Files
iiEsaywebUI/worker/nextcloud_client.py

250 lines
9.6 KiB
Python
Raw Normal View History

"""
WebDAV клиент для работы с Nextcloud
Сканирование директорий, загрузка файлов, получение метаданных
"""
import os
import logging
from datetime import datetime
from typing import List, Dict, Optional, Tuple
from pathlib import Path
import requests
from requests.auth import HTTPBasicAuth
from requests.exceptions import RequestException, Timeout
logger = logging.getLogger(__name__)
class NextcloudClient:
"""Клиент для работы с Nextcloud через WebDAV API"""
def __init__(self, url: str, username: str, password: str):
"""
Инициализация WebDAV клиента
Args:
url: URL Nextcloud (например, https://next.iieasy.ru)
username: Имя пользователя Nextcloud
password: App Password (не основной пароль!)
"""
self.base_url = url.rstrip("/")
self.webdav_url = f"{self.base_url}/remote.php/dav/files/{username}"
self.auth = HTTPBasicAuth(username, password)
self.session = requests.Session()
self.session.auth = self.auth
self.session.headers.update({
"User-Agent": "iiEasy-Nextcloud-Sync/1.0"
})
def list_directory(self, path: str, depth: int = 1) -> List[Dict]:
"""
Получение списка файлов и директорий через PROPFIND
Args:
path: Путь к директории (относительно WebDAV root)
depth: Глубина рекурсии (0 - только указанный ресурс, 1 - + дочерние)
Returns:
Список словарей с информацией о файлах/директориях
Raises:
RequestException: При ошибках сетевого запроса
"""
url = f"{self.webdav_url}/{path.lstrip('/')}"
try:
# PROPFIND запрос для получения списка ресурсов
headers = {
"Depth": str(depth),
"Content-Type": "application/xml"
}
body = """<?xml version="1.0"?>
<d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns" xmlns:nc="http://nextcloud.org/ns">
<d:prop>
<d:getlastmodified/>
<d:getcontentlength/>
<d:getcontenttype/>
<oc:fileid/>
<nc:has-preview/>
</d:prop>
</d:propfind>"""
response = self.session.request(
"PROPFIND",
url,
headers=headers,
data=body,
timeout=30
)
response.raise_for_status()
# Парсинг XML ответа (упрощенная версия)
# В production лучше использовать xml.etree.ElementTree
files = []
# Здесь должен быть парсинг XML, но для простоты используем альтернативный метод
return self._parse_propfind_response(response.text)
except Timeout:
logger.error(f"Таймаут при получении списка файлов: {path}")
raise
except RequestException as e:
logger.error(f"Ошибка при получении списка файлов {path}: {e}")
raise
def _parse_propfind_response(self, xml_content: str) -> List[Dict]:
"""
Упрощенный парсинг PROPFIND ответа
В production лучше использовать xml.etree.ElementTree или lxml
"""
import re
files = []
# Простой regex парсинг (для production использовать XML парсер)
# Ищем href и getlastmodified
href_pattern = r'<d:href>(.*?)</d:href>'
modified_pattern = r'<d:getlastmodified>(.*?)</d:getlastmodified>'
size_pattern = r'<d:getcontentlength>(.*?)</d:getcontentlength>'
type_pattern = r'<d:getcontenttype>(.*?)</d:getcontenttype>'
hrefs = re.findall(href_pattern, xml_content)
modifieds = re.findall(modified_pattern, xml_content)
sizes = re.findall(size_pattern, xml_content)
types = re.findall(type_pattern, xml_content)
for i, href in enumerate(hrefs):
# Убираем префикс /remote.php/dav/files/username
clean_href = href.replace(f"/remote.php/dav/files/{self.auth.username}", "")
if clean_href == "":
continue
files.append({
"path": clean_href,
"modified": modifieds[i] if i < len(modifieds) else None,
"size": int(sizes[i]) if i < len(sizes) and sizes[i] else 0,
"type": types[i] if i < len(types) else "application/octet-stream",
"is_directory": i < len(types) and types[i] == "httpd/unix-directory"
})
return files
def download_file(self, path: str) -> bytes:
"""
Загрузка файла из Nextcloud
Args:
path: Путь к файлу (относительно WebDAV root)
Returns:
Содержимое файла в виде bytes
Raises:
RequestException: При ошибках загрузки
"""
url = f"{self.webdav_url}/{path.lstrip('/')}"
try:
response = self.session.get(url, timeout=300, stream=True)
response.raise_for_status()
return response.content
except Timeout:
logger.error(f"Таймаут при загрузке файла: {path}")
raise
except RequestException as e:
logger.error(f"Ошибка при загрузке файла {path}: {e}")
raise
def get_file_metadata(self, path: str) -> Dict:
"""
Получение метаданных файла (размер, дата изменения)
Args:
path: Путь к файлу
Returns:
Словарь с метаданными
"""
try:
url = f"{self.webdav_url}/{path.lstrip('/')}"
response = self.session.head(url, timeout=30)
response.raise_for_status()
return {
"size": int(response.headers.get("Content-Length", 0)),
"modified": response.headers.get("Last-Modified"),
"etag": response.headers.get("ETag", "").strip('"')
}
except RequestException as e:
logger.warning(f"Не удалось получить метаданные для {path}: {e}")
return {
"size": 0,
"modified": None,
"etag": ""
}
def scan_directory_recursive(self, base_path: str, max_depth: int = 10) -> List[Dict]:
"""
Рекурсивное сканирование директории
Args:
base_path: Базовый путь для сканирования
max_depth: Максимальная глубина рекурсии
Returns:
Список всех файлов с их метаданными
"""
all_files = []
def scan_recursive(current_path: str, depth: int = 0):
if depth > max_depth:
return
try:
items = self.list_directory(current_path, depth=1)
for item in items:
item_path = item["path"]
# Пропускаем сам каталог
if item_path == current_path:
continue
if item.get("is_directory"):
# Рекурсивный обход поддиректорий
scan_recursive(item_path, depth + 1)
else:
# Добавляем файл в список
all_files.append({
"path": item_path,
"size": item.get("size", 0),
"modified": item.get("modified"),
"type": item.get("type", "application/octet-stream")
})
except Exception as e:
logger.error(f"Ошибка при сканировании {current_path}: {e}")
scan_recursive(base_path)
return all_files
def extract_username_from_path(self, path: str) -> Optional[str]:
"""
Извлечение имени пользователя из пути Nextcloud
Например: /home/username/Documents -> username
Args:
path: Путь к файлу/директории
Returns:
Имя пользователя или None
"""
# Формат путей: /home/{username}/...
parts = path.strip("/").split("/")
if len(parts) >= 2 and parts[0] == "home":
return parts[1]
return None
def close(self):
"""Закрытие сессии"""
self.session.close()