From 1b65743b06afabb4e871d5efda48abb959087079 Mon Sep 17 00:00:00 2001 From: cogwheel0 <172976095+cogwheel0@users.noreply.github.com> Date: Mon, 1 Sep 2025 23:41:22 +0530 Subject: [PATCH] feat: task based send flow --- lib/core/services/api_service.dart | 21 +- .../chat/providers/chat_providers.dart | 23 +- lib/features/chat/views/chat_page.dart | 34 +-- lib/shared/services/tasks/outbound_task.dart | 70 ++++++ lib/shared/services/tasks/task_queue.dart | 210 ++++++++++++++++++ lib/shared/services/tasks/task_worker.dart | 120 ++++++++++ 6 files changed, 445 insertions(+), 33 deletions(-) create mode 100644 lib/shared/services/tasks/outbound_task.dart create mode 100644 lib/shared/services/tasks/task_queue.dart create mode 100644 lib/shared/services/tasks/task_worker.dart diff --git a/lib/core/services/api_service.dart b/lib/core/services/api_service.dart index 5e20901..45c59f0 100644 --- a/lib/core/services/api_service.dart +++ b/lib/core/services/api_service.dart @@ -625,7 +625,7 @@ class ApiService { // If this assistant message includes tool_calls, merge following tool results final historyMsg = historyMessagesMap != null - ? (historyMessagesMap![msgData['id']] as Map?) + ? (historyMessagesMap[msgData['id']] as Map?) : null; final toolCalls = (msgData['tool_calls'] is List) @@ -2652,9 +2652,7 @@ class ApiService { // Add only essential parameters if (conversationId != null) { - if (conversationId != null) { - data['chat_id'] = conversationId; - } + data['chat_id'] = conversationId; } // Add feature flags if enabled @@ -2737,13 +2735,10 @@ class ApiService { // Decide whether to use background task flow. // Only enable background task mode when we actually need socket/dynamic-channel // behavior (e.g., provider-native tools or explicit background tasks with a session). - final bool useBackgroundTasks = ( - // Use background flow only for provider-native tools or explicit tool servers. - // Pure text generation should prefer SSE even if a socket session exists, - // and background_tasks can still be honored at the end of SSE. - (toolIds != null && toolIds.isNotEmpty) || - (toolServers != null && toolServers.isNotEmpty) - ); + // Always use task-based background flow for unified pipeline. + // When a dynamic channel (session_id) is not provided, this method falls + // back to polling and streams deltas to the UI. + final bool useBackgroundTasks = true; // Use background flow only when required; otherwise prefer SSE even with chat_id. // SSE must not include session_id/id to avoid server falling back to task mode. @@ -3625,12 +3620,12 @@ class ApiService { : ''; String toEmit; - if ((contentVal as String).startsWith(last)) { + if (contentVal.startsWith(last)) { toEmit = contentVal.substring(last.length); } else { // Fallback: emit suffix after longest common prefix int i = 0; - final s = contentVal as String; + final s = contentVal; final minLen = last.length < s.length ? last.length : s.length; while (i < minLen && last.codeUnitAt(i) == s.codeUnitAt(i)) { i++; diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index f3fbaaa..8a8fcba 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -13,6 +13,7 @@ import '../../../core/utils/stream_chunker.dart'; import '../../../core/services/persistent_streaming_service.dart'; import '../../../core/utils/debug_logger.dart'; import '../services/reviewer_mode_service.dart'; +import '../../../shared/services/tasks/task_queue.dart'; const bool kSocketVerboseLogging = false; @@ -1044,6 +1045,11 @@ Future _sendMessageInternal( 'follow_up_generation': true, }; + // Determine if we need background task flow (tools/tool servers) + final bool isBackgroundToolsFlowPre = + (toolIdsForApi != null && toolIdsForApi.isNotEmpty) || + (toolServers != null && toolServers.isNotEmpty); + final response = await api.sendMessage( messages: conversationMessages, model: selectedModel.id, @@ -1054,7 +1060,9 @@ Future _sendMessageInternal( // handled via pre-stream client-side request above enableImageGeneration: false, modelItem: modelItem, - sessionIdOverride: socketSessionId, + // Only pass a session when we truly want task-based dynamic-channel + // behavior; for pure text flows prefer polling (if background mode). + sessionIdOverride: isBackgroundToolsFlowPre ? socketSessionId : null, toolServers: toolServers, backgroundTasks: bgTasks, ); @@ -1078,9 +1086,7 @@ Future _sendMessageInternal( // Background-tools flow (tools/tool servers) relies on socket/dynamic channel for // streaming content. Allow socket TEXT in that mode. For pure SSE flows, suppress // socket TEXT to avoid duplicates (still surface tool_call status). - final bool isBackgroundToolsFlow = - (toolIdsForApi != null && toolIdsForApi.isNotEmpty) || - (toolServers != null && toolServers.isNotEmpty); + final bool isBackgroundToolsFlow = isBackgroundToolsFlowPre; bool suppressSocketContent = !isBackgroundToolsFlow; // allow socket text for tools bool usingDynamicChannel = false; // set true when server provides a channel if (socketService != null) { @@ -2555,6 +2561,15 @@ final stopGenerationProvider = Provider((ref) { } } catch (_) {} }()); + + // Also cancel local queue tasks for this conversation + try { + // Fire-and-forget local queue cancellation + // ignore: unawaited_futures + ref + .read(taskQueueProvider.notifier) + .cancelByConversation(activeConv.id); + } catch (_) {} } } catch (_) {} diff --git a/lib/features/chat/views/chat_page.dart b/lib/features/chat/views/chat_page.dart index 7447e9f..54a01a3 100644 --- a/lib/features/chat/views/chat_page.dart +++ b/lib/features/chat/views/chat_page.dart @@ -33,6 +33,7 @@ import '../../onboarding/views/onboarding_sheet.dart'; import '../../../shared/widgets/sheet_handle.dart'; import '../../../shared/widgets/conduit_components.dart'; import '../../../core/services/settings_service.dart'; +import '../../../shared/services/tasks/task_queue.dart'; // Removed unused PlatformUtils import import '../../../core/services/platform_service.dart' as ps; import 'package:flutter/gestures.dart' show DragStartBehavior; @@ -280,33 +281,30 @@ class _ChatPageState extends ConsumerState { } try { - // Get attached files and use uploadedFileIds when sendMessage is updated to accept file IDs + // Get attached files and collect uploaded file IDs (including data URLs for images) final attachedFiles = ref.read(attachedFilesProvider); - final uploadedFileIds = attachedFiles - .where( - (file) => - file.status == FileUploadStatus.completed && - file.fileId != null, - ) + .where((file) => + file.status == FileUploadStatus.completed && file.fileId != null) .map((file) => file.fileId!) .toList(); // Get selected tools final toolIds = ref.read(selectedToolIdsProvider); - // Send message with file attachments and tools using existing provider logic - await sendMessage( - ref, - text, - uploadedFileIds.isNotEmpty ? uploadedFileIds : null, - toolIds.isNotEmpty ? toolIds : null, - ); + // Enqueue task-based send to unify flow across text, images, and tools + final activeConv = ref.read(activeConversationProvider); + await ref.read(taskQueueProvider.notifier).enqueueSendText( + conversationId: activeConv?.id, + text: text, + attachments: uploadedFileIds.isNotEmpty ? uploadedFileIds : null, + toolIds: toolIds.isNotEmpty ? toolIds : null, + ); // Clear attachments after successful send ref.read(attachedFilesProvider.notifier).clearAll(); - // Scroll to bottom after sending message (only if user was near bottom) + // Scroll to bottom after enqueuing (only if user was near bottom) WidgetsBinding.instance.addPostFrameCallback((_) { if (_scrollController.hasClients) { final maxScroll = _scrollController.position.maxScrollExtent; @@ -879,7 +877,11 @@ class _ChatPageState extends ConsumerState { // Send the edited message final selectedModel = ref.read(selectedModelProvider); if (selectedModel != null) { - await sendMessage(ref, result, null); + final activeConv = ref.read(activeConversationProvider); + await ref.read(taskQueueProvider.notifier).enqueueSendText( + conversationId: activeConv?.id, + text: result, + ); if (mounted) {} } diff --git a/lib/shared/services/tasks/outbound_task.dart b/lib/shared/services/tasks/outbound_task.dart new file mode 100644 index 0000000..00264c8 --- /dev/null +++ b/lib/shared/services/tasks/outbound_task.dart @@ -0,0 +1,70 @@ +import 'package:freezed_annotation/freezed_annotation.dart'; + +part 'outbound_task.freezed.dart'; +part 'outbound_task.g.dart'; + +enum TaskStatus { + queued, + running, + succeeded, + failed, + cancelled, +} + +@freezed +abstract class OutboundTask with _$OutboundTask { + const OutboundTask._(); + + const factory OutboundTask.sendTextMessage({ + required String id, + String? conversationId, + required String text, + @Default([]) List attachments, + @Default([]) List toolIds, + @Default(TaskStatus.queued) TaskStatus status, + @Default(0) int attempt, + String? idempotencyKey, + DateTime? enqueuedAt, + DateTime? startedAt, + DateTime? completedAt, + String? error, + }) = SendTextMessageTask; + + const factory OutboundTask.uploadMedia({ + required String id, + String? conversationId, + required String filePath, + required String fileName, + int? fileSize, + String? mimeType, + String? checksum, + @Default(TaskStatus.queued) TaskStatus status, + @Default(0) int attempt, + String? idempotencyKey, + DateTime? enqueuedAt, + DateTime? startedAt, + DateTime? completedAt, + String? error, + }) = UploadMediaTask; + + const factory OutboundTask.executeToolCall({ + required String id, + String? conversationId, + required String toolName, + @Default({}) Map arguments, + @Default(TaskStatus.queued) TaskStatus status, + @Default(0) int attempt, + String? idempotencyKey, + DateTime? enqueuedAt, + DateTime? startedAt, + DateTime? completedAt, + String? error, + }) = ExecuteToolCallTask; + + factory OutboundTask.fromJson(Map json) => + _$OutboundTaskFromJson(json); + + String get threadKey => (conversationId == null || conversationId!.isEmpty) + ? 'new' + : conversationId!; +} diff --git a/lib/shared/services/tasks/task_queue.dart b/lib/shared/services/tasks/task_queue.dart new file mode 100644 index 0000000..e740458 --- /dev/null +++ b/lib/shared/services/tasks/task_queue.dart @@ -0,0 +1,210 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:flutter/foundation.dart'; +import 'package:flutter_riverpod/flutter_riverpod.dart'; +import 'package:uuid/uuid.dart'; + +import '../../../core/providers/app_providers.dart'; +import 'outbound_task.dart'; +import 'task_worker.dart'; + +final taskQueueProvider = + StateNotifierProvider>((ref) { + return TaskQueueNotifier(ref); +}); + +class TaskQueueNotifier extends StateNotifier> { + TaskQueueNotifier(this._ref) : super(const []) { + _load(); + } + + static const _prefsKey = 'outbound_task_queue_v1'; + final Ref _ref; + final _uuid = const Uuid(); + + bool _processing = false; + final Set _activeThreads = {}; + int _maxParallel = 2; // bounded parallelism across conversations + + Future _load() async { + try { + final prefs = _ref.read(sharedPreferencesProvider); + final jsonStr = prefs.getString(_prefsKey); + if (jsonStr == null || jsonStr.isEmpty) return; + final raw = (jsonDecode(jsonStr) as List).cast>(); + final tasks = raw.map(OutboundTask.fromJson).toList(); + // Only restore non-completed tasks + state = tasks + .where((t) => t.status == TaskStatus.queued || t.status == TaskStatus.running) + .map((t) => t.copyWith( + status: TaskStatus.queued, + startedAt: null, + completedAt: null, + )) + .toList(); + // Kick processing after load + _process(); + } catch (e) { + debugPrint('DEBUG: Failed to load task queue: $e'); + } + } + + Future _save() async { + try { + final prefs = _ref.read(sharedPreferencesProvider); + final raw = state.map((t) => t.toJson()).toList(growable: false); + await prefs.setString(_prefsKey, jsonEncode(raw)); + } catch (e) { + debugPrint('DEBUG: Failed to persist task queue: $e'); + } + } + + Future enqueueSendText({ + required String? conversationId, + required String text, + List? attachments, + List? toolIds, + String? idempotencyKey, + }) async { + final id = _uuid.v4(); + final task = OutboundTask.sendTextMessage( + id: id, + conversationId: conversationId, + text: text, + attachments: attachments ?? const [], + toolIds: toolIds ?? const [], + idempotencyKey: idempotencyKey, + enqueuedAt: DateTime.now(), + ); + state = [...state, task]; + await _save(); + _process(); + return id; + } + + Future cancel(String id) async { + state = [ + for (final t in state) + if (t.id == id) + t.copyWith( + status: TaskStatus.cancelled, + completedAt: DateTime.now(), + ) + else + t, + ]; + await _save(); + } + + Future cancelByConversation(String conversationId) async { + state = [ + for (final t in state) + if ((t.conversationId ?? '') == conversationId && + (t.status == TaskStatus.queued || t.status == TaskStatus.running)) + t.copyWith( + status: TaskStatus.cancelled, + completedAt: DateTime.now(), + ) + else + t, + ]; + await _save(); + } + + Future retry(String id) async { + state = [ + for (final t in state) + if (t.id == id) + t.copyWith( + status: TaskStatus.queued, + attempt: (t.attempt + 1), + error: null, + startedAt: null, + completedAt: null, + ) + else + t, + ]; + await _save(); + _process(); + } + + Future _process() async { + if (_processing) return; + _processing = true; + try { + // Pump while there is capacity and queued tasks remain + while (true) { + // Filter runnable tasks + final queued = state.where((t) => t.status == TaskStatus.queued).toList(); + if (queued.isEmpty) break; + + // Respect parallelism and one-per-thread + final availableCapacity = _maxParallel - _activeThreads.length; + if (availableCapacity <= 0) break; + + OutboundTask? next; + for (final t in queued) { + final thread = t.threadKey; + if (!_activeThreads.contains(thread)) { + next = t; + break; + } + } + + // If no eligible task (all threads busy), exit loop + if (next == null) break; + + // Mark running and launch without awaiting (parallel across threads) + final threadKey = next.threadKey; + _activeThreads.add(threadKey); + state = [ + for (final t in state) + if (t.id == next.id) + next.copyWith(status: TaskStatus.running, startedAt: DateTime.now()) + else + t, + ]; + await _save(); + + // Launch worker + unawaited(_run(next).whenComplete(() { + _activeThreads.remove(threadKey); + // After a task completes, try to schedule more + _process(); + })); + } + } finally { + _processing = false; + } + } + + Future _run(OutboundTask task) async { + try { + await TaskWorker(_ref).perform(task); + state = [ + for (final t in state) + if (t.id == task.id) + t.copyWith(status: TaskStatus.succeeded, completedAt: DateTime.now()) + else + t, + ]; + } catch (e, st) { + debugPrint('Task failed (${task.runtimeType}): $e\n$st'); + state = [ + for (final t in state) + if (t.id == task.id) + t.copyWith( + status: TaskStatus.failed, + error: e.toString(), + completedAt: DateTime.now(), + ) + else + t, + ]; + } finally { + await _save(); + } + } +} diff --git a/lib/shared/services/tasks/task_worker.dart b/lib/shared/services/tasks/task_worker.dart new file mode 100644 index 0000000..ed2d6d6 --- /dev/null +++ b/lib/shared/services/tasks/task_worker.dart @@ -0,0 +1,120 @@ +import 'dart:async'; +import 'package:flutter/foundation.dart'; +import 'package:flutter_riverpod/flutter_riverpod.dart'; + +import '../../../core/providers/app_providers.dart'; +import '../../../core/services/attachment_upload_queue.dart'; +import '../../../core/utils/debug_logger.dart'; +import '../../../features/chat/providers/chat_providers.dart' as chat; +import 'outbound_task.dart'; + +class TaskWorker { + final Ref _ref; + TaskWorker(this._ref); + + Future perform(OutboundTask task) async { + await task.map>( + sendTextMessage: _performSendText, + uploadMedia: _performUploadMedia, + executeToolCall: _performExecuteToolCall, + ); + } + + Future _performSendText(SendTextMessageTask task) async { + // Ensure uploads referenced in attachments are completed if they are local queued ids + // For now, assume attachments are already uploaded (fileIds or data URLs) as UI uploads eagerly. + // If needed, we could resolve queued uploads here by integrating with AttachmentUploadQueue. + final isReviewer = _ref.read(reviewerModeProvider); + if (!isReviewer) { + final api = _ref.read(apiServiceProvider); + if (api == null) { + throw Exception('API not available'); + } + } + + // Set active conversation if provided; otherwise keep current + try { + // If a specific conversation id is provided and differs from current, load it + final active = _ref.read(activeConversationProvider); + if (task.conversationId != null && + task.conversationId!.isNotEmpty && + (active == null || active.id != task.conversationId)) { + try { + final api = _ref.read(apiServiceProvider); + if (api != null) { + final conv = await api.getConversation(task.conversationId!); + _ref.read(activeConversationProvider.notifier).state = conv; + } + } catch (_) { + // If loading fails, proceed; send flow can create a new conversation + } + } + } catch (_) {} + + // Delegate to existing unified send implementation + await chat.sendMessageFromService( + _ref, + task.text, + task.attachments.isEmpty ? null : task.attachments, + task.toolIds.isEmpty ? null : task.toolIds, + ); + } + + Future _performUploadMedia(UploadMediaTask task) async { + final uploader = AttachmentUploadQueue(); + // Ensure queue initialized with API upload callback + try { + final api = _ref.read(apiServiceProvider); + if (api != null) { + await uploader.initialize( + onUpload: (p, n) => api.uploadFile(p, n), + ); + } + } catch (_) {} + + // Enqueue and then wait until the item reaches a terminal state for basic parity + final id = await uploader.enqueue( + filePath: task.filePath, + fileName: task.fileName, + fileSize: task.fileSize ?? 0, + mimeType: task.mimeType, + checksum: task.checksum, + ); + + final completer = Completer(); + late final StreamSubscription> sub; + sub = uploader.queueStream.listen((items) { + QueuedAttachment? entry; + try { + entry = items.firstWhere((e) => e.id == id); + } catch (_) { + entry = null; + } + if (entry == null) return; + switch (entry.status) { + case QueuedAttachmentStatus.completed: + case QueuedAttachmentStatus.failed: + case QueuedAttachmentStatus.cancelled: + sub.cancel(); + completer.complete(); + break; + default: + break; + } + }); + + // Fire a process tick + unawaited(uploader.processQueue()); + await completer.future.timeout(const Duration(minutes: 2), onTimeout: () { + try { sub.cancel(); } catch (_) {} + DebugLogger.warning('UploadMediaTask timed out: ${task.fileName}'); + return; + }); + } + + Future _performExecuteToolCall(ExecuteToolCallTask task) async { + // Placeholder: In this client, native tool execution is orchestrated server-side. + // We keep this task type for future local tools or MCP bridges. + debugPrint('ExecuteToolCallTask stub: ${task.toolName}'); + } +}