#!/usr/bin/env python3 """ Воркер синхронизации Nextcloud -> Qdrant через Open WebUI API Background процесс для автоматической синхронизации документов """ import sys import time import logging import sqlite3 import hashlib from pathlib import Path from typing import Dict, Optional, List from datetime import datetime # Импорт локальных модулей from config import load_config, WorkerConfig from nextcloud_client import NextcloudClient from openwebui_client import OpenWebUIClient from document_processor import DocumentProcessor # Настройка логирования logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('sync.log'), logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) class SyncStateDB: """Управление состоянием синхронизации через SQLite""" def __init__(self, db_path: str): """ Инициализация БД состояния Args: db_path: Путь к файлу БД """ self.db_path = db_path self._init_db() def _init_db(self): """Инициализация схемы БД""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS synced_files ( file_path TEXT PRIMARY KEY, file_hash TEXT NOT NULL, file_size INTEGER NOT NULL, last_modified TEXT, synced_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, user_id TEXT, document_id TEXT ) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_file_hash ON synced_files(file_hash) """) conn.commit() conn.close() def get_file_hash(self, file_path: str, file_content: bytes) -> str: """Вычисление хеша файла""" return hashlib.sha256(file_content).hexdigest() def is_file_synced(self, file_path: str, file_hash: str) -> bool: """ Проверка, синхронизирован ли файл Args: file_path: Путь к файлу file_hash: Хеш содержимого файла Returns: True если файл уже синхронизирован с таким хешем """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" SELECT file_hash FROM synced_files WHERE file_path = ? AND file_hash = ? """, (file_path, file_hash)) result = cursor.fetchone() conn.close() return result is not None def mark_file_synced( self, file_path: str, file_hash: str, file_size: int, last_modified: Optional[str], user_id: Optional[str], document_id: Optional[str] ): """Отметка файла как синхронизированного""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" INSERT OR REPLACE INTO synced_files (file_path, file_hash, file_size, last_modified, synced_at, user_id, document_id) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( file_path, file_hash, file_size, last_modified, datetime.now().isoformat(), user_id, document_id )) conn.commit() conn.close() def get_sync_stats(self) -> Dict: """Получение статистики синхронизации""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM synced_files") total_files = cursor.fetchone()[0] cursor.execute("SELECT SUM(file_size) FROM synced_files") total_size = cursor.fetchone()[0] or 0 conn.close() return { "total_files": total_files, "total_size": total_size } class UserMapper: """Маппинг пользователей Nextcloud -> Authentik""" def __init__(self, config: WorkerConfig): """ Инициализация маппера пользователей Args: config: Конфигурация воркера """ self.config = config # Кеш маппинга (в production можно использовать БД или Authentik API) self._cache: Dict[str, str] = {} def map_nextcloud_user_to_authentik(self, nc_username: str) -> Optional[str]: """ Маппинг имени пользователя Nextcloud в user_id Authentik Args: nc_username: Имя пользователя в Nextcloud Returns: user_id Authentik или None если маппинг не найден """ # Проверка кеша if nc_username in self._cache: return self._cache[nc_username] # В production здесь должен быть запрос к Authentik API # или использование общей БД пользователей # Пока используем простое предположение: username совпадает # или можно настроить через переменные окружения # Попытка получить из переменных окружения (формат: NC_USER_username=AUTHENTIK_USER_ID) env_key = f"NC_USER_{nc_username}" authentik_user_id = None import os if env_key in os.environ: authentik_user_id = os.environ[env_key] else: # По умолчанию предполагаем, что username совпадает # В production это должно быть через Authentik API authentik_user_id = nc_username self._cache[nc_username] = authentik_user_id return authentik_user_id def get_user_groups(self, nc_username: str) -> List[str]: """ Получение списка групп доступа для пользователя Args: nc_username: Имя пользователя Nextcloud Returns: Список групп доступа """ # В production получать из Authentik API или Nextcloud групп # Пока возвращаем пустой список (только личные файлы) return [] class NextcloudSyncWorker: """Основной класс воркера синхронизации""" def __init__(self, config: WorkerConfig): """ Инициализация воркера Args: config: Конфигурация воркера """ self.config = config self.nc_client = NextcloudClient( config.nextcloud.url, config.nextcloud.username, config.nextcloud.password ) self.webui_client = OpenWebUIClient( config.openwebui.api_url, config.openwebui.api_key, config.openwebui.timeout ) self.processor = DocumentProcessor(config.max_file_size) self.state_db = SyncStateDB(config.db_path) self.user_mapper = UserMapper(config) def sync_path(self, path_template: str) -> int: """ Синхронизация указанного пути Args: path_template: Шаблон пути (может содержать {username}) Returns: Количество синхронизированных файлов """ synced_count = 0 # Замена {username} в шаблоне пути if "{username}" in path_template: # Сканируем все домашние директории # В production можно получить список пользователей из Nextcloud API username = self.config.nextcloud.username path = path_template.replace("{username}", username) else: path = path_template try: logger.info(f"Сканирование пути: {path}") files = self.nc_client.scan_directory_recursive(path) logger.info(f"Найдено файлов: {len(files)}") for file_info in files: try: if self._sync_file(file_info, path_template): synced_count += 1 except Exception as e: logger.error(f"Ошибка при синхронизации файла {file_info['path']}: {e}") continue except Exception as e: logger.error(f"Ошибка при сканировании пути {path}: {e}") return synced_count def _sync_file(self, file_info: Dict, path_template: str) -> bool: """ Синхронизация одного файла Args: file_info: Информация о файле path_template: Шаблон пути (для определения владельца) Returns: True если файл был синхронизирован """ file_path = file_info["path"] file_size = file_info.get("size", 0) # Пропускаем слишком большие файлы if file_size > self.config.max_file_size * 2: # Двойной лимит для безопасности logger.warning(f"Файл {file_path} слишком большой ({file_size} bytes), пропуск") return False # Проверка поддержки формата if not self.processor.is_supported_format(file_path, file_info.get("type")): logger.debug(f"Неподдерживаемый формат: {file_path}") return False try: # Загрузка файла file_content = self.nc_client.download_file(file_path) file_hash = self.state_db.get_file_hash(file_path, file_content) # Проверка, не синхронизирован ли уже файл if self.state_db.is_file_synced(file_path, file_hash): logger.debug(f"Файл уже синхронизирован: {file_path}") return False # Обработка файла text_content, is_large = self.processor.process_file( file_content, Path(file_path).name, file_info.get("type") ) # Определение владельца файла username = self.nc_client.extract_username_from_path(file_path) if not username: # Для общих папок используем None или специальную группу username = None user_id = self.user_mapper.map_nextcloud_user_to_authentik(username) if username else None access_groups = self.user_mapper.get_user_groups(username) if username else [] # Метаданные для API metadata = { "source": "nextcloud", "path": file_path, "original_size": file_size, "is_large_file": is_large } # Загрузка в Open WebUI if is_large or len(text_content.encode('utf-8')) > self.config.max_file_size: # Для больших файлов загружаем как текст result = self.webui_client.upload_document_with_text( text_content=text_content, filename=Path(file_path).name, user_id=user_id, access_groups=access_groups, metadata=metadata ) else: # Для обычных файлов загружаем оригинал result = self.webui_client.upload_document( file_content=file_content, filename=Path(file_path).name, user_id=user_id, access_groups=access_groups, metadata=metadata ) # Сохранение состояния document_id = result.get("id") if isinstance(result, dict) else None self.state_db.mark_file_synced( file_path=file_path, file_hash=file_hash, file_size=file_size, last_modified=file_info.get("modified"), user_id=user_id, document_id=document_id ) logger.info(f"Файл синхронизирован: {file_path} -> документ ID: {document_id}") return True except Exception as e: logger.error(f"Ошибка при синхронизации файла {file_path}: {e}") return False def run_once(self): """Однократный запуск синхронизации""" logger.info("=== Начало синхронизации ===") total_synced = 0 for path_template in self.config.nextcloud.scan_paths: try: synced = self.sync_path(path_template) total_synced += synced logger.info(f"Синхронизировано файлов из {path_template}: {synced}") except Exception as e: logger.error(f"Ошибка при синхронизации пути {path_template}: {e}") stats = self.state_db.get_sync_stats() logger.info(f"=== Синхронизация завершена ===") logger.info(f"Всего синхронизировано в этом цикле: {total_synced}") logger.info(f"Всего файлов в БД: {stats['total_files']}") logger.info(f"Общий размер: {stats['total_size'] / 1024 / 1024:.2f} MB") def run_daemon(self): """Запуск воркера в режиме daemon (бесконечный цикл)""" logger.info("Запуск воркера в режиме daemon") logger.info(f"Интервал синхронизации: {self.config.sync_interval} секунд") try: while True: self.run_once() logger.info(f"Ожидание {self.config.sync_interval} секунд до следующей синхронизации...") time.sleep(self.config.sync_interval) except KeyboardInterrupt: logger.info("Получен сигнал остановки, завершение работы...") finally: self.cleanup() def cleanup(self): """Очистка ресурсов""" logger.info("Очистка ресурсов...") self.nc_client.close() self.webui_client.close() def main(): """Точка входа""" import argparse parser = argparse.ArgumentParser(description="Воркер синхронизации Nextcloud -> Qdrant") parser.add_argument( "--once", action="store_true", help="Запустить синхронизацию один раз и выйти" ) parser.add_argument( "--daemon", action="store_true", help="Запустить в режиме daemon (бесконечный цикл)" ) args = parser.parse_args() try: config = load_config() worker = NextcloudSyncWorker(config) if args.once: worker.run_once() elif args.daemon: worker.run_daemon() else: # По умолчанию запускаем один раз worker.run_once() except Exception as e: logger.error(f"Критическая ошибка: {e}", exc_info=True) sys.exit(1) if __name__ == "__main__": main()