import 'dart:async'; import 'dart:convert'; import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'package:uuid/uuid.dart'; import '../../../core/providers/app_providers.dart'; import 'outbound_task.dart'; import 'task_worker.dart'; import '../../../core/utils/debug_logger.dart'; void debugPrint(String? message, {int? wrapWidth}) { if (message == null) return; DebugLogger.fromLegacy(message, scope: 'tasks/queue'); } final taskQueueProvider = NotifierProvider>( TaskQueueNotifier.new, ); class TaskQueueNotifier extends Notifier> { static const _prefsKey = 'outbound_task_queue_v1'; final _uuid = const Uuid(); bool _bootstrapScheduled = false; @override List build() { if (!_bootstrapScheduled) { _bootstrapScheduled = true; Future.microtask(_load); } return const []; } bool _processing = false; final Set _activeThreads = {}; final int _maxParallel = 2; // bounded parallelism across conversations Future _load() async { try { final prefs = ref.read(sharedPreferencesProvider); final jsonStr = prefs.getString(_prefsKey); if (jsonStr == null || jsonStr.isEmpty) return; final raw = (jsonDecode(jsonStr) as List).cast>(); final tasks = raw.map(OutboundTask.fromJson).toList(); // Only restore non-completed tasks state = tasks .where( (t) => t.status == TaskStatus.queued || t.status == TaskStatus.running, ) .map( (t) => t.copyWith( status: TaskStatus.queued, startedAt: null, completedAt: null, ), ) .toList(); // Kick processing after load _process(); } catch (e) { debugPrint('DEBUG: Failed to load task queue: $e'); } } Future _save() async { try { final prefs = ref.read(sharedPreferencesProvider); final retained = [ for (final task in state) if (task.status == TaskStatus.queued || task.status == TaskStatus.running) task, ]; if (retained.length != state.length) { // Remove completed entries from state to keep the in-memory queue lean. state = retained; } final raw = retained.map((t) => t.toJson()).toList(growable: false); await prefs.setString(_prefsKey, jsonEncode(raw)); } catch (e) { debugPrint('DEBUG: Failed to persist task queue: $e'); } } Future enqueueSendText({ required String? conversationId, required String text, List? attachments, List? toolIds, String? idempotencyKey, }) async { final id = _uuid.v4(); final task = OutboundTask.sendTextMessage( id: id, conversationId: conversationId, text: text, attachments: attachments ?? const [], toolIds: toolIds ?? const [], idempotencyKey: idempotencyKey, enqueuedAt: DateTime.now(), ); state = [...state, task]; await _save(); _process(); return id; } Future cancel(String id) async { state = [ for (final t in state) if (t.id == id) t.copyWith(status: TaskStatus.cancelled, completedAt: DateTime.now()) else t, ]; await _save(); } Future cancelByConversation(String conversationId) async { state = [ for (final t in state) if ((t.maybeConversationId ?? '') == conversationId && (t.status == TaskStatus.queued || t.status == TaskStatus.running)) t.copyWith(status: TaskStatus.cancelled, completedAt: DateTime.now()) else t, ]; await _save(); } Future retry(String id) async { state = [ for (final t in state) if (t.id == id) t.copyWith( status: TaskStatus.queued, attempt: (t.attempt + 1), error: null, startedAt: null, completedAt: null, ) else t, ]; await _save(); _process(); } Future enqueueGenerateImage({ required String? conversationId, required String prompt, String? idempotencyKey, }) async { final id = _uuid.v4(); final task = OutboundTask.generateImage( id: id, conversationId: conversationId, prompt: prompt, idempotencyKey: idempotencyKey, enqueuedAt: DateTime.now(), ); state = [...state, task]; await _save(); _process(); return id; } Future enqueueExecuteToolCall({ required String? conversationId, required String toolName, Map arguments = const {}, String? idempotencyKey, }) async { final id = _uuid.v4(); final task = OutboundTask.executeToolCall( id: id, conversationId: conversationId, toolName: toolName, arguments: arguments, idempotencyKey: idempotencyKey, enqueuedAt: DateTime.now(), ); state = [...state, task]; await _save(); _process(); return id; } Future _process() async { if (_processing) return; _processing = true; try { // Pump while there is capacity and queued tasks remain while (true) { // Filter runnable tasks final queued = state .where((t) => t.status == TaskStatus.queued) .toList(); if (queued.isEmpty) break; // Respect parallelism and one-per-thread final availableCapacity = _maxParallel - _activeThreads.length; if (availableCapacity <= 0) break; OutboundTask? next; for (final t in queued) { final thread = t.threadKey; if (!_activeThreads.contains(thread)) { next = t; break; } } // If no eligible task (all threads busy), exit loop if (next == null) break; // Mark running and launch without awaiting (parallel across threads) final threadKey = next.threadKey; _activeThreads.add(threadKey); state = [ for (final t in state) if (t.id == next.id) next.copyWith( status: TaskStatus.running, startedAt: DateTime.now(), ) else t, ]; await _save(); // Launch worker unawaited( _run(next).whenComplete(() { _activeThreads.remove(threadKey); // After a task completes, try to schedule more _process(); }), ); } } finally { _processing = false; } } Future _run(OutboundTask task) async { try { await TaskWorker(ref).perform(task); state = [ for (final t in state) if (t.id == task.id) t.copyWith( status: TaskStatus.succeeded, completedAt: DateTime.now(), ) else t, ]; } catch (e, st) { debugPrint('Task failed (${task.runtimeType}): $e\n$st'); state = [ for (final t in state) if (t.id == task.id) t.copyWith( status: TaskStatus.failed, error: e.toString(), completedAt: DateTime.now(), ) else t, ]; } finally { await _save(); } } Future enqueueUploadMedia({ required String? conversationId, required String filePath, required String fileName, int? fileSize, String? mimeType, String? checksum, }) async { final id = _uuid.v4(); final task = OutboundTask.uploadMedia( id: id, conversationId: conversationId, filePath: filePath, fileName: fileName, fileSize: fileSize, mimeType: mimeType, checksum: checksum, enqueuedAt: DateTime.now(), ); state = [...state, task]; await _save(); _process(); return id; } // Removed: enqueueSaveConversation — mobile app no longer persists chats to server. Future enqueueImageToDataUrl({ required String? conversationId, required String filePath, required String fileName, String? idempotencyKey, }) async { final id = _uuid.v4(); final task = OutboundTask.imageToDataUrl( id: id, conversationId: conversationId, filePath: filePath, fileName: fileName, idempotencyKey: idempotencyKey, enqueuedAt: DateTime.now(), ); state = [...state, task]; await _save(); _process(); return id; } }