import 'dart:async'; import 'dart:convert'; import 'package:flutter/foundation.dart'; import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'package:uuid/uuid.dart'; import '../../../core/providers/app_providers.dart'; import 'outbound_task.dart'; import 'task_worker.dart'; final taskQueueProvider = StateNotifierProvider>((ref) { return TaskQueueNotifier(ref); }); class TaskQueueNotifier extends StateNotifier> { TaskQueueNotifier(this._ref) : super(const []) { _load(); } static const _prefsKey = 'outbound_task_queue_v1'; final Ref _ref; final _uuid = const Uuid(); bool _processing = false; final Set _activeThreads = {}; final int _maxParallel = 2; // bounded parallelism across conversations Future _load() async { try { final prefs = _ref.read(sharedPreferencesProvider); final jsonStr = prefs.getString(_prefsKey); if (jsonStr == null || jsonStr.isEmpty) return; final raw = (jsonDecode(jsonStr) as List).cast>(); final tasks = raw.map(OutboundTask.fromJson).toList(); // Only restore non-completed tasks state = tasks .where((t) => t.status == TaskStatus.queued || t.status == TaskStatus.running) .map((t) => t.copyWith( status: TaskStatus.queued, startedAt: null, completedAt: null, )) .toList(); // Kick processing after load _process(); } catch (e) { debugPrint('DEBUG: Failed to load task queue: $e'); } } Future _save() async { try { final prefs = _ref.read(sharedPreferencesProvider); final raw = state.map((t) => t.toJson()).toList(growable: false); await prefs.setString(_prefsKey, jsonEncode(raw)); } catch (e) { debugPrint('DEBUG: Failed to persist task queue: $e'); } } Future enqueueSendText({ required String? conversationId, required String text, List? attachments, List? toolIds, String? idempotencyKey, }) async { final id = _uuid.v4(); final task = OutboundTask.sendTextMessage( id: id, conversationId: conversationId, text: text, attachments: attachments ?? const [], toolIds: toolIds ?? const [], idempotencyKey: idempotencyKey, enqueuedAt: DateTime.now(), ); state = [...state, task]; await _save(); _process(); return id; } Future cancel(String id) async { state = [ for (final t in state) if (t.id == id) t.copyWith( status: TaskStatus.cancelled, completedAt: DateTime.now(), ) else t, ]; await _save(); } Future cancelByConversation(String conversationId) async { state = [ for (final t in state) if ((t.maybeConversationId ?? '') == conversationId && (t.status == TaskStatus.queued || t.status == TaskStatus.running)) t.copyWith( status: TaskStatus.cancelled, completedAt: DateTime.now(), ) else t, ]; await _save(); } Future retry(String id) async { state = [ for (final t in state) if (t.id == id) t.copyWith( status: TaskStatus.queued, attempt: (t.attempt + 1), error: null, startedAt: null, completedAt: null, ) else t, ]; await _save(); _process(); } Future enqueueGenerateImage({ required String? conversationId, required String prompt, String? idempotencyKey, }) async { final id = _uuid.v4(); final task = OutboundTask.generateImage( id: id, conversationId: conversationId, prompt: prompt, idempotencyKey: idempotencyKey, enqueuedAt: DateTime.now(), ); state = [...state, task]; await _save(); _process(); return id; } Future enqueueExecuteToolCall({ required String? conversationId, required String toolName, Map arguments = const {}, String? idempotencyKey, }) async { final id = _uuid.v4(); final task = OutboundTask.executeToolCall( id: id, conversationId: conversationId, toolName: toolName, arguments: arguments, idempotencyKey: idempotencyKey, enqueuedAt: DateTime.now(), ); state = [...state, task]; await _save(); _process(); return id; } Future _process() async { if (_processing) return; _processing = true; try { // Pump while there is capacity and queued tasks remain while (true) { // Filter runnable tasks final queued = state.where((t) => t.status == TaskStatus.queued).toList(); if (queued.isEmpty) break; // Respect parallelism and one-per-thread final availableCapacity = _maxParallel - _activeThreads.length; if (availableCapacity <= 0) break; OutboundTask? next; for (final t in queued) { final thread = t.threadKey; if (!_activeThreads.contains(thread)) { next = t; break; } } // If no eligible task (all threads busy), exit loop if (next == null) break; // Mark running and launch without awaiting (parallel across threads) final threadKey = next.threadKey; _activeThreads.add(threadKey); state = [ for (final t in state) if (t.id == next.id) next.copyWith(status: TaskStatus.running, startedAt: DateTime.now()) else t, ]; await _save(); // Launch worker unawaited(_run(next).whenComplete(() { _activeThreads.remove(threadKey); // After a task completes, try to schedule more _process(); })); } } finally { _processing = false; } } Future _run(OutboundTask task) async { try { await TaskWorker(_ref).perform(task); state = [ for (final t in state) if (t.id == task.id) t.copyWith(status: TaskStatus.succeeded, completedAt: DateTime.now()) else t, ]; } catch (e, st) { debugPrint('Task failed (${task.runtimeType}): $e\n$st'); state = [ for (final t in state) if (t.id == task.id) t.copyWith( status: TaskStatus.failed, error: e.toString(), completedAt: DateTime.now(), ) else t, ]; } finally { await _save(); } } Future enqueueUploadMedia({ required String? conversationId, required String filePath, required String fileName, int? fileSize, String? mimeType, String? checksum, }) async { final id = _uuid.v4(); final task = OutboundTask.uploadMedia( id: id, conversationId: conversationId, filePath: filePath, fileName: fileName, fileSize: fileSize, mimeType: mimeType, checksum: checksum, enqueuedAt: DateTime.now(), ); state = [...state, task]; await _save(); _process(); return id; } Future enqueueSaveConversation({ required String? conversationId, String? idempotencyKey, }) async { final id = _uuid.v4(); final task = OutboundTask.saveConversation( id: id, conversationId: conversationId, idempotencyKey: idempotencyKey, enqueuedAt: DateTime.now(), ); state = [...state, task]; await _save(); _process(); return id; } Future enqueueGenerateTitle({ required String conversationId, String? idempotencyKey, }) async { final id = _uuid.v4(); final task = OutboundTask.generateTitle( id: id, conversationId: conversationId, idempotencyKey: idempotencyKey, enqueuedAt: DateTime.now(), ); state = [...state, task]; await _save(); _process(); return id; } Future enqueueImageToDataUrl({ required String? conversationId, required String filePath, required String fileName, String? idempotencyKey, }) async { final id = _uuid.v4(); final task = OutboundTask.imageToDataUrl( id: id, conversationId: conversationId, filePath: filePath, fileName: fileName, idempotencyKey: idempotencyKey, enqueuedAt: DateTime.now(), ); state = [...state, task]; await _save(); _process(); return id; } }