Files

133 lines
4.6 KiB
Python
Raw Permalink Normal View History

"""
Конфигурация для воркера синхронизации Nextcloud -> Qdrant
Типизация и валидация настроек
"""
import os
from pathlib import Path
from dataclasses import dataclass
from typing import List, Optional
from dotenv import load_dotenv
# Загрузка переменных окружения: сначала worker/.env, затем корневой .env
load_dotenv()
root_env = Path(__file__).resolve().parent.parent / ".env"
load_dotenv(root_env)
@dataclass
class NextcloudConfig:
"""Конфигурация подключения к Nextcloud"""
url: str
username: str
password: str
scan_paths: List[str] # Пути для сканирования
@dataclass
class OpenWebUIConfig:
"""Конфигурация подключения к Open WebUI API"""
api_url: str
api_key: str
timeout: int = 300 # Таймаут для больших файлов (секунды)
@dataclass
class AuthentikConfig:
"""Конфигурация для маппинга пользователей Authentik"""
api_url: Optional[str] = None
api_token: Optional[str] = None
@dataclass
class WorkerConfig:
"""Основная конфигурация воркера"""
nextcloud: NextcloudConfig
openwebui: OpenWebUIConfig
authentik: AuthentikConfig
sync_interval: int = 300 # Интервал синхронизации (секунды)
max_file_size: int = 100 * 1024 * 1024 # Максимальный размер файла (100MB)
db_path: str = "sync_state.db" # Путь к SQLite БД для отслеживания состояния
log_level: str = "INFO"
def load_config() -> WorkerConfig:
"""
Загрузка и валидация конфигурации из переменных окружения
Returns:
WorkerConfig: Валидированная конфигурация
Raises:
ValueError: Если обязательные переменные не установлены
"""
# Nextcloud конфигурация
nc_url = os.getenv("DOMAIN_NEXTCLOUD", "").rstrip("/")
if not nc_url:
raise ValueError("DOMAIN_NEXTCLOUD не установлен в .env")
nc_user = os.getenv("NC_USER")
if not nc_user:
raise ValueError("NC_USER не установлен в .env")
nc_password = os.getenv("NC_APP_PASSWORD")
if not nc_password:
raise ValueError("NC_APP_PASSWORD не установлен в .env")
# Пути для сканирования (можно настроить через переменные окружения)
scan_paths = os.getenv(
"NC_SCAN_PATHS",
"/home/{username}/Documents,/home/{username}/Files,/Shared/Documents"
).split(",")
nextcloud_config = NextcloudConfig(
url=nc_url,
username=nc_user,
password=nc_password,
scan_paths=[path.strip() for path in scan_paths]
)
# Open WebUI конфигурация
webui_url = os.getenv("DOMAIN_OPENWEBUI", "").rstrip("/")
if not webui_url:
raise ValueError("DOMAIN_OPENWEBUI не установлен в .env")
webui_api_key = os.getenv("OPENWEBUI_API_KEY")
if not webui_api_key or webui_api_key == "твой_api_ключ_от_openwebui":
raise ValueError(
"OPENWEBUI_API_KEY не установлен или имеет значение по умолчанию. "
"Создайте API ключ в Open WebUI -> Settings -> Account -> API Keys"
)
timeout = int(os.getenv("OPENWEBUI_TIMEOUT", "300"))
openwebui_config = OpenWebUIConfig(
api_url=f"{webui_url}/api/v1",
api_key=webui_api_key,
timeout=timeout
)
# Authentik конфигурация (опционально)
authentik_url = os.getenv("DOMAIN_AUTHENTIK", "").rstrip("/")
authentik_token = os.getenv("AUTHENTIK_API_TOKEN")
authentik_config = AuthentikConfig(
api_url=authentik_url if authentik_url else None,
api_token=authentik_token
)
# Общие настройки воркера
sync_interval = int(os.getenv("SYNC_INTERVAL", "300"))
max_file_size = int(os.getenv("MAX_FILE_SIZE", str(100 * 1024 * 1024)))
db_path = os.getenv("DB_PATH", "sync_state.db")
log_level = os.getenv("LOG_LEVEL", "INFO")
return WorkerConfig(
nextcloud=nextcloud_config,
openwebui=openwebui_config,
authentik=authentik_config,
sync_interval=sync_interval,
max_file_size=max_file_size,
db_path=db_path,
log_level=log_level
)