From 77e6a15215490b84f14db46896acd07dda0657ae Mon Sep 17 00:00:00 2001 From: cogwheel0 <172976095+cogwheel0@users.noreply.github.com> Date: Tue, 2 Sep 2025 13:20:02 +0530 Subject: [PATCH] refactor: more flows --- lib/core/services/share_receiver_service.dart | 18 ++- .../chat/providers/chat_providers.dart | 106 ++---------------- lib/shared/services/tasks/outbound_task.dart | 24 ++++ lib/shared/services/tasks/task_queue.dart | 59 ++++++++++ lib/shared/services/tasks/task_worker.dart | 76 +++++++++++++ 5 files changed, 182 insertions(+), 101 deletions(-) diff --git a/lib/core/services/share_receiver_service.dart b/lib/core/services/share_receiver_service.dart index d7d0375..6cf86ee 100644 --- a/lib/core/services/share_receiver_service.dart +++ b/lib/core/services/share_receiver_service.dart @@ -9,6 +9,8 @@ import '../../features/auth/providers/unified_auth_providers.dart'; import '../../features/chat/providers/chat_providers.dart'; import '../../features/chat/services/file_attachment_service.dart'; import '../../core/providers/app_providers.dart'; +import '../../shared/services/tasks/task_queue.dart'; +import 'package:path/path.dart' as path; import 'navigation_service.dart'; // Server chat creation/title generation occur on first send via chat providers @@ -152,13 +154,17 @@ Future _processPayload(Ref ref, SharedPayload payload) async { if (files.isNotEmpty) { ref.read(attachedFilesProvider.notifier).addFiles(files); + // Enqueue uploads via task queue to unify progress + retry + final activeConv = ref.read(activeConversationProvider); for (final file in files) { - final uploadStream = svc.uploadFile(file); - uploadStream.listen((state) { - ref - .read(attachedFilesProvider.notifier) - .updateFileState(file.path, state); - }, onError: (_) {}); + try { + await ref.read(taskQueueProvider.notifier).enqueueUploadMedia( + conversationId: activeConv?.id, + filePath: file.path, + fileName: path.basename(file.path), + fileSize: await file.length(), + ); + } catch (_) {} } } } diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index 21c64c9..5135c38 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -2026,51 +2026,13 @@ Future _triggerTitleGeneration( List> messages, String model, ) async { + // Enqueue background title generation task try { - final api = ref.read(apiServiceProvider); - if (api == null) return; - - // Call the title generation endpoint - final generatedTitle = await api.generateTitle( - conversationId: conversationId, - messages: messages, - model: model, - ); - - if (generatedTitle != null && - generatedTitle.isNotEmpty && - generatedTitle != 'New Chat') { - // Update the active conversation with the new title - final activeConversation = ref.read(activeConversationProvider); - if (activeConversation?.id == conversationId) { - final updated = activeConversation!.copyWith( - title: generatedTitle, - updatedAt: DateTime.now(), - ); - ref.read(activeConversationProvider.notifier).state = updated; - - // Save the updated title to the server - try { - final currentMessages = ref.read(chatMessagesProvider); - await api.updateConversationWithMessages( - conversationId, - currentMessages, - title: generatedTitle, - model: model, - ); - } catch (e) { - // Handle title save errors silently - } - - // Refresh the conversations list - ref.invalidate(conversationsProvider); - } - } else { - // Fall back to background checking - _checkForTitleInBackground(ref, conversationId); - } - } catch (e) { - // Fall back to background checking + await ref + .read(taskQueueProvider.notifier) + .enqueueGenerateTitle(conversationId: conversationId); + } catch (_) { + // Best effort background check remains _checkForTitleInBackground(ref, conversationId); } } @@ -2124,59 +2086,13 @@ Future _checkForTitleInBackground( // Save current conversation to OpenWebUI server Future _saveConversationToServer(dynamic ref) async { + // Enqueue save task; local fallback remains if queue fails try { - final api = ref.read(apiServiceProvider); - final messages = ref.read(chatMessagesProvider); final activeConversation = ref.read(activeConversationProvider); - final selectedModel = ref.read(selectedModelProvider); - - if (api == null || messages.isEmpty || activeConversation == null) { - return; - } - - // Check if the last assistant message is truly empty (no text and no files) - final lastMessage = messages.last; - if (lastMessage.role == 'assistant' && - lastMessage.content.trim().isEmpty && - (lastMessage.files == null || lastMessage.files!.isEmpty) && - (lastMessage.attachmentIds == null || - lastMessage.attachmentIds!.isEmpty)) { - return; - } - - // Update the existing conversation with all messages (including assistant response) - - try { - await api.updateConversationWithMessages( - activeConversation.id, - messages, - model: selectedModel?.id, - ); - - // Update local state - final updatedConversation = activeConversation.copyWith( - messages: messages, - updatedAt: DateTime.now(), - ); - - ref.read(activeConversationProvider.notifier).state = updatedConversation; - } catch (e) { - // Fallback to local storage if server update fails - await _saveConversationLocally(ref); - return; - } - - // Refresh conversations list to show the updated conversation - // Adding a small delay to prevent rapid invalidations that could cause duplicates - Future.delayed(const Duration(milliseconds: 100), () { - try { - if (ref.mounted == true) { - ref.invalidate(conversationsProvider); - } - } catch (_) {} - }); - } catch (e) { - // Fallback to local storage + await ref + .read(taskQueueProvider.notifier) + .enqueueSaveConversation(conversationId: activeConversation?.id); + } catch (_) { await _saveConversationLocally(ref); } } diff --git a/lib/shared/services/tasks/outbound_task.dart b/lib/shared/services/tasks/outbound_task.dart index d5d54af..7896b9c 100644 --- a/lib/shared/services/tasks/outbound_task.dart +++ b/lib/shared/services/tasks/outbound_task.dart @@ -74,6 +74,30 @@ abstract class OutboundTask with _$OutboundTask { String? error, }) = GenerateImageTask; + const factory OutboundTask.saveConversation({ + required String id, + String? conversationId, + @Default(TaskStatus.queued) TaskStatus status, + @Default(0) int attempt, + String? idempotencyKey, + DateTime? enqueuedAt, + DateTime? startedAt, + DateTime? completedAt, + String? error, + }) = SaveConversationTask; + + const factory OutboundTask.generateTitle({ + required String id, + required String conversationId, + @Default(TaskStatus.queued) TaskStatus status, + @Default(0) int attempt, + String? idempotencyKey, + DateTime? enqueuedAt, + DateTime? startedAt, + DateTime? completedAt, + String? error, + }) = GenerateTitleTask; + factory OutboundTask.fromJson(Map json) => _$OutboundTaskFromJson(json); diff --git a/lib/shared/services/tasks/task_queue.dart b/lib/shared/services/tasks/task_queue.dart index 2273e5d..0dc48c0 100644 --- a/lib/shared/services/tasks/task_queue.dart +++ b/lib/shared/services/tasks/task_queue.dart @@ -226,4 +226,63 @@ class TaskQueueNotifier extends StateNotifier> { await _save(); } } + + Future enqueueUploadMedia({ + required String? conversationId, + required String filePath, + required String fileName, + int? fileSize, + String? mimeType, + String? checksum, + }) async { + final id = _uuid.v4(); + final task = OutboundTask.uploadMedia( + id: id, + conversationId: conversationId, + filePath: filePath, + fileName: fileName, + fileSize: fileSize, + mimeType: mimeType, + checksum: checksum, + enqueuedAt: DateTime.now(), + ); + state = [...state, task]; + await _save(); + _process(); + return id; + } + + Future enqueueSaveConversation({ + required String? conversationId, + String? idempotencyKey, + }) async { + final id = _uuid.v4(); + final task = OutboundTask.saveConversation( + id: id, + conversationId: conversationId, + idempotencyKey: idempotencyKey, + enqueuedAt: DateTime.now(), + ); + state = [...state, task]; + await _save(); + _process(); + return id; + } + + Future enqueueGenerateTitle({ + required String conversationId, + String? idempotencyKey, + }) async { + final id = _uuid.v4(); + final task = OutboundTask.generateTitle( + id: id, + conversationId: conversationId, + idempotencyKey: idempotencyKey, + enqueuedAt: DateTime.now(), + ); + state = [...state, task]; + await _save(); + _process(); + return id; + } } diff --git a/lib/shared/services/tasks/task_worker.dart b/lib/shared/services/tasks/task_worker.dart index e1c6ac3..f97c190 100644 --- a/lib/shared/services/tasks/task_worker.dart +++ b/lib/shared/services/tasks/task_worker.dart @@ -20,6 +20,8 @@ class TaskWorker { uploadMedia: _performUploadMedia, executeToolCall: _performExecuteToolCall, generateImage: _performGenerateImage, + saveConversation: _performSaveConversation, + generateTitle: _performGenerateTitle, ); } @@ -278,4 +280,78 @@ class TaskWorker { _ref.read(chat.chatMessagesProvider.notifier).finishStreaming(); } } + + Future _performSaveConversation(SaveConversationTask task) async { + final api = _ref.read(apiServiceProvider); + final messages = _ref.read(chat.chatMessagesProvider); + final activeConv = _ref.read(activeConversationProvider); + final selectedModel = _ref.read(selectedModelProvider); + if (api == null || messages.isEmpty || activeConv == null) return; + + // Skip if last assistant is empty placeholder + final last = messages.last; + if (last.role == 'assistant' && + last.content.trim().isEmpty && + (last.files?.isEmpty ?? true) && + (last.attachmentIds?.isEmpty ?? true)) { + return; + } + + try { + await api.updateConversationWithMessages( + activeConv.id, + messages, + model: selectedModel?.id, + ); + final updated = activeConv.copyWith( + messages: messages, + updatedAt: DateTime.now(), + ); + _ref.read(activeConversationProvider.notifier).state = updated; + _ref.invalidate(conversationsProvider); + } catch (_) {} + } + + Future _performGenerateTitle(GenerateTitleTask task) async { + final api = _ref.read(apiServiceProvider); + final activeConv = _ref.read(activeConversationProvider); + final selectedModel = _ref.read(selectedModelProvider); + if (api == null || selectedModel == null) return; + try { + final messages = _ref.read(chat.chatMessagesProvider); + final formatted = >[]; + for (final msg in messages) { + formatted.add({ + 'id': msg.id, + 'role': msg.role, + 'content': msg.content, + 'timestamp': msg.timestamp.millisecondsSinceEpoch ~/ 1000, + }); + } + final title = await api.generateTitle( + conversationId: task.conversationId, + messages: formatted, + model: selectedModel.id, + ); + if (title != null && title.isNotEmpty && title != 'New Chat') { + if (activeConv != null && activeConv.id == task.conversationId) { + final updated = activeConv.copyWith( + title: title.length > 100 ? '${title.substring(0, 100)}...' : title, + updatedAt: DateTime.now(), + ); + _ref.read(activeConversationProvider.notifier).state = updated; + try { + final cur = _ref.read(chat.chatMessagesProvider); + await api.updateConversationWithMessages( + updated.id, + cur, + title: updated.title, + model: selectedModel.id, + ); + } catch (_) {} + _ref.invalidate(conversationsProvider); + } + } + } catch (_) {} + } }