449 lines
17 KiB
Python
449 lines
17 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""
|
|||
|
|
Воркер синхронизации Nextcloud -> Qdrant через Open WebUI API
|
|||
|
|
Background процесс для автоматической синхронизации документов
|
|||
|
|
"""
|
|||
|
|
import sys
|
|||
|
|
import time
|
|||
|
|
import logging
|
|||
|
|
import sqlite3
|
|||
|
|
import hashlib
|
|||
|
|
from pathlib import Path
|
|||
|
|
from typing import Dict, Optional, List
|
|||
|
|
from datetime import datetime
|
|||
|
|
|
|||
|
|
# Импорт локальных модулей
|
|||
|
|
from config import load_config, WorkerConfig
|
|||
|
|
from nextcloud_client import NextcloudClient
|
|||
|
|
from openwebui_client import OpenWebUIClient
|
|||
|
|
from document_processor import DocumentProcessor
|
|||
|
|
|
|||
|
|
# Настройка логирования
|
|||
|
|
logging.basicConfig(
|
|||
|
|
level=logging.INFO,
|
|||
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|||
|
|
handlers=[
|
|||
|
|
logging.FileHandler('sync.log'),
|
|||
|
|
logging.StreamHandler(sys.stdout)
|
|||
|
|
]
|
|||
|
|
)
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class SyncStateDB:
|
|||
|
|
"""Управление состоянием синхронизации через SQLite"""
|
|||
|
|
|
|||
|
|
def __init__(self, db_path: str):
|
|||
|
|
"""
|
|||
|
|
Инициализация БД состояния
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
db_path: Путь к файлу БД
|
|||
|
|
"""
|
|||
|
|
self.db_path = db_path
|
|||
|
|
self._init_db()
|
|||
|
|
|
|||
|
|
def _init_db(self):
|
|||
|
|
"""Инициализация схемы БД"""
|
|||
|
|
conn = sqlite3.connect(self.db_path)
|
|||
|
|
cursor = conn.cursor()
|
|||
|
|
|
|||
|
|
cursor.execute("""
|
|||
|
|
CREATE TABLE IF NOT EXISTS synced_files (
|
|||
|
|
file_path TEXT PRIMARY KEY,
|
|||
|
|
file_hash TEXT NOT NULL,
|
|||
|
|
file_size INTEGER NOT NULL,
|
|||
|
|
last_modified TEXT,
|
|||
|
|
synced_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|||
|
|
user_id TEXT,
|
|||
|
|
document_id TEXT
|
|||
|
|
)
|
|||
|
|
""")
|
|||
|
|
|
|||
|
|
cursor.execute("""
|
|||
|
|
CREATE INDEX IF NOT EXISTS idx_file_hash ON synced_files(file_hash)
|
|||
|
|
""")
|
|||
|
|
|
|||
|
|
conn.commit()
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
def get_file_hash(self, file_path: str, file_content: bytes) -> str:
|
|||
|
|
"""Вычисление хеша файла"""
|
|||
|
|
return hashlib.sha256(file_content).hexdigest()
|
|||
|
|
|
|||
|
|
def is_file_synced(self, file_path: str, file_hash: str) -> bool:
|
|||
|
|
"""
|
|||
|
|
Проверка, синхронизирован ли файл
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
file_path: Путь к файлу
|
|||
|
|
file_hash: Хеш содержимого файла
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
True если файл уже синхронизирован с таким хешем
|
|||
|
|
"""
|
|||
|
|
conn = sqlite3.connect(self.db_path)
|
|||
|
|
cursor = conn.cursor()
|
|||
|
|
|
|||
|
|
cursor.execute("""
|
|||
|
|
SELECT file_hash FROM synced_files
|
|||
|
|
WHERE file_path = ? AND file_hash = ?
|
|||
|
|
""", (file_path, file_hash))
|
|||
|
|
|
|||
|
|
result = cursor.fetchone()
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
return result is not None
|
|||
|
|
|
|||
|
|
def mark_file_synced(
|
|||
|
|
self,
|
|||
|
|
file_path: str,
|
|||
|
|
file_hash: str,
|
|||
|
|
file_size: int,
|
|||
|
|
last_modified: Optional[str],
|
|||
|
|
user_id: Optional[str],
|
|||
|
|
document_id: Optional[str]
|
|||
|
|
):
|
|||
|
|
"""Отметка файла как синхронизированного"""
|
|||
|
|
conn = sqlite3.connect(self.db_path)
|
|||
|
|
cursor = conn.cursor()
|
|||
|
|
|
|||
|
|
cursor.execute("""
|
|||
|
|
INSERT OR REPLACE INTO synced_files
|
|||
|
|
(file_path, file_hash, file_size, last_modified, synced_at, user_id, document_id)
|
|||
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|||
|
|
""", (
|
|||
|
|
file_path,
|
|||
|
|
file_hash,
|
|||
|
|
file_size,
|
|||
|
|
last_modified,
|
|||
|
|
datetime.now().isoformat(),
|
|||
|
|
user_id,
|
|||
|
|
document_id
|
|||
|
|
))
|
|||
|
|
|
|||
|
|
conn.commit()
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
def get_sync_stats(self) -> Dict:
|
|||
|
|
"""Получение статистики синхронизации"""
|
|||
|
|
conn = sqlite3.connect(self.db_path)
|
|||
|
|
cursor = conn.cursor()
|
|||
|
|
|
|||
|
|
cursor.execute("SELECT COUNT(*) FROM synced_files")
|
|||
|
|
total_files = cursor.fetchone()[0]
|
|||
|
|
|
|||
|
|
cursor.execute("SELECT SUM(file_size) FROM synced_files")
|
|||
|
|
total_size = cursor.fetchone()[0] or 0
|
|||
|
|
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"total_files": total_files,
|
|||
|
|
"total_size": total_size
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
class UserMapper:
|
|||
|
|
"""Маппинг пользователей Nextcloud -> Authentik"""
|
|||
|
|
|
|||
|
|
def __init__(self, config: WorkerConfig):
|
|||
|
|
"""
|
|||
|
|
Инициализация маппера пользователей
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
config: Конфигурация воркера
|
|||
|
|
"""
|
|||
|
|
self.config = config
|
|||
|
|
# Кеш маппинга (в production можно использовать БД или Authentik API)
|
|||
|
|
self._cache: Dict[str, str] = {}
|
|||
|
|
|
|||
|
|
def map_nextcloud_user_to_authentik(self, nc_username: str) -> Optional[str]:
|
|||
|
|
"""
|
|||
|
|
Маппинг имени пользователя Nextcloud в user_id Authentik
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
nc_username: Имя пользователя в Nextcloud
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
user_id Authentik или None если маппинг не найден
|
|||
|
|
"""
|
|||
|
|
# Проверка кеша
|
|||
|
|
if nc_username in self._cache:
|
|||
|
|
return self._cache[nc_username]
|
|||
|
|
|
|||
|
|
# В production здесь должен быть запрос к Authentik API
|
|||
|
|
# или использование общей БД пользователей
|
|||
|
|
# Пока используем простое предположение: username совпадает
|
|||
|
|
# или можно настроить через переменные окружения
|
|||
|
|
|
|||
|
|
# Попытка получить из переменных окружения (формат: NC_USER_username=AUTHENTIK_USER_ID)
|
|||
|
|
env_key = f"NC_USER_{nc_username}"
|
|||
|
|
authentik_user_id = None
|
|||
|
|
|
|||
|
|
import os
|
|||
|
|
if env_key in os.environ:
|
|||
|
|
authentik_user_id = os.environ[env_key]
|
|||
|
|
else:
|
|||
|
|
# По умолчанию предполагаем, что username совпадает
|
|||
|
|
# В production это должно быть через Authentik API
|
|||
|
|
authentik_user_id = nc_username
|
|||
|
|
|
|||
|
|
self._cache[nc_username] = authentik_user_id
|
|||
|
|
return authentik_user_id
|
|||
|
|
|
|||
|
|
def get_user_groups(self, nc_username: str) -> List[str]:
|
|||
|
|
"""
|
|||
|
|
Получение списка групп доступа для пользователя
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
nc_username: Имя пользователя Nextcloud
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Список групп доступа
|
|||
|
|
"""
|
|||
|
|
# В production получать из Authentik API или Nextcloud групп
|
|||
|
|
# Пока возвращаем пустой список (только личные файлы)
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
|
|||
|
|
class NextcloudSyncWorker:
|
|||
|
|
"""Основной класс воркера синхронизации"""
|
|||
|
|
|
|||
|
|
def __init__(self, config: WorkerConfig):
|
|||
|
|
"""
|
|||
|
|
Инициализация воркера
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
config: Конфигурация воркера
|
|||
|
|
"""
|
|||
|
|
self.config = config
|
|||
|
|
self.nc_client = NextcloudClient(
|
|||
|
|
config.nextcloud.url,
|
|||
|
|
config.nextcloud.username,
|
|||
|
|
config.nextcloud.password
|
|||
|
|
)
|
|||
|
|
self.webui_client = OpenWebUIClient(
|
|||
|
|
config.openwebui.api_url,
|
|||
|
|
config.openwebui.api_key,
|
|||
|
|
config.openwebui.timeout
|
|||
|
|
)
|
|||
|
|
self.processor = DocumentProcessor(config.max_file_size)
|
|||
|
|
self.state_db = SyncStateDB(config.db_path)
|
|||
|
|
self.user_mapper = UserMapper(config)
|
|||
|
|
|
|||
|
|
def sync_path(self, path_template: str) -> int:
|
|||
|
|
"""
|
|||
|
|
Синхронизация указанного пути
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
path_template: Шаблон пути (может содержать {username})
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Количество синхронизированных файлов
|
|||
|
|
"""
|
|||
|
|
synced_count = 0
|
|||
|
|
|
|||
|
|
# Замена {username} в шаблоне пути
|
|||
|
|
if "{username}" in path_template:
|
|||
|
|
# Сканируем все домашние директории
|
|||
|
|
# В production можно получить список пользователей из Nextcloud API
|
|||
|
|
username = self.config.nextcloud.username
|
|||
|
|
path = path_template.replace("{username}", username)
|
|||
|
|
else:
|
|||
|
|
path = path_template
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
logger.info(f"Сканирование пути: {path}")
|
|||
|
|
files = self.nc_client.scan_directory_recursive(path)
|
|||
|
|
logger.info(f"Найдено файлов: {len(files)}")
|
|||
|
|
|
|||
|
|
for file_info in files:
|
|||
|
|
try:
|
|||
|
|
if self._sync_file(file_info, path_template):
|
|||
|
|
synced_count += 1
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"Ошибка при синхронизации файла {file_info['path']}: {e}")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"Ошибка при сканировании пути {path}: {e}")
|
|||
|
|
|
|||
|
|
return synced_count
|
|||
|
|
|
|||
|
|
def _sync_file(self, file_info: Dict, path_template: str) -> bool:
|
|||
|
|
"""
|
|||
|
|
Синхронизация одного файла
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
file_info: Информация о файле
|
|||
|
|
path_template: Шаблон пути (для определения владельца)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
True если файл был синхронизирован
|
|||
|
|
"""
|
|||
|
|
file_path = file_info["path"]
|
|||
|
|
file_size = file_info.get("size", 0)
|
|||
|
|
|
|||
|
|
# Пропускаем слишком большие файлы
|
|||
|
|
if file_size > self.config.max_file_size * 2: # Двойной лимит для безопасности
|
|||
|
|
logger.warning(f"Файл {file_path} слишком большой ({file_size} bytes), пропуск")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
# Проверка поддержки формата
|
|||
|
|
if not self.processor.is_supported_format(file_path, file_info.get("type")):
|
|||
|
|
logger.debug(f"Неподдерживаемый формат: {file_path}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# Загрузка файла
|
|||
|
|
file_content = self.nc_client.download_file(file_path)
|
|||
|
|
file_hash = self.state_db.get_file_hash(file_path, file_content)
|
|||
|
|
|
|||
|
|
# Проверка, не синхронизирован ли уже файл
|
|||
|
|
if self.state_db.is_file_synced(file_path, file_hash):
|
|||
|
|
logger.debug(f"Файл уже синхронизирован: {file_path}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
# Обработка файла
|
|||
|
|
text_content, is_large = self.processor.process_file(
|
|||
|
|
file_content,
|
|||
|
|
Path(file_path).name,
|
|||
|
|
file_info.get("type")
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# Определение владельца файла
|
|||
|
|
username = self.nc_client.extract_username_from_path(file_path)
|
|||
|
|
if not username:
|
|||
|
|
# Для общих папок используем None или специальную группу
|
|||
|
|
username = None
|
|||
|
|
|
|||
|
|
user_id = self.user_mapper.map_nextcloud_user_to_authentik(username) if username else None
|
|||
|
|
access_groups = self.user_mapper.get_user_groups(username) if username else []
|
|||
|
|
|
|||
|
|
# Метаданные для API
|
|||
|
|
metadata = {
|
|||
|
|
"source": "nextcloud",
|
|||
|
|
"path": file_path,
|
|||
|
|
"original_size": file_size,
|
|||
|
|
"is_large_file": is_large
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# Загрузка в Open WebUI
|
|||
|
|
if is_large or len(text_content.encode('utf-8')) > self.config.max_file_size:
|
|||
|
|
# Для больших файлов загружаем как текст
|
|||
|
|
result = self.webui_client.upload_document_with_text(
|
|||
|
|
text_content=text_content,
|
|||
|
|
filename=Path(file_path).name,
|
|||
|
|
user_id=user_id,
|
|||
|
|
access_groups=access_groups,
|
|||
|
|
metadata=metadata
|
|||
|
|
)
|
|||
|
|
else:
|
|||
|
|
# Для обычных файлов загружаем оригинал
|
|||
|
|
result = self.webui_client.upload_document(
|
|||
|
|
file_content=file_content,
|
|||
|
|
filename=Path(file_path).name,
|
|||
|
|
user_id=user_id,
|
|||
|
|
access_groups=access_groups,
|
|||
|
|
metadata=metadata
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# Сохранение состояния
|
|||
|
|
document_id = result.get("id") if isinstance(result, dict) else None
|
|||
|
|
self.state_db.mark_file_synced(
|
|||
|
|
file_path=file_path,
|
|||
|
|
file_hash=file_hash,
|
|||
|
|
file_size=file_size,
|
|||
|
|
last_modified=file_info.get("modified"),
|
|||
|
|
user_id=user_id,
|
|||
|
|
document_id=document_id
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
logger.info(f"Файл синхронизирован: {file_path} -> документ ID: {document_id}")
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"Ошибка при синхронизации файла {file_path}: {e}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def run_once(self):
|
|||
|
|
"""Однократный запуск синхронизации"""
|
|||
|
|
logger.info("=== Начало синхронизации ===")
|
|||
|
|
|
|||
|
|
total_synced = 0
|
|||
|
|
|
|||
|
|
for path_template in self.config.nextcloud.scan_paths:
|
|||
|
|
try:
|
|||
|
|
synced = self.sync_path(path_template)
|
|||
|
|
total_synced += synced
|
|||
|
|
logger.info(f"Синхронизировано файлов из {path_template}: {synced}")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"Ошибка при синхронизации пути {path_template}: {e}")
|
|||
|
|
|
|||
|
|
stats = self.state_db.get_sync_stats()
|
|||
|
|
logger.info(f"=== Синхронизация завершена ===")
|
|||
|
|
logger.info(f"Всего синхронизировано в этом цикле: {total_synced}")
|
|||
|
|
logger.info(f"Всего файлов в БД: {stats['total_files']}")
|
|||
|
|
logger.info(f"Общий размер: {stats['total_size'] / 1024 / 1024:.2f} MB")
|
|||
|
|
|
|||
|
|
def run_daemon(self):
|
|||
|
|
"""Запуск воркера в режиме daemon (бесконечный цикл)"""
|
|||
|
|
logger.info("Запуск воркера в режиме daemon")
|
|||
|
|
logger.info(f"Интервал синхронизации: {self.config.sync_interval} секунд")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
while True:
|
|||
|
|
self.run_once()
|
|||
|
|
logger.info(f"Ожидание {self.config.sync_interval} секунд до следующей синхронизации...")
|
|||
|
|
time.sleep(self.config.sync_interval)
|
|||
|
|
except KeyboardInterrupt:
|
|||
|
|
logger.info("Получен сигнал остановки, завершение работы...")
|
|||
|
|
finally:
|
|||
|
|
self.cleanup()
|
|||
|
|
|
|||
|
|
def cleanup(self):
|
|||
|
|
"""Очистка ресурсов"""
|
|||
|
|
logger.info("Очистка ресурсов...")
|
|||
|
|
self.nc_client.close()
|
|||
|
|
self.webui_client.close()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def main():
|
|||
|
|
"""Точка входа"""
|
|||
|
|
import argparse
|
|||
|
|
|
|||
|
|
parser = argparse.ArgumentParser(description="Воркер синхронизации Nextcloud -> Qdrant")
|
|||
|
|
parser.add_argument(
|
|||
|
|
"--once",
|
|||
|
|
action="store_true",
|
|||
|
|
help="Запустить синхронизацию один раз и выйти"
|
|||
|
|
)
|
|||
|
|
parser.add_argument(
|
|||
|
|
"--daemon",
|
|||
|
|
action="store_true",
|
|||
|
|
help="Запустить в режиме daemon (бесконечный цикл)"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
args = parser.parse_args()
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
config = load_config()
|
|||
|
|
worker = NextcloudSyncWorker(config)
|
|||
|
|
|
|||
|
|
if args.once:
|
|||
|
|
worker.run_once()
|
|||
|
|
elif args.daemon:
|
|||
|
|
worker.run_daemon()
|
|||
|
|
else:
|
|||
|
|
# По умолчанию запускаем один раз
|
|||
|
|
worker.run_once()
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"Критическая ошибка: {e}", exc_info=True)
|
|||
|
|
sys.exit(1)
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
main()
|