Files
iiEsaywebUI/worker/nextcloud_sync.py
2026-02-19 18:12:09 +00:00

449 lines
17 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Воркер синхронизации Nextcloud -> Qdrant через Open WebUI API
Background процесс для автоматической синхронизации документов
"""
import sys
import time
import logging
import sqlite3
import hashlib
from pathlib import Path
from typing import Dict, Optional, List
from datetime import datetime
# Импорт локальных модулей
from config import load_config, WorkerConfig
from nextcloud_client import NextcloudClient
from openwebui_client import OpenWebUIClient
from document_processor import DocumentProcessor
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('sync.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
class SyncStateDB:
"""Управление состоянием синхронизации через SQLite"""
def __init__(self, db_path: str):
"""
Инициализация БД состояния
Args:
db_path: Путь к файлу БД
"""
self.db_path = db_path
self._init_db()
def _init_db(self):
"""Инициализация схемы БД"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS synced_files (
file_path TEXT PRIMARY KEY,
file_hash TEXT NOT NULL,
file_size INTEGER NOT NULL,
last_modified TEXT,
synced_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
user_id TEXT,
document_id TEXT
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_file_hash ON synced_files(file_hash)
""")
conn.commit()
conn.close()
def get_file_hash(self, file_path: str, file_content: bytes) -> str:
"""Вычисление хеша файла"""
return hashlib.sha256(file_content).hexdigest()
def is_file_synced(self, file_path: str, file_hash: str) -> bool:
"""
Проверка, синхронизирован ли файл
Args:
file_path: Путь к файлу
file_hash: Хеш содержимого файла
Returns:
True если файл уже синхронизирован с таким хешем
"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT file_hash FROM synced_files
WHERE file_path = ? AND file_hash = ?
""", (file_path, file_hash))
result = cursor.fetchone()
conn.close()
return result is not None
def mark_file_synced(
self,
file_path: str,
file_hash: str,
file_size: int,
last_modified: Optional[str],
user_id: Optional[str],
document_id: Optional[str]
):
"""Отметка файла как синхронизированного"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO synced_files
(file_path, file_hash, file_size, last_modified, synced_at, user_id, document_id)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
file_path,
file_hash,
file_size,
last_modified,
datetime.now().isoformat(),
user_id,
document_id
))
conn.commit()
conn.close()
def get_sync_stats(self) -> Dict:
"""Получение статистики синхронизации"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM synced_files")
total_files = cursor.fetchone()[0]
cursor.execute("SELECT SUM(file_size) FROM synced_files")
total_size = cursor.fetchone()[0] or 0
conn.close()
return {
"total_files": total_files,
"total_size": total_size
}
class UserMapper:
"""Маппинг пользователей Nextcloud -> Authentik"""
def __init__(self, config: WorkerConfig):
"""
Инициализация маппера пользователей
Args:
config: Конфигурация воркера
"""
self.config = config
# Кеш маппинга (в production можно использовать БД или Authentik API)
self._cache: Dict[str, str] = {}
def map_nextcloud_user_to_authentik(self, nc_username: str) -> Optional[str]:
"""
Маппинг имени пользователя Nextcloud в user_id Authentik
Args:
nc_username: Имя пользователя в Nextcloud
Returns:
user_id Authentik или None если маппинг не найден
"""
# Проверка кеша
if nc_username in self._cache:
return self._cache[nc_username]
# В production здесь должен быть запрос к Authentik API
# или использование общей БД пользователей
# Пока используем простое предположение: username совпадает
# или можно настроить через переменные окружения
# Попытка получить из переменных окружения (формат: NC_USER_username=AUTHENTIK_USER_ID)
env_key = f"NC_USER_{nc_username}"
authentik_user_id = None
import os
if env_key in os.environ:
authentik_user_id = os.environ[env_key]
else:
# По умолчанию предполагаем, что username совпадает
# В production это должно быть через Authentik API
authentik_user_id = nc_username
self._cache[nc_username] = authentik_user_id
return authentik_user_id
def get_user_groups(self, nc_username: str) -> List[str]:
"""
Получение списка групп доступа для пользователя
Args:
nc_username: Имя пользователя Nextcloud
Returns:
Список групп доступа
"""
# В production получать из Authentik API или Nextcloud групп
# Пока возвращаем пустой список (только личные файлы)
return []
class NextcloudSyncWorker:
"""Основной класс воркера синхронизации"""
def __init__(self, config: WorkerConfig):
"""
Инициализация воркера
Args:
config: Конфигурация воркера
"""
self.config = config
self.nc_client = NextcloudClient(
config.nextcloud.url,
config.nextcloud.username,
config.nextcloud.password
)
self.webui_client = OpenWebUIClient(
config.openwebui.api_url,
config.openwebui.api_key,
config.openwebui.timeout
)
self.processor = DocumentProcessor(config.max_file_size)
self.state_db = SyncStateDB(config.db_path)
self.user_mapper = UserMapper(config)
def sync_path(self, path_template: str) -> int:
"""
Синхронизация указанного пути
Args:
path_template: Шаблон пути (может содержать {username})
Returns:
Количество синхронизированных файлов
"""
synced_count = 0
# Замена {username} в шаблоне пути
if "{username}" in path_template:
# Сканируем все домашние директории
# В production можно получить список пользователей из Nextcloud API
username = self.config.nextcloud.username
path = path_template.replace("{username}", username)
else:
path = path_template
try:
logger.info(f"Сканирование пути: {path}")
files = self.nc_client.scan_directory_recursive(path)
logger.info(f"Найдено файлов: {len(files)}")
for file_info in files:
try:
if self._sync_file(file_info, path_template):
synced_count += 1
except Exception as e:
logger.error(f"Ошибка при синхронизации файла {file_info['path']}: {e}")
continue
except Exception as e:
logger.error(f"Ошибка при сканировании пути {path}: {e}")
return synced_count
def _sync_file(self, file_info: Dict, path_template: str) -> bool:
"""
Синхронизация одного файла
Args:
file_info: Информация о файле
path_template: Шаблон пути (для определения владельца)
Returns:
True если файл был синхронизирован
"""
file_path = file_info["path"]
file_size = file_info.get("size", 0)
# Пропускаем слишком большие файлы
if file_size > self.config.max_file_size * 2: # Двойной лимит для безопасности
logger.warning(f"Файл {file_path} слишком большой ({file_size} bytes), пропуск")
return False
# Проверка поддержки формата
if not self.processor.is_supported_format(file_path, file_info.get("type")):
logger.debug(f"Неподдерживаемый формат: {file_path}")
return False
try:
# Загрузка файла
file_content = self.nc_client.download_file(file_path)
file_hash = self.state_db.get_file_hash(file_path, file_content)
# Проверка, не синхронизирован ли уже файл
if self.state_db.is_file_synced(file_path, file_hash):
logger.debug(f"Файл уже синхронизирован: {file_path}")
return False
# Обработка файла
text_content, is_large = self.processor.process_file(
file_content,
Path(file_path).name,
file_info.get("type")
)
# Определение владельца файла
username = self.nc_client.extract_username_from_path(file_path)
if not username:
# Для общих папок используем None или специальную группу
username = None
user_id = self.user_mapper.map_nextcloud_user_to_authentik(username) if username else None
access_groups = self.user_mapper.get_user_groups(username) if username else []
# Метаданные для API
metadata = {
"source": "nextcloud",
"path": file_path,
"original_size": file_size,
"is_large_file": is_large
}
# Загрузка в Open WebUI
if is_large or len(text_content.encode('utf-8')) > self.config.max_file_size:
# Для больших файлов загружаем как текст
result = self.webui_client.upload_document_with_text(
text_content=text_content,
filename=Path(file_path).name,
user_id=user_id,
access_groups=access_groups,
metadata=metadata
)
else:
# Для обычных файлов загружаем оригинал
result = self.webui_client.upload_document(
file_content=file_content,
filename=Path(file_path).name,
user_id=user_id,
access_groups=access_groups,
metadata=metadata
)
# Сохранение состояния
document_id = result.get("id") if isinstance(result, dict) else None
self.state_db.mark_file_synced(
file_path=file_path,
file_hash=file_hash,
file_size=file_size,
last_modified=file_info.get("modified"),
user_id=user_id,
document_id=document_id
)
logger.info(f"Файл синхронизирован: {file_path} -> документ ID: {document_id}")
return True
except Exception as e:
logger.error(f"Ошибка при синхронизации файла {file_path}: {e}")
return False
def run_once(self):
"""Однократный запуск синхронизации"""
logger.info("=== Начало синхронизации ===")
total_synced = 0
for path_template in self.config.nextcloud.scan_paths:
try:
synced = self.sync_path(path_template)
total_synced += synced
logger.info(f"Синхронизировано файлов из {path_template}: {synced}")
except Exception as e:
logger.error(f"Ошибка при синхронизации пути {path_template}: {e}")
stats = self.state_db.get_sync_stats()
logger.info(f"=== Синхронизация завершена ===")
logger.info(f"Всего синхронизировано в этом цикле: {total_synced}")
logger.info(f"Всего файлов в БД: {stats['total_files']}")
logger.info(f"Общий размер: {stats['total_size'] / 1024 / 1024:.2f} MB")
def run_daemon(self):
"""Запуск воркера в режиме daemon (бесконечный цикл)"""
logger.info("Запуск воркера в режиме daemon")
logger.info(f"Интервал синхронизации: {self.config.sync_interval} секунд")
try:
while True:
self.run_once()
logger.info(f"Ожидание {self.config.sync_interval} секунд до следующей синхронизации...")
time.sleep(self.config.sync_interval)
except KeyboardInterrupt:
logger.info("Получен сигнал остановки, завершение работы...")
finally:
self.cleanup()
def cleanup(self):
"""Очистка ресурсов"""
logger.info("Очистка ресурсов...")
self.nc_client.close()
self.webui_client.close()
def main():
"""Точка входа"""
import argparse
parser = argparse.ArgumentParser(description="Воркер синхронизации Nextcloud -> Qdrant")
parser.add_argument(
"--once",
action="store_true",
help="Запустить синхронизацию один раз и выйти"
)
parser.add_argument(
"--daemon",
action="store_true",
help="Запустить в режиме daemon (бесконечный цикл)"
)
args = parser.parse_args()
try:
config = load_config()
worker = NextcloudSyncWorker(config)
if args.once:
worker.run_once()
elif args.daemon:
worker.run_daemon()
else:
# По умолчанию запускаем один раз
worker.run_once()
except Exception as e:
logger.error(f"Критическая ошибка: {e}", exc_info=True)
sys.exit(1)
if __name__ == "__main__":
main()