449 lines
17 KiB
Python
Executable File
449 lines
17 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
Воркер синхронизации Nextcloud -> Qdrant через Open WebUI API
|
||
Background процесс для автоматической синхронизации документов
|
||
"""
|
||
import sys
|
||
import time
|
||
import logging
|
||
import sqlite3
|
||
import hashlib
|
||
from pathlib import Path
|
||
from typing import Dict, Optional, List
|
||
from datetime import datetime
|
||
|
||
# Импорт локальных модулей
|
||
from config import load_config, WorkerConfig
|
||
from nextcloud_client import NextcloudClient
|
||
from openwebui_client import OpenWebUIClient
|
||
from document_processor import DocumentProcessor
|
||
|
||
# Настройка логирования
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.FileHandler('sync.log'),
|
||
logging.StreamHandler(sys.stdout)
|
||
]
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class SyncStateDB:
|
||
"""Управление состоянием синхронизации через SQLite"""
|
||
|
||
def __init__(self, db_path: str):
|
||
"""
|
||
Инициализация БД состояния
|
||
|
||
Args:
|
||
db_path: Путь к файлу БД
|
||
"""
|
||
self.db_path = db_path
|
||
self._init_db()
|
||
|
||
def _init_db(self):
|
||
"""Инициализация схемы БД"""
|
||
conn = sqlite3.connect(self.db_path)
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS synced_files (
|
||
file_path TEXT PRIMARY KEY,
|
||
file_hash TEXT NOT NULL,
|
||
file_size INTEGER NOT NULL,
|
||
last_modified TEXT,
|
||
synced_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
user_id TEXT,
|
||
document_id TEXT
|
||
)
|
||
""")
|
||
|
||
cursor.execute("""
|
||
CREATE INDEX IF NOT EXISTS idx_file_hash ON synced_files(file_hash)
|
||
""")
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
def get_file_hash(self, file_path: str, file_content: bytes) -> str:
|
||
"""Вычисление хеша файла"""
|
||
return hashlib.sha256(file_content).hexdigest()
|
||
|
||
def is_file_synced(self, file_path: str, file_hash: str) -> bool:
|
||
"""
|
||
Проверка, синхронизирован ли файл
|
||
|
||
Args:
|
||
file_path: Путь к файлу
|
||
file_hash: Хеш содержимого файла
|
||
|
||
Returns:
|
||
True если файл уже синхронизирован с таким хешем
|
||
"""
|
||
conn = sqlite3.connect(self.db_path)
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute("""
|
||
SELECT file_hash FROM synced_files
|
||
WHERE file_path = ? AND file_hash = ?
|
||
""", (file_path, file_hash))
|
||
|
||
result = cursor.fetchone()
|
||
conn.close()
|
||
|
||
return result is not None
|
||
|
||
def mark_file_synced(
|
||
self,
|
||
file_path: str,
|
||
file_hash: str,
|
||
file_size: int,
|
||
last_modified: Optional[str],
|
||
user_id: Optional[str],
|
||
document_id: Optional[str]
|
||
):
|
||
"""Отметка файла как синхронизированного"""
|
||
conn = sqlite3.connect(self.db_path)
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute("""
|
||
INSERT OR REPLACE INTO synced_files
|
||
(file_path, file_hash, file_size, last_modified, synced_at, user_id, document_id)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||
""", (
|
||
file_path,
|
||
file_hash,
|
||
file_size,
|
||
last_modified,
|
||
datetime.now().isoformat(),
|
||
user_id,
|
||
document_id
|
||
))
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
def get_sync_stats(self) -> Dict:
|
||
"""Получение статистики синхронизации"""
|
||
conn = sqlite3.connect(self.db_path)
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute("SELECT COUNT(*) FROM synced_files")
|
||
total_files = cursor.fetchone()[0]
|
||
|
||
cursor.execute("SELECT SUM(file_size) FROM synced_files")
|
||
total_size = cursor.fetchone()[0] or 0
|
||
|
||
conn.close()
|
||
|
||
return {
|
||
"total_files": total_files,
|
||
"total_size": total_size
|
||
}
|
||
|
||
|
||
class UserMapper:
|
||
"""Маппинг пользователей Nextcloud -> Authentik"""
|
||
|
||
def __init__(self, config: WorkerConfig):
|
||
"""
|
||
Инициализация маппера пользователей
|
||
|
||
Args:
|
||
config: Конфигурация воркера
|
||
"""
|
||
self.config = config
|
||
# Кеш маппинга (в production можно использовать БД или Authentik API)
|
||
self._cache: Dict[str, str] = {}
|
||
|
||
def map_nextcloud_user_to_authentik(self, nc_username: str) -> Optional[str]:
|
||
"""
|
||
Маппинг имени пользователя Nextcloud в user_id Authentik
|
||
|
||
Args:
|
||
nc_username: Имя пользователя в Nextcloud
|
||
|
||
Returns:
|
||
user_id Authentik или None если маппинг не найден
|
||
"""
|
||
# Проверка кеша
|
||
if nc_username in self._cache:
|
||
return self._cache[nc_username]
|
||
|
||
# В production здесь должен быть запрос к Authentik API
|
||
# или использование общей БД пользователей
|
||
# Пока используем простое предположение: username совпадает
|
||
# или можно настроить через переменные окружения
|
||
|
||
# Попытка получить из переменных окружения (формат: NC_USER_username=AUTHENTIK_USER_ID)
|
||
env_key = f"NC_USER_{nc_username}"
|
||
authentik_user_id = None
|
||
|
||
import os
|
||
if env_key in os.environ:
|
||
authentik_user_id = os.environ[env_key]
|
||
else:
|
||
# По умолчанию предполагаем, что username совпадает
|
||
# В production это должно быть через Authentik API
|
||
authentik_user_id = nc_username
|
||
|
||
self._cache[nc_username] = authentik_user_id
|
||
return authentik_user_id
|
||
|
||
def get_user_groups(self, nc_username: str) -> List[str]:
|
||
"""
|
||
Получение списка групп доступа для пользователя
|
||
|
||
Args:
|
||
nc_username: Имя пользователя Nextcloud
|
||
|
||
Returns:
|
||
Список групп доступа
|
||
"""
|
||
# В production получать из Authentik API или Nextcloud групп
|
||
# Пока возвращаем пустой список (только личные файлы)
|
||
return []
|
||
|
||
|
||
class NextcloudSyncWorker:
|
||
"""Основной класс воркера синхронизации"""
|
||
|
||
def __init__(self, config: WorkerConfig):
|
||
"""
|
||
Инициализация воркера
|
||
|
||
Args:
|
||
config: Конфигурация воркера
|
||
"""
|
||
self.config = config
|
||
self.nc_client = NextcloudClient(
|
||
config.nextcloud.url,
|
||
config.nextcloud.username,
|
||
config.nextcloud.password
|
||
)
|
||
self.webui_client = OpenWebUIClient(
|
||
config.openwebui.api_url,
|
||
config.openwebui.api_key,
|
||
config.openwebui.timeout
|
||
)
|
||
self.processor = DocumentProcessor(config.max_file_size)
|
||
self.state_db = SyncStateDB(config.db_path)
|
||
self.user_mapper = UserMapper(config)
|
||
|
||
def sync_path(self, path_template: str) -> int:
|
||
"""
|
||
Синхронизация указанного пути
|
||
|
||
Args:
|
||
path_template: Шаблон пути (может содержать {username})
|
||
|
||
Returns:
|
||
Количество синхронизированных файлов
|
||
"""
|
||
synced_count = 0
|
||
|
||
# Замена {username} в шаблоне пути
|
||
if "{username}" in path_template:
|
||
# Сканируем все домашние директории
|
||
# В production можно получить список пользователей из Nextcloud API
|
||
username = self.config.nextcloud.username
|
||
path = path_template.replace("{username}", username)
|
||
else:
|
||
path = path_template
|
||
|
||
try:
|
||
logger.info(f"Сканирование пути: {path}")
|
||
files = self.nc_client.scan_directory_recursive(path)
|
||
logger.info(f"Найдено файлов: {len(files)}")
|
||
|
||
for file_info in files:
|
||
try:
|
||
if self._sync_file(file_info, path_template):
|
||
synced_count += 1
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при синхронизации файла {file_info['path']}: {e}")
|
||
continue
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при сканировании пути {path}: {e}")
|
||
|
||
return synced_count
|
||
|
||
def _sync_file(self, file_info: Dict, path_template: str) -> bool:
|
||
"""
|
||
Синхронизация одного файла
|
||
|
||
Args:
|
||
file_info: Информация о файле
|
||
path_template: Шаблон пути (для определения владельца)
|
||
|
||
Returns:
|
||
True если файл был синхронизирован
|
||
"""
|
||
file_path = file_info["path"]
|
||
file_size = file_info.get("size", 0)
|
||
|
||
# Пропускаем слишком большие файлы
|
||
if file_size > self.config.max_file_size * 2: # Двойной лимит для безопасности
|
||
logger.warning(f"Файл {file_path} слишком большой ({file_size} bytes), пропуск")
|
||
return False
|
||
|
||
# Проверка поддержки формата
|
||
if not self.processor.is_supported_format(file_path, file_info.get("type")):
|
||
logger.debug(f"Неподдерживаемый формат: {file_path}")
|
||
return False
|
||
|
||
try:
|
||
# Загрузка файла
|
||
file_content = self.nc_client.download_file(file_path)
|
||
file_hash = self.state_db.get_file_hash(file_path, file_content)
|
||
|
||
# Проверка, не синхронизирован ли уже файл
|
||
if self.state_db.is_file_synced(file_path, file_hash):
|
||
logger.debug(f"Файл уже синхронизирован: {file_path}")
|
||
return False
|
||
|
||
# Обработка файла
|
||
text_content, is_large = self.processor.process_file(
|
||
file_content,
|
||
Path(file_path).name,
|
||
file_info.get("type")
|
||
)
|
||
|
||
# Определение владельца файла
|
||
username = self.nc_client.extract_username_from_path(file_path)
|
||
if not username:
|
||
# Для общих папок используем None или специальную группу
|
||
username = None
|
||
|
||
user_id = self.user_mapper.map_nextcloud_user_to_authentik(username) if username else None
|
||
access_groups = self.user_mapper.get_user_groups(username) if username else []
|
||
|
||
# Метаданные для API
|
||
metadata = {
|
||
"source": "nextcloud",
|
||
"path": file_path,
|
||
"original_size": file_size,
|
||
"is_large_file": is_large
|
||
}
|
||
|
||
# Загрузка в Open WebUI
|
||
if is_large or len(text_content.encode('utf-8')) > self.config.max_file_size:
|
||
# Для больших файлов загружаем как текст
|
||
result = self.webui_client.upload_document_with_text(
|
||
text_content=text_content,
|
||
filename=Path(file_path).name,
|
||
user_id=user_id,
|
||
access_groups=access_groups,
|
||
metadata=metadata
|
||
)
|
||
else:
|
||
# Для обычных файлов загружаем оригинал
|
||
result = self.webui_client.upload_document(
|
||
file_content=file_content,
|
||
filename=Path(file_path).name,
|
||
user_id=user_id,
|
||
access_groups=access_groups,
|
||
metadata=metadata
|
||
)
|
||
|
||
# Сохранение состояния
|
||
document_id = result.get("id") if isinstance(result, dict) else None
|
||
self.state_db.mark_file_synced(
|
||
file_path=file_path,
|
||
file_hash=file_hash,
|
||
file_size=file_size,
|
||
last_modified=file_info.get("modified"),
|
||
user_id=user_id,
|
||
document_id=document_id
|
||
)
|
||
|
||
logger.info(f"Файл синхронизирован: {file_path} -> документ ID: {document_id}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при синхронизации файла {file_path}: {e}")
|
||
return False
|
||
|
||
def run_once(self):
|
||
"""Однократный запуск синхронизации"""
|
||
logger.info("=== Начало синхронизации ===")
|
||
|
||
total_synced = 0
|
||
|
||
for path_template in self.config.nextcloud.scan_paths:
|
||
try:
|
||
synced = self.sync_path(path_template)
|
||
total_synced += synced
|
||
logger.info(f"Синхронизировано файлов из {path_template}: {synced}")
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при синхронизации пути {path_template}: {e}")
|
||
|
||
stats = self.state_db.get_sync_stats()
|
||
logger.info(f"=== Синхронизация завершена ===")
|
||
logger.info(f"Всего синхронизировано в этом цикле: {total_synced}")
|
||
logger.info(f"Всего файлов в БД: {stats['total_files']}")
|
||
logger.info(f"Общий размер: {stats['total_size'] / 1024 / 1024:.2f} MB")
|
||
|
||
def run_daemon(self):
|
||
"""Запуск воркера в режиме daemon (бесконечный цикл)"""
|
||
logger.info("Запуск воркера в режиме daemon")
|
||
logger.info(f"Интервал синхронизации: {self.config.sync_interval} секунд")
|
||
|
||
try:
|
||
while True:
|
||
self.run_once()
|
||
logger.info(f"Ожидание {self.config.sync_interval} секунд до следующей синхронизации...")
|
||
time.sleep(self.config.sync_interval)
|
||
except KeyboardInterrupt:
|
||
logger.info("Получен сигнал остановки, завершение работы...")
|
||
finally:
|
||
self.cleanup()
|
||
|
||
def cleanup(self):
|
||
"""Очистка ресурсов"""
|
||
logger.info("Очистка ресурсов...")
|
||
self.nc_client.close()
|
||
self.webui_client.close()
|
||
|
||
|
||
def main():
|
||
"""Точка входа"""
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(description="Воркер синхронизации Nextcloud -> Qdrant")
|
||
parser.add_argument(
|
||
"--once",
|
||
action="store_true",
|
||
help="Запустить синхронизацию один раз и выйти"
|
||
)
|
||
parser.add_argument(
|
||
"--daemon",
|
||
action="store_true",
|
||
help="Запустить в режиме daemon (бесконечный цикл)"
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
try:
|
||
config = load_config()
|
||
worker = NextcloudSyncWorker(config)
|
||
|
||
if args.once:
|
||
worker.run_once()
|
||
elif args.daemon:
|
||
worker.run_daemon()
|
||
else:
|
||
# По умолчанию запускаем один раз
|
||
worker.run_once()
|
||
|
||
except Exception as e:
|
||
logger.error(f"Критическая ошибка: {e}", exc_info=True)
|
||
sys.exit(1)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|