Files
iiEsaywebUIapp/lib/shared/services/tasks/task_worker.dart

355 lines
12 KiB
Dart
Raw Normal View History

2025-09-01 23:41:22 +05:30
import 'dart:async';
2025-09-02 19:08:23 +05:30
import 'dart:io';
import 'dart:convert';
2025-09-01 23:41:22 +05:30
import 'package:flutter_riverpod/flutter_riverpod.dart';
import '../../../core/providers/app_providers.dart';
import '../../../core/services/attachment_upload_queue.dart';
import '../../../core/utils/debug_logger.dart';
import '../../../features/chat/providers/chat_providers.dart' as chat;
import '../../../features/chat/providers/context_attachments_provider.dart';
2025-09-02 19:08:23 +05:30
import '../../../features/chat/services/file_attachment_service.dart';
2025-09-01 23:41:22 +05:30
import 'outbound_task.dart';
class TaskWorker {
final Ref _ref;
TaskWorker(this._ref);
Future<void> perform(OutboundTask task) async {
await task.map<Future<void>>(
sendTextMessage: _performSendText,
uploadMedia: _performUploadMedia,
executeToolCall: _performExecuteToolCall,
generateImage: _performGenerateImage,
2025-09-02 19:08:23 +05:30
imageToDataUrl: _performImageToDataUrl,
2025-09-01 23:41:22 +05:30
);
}
Future<void> _performSendText(SendTextMessageTask task) async {
// Ensure uploads referenced in attachments are completed if they are local queued ids
// For now, assume attachments are already uploaded (fileIds or data URLs) as UI uploads eagerly.
// If needed, we could resolve queued uploads here by integrating with AttachmentUploadQueue.
final isReviewer = _ref.read(reviewerModeProvider);
if (!isReviewer) {
final api = _ref.read(apiServiceProvider);
if (api == null) {
throw Exception('API not available');
}
}
// Set active conversation if provided; otherwise keep current
try {
// If a specific conversation id is provided and differs from current, load it
final active = _ref.read(activeConversationProvider);
if (task.conversationId != null &&
task.conversationId!.isNotEmpty &&
(active == null || active.id != task.conversationId)) {
try {
final api = _ref.read(apiServiceProvider);
if (api != null) {
final conv = await api.getConversation(task.conversationId!);
2025-09-21 22:31:44 +05:30
_ref.read(activeConversationProvider.notifier).set(conv);
2025-09-01 23:41:22 +05:30
}
} catch (_) {
// If loading fails, proceed; send flow can create a new conversation
}
}
} catch (_) {}
// Delegate to existing unified send implementation.
// Always clear context attachments after send, even on failure,
// to prevent stale attachments from leaking into subsequent messages.
try {
await chat.sendMessageFromService(
_ref,
task.text,
task.attachments.isEmpty ? null : task.attachments,
task.toolIds.isEmpty ? null : task.toolIds,
);
} finally {
try {
_ref.read(contextAttachmentsProvider.notifier).clear();
} catch (_) {}
}
2025-09-01 23:41:22 +05:30
}
Future<void> _performUploadMedia(UploadMediaTask task) async {
final uploader = AttachmentUploadQueue();
// Ensure queue initialized with API upload callback
try {
final api = _ref.read(apiServiceProvider);
if (api != null) {
2025-09-13 10:16:58 +05:30
await uploader.initialize(onUpload: (p, n) => api.uploadFile(p, n));
2025-09-01 23:41:22 +05:30
}
} catch (_) {}
// Enqueue and then wait until the item reaches a terminal state for basic parity
final id = await uploader.enqueue(
filePath: task.filePath,
fileName: task.fileName,
fileSize: task.fileSize ?? 0,
mimeType: task.mimeType,
checksum: task.checksum,
);
final completer = Completer<void>();
late final StreamSubscription<List<QueuedAttachment>> sub;
sub = uploader.queueStream.listen((items) {
QueuedAttachment? entry;
try {
entry = items.firstWhere((e) => e.id == id);
} catch (_) {
entry = null;
}
if (entry == null) return;
2025-09-02 19:08:23 +05:30
// Reflect progress into UI attachment state if that file is present
try {
final current = _ref.read(attachedFilesProvider);
final idx = current.indexWhere((f) => f.file.path == task.filePath);
if (idx != -1) {
final existing = current[idx];
final status = switch (entry.status) {
QueuedAttachmentStatus.pending => FileUploadStatus.uploading,
QueuedAttachmentStatus.uploading => FileUploadStatus.uploading,
QueuedAttachmentStatus.completed => FileUploadStatus.completed,
QueuedAttachmentStatus.failed => FileUploadStatus.failed,
QueuedAttachmentStatus.cancelled => FileUploadStatus.failed,
};
const imageExts = <String>{'.jpg', '.jpeg', '.png', '.gif', '.webp'};
final lowerName = task.fileName.toLowerCase();
final bool isImage =
existing.isImage ?? imageExts.any(lowerName.endsWith);
2025-09-02 19:08:23 +05:30
final newState = FileUploadState(
file: File(task.filePath),
fileName: task.fileName,
fileSize: task.fileSize ?? existing.fileSize,
2025-09-13 10:16:58 +05:30
progress: status == FileUploadStatus.completed
? 1.0
: existing.progress,
2025-09-02 19:08:23 +05:30
status: status,
fileId: entry.fileId ?? existing.fileId,
error: entry.lastError,
isImage: isImage,
2025-09-02 19:08:23 +05:30
);
_ref
.read(attachedFilesProvider.notifier)
.updateFileState(task.filePath, newState);
}
} catch (_) {}
2025-09-01 23:41:22 +05:30
switch (entry.status) {
case QueuedAttachmentStatus.completed:
case QueuedAttachmentStatus.failed:
case QueuedAttachmentStatus.cancelled:
sub.cancel();
completer.complete();
break;
default:
break;
}
});
// Fire a process tick
unawaited(uploader.processQueue());
2025-09-13 10:16:58 +05:30
await completer.future.timeout(
const Duration(minutes: 2),
onTimeout: () {
try {
sub.cancel();
} catch (_) {}
DebugLogger.warning('UploadMediaTask timed out: ${task.fileName}');
return;
},
);
2025-09-01 23:41:22 +05:30
}
Future<void> _performExecuteToolCall(ExecuteToolCallTask task) async {
// Resolve API + selected model
final api = _ref.read(apiServiceProvider);
final selectedModel = _ref.read(selectedModelProvider);
if (api == null || selectedModel == null) {
throw Exception('API or model not available');
}
// Optionally bring the target conversation to foreground
try {
final active = _ref.read(activeConversationProvider);
if (task.conversationId != null &&
task.conversationId!.isNotEmpty &&
(active == null || active.id != task.conversationId)) {
try {
final conv = await api.getConversation(task.conversationId!);
2025-09-21 22:31:44 +05:30
_ref.read(activeConversationProvider.notifier).set(conv);
} catch (_) {}
}
} catch (_) {}
// Lookup tool by name (or id fallback)
String? resolvedToolId;
try {
final tools = await api.getAvailableTools();
for (final t in tools) {
final id = (t['id'] ?? '').toString();
final name = (t['name'] ?? '').toString();
if (name.toLowerCase() == task.toolName.toLowerCase() ||
id.toLowerCase() == task.toolName.toLowerCase()) {
resolvedToolId = id;
break;
}
}
} catch (_) {}
// Build an explicit user instruction to run the tool with arguments.
// Passing the specific tool id hints the server/provider to execute it via native function calling.
final args = task.arguments;
String argsSnippet;
try {
argsSnippet = const JsonEncoder.withIndent(' ').convert(args);
} catch (_) {
argsSnippet = args.toString();
}
final instruction =
'Run the tool "${task.toolName}" with the following JSON arguments and return the result succinctly.\n'
'If the tool is not available, respond with a brief error.\n\n'
'Arguments:\n'
'```json\n$argsSnippet\n```';
// Send as a normal message but constrain tools to the resolved tool (if found)
final toolIds = (resolvedToolId != null && resolvedToolId.isNotEmpty)
? <String>[resolvedToolId]
: null;
2025-09-13 10:16:58 +05:30
await chat.sendMessageFromService(_ref, instruction, null, toolIds);
2025-09-01 23:41:22 +05:30
}
Future<void> _performGenerateImage(GenerateImageTask task) async {
final api = _ref.read(apiServiceProvider);
final selectedModel = _ref.read(selectedModelProvider);
2025-09-05 02:55:11 +05:30
if (api == null || selectedModel == null) {
throw Exception('API or model not available');
}
2025-09-05 02:55:11 +05:30
// Ensure the target conversation is active if provided
try {
2025-09-05 02:55:11 +05:30
final active = _ref.read(activeConversationProvider);
if (task.conversationId != null &&
task.conversationId!.isNotEmpty &&
(active == null || active.id != task.conversationId)) {
try {
2025-09-05 02:55:11 +05:30
final conv = await api.getConversation(task.conversationId!);
2025-09-21 22:31:44 +05:30
_ref.read(activeConversationProvider.notifier).set(conv);
} catch (_) {}
}
2025-09-05 02:55:11 +05:30
} catch (_) {}
// Temporarily enable image-generation background flow for this send
final prev = _ref.read(chat.imageGenerationEnabledProvider);
try {
2025-09-21 22:31:44 +05:30
_ref.read(chat.imageGenerationEnabledProvider.notifier).set(true);
2025-09-13 10:16:58 +05:30
await chat.sendMessageFromService(_ref, task.prompt, null, null);
2025-09-05 02:55:11 +05:30
} finally {
2025-09-21 22:31:44 +05:30
_ref.read(chat.imageGenerationEnabledProvider.notifier).set(prev);
}
}
2025-09-02 13:20:02 +05:30
2025-09-02 19:08:23 +05:30
Future<void> _performImageToDataUrl(ImageToDataUrlTask task) async {
2025-09-13 10:16:58 +05:30
// Upload images to server instead of converting to data URLs
final uploader = AttachmentUploadQueue();
2025-09-02 19:08:23 +05:30
try {
2025-09-13 10:16:58 +05:30
final api = _ref.read(apiServiceProvider);
if (api != null) {
await uploader.initialize(onUpload: (p, n) => api.uploadFile(p, n));
}
} catch (_) {}
2025-09-02 19:08:23 +05:30
2025-09-13 10:16:58 +05:30
try {
final current = _ref.read(attachedFilesProvider);
final idx = current.indexWhere((f) => f.file.path == task.filePath);
if (idx != -1) {
final existing = current[idx];
final uploading = FileUploadState(
file: existing.file,
fileName: task.fileName,
fileSize: existing.fileSize,
progress: 0.0,
status: FileUploadStatus.uploading,
fileId: existing.fileId,
isImage: existing.isImage ?? true,
2025-09-13 10:16:58 +05:30
);
_ref
.read(attachedFilesProvider.notifier)
.updateFileState(task.filePath, uploading);
}
2025-09-13 10:16:58 +05:30
} catch (_) {}
final id = await uploader.enqueue(
filePath: task.filePath,
fileName: task.fileName,
fileSize: File(task.filePath).lengthSync(),
);
2025-09-02 19:08:23 +05:30
2025-09-13 10:16:58 +05:30
final completer = Completer<void>();
late final StreamSubscription<List<QueuedAttachment>> sub;
sub = uploader.queueStream.listen((items) {
QueuedAttachment? entry;
2025-09-02 19:08:23 +05:30
try {
2025-09-13 10:16:58 +05:30
entry = items.firstWhere((e) => e.id == id);
} catch (_) {
entry = null;
}
if (entry == null) return;
2025-09-02 19:08:23 +05:30
try {
final current = _ref.read(attachedFilesProvider);
final idx = current.indexWhere((f) => f.file.path == task.filePath);
if (idx != -1) {
final existing = current[idx];
2025-09-13 10:16:58 +05:30
final status = switch (entry.status) {
QueuedAttachmentStatus.pending => FileUploadStatus.uploading,
QueuedAttachmentStatus.uploading => FileUploadStatus.uploading,
QueuedAttachmentStatus.completed => FileUploadStatus.completed,
QueuedAttachmentStatus.failed => FileUploadStatus.failed,
QueuedAttachmentStatus.cancelled => FileUploadStatus.failed,
};
final newState = FileUploadState(
file: File(task.filePath),
2025-09-02 19:08:23 +05:30
fileName: task.fileName,
fileSize: existing.fileSize,
2025-09-13 10:16:58 +05:30
progress: status == FileUploadStatus.completed
? 1.0
: existing.progress,
status: status,
fileId: entry.fileId ?? existing.fileId,
isImage: true,
error: entry.lastError,
2025-09-02 19:08:23 +05:30
);
2025-09-13 10:16:58 +05:30
_ref
.read(attachedFilesProvider.notifier)
.updateFileState(task.filePath, newState);
2025-09-02 19:08:23 +05:30
}
} catch (_) {}
2025-09-13 10:16:58 +05:30
switch (entry.status) {
case QueuedAttachmentStatus.completed:
case QueuedAttachmentStatus.failed:
case QueuedAttachmentStatus.cancelled:
sub.cancel();
completer.complete();
break;
default:
break;
}
});
unawaited(uploader.processQueue());
await completer.future.timeout(
const Duration(minutes: 2),
onTimeout: () {
try {
sub.cancel();
} catch (_) {}
DebugLogger.warning('Image upload timed out: ${task.fileName}');
return;
},
);
2025-09-02 19:08:23 +05:30
}
2025-09-01 23:41:22 +05:30
}