Files
iiEsaywebUI/worker/openwebui_client.py
2026-02-19 18:12:09 +00:00

180 lines
6.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
HTTP клиент для загрузки документов в Open WebUI API
Поддержка метаданных user_id и access_groups для изоляции доступа
"""
import logging
from typing import Dict, Optional, List
import requests
from requests.exceptions import RequestException, Timeout
logger = logging.getLogger(__name__)
class OpenWebUIClient:
"""Клиент для работы с Open WebUI API"""
def __init__(self, api_url: str, api_key: str, timeout: int = 300):
"""
Инициализация клиента Open WebUI API
Args:
api_url: Базовый URL API (например, https://odo.iieasy.ru/api/v1)
api_key: API ключ из Open WebUI Settings -> Account -> API Keys
timeout: Таймаут для запросов (секунды)
"""
self.api_url = api_url.rstrip("/")
self.api_key = api_key
self.timeout = timeout
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"User-Agent": "iiEasy-Nextcloud-Sync/1.0"
})
def upload_document(
self,
file_content: bytes,
filename: str,
user_id: Optional[str] = None,
access_groups: Optional[List[str]] = None,
metadata: Optional[Dict] = None
) -> Dict:
"""
Загрузка документа в Open WebUI через API
Args:
file_content: Содержимое файла в виде bytes
filename: Имя файла
user_id: ID пользователя Authentik для изоляции доступа
access_groups: Список групп доступа (для совместного доступа)
metadata: Дополнительные метаданные (source, path и т.д.)
Returns:
Ответ API в виде словаря
Raises:
RequestException: При ошибках загрузки
"""
url = f"{self.api_url}/documents"
# Подготовка данных для multipart/form-data
files = {
"file": (filename, file_content)
}
# Метаданные передаются через form-data или JSON
data = {}
if user_id:
data["user_id"] = user_id
if access_groups:
# Если API поддерживает список групп, передаем как JSON строку или отдельные поля
data["access_groups"] = ",".join(access_groups) if isinstance(access_groups, list) else access_groups
if metadata:
# Добавляем дополнительные метаданные
for key, value in metadata.items():
if key not in data:
data[key] = str(value)
try:
logger.info(f"Загрузка документа {filename} (размер: {len(file_content)} bytes)")
response = self.session.post(
url,
files=files,
data=data,
timeout=self.timeout
)
response.raise_for_status()
result = response.json() if response.content else {}
logger.info(f"Документ {filename} успешно загружен. ID: {result.get('id', 'unknown')}")
return result
except Timeout:
logger.error(f"Таймаут при загрузке документа {filename}")
raise
except RequestException as e:
logger.error(f"Ошибка при загрузке документа {filename}: {e}")
if hasattr(e.response, 'text'):
logger.error(f"Ответ сервера: {e.response.text}")
raise
def upload_document_with_text(
self,
text_content: str,
filename: str,
user_id: Optional[str] = None,
access_groups: Optional[List[str]] = None,
metadata: Optional[Dict] = None
) -> Dict:
"""
Загрузка текстового документа (например, извлеченного из PDF)
Args:
text_content: Текст документа
filename: Имя файла (будет изменено на .txt если нужно)
user_id: ID пользователя Authentik
access_groups: Список групп доступа
metadata: Дополнительные метаданные
Returns:
Ответ API
"""
# Убеждаемся, что файл имеет расширение .txt
if not filename.endswith('.txt'):
filename = filename.rsplit('.', 1)[0] + '.txt'
file_content = text_content.encode('utf-8')
return self.upload_document(
file_content=file_content,
filename=filename,
user_id=user_id,
access_groups=access_groups,
metadata=metadata
)
def check_document_exists(self, document_id: str) -> bool:
"""
Проверка существования документа по ID
Args:
document_id: ID документа
Returns:
True если документ существует
"""
try:
url = f"{self.api_url}/documents/{document_id}"
response = self.session.get(url, timeout=30)
return response.status_code == 200
except RequestException:
return False
def delete_document(self, document_id: str) -> bool:
"""
Удаление документа по ID
Args:
document_id: ID документа
Returns:
True если удаление успешно
"""
try:
url = f"{self.api_url}/documents/{document_id}"
response = self.session.delete(url, timeout=30)
response.raise_for_status()
return True
except RequestException as e:
logger.error(f"Ошибка при удалении документа {document_id}: {e}")
return False
def close(self):
"""Закрытие сессии"""
self.session.close()