import 'dart:async'; import 'dart:io'; import 'dart:convert'; import 'package:flutter_riverpod/flutter_riverpod.dart'; import '../../../core/providers/app_providers.dart'; import '../../../core/services/attachment_upload_queue.dart'; import '../../../core/utils/debug_logger.dart'; import '../../../features/chat/providers/chat_providers.dart' as chat; import '../../../features/chat/services/file_attachment_service.dart'; import 'package:path/path.dart' as path; import 'outbound_task.dart'; class TaskWorker { final Ref _ref; TaskWorker(this._ref); Future perform(OutboundTask task) async { await task.map>( sendTextMessage: _performSendText, uploadMedia: _performUploadMedia, executeToolCall: _performExecuteToolCall, generateImage: _performGenerateImage, imageToDataUrl: _performImageToDataUrl, saveConversation: _performSaveConversation, generateTitle: _performGenerateTitle, ); } Future _performSendText(SendTextMessageTask task) async { // Ensure uploads referenced in attachments are completed if they are local queued ids // For now, assume attachments are already uploaded (fileIds or data URLs) as UI uploads eagerly. // If needed, we could resolve queued uploads here by integrating with AttachmentUploadQueue. final isReviewer = _ref.read(reviewerModeProvider); if (!isReviewer) { final api = _ref.read(apiServiceProvider); if (api == null) { throw Exception('API not available'); } } // Set active conversation if provided; otherwise keep current try { // If a specific conversation id is provided and differs from current, load it final active = _ref.read(activeConversationProvider); if (task.conversationId != null && task.conversationId!.isNotEmpty && (active == null || active.id != task.conversationId)) { try { final api = _ref.read(apiServiceProvider); if (api != null) { final conv = await api.getConversation(task.conversationId!); _ref.read(activeConversationProvider.notifier).state = conv; } } catch (_) { // If loading fails, proceed; send flow can create a new conversation } } } catch (_) {} // Delegate to existing unified send implementation await chat.sendMessageFromService( _ref, task.text, task.attachments.isEmpty ? null : task.attachments, task.toolIds.isEmpty ? null : task.toolIds, ); } Future _performUploadMedia(UploadMediaTask task) async { final uploader = AttachmentUploadQueue(); // Ensure queue initialized with API upload callback try { final api = _ref.read(apiServiceProvider); if (api != null) { await uploader.initialize( onUpload: (p, n) => api.uploadFile(p, n), ); } } catch (_) {} // Enqueue and then wait until the item reaches a terminal state for basic parity final id = await uploader.enqueue( filePath: task.filePath, fileName: task.fileName, fileSize: task.fileSize ?? 0, mimeType: task.mimeType, checksum: task.checksum, ); final completer = Completer(); late final StreamSubscription> sub; sub = uploader.queueStream.listen((items) { QueuedAttachment? entry; try { entry = items.firstWhere((e) => e.id == id); } catch (_) { entry = null; } if (entry == null) return; // Reflect progress into UI attachment state if that file is present try { final current = _ref.read(attachedFilesProvider); final idx = current.indexWhere((f) => f.file.path == task.filePath); if (idx != -1) { final existing = current[idx]; final status = switch (entry.status) { QueuedAttachmentStatus.pending => FileUploadStatus.uploading, QueuedAttachmentStatus.uploading => FileUploadStatus.uploading, QueuedAttachmentStatus.completed => FileUploadStatus.completed, QueuedAttachmentStatus.failed => FileUploadStatus.failed, QueuedAttachmentStatus.cancelled => FileUploadStatus.failed, }; final newState = FileUploadState( file: File(task.filePath), fileName: task.fileName, fileSize: task.fileSize ?? existing.fileSize, progress: status == FileUploadStatus.completed ? 1.0 : existing.progress, status: status, fileId: entry.fileId ?? existing.fileId, error: entry.lastError, ); _ref .read(attachedFilesProvider.notifier) .updateFileState(task.filePath, newState); } } catch (_) {} switch (entry.status) { case QueuedAttachmentStatus.completed: case QueuedAttachmentStatus.failed: case QueuedAttachmentStatus.cancelled: sub.cancel(); completer.complete(); break; default: break; } }); // Fire a process tick unawaited(uploader.processQueue()); await completer.future.timeout(const Duration(minutes: 2), onTimeout: () { try { sub.cancel(); } catch (_) {} DebugLogger.warning('UploadMediaTask timed out: ${task.fileName}'); return; }); } Future _performExecuteToolCall(ExecuteToolCallTask task) async { // Resolve API + selected model final api = _ref.read(apiServiceProvider); final selectedModel = _ref.read(selectedModelProvider); if (api == null || selectedModel == null) { throw Exception('API or model not available'); } // Optionally bring the target conversation to foreground try { final active = _ref.read(activeConversationProvider); if (task.conversationId != null && task.conversationId!.isNotEmpty && (active == null || active.id != task.conversationId)) { try { final conv = await api.getConversation(task.conversationId!); _ref.read(activeConversationProvider.notifier).state = conv; } catch (_) {} } } catch (_) {} // Lookup tool by name (or id fallback) String? resolvedToolId; try { final tools = await api.getAvailableTools(); for (final t in tools) { final id = (t['id'] ?? '').toString(); final name = (t['name'] ?? '').toString(); if (name.toLowerCase() == task.toolName.toLowerCase() || id.toLowerCase() == task.toolName.toLowerCase()) { resolvedToolId = id; break; } } } catch (_) {} // Build an explicit user instruction to run the tool with arguments. // Passing the specific tool id hints the server/provider to execute it via native function calling. final args = task.arguments; String argsSnippet; try { argsSnippet = const JsonEncoder.withIndent(' ').convert(args); } catch (_) { argsSnippet = args.toString(); } final instruction = 'Run the tool "${task.toolName}" with the following JSON arguments and return the result succinctly.\n' 'If the tool is not available, respond with a brief error.\n\n' 'Arguments:\n' '```json\n$argsSnippet\n```'; // Send as a normal message but constrain tools to the resolved tool (if found) final toolIds = (resolvedToolId != null && resolvedToolId.isNotEmpty) ? [resolvedToolId] : null; await chat.sendMessageFromService( _ref, instruction, null, toolIds, ); } Future _performGenerateImage(GenerateImageTask task) async { final api = _ref.read(apiServiceProvider); final selectedModel = _ref.read(selectedModelProvider); if (api == null || selectedModel == null) { throw Exception('API or model not available'); } // Ensure the target conversation is active if provided try { final active = _ref.read(activeConversationProvider); if (task.conversationId != null && task.conversationId!.isNotEmpty && (active == null || active.id != task.conversationId)) { try { final conv = await api.getConversation(task.conversationId!); _ref.read(activeConversationProvider.notifier).state = conv; } catch (_) {} } } catch (_) {} // Temporarily enable image-generation background flow for this send final prev = _ref.read(chat.imageGenerationEnabledProvider); try { _ref.read(chat.imageGenerationEnabledProvider.notifier).state = true; await chat.sendMessageFromService( _ref, task.prompt, null, null, ); } finally { _ref.read(chat.imageGenerationEnabledProvider.notifier).state = prev; } } Future _performImageToDataUrl(ImageToDataUrlTask task) async { try { // Update UI to uploading state first try { final current = _ref.read(attachedFilesProvider); final idx = current.indexWhere((f) => f.file.path == task.filePath); if (idx != -1) { final existing = current[idx]; final uploading = FileUploadState( file: existing.file, fileName: task.fileName, fileSize: existing.fileSize, progress: 0.5, status: FileUploadStatus.uploading, fileId: existing.fileId, ); _ref.read(attachedFilesProvider.notifier).updateFileState( task.filePath, uploading, ); } } catch (_) {} // Read file and convert to data URL final file = File(task.filePath); final bytes = await file.readAsBytes(); final b64 = base64Encode(bytes); final ext = path.extension(task.fileName).toLowerCase(); String mime = 'image/png'; if (ext == '.jpg' || ext == '.jpeg') { mime = 'image/jpeg'; } else if (ext == '.gif') { mime = 'image/gif'; } else if (ext == '.webp') { mime = 'image/webp'; } final dataUrl = 'data:$mime;base64,$b64'; // Mark as completed with data URL as fileId try { final current = _ref.read(attachedFilesProvider); final idx = current.indexWhere((f) => f.file.path == task.filePath); if (idx != -1) { final existing = current[idx]; final done = FileUploadState( file: existing.file, fileName: task.fileName, fileSize: existing.fileSize, progress: 1.0, status: FileUploadStatus.completed, fileId: dataUrl, ); _ref.read(attachedFilesProvider.notifier).updateFileState( task.filePath, done, ); } } catch (_) {} } catch (e) { try { final current = _ref.read(attachedFilesProvider); final idx = current.indexWhere((f) => f.file.path == task.filePath); if (idx != -1) { final existing = current[idx]; final failed = FileUploadState( file: existing.file, fileName: task.fileName, fileSize: existing.fileSize, progress: 0.0, status: FileUploadStatus.failed, fileId: existing.fileId, error: e.toString(), ); _ref.read(attachedFilesProvider.notifier).updateFileState( task.filePath, failed, ); } } catch (_) {} } } Future _performSaveConversation(SaveConversationTask task) async { final api = _ref.read(apiServiceProvider); final messages = _ref.read(chat.chatMessagesProvider); final activeConv = _ref.read(activeConversationProvider); final selectedModel = _ref.read(selectedModelProvider); if (api == null || messages.isEmpty || activeConv == null) return; // Skip if last assistant is empty placeholder final last = messages.last; if (last.role == 'assistant' && last.content.trim().isEmpty && (last.files?.isEmpty ?? true) && (last.attachmentIds?.isEmpty ?? true)) { return; } try { await api.updateConversationWithMessages( activeConv.id, messages, model: selectedModel?.id, ); final updated = activeConv.copyWith( messages: messages, updatedAt: DateTime.now(), ); _ref.read(activeConversationProvider.notifier).state = updated; _ref.invalidate(conversationsProvider); } catch (_) {} } Future _performGenerateTitle(GenerateTitleTask task) async { final api = _ref.read(apiServiceProvider); final activeConv = _ref.read(activeConversationProvider); final selectedModel = _ref.read(selectedModelProvider); if (api == null || selectedModel == null) return; try { final messages = _ref.read(chat.chatMessagesProvider); final formatted = >[]; for (final msg in messages) { formatted.add({ 'id': msg.id, 'role': msg.role, 'content': msg.content, 'timestamp': msg.timestamp.millisecondsSinceEpoch ~/ 1000, }); } final title = await api.generateTitle( conversationId: task.conversationId, messages: formatted, model: selectedModel.id, ); if (title != null && title.isNotEmpty && title != 'New Chat') { if (activeConv != null && activeConv.id == task.conversationId) { final updated = activeConv.copyWith( title: title.length > 100 ? '${title.substring(0, 100)}...' : title, updatedAt: DateTime.now(), ); _ref.read(activeConversationProvider.notifier).state = updated; try { final cur = _ref.read(chat.chatMessagesProvider); await api.updateConversationWithMessages( updated.id, cur, title: updated.title, model: selectedModel.id, ); } catch (_) {} _ref.invalidate(conversationsProvider); } } } catch (_) {} } }