From 679eac4dd6c152d1d584fc52a1ed8b785c1aedea Mon Sep 17 00:00:00 2001 From: cogwheel0 <172976095+cogwheel0@users.noreply.github.com> Date: Sun, 7 Sep 2025 21:41:13 +0530 Subject: [PATCH] refactor: regen and socket flows --- lib/core/services/streaming_helper.dart | 735 ++++++++ .../chat/providers/chat_providers.dart | 1555 ++++++++++++----- 2 files changed, 1886 insertions(+), 404 deletions(-) create mode 100644 lib/core/services/streaming_helper.dart diff --git a/lib/core/services/streaming_helper.dart b/lib/core/services/streaming_helper.dart new file mode 100644 index 0000000..3330906 --- /dev/null +++ b/lib/core/services/streaming_helper.dart @@ -0,0 +1,735 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:flutter/foundation.dart'; + +import '../../core/models/chat_message.dart'; +import '../../core/services/persistent_streaming_service.dart'; +import '../../core/services/socket_service.dart'; +import '../../core/utils/stream_chunker.dart'; +import '../../core/utils/tool_calls_parser.dart'; + +// Keep local verbosity toggle for socket logs +const bool kSocketVerboseLogging = false; + +/// Unified streaming helper for chat send/regenerate flows. +/// +/// This attaches chunked SSE streaming handlers, optional WebSocket event handlers, +/// and manages background search/image-gen UI updates. It operates via callbacks to +/// avoid tight coupling with provider files for easier reuse and testing. +StreamSubscription attachUnifiedChunkedStreaming({ + required Stream stream, + required bool webSearchEnabled, + required bool isBackgroundFlow, + required bool suppressSocketContentInitially, + required bool usingDynamicChannelInitially, + required String assistantMessageId, + required String modelId, + required Map modelItem, + required String sessionId, + required String? activeConversationId, + required dynamic api, + required SocketService? socketService, + // Message update callbacks + required void Function(String) appendToLastMessage, + required void Function(String) replaceLastMessageContent, + required void Function(ChatMessage Function(ChatMessage)) updateLastMessageWith, + required void Function() finishStreaming, + required List Function() getMessages, +}) { + // Chunk the incoming stream for smoother UI updates + final chunkedStream = StreamChunker.chunkStream( + stream, + enableChunking: true, + minChunkSize: 5, + maxChunkLength: 3, + delayBetweenChunks: const Duration(milliseconds: 15), + ); + + // Persistable controller to survive brief app suspensions + final persistentController = StreamController.broadcast(); + final persistentService = PersistentStreamingService(); + + final streamId = persistentService.registerStream( + subscription: chunkedStream.listen( + persistentController.add, + onDone: persistentController.close, + onError: persistentController.addError, + ), + controller: persistentController, + recoveryCallback: () async { + debugPrint('DEBUG: Attempting to recover interrupted stream'); + }, + metadata: { + 'conversationId': activeConversationId, + 'messageId': assistantMessageId, + 'modelId': modelId, + }, + ); + + bool isSearching = false; + bool suppressSocketContent = suppressSocketContentInitially; + bool usingDynamicChannel = usingDynamicChannelInitially; + + void _updateImagesFromCurrentContent() { + try { + final msgs = getMessages(); + if (msgs.isEmpty || msgs.last.role != 'assistant') return; + final content = msgs.last.content; + if (content.isEmpty) return; + + final collected = >[]; + + if (content.contains('\"]+\.(jpg|jpeg|png|gif|webp)', + caseSensitive: false, + ); + final urlMatches = urlPattern.allMatches(content); + for (final match in urlMatches) { + final url = match.group(0); + if (url != null && url.isNotEmpty) { + collected.add({'type': 'image', 'url': url}); + } + } + + final jsonPattern = RegExp( + r'\{[^}]*"url"[^}]*:[^}]*"(data:image/[^"]+|https?://[^"]+\.(jpg|jpeg|png|gif|webp))"[^}]*\}', + caseSensitive: false, + ); + final jsonMatches = jsonPattern.allMatches(content); + for (final match in jsonMatches) { + final url = RegExp( + r'"url"[^:]*:[^"]*"([^"]+)"', + ).firstMatch(match.group(0) ?? '')?.group(1); + if (url != null && url.isNotEmpty) { + collected.add({'type': 'image', 'url': url}); + } + } + + final partialResultsPattern = RegExp( + r'(result|files)="([^"]*(?:data:image/[^"]*|https?://[^"]*\.(jpg|jpeg|png|gif|webp))[^"]*)"', + caseSensitive: false, + ); + final partialMatches = partialResultsPattern.allMatches(content); + for (final match in partialMatches) { + final attrValue = match.group(2); + if (attrValue != null) { + try { + final decoded = json.decode(attrValue); + collected.addAll(_extractFilesFromResult(decoded)); + } catch (_) { + if (attrValue.startsWith('data:image/') || + RegExp( + r'https?://[^\s]+\.(jpg|jpeg|png|gif|webp)$', + caseSensitive: false, + ).hasMatch(attrValue)) { + collected.add({'type': 'image', 'url': attrValue}); + } + } + } + } + } + + if (collected.isEmpty) return; + + final existing = msgs.last.files ?? >[]; + final seen = { + for (final f in existing) + if (f['url'] is String) (f['url'] as String) else '', + }..removeWhere((e) => e.isEmpty); + + final merged = >[...existing]; + for (final f in collected) { + final url = f['url'] as String?; + if (url != null && url.isNotEmpty && !seen.contains(url)) { + merged.add({'type': 'image', 'url': url}); + seen.add(url); + } + } + + if (merged.length != existing.length) { + updateLastMessageWith((m) => m.copyWith(files: merged)); + } + } catch (_) {} + } + + void channelLineHandlerFactory(String channel) { + void handler(dynamic line) { + try { + if (line is String) { + final s = line.trim(); + if (s == '[DONE]' || s == 'DONE') { + try { + socketService?.offEvent(channel); + } catch (_) {} + try { + // Fire and forget + // ignore: unawaited_futures + api?.sendChatCompleted( + chatId: activeConversationId ?? '', + messageId: assistantMessageId, + messages: const [], + model: modelId, + modelItem: modelItem, + sessionId: sessionId, + ); + } catch (_) {} + finishStreaming(); + return; + } + if (s.startsWith('data:')) { + final dataStr = s.substring(5).trim(); + if (dataStr == '[DONE]') { + try { + socketService?.offEvent(channel); + } catch (_) {} + try { + // ignore: unawaited_futures + api?.sendChatCompleted( + chatId: activeConversationId ?? '', + messageId: assistantMessageId, + messages: const [], + model: modelId, + modelItem: modelItem, + sessionId: sessionId, + ); + } catch (_) {} + finishStreaming(); + return; + } + try { + final Map j = jsonDecode(dataStr); + final choices = j['choices']; + if (choices is List && choices.isNotEmpty) { + final choice = choices.first; + final delta = choice is Map ? choice['delta'] : null; + if (delta is Map) { + if (delta.containsKey('tool_calls')) { + final tc = delta['tool_calls']; + if (tc is List) { + for (final call in tc) { + if (call is Map) { + final fn = call['function']; + final name = + (fn is Map && fn['name'] is String) + ? fn['name'] as String + : null; + if (name is String && name.isNotEmpty) { + final msgs = getMessages(); + final exists = (msgs.isNotEmpty) && + RegExp( + r']*\bname=\"' + + RegExp.escape(name) + + r'\"', + multiLine: true, + ).hasMatch(msgs.last.content); + if (!exists) { + final status = + '\n
Executing...\n
\n'; + appendToLastMessage(status); + } + } + } + } + } + } + final content = delta['content']?.toString() ?? ''; + if (content.isNotEmpty) { + appendToLastMessage(content); + _updateImagesFromCurrentContent(); + } + } + } + } catch (_) { + if (s.isNotEmpty) { + appendToLastMessage(s); + _updateImagesFromCurrentContent(); + } + } + } else { + if (s.isNotEmpty) { + appendToLastMessage(s); + _updateImagesFromCurrentContent(); + } + } + } else if (line is Map) { + if (line['done'] == true) { + try { + socketService?.offEvent(channel); + } catch (_) {} + finishStreaming(); + return; + } + } + } catch (_) {} + } + + try { + socketService?.onEvent(channel, handler); + } catch (_) {} + Future.delayed(const Duration(minutes: 3), () { + try { + socketService?.offEvent(channel); + } catch (_) {} + }); + } + + void chatHandler(Map ev) { + try { + final data = ev['data']; + if (data == null) return; + final type = data['type']; + final payload = data['data']; + + if (type == 'chat:completion' && payload != null) { + if (payload is Map) { + if (payload.containsKey('tool_calls')) { + final tc = payload['tool_calls']; + if (tc is List) { + for (final call in tc) { + if (call is Map) { + final fn = call['function']; + final name = (fn is Map && fn['name'] is String) + ? fn['name'] as String + : null; + if (name is String && name.isNotEmpty) { + final msgs = getMessages(); + final exists = (msgs.isNotEmpty) && + RegExp( + r']*\bname=\"' + + RegExp.escape(name) + r'\"', + multiLine: true, + ).hasMatch(msgs.last.content); + if (!exists) { + final status = + '\n
Executing...\n
\n'; + appendToLastMessage(status); + } + } + } + } + } + } + if (!suppressSocketContent && payload.containsKey('choices')) { + final choices = payload['choices']; + if (choices is List && choices.isNotEmpty) { + final choice = choices.first; + final delta = choice is Map ? choice['delta'] : null; + if (delta is Map) { + if (delta.containsKey('tool_calls')) { + final tc = delta['tool_calls']; + if (tc is List) { + for (final call in tc) { + if (call is Map) { + final fn = call['function']; + final name = (fn is Map && fn['name'] is String) + ? fn['name'] as String + : null; + if (name is String && name.isNotEmpty) { + final msgs = getMessages(); + final exists = (msgs.isNotEmpty) && + RegExp( + r']*\bname=\"' + + RegExp.escape(name) + r'\"', + multiLine: true, + ).hasMatch(msgs.last.content); + if (!exists) { + final status = + '\n
Executing...\n
\n'; + appendToLastMessage(status); + } + } + } + } + } + } + final content = delta['content']?.toString() ?? ''; + if (content.isNotEmpty) { + appendToLastMessage(content); + _updateImagesFromCurrentContent(); + } + } + } + } + if (payload['done'] == true) { + try { + socketService?.offChatEvents(); + } catch (_) {} + try { + // ignore: unawaited_futures + api?.sendChatCompleted( + chatId: activeConversationId ?? '', + messageId: assistantMessageId, + messages: const [], + model: modelId, + modelItem: modelItem, + sessionId: sessionId, + ); + } catch (_) {} + + final msgs = getMessages(); + if (msgs.isNotEmpty && msgs.last.role == 'assistant') { + final lastContent = msgs.last.content.trim(); + if (lastContent.isEmpty) { + Future.microtask(() async { + try { + final chatId = activeConversationId; + if (chatId != null && chatId.isNotEmpty) { + final resp = await api?.dio.get('/api/v1/chats/$chatId'); + final data = resp?.data as Map?; + String content = ''; + final chatObj = data?['chat'] as Map?; + if (chatObj != null) { + final list = chatObj['messages']; + if (list is List) { + final target = list.firstWhere( + (m) => (m is Map && (m['id']?.toString() == assistantMessageId)), + orElse: () => null, + ); + if (target != null) { + final rawContent = (target as Map)['content']; + if (rawContent is String) { + content = rawContent; + } else if (rawContent is List) { + final textItem = rawContent.firstWhere( + (i) => i is Map && i['type'] == 'text', + orElse: () => null, + ); + if (textItem != null) { + content = textItem['text']?.toString() ?? ''; + } + } + } + } + if (content.isEmpty) { + final history = chatObj['history']; + if (history is Map && history['messages'] is Map) { + final Map messagesMap = + (history['messages'] as Map).cast(); + final msg = messagesMap[assistantMessageId]; + if (msg is Map) { + final rawContent = msg['content']; + if (rawContent is String) { + content = rawContent; + } else if (rawContent is List) { + final textItem = rawContent.firstWhere( + (i) => i is Map && i['type'] == 'text', + orElse: () => null, + ); + if (textItem != null) { + content = textItem['text']?.toString() ?? ''; + } + } + } + } + } + } + if (content.isNotEmpty) { + replaceLastMessageContent(content); + } + } + } catch (_) {} finally { + finishStreaming(); + } + }); + return; + } + } + finishStreaming(); + } + } + } else if (type == 'request:chat:completion' && payload != null) { + final channel = payload['channel']; + if (channel is String && channel.isNotEmpty) { + suppressSocketContent = true; + channelLineHandlerFactory(channel); + } + } else if (type == 'execute:tool' && payload != null) { + // Show an executing tile immediately; also surface any inline files/result + try { + final name = payload['name']?.toString() ?? 'tool'; + final status = + '\n
Executing...\n
\n'; + appendToLastMessage(status); + try { + final filesA = _extractFilesFromResult(payload['files']); + final filesB = _extractFilesFromResult(payload['result']); + final all = [...filesA, ...filesB]; + if (all.isNotEmpty) { + final msgs = getMessages(); + if (msgs.isNotEmpty && msgs.last.role == 'assistant') { + final existing = msgs.last.files ?? >[]; + final seen = { + for (final f in existing) + if (f['url'] is String) (f['url'] as String) else '', + }..removeWhere((e) => e.isEmpty); + final merged = >[...existing]; + for (final f in all) { + final url = f['url'] as String?; + if (url != null && url.isNotEmpty && !seen.contains(url)) { + merged.add({'type': 'image', 'url': url}); + seen.add(url); + } + } + if (merged.length != existing.length) { + updateLastMessageWith((m) => m.copyWith(files: merged)); + } + } + } + } catch (_) {} + } catch (_) {} + } else if (type == 'files' && payload != null) { + // Handle raw files event (image generation results) + try { + final files = _extractFilesFromResult(payload); + if (files.isNotEmpty) { + final msgs = getMessages(); + if (msgs.isNotEmpty && msgs.last.role == 'assistant') { + final existing = msgs.last.files ?? >[]; + final seen = { + for (final f in existing) + if (f['url'] is String) (f['url'] as String) else '', + }..removeWhere((e) => e.isEmpty); + final merged = >[...existing]; + for (final f in files) { + final url = f['url'] as String?; + if (url != null && url.isNotEmpty && !seen.contains(url)) { + merged.add({'type': 'image', 'url': url}); + seen.add(url); + } + } + if (merged.length != existing.length) { + updateLastMessageWith((m) => m.copyWith(files: merged)); + } + } + } + } catch (_) {} + } else if (type == 'event:status' && payload != null) { + final status = payload['status']?.toString() ?? ''; + if (status.isNotEmpty) { + updateLastMessageWith((m) => m.copyWith(metadata: { + ...?m.metadata, + 'status': status, + })); + } + } else if (type == 'event:tool' && payload != null) { + // Accept files from both 'result' and 'files' + final files = [ + ..._extractFilesFromResult(payload['files']), + ..._extractFilesFromResult(payload['result']), + ]; + if (files.isNotEmpty) { + final msgs = getMessages(); + if (msgs.isNotEmpty && msgs.last.role == 'assistant') { + final existing = msgs.last.files ?? >[]; + final merged = [...existing, ...files]; + updateLastMessageWith((m) => m.copyWith(files: merged)); + } + } + } else if (type == 'event:message:delta' && payload != null) { + if (suppressSocketContent) return; + final content = payload['content']?.toString() ?? ''; + if (content.isNotEmpty) { + appendToLastMessage(content); + _updateImagesFromCurrentContent(); + } + } + } catch (_) {} + } + + void channelEventsHandler(Map ev) { + try { + final data = ev['data']; + if (data == null) return; + final type = data['type']; + final payload = data['data']; + if (type == 'message' && payload is Map) { + final content = payload['content']?.toString() ?? ''; + if (content.isNotEmpty) { + appendToLastMessage(content); + _updateImagesFromCurrentContent(); + } + } + } catch (_) {} + } + + if (socketService != null) { + socketService.onChatEvents(chatHandler); + socketService.onChannelEvents(channelEventsHandler); + Future.delayed(const Duration(seconds: 90), () { + try { + socketService.offChatEvents(); + socketService.offChannelEvents(); + } catch (_) {} + try { + final msgs = getMessages(); + if (msgs.isNotEmpty && msgs.last.role == 'assistant' && msgs.last.isStreaming) { + finishStreaming(); + } + } catch (_) {} + }); + } + + final subscription = persistentController.stream.listen( + (chunk) { + var effectiveChunk = chunk; + if (webSearchEnabled && !isSearching) { + if (chunk.contains('[SEARCHING]') || + chunk.contains('Searching the web') || + chunk.contains('web search')) { + isSearching = true; + updateLastMessageWith( + (message) => message.copyWith( + content: '🔍 Searching the web...', + metadata: {'webSearchActive': true}, + ), + ); + return; // Don't append this chunk + } + } + + if (isSearching && (chunk.contains('[/SEARCHING]') || chunk.contains('Search complete'))) { + isSearching = false; + updateLastMessageWith( + (message) => message.copyWith(metadata: {'webSearchActive': false}), + ); + effectiveChunk = effectiveChunk.replaceAll('[SEARCHING]', '').replaceAll('[/SEARCHING]', ''); + } + + if (effectiveChunk.trim().isNotEmpty) { + appendToLastMessage(effectiveChunk); + _updateImagesFromCurrentContent(); + } + }, + onDone: () async { + // Unregister from persistent service + persistentService.unregisterStream(streamId); + + // Stop socket events now that streaming finished only for SSE-driven streams + if (socketService != null && suppressSocketContent == true) { + try { + socketService.offChatEvents(); + } catch (_) {} + } + // Allow socket content again for future sessions + suppressSocketContent = false; + + // If SSE-driven (no dynamic channel/background flow), finish now + if (!usingDynamicChannel && !isBackgroundFlow) { + finishStreaming(); + } + }, + onError: (error) async { + try { + persistentService.unregisterStream(streamId); + } catch (_) {} + finishStreaming(); + if (socketService != null && suppressSocketContent == true) { + try { + socketService.offChatEvents(); + } catch (_) {} + } + }, + ); + + return subscription; +} + +List> _extractFilesFromResult(dynamic resp) { + final results = >[]; + if (resp == null) return results; + dynamic r = resp; + if (r is String) { + try { + r = jsonDecode(r); + } catch (_) {} + } + if (r is List) { + for (final item in r) { + if (item is String && item.isNotEmpty) { + results.add({'type': 'image', 'url': item}); + } else if (item is Map) { + final url = item['url']; + final b64 = item['b64_json'] ?? item['b64']; + if (url is String && url.isNotEmpty) { + results.add({'type': 'image', 'url': url}); + } else if (b64 is String && b64.isNotEmpty) { + results.add({'type': 'image', 'url': 'data:image/png;base64,$b64'}); + } + } + } + return results; + } + if (r is! Map) return results; + final data = r['data']; + if (data is List) { + for (final item in data) { + if (item is Map) { + final url = item['url']; + final b64 = item['b64_json'] ?? item['b64']; + if (url is String && url.isNotEmpty) { + results.add({'type': 'image', 'url': url}); + } else if (b64 is String && b64.isNotEmpty) { + results.add({'type': 'image', 'url': 'data:image/png;base64,$b64'}); + } + } else if (item is String && item.isNotEmpty) { + results.add({'type': 'image', 'url': item}); + } + } + } + final images = r['images']; + if (images is List) { + for (final item in images) { + if (item is String && item.isNotEmpty) { + results.add({'type': 'image', 'url': item}); + } else if (item is Map) { + final url = item['url']; + final b64 = item['b64_json'] ?? item['b64']; + if (url is String && url.isNotEmpty) { + results.add({'type': 'image', 'url': url}); + } else if (b64 is String && b64.isNotEmpty) { + results.add({'type': 'image', 'url': 'data:image/png;base64,$b64'}); + } + } + } + } + final files = r['files']; + if (files is List) { + results.addAll(_extractFilesFromResult(files)); + } + final singleUrl = r['url']; + if (singleUrl is String && singleUrl.isNotEmpty) { + results.add({'type': 'image', 'url': singleUrl}); + } + final singleB64 = r['b64_json'] ?? r['b64']; + if (singleB64 is String && singleB64.isNotEmpty) { + results.add({'type': 'image', 'url': 'data:image/png;base64,$singleB64'}); + } + return results; +} diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index 53854a3..e60d6ad 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -1,4 +1,3 @@ -import 'dart:async'; import 'dart:convert'; import 'package:yaml/yaml.dart' as yaml; @@ -6,6 +5,7 @@ import 'package:flutter/foundation.dart'; import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'package:uuid/uuid.dart'; import '../../../core/utils/tool_calls_parser.dart'; +import '../../../core/services/streaming_helper.dart'; import '../../../core/models/chat_message.dart'; import '../../../core/models/conversation.dart'; import '../../../core/providers/app_providers.dart'; @@ -15,6 +15,8 @@ import '../../../core/services/persistent_streaming_service.dart'; import '../../../core/utils/debug_logger.dart'; import '../services/reviewer_mode_service.dart'; import '../../../shared/services/tasks/task_queue.dart'; +import '../../tools/providers/tools_providers.dart'; +import 'dart:async'; const bool kSocketVerboseLogging = false; @@ -70,9 +72,13 @@ class ChatMessagesNotifier extends StateNotifier> { final localText = localLast.content.trim(); final sameLastId = serverLast.id == localLast.id; final isAssistant = serverLast.role == 'assistant'; - final serverHasMore = serverText.isNotEmpty && serverText.length > localText.length; - final localEmptyButServerHas = localText.isEmpty && serverText.isNotEmpty; - if (sameLastId && isAssistant && (serverHasMore || localEmptyButServerHas)) { + final serverHasMore = + serverText.isNotEmpty && serverText.length > localText.length; + final localEmptyButServerHas = + localText.isEmpty && serverText.isNotEmpty; + if (sameLastId && + isAssistant && + (serverHasMore || localEmptyButServerHas)) { state = serverMessages; return; } @@ -173,7 +179,8 @@ class ChatMessagesNotifier extends StateNotifier> { orElse: () => null, ); if (textItem != null) { - content = (textItem as Map)['text']?.toString() ?? ''; + content = + (textItem as Map)['text']?.toString() ?? ''; } } } @@ -206,7 +213,8 @@ class ChatMessagesNotifier extends StateNotifier> { final meta = last.metadata ?? const {}; final isBgFlow = (meta['backgroundFlow'] == true); final isWebSearchFlow = - (meta['webSearchFlow'] == true) || (meta['webSearchActive'] == true); + (meta['webSearchFlow'] == true) || + (meta['webSearchActive'] == true); final isImageGenFlow = (meta['imageGenerationFlow'] == true); // Also consult global toggles if metadata not present @@ -215,7 +223,9 @@ class ChatMessagesNotifier extends StateNotifier> { final globalImageGen = _ref.read(imageGenerationEnabledProvider); if (isWebSearchFlow || (globalWebSearch && webSearchAvailable)) { - timeout = Duration(milliseconds: timeout.inMilliseconds.clamp(0, 45000)); + timeout = Duration( + milliseconds: timeout.inMilliseconds.clamp(0, 45000), + ); // If current < 45s, bump to 45s if (timeout.inSeconds < 45) timeout = const Duration(seconds: 45); } @@ -472,8 +482,8 @@ Future _preseedAssistantAndPersist( // Choose id: reuse existing if provided, else create new final String assistantMessageId = (existingAssistantId != null && existingAssistantId.isNotEmpty) - ? existingAssistantId - : const Uuid().v4(); + ? existingAssistantId + : const Uuid().v4(); // If the message with this id doesn't exist locally, add a placeholder final msgs = ref.read(chatMessagesProvider); @@ -492,8 +502,13 @@ Future _preseedAssistantAndPersist( // If it exists and is the last assistant, ensure we mark it streaming try { final last = msgs.isNotEmpty ? msgs.last : null; - if (last != null && last.id == assistantMessageId && last.role == 'assistant' && !last.isStreaming) { - ref.read(chatMessagesProvider.notifier).updateLastMessageWithFunction( + if (last != null && + last.id == assistantMessageId && + last.role == 'assistant' && + !last.isStreaming) { + ref + .read(chatMessagesProvider.notifier) + .updateLastMessageWithFunction( (m) => m.copyWith(isStreaming: true), ); } @@ -609,9 +624,75 @@ Future _getFileAsBase64(dynamic api, String fileId) async { } } +// Small internal helper to convert a message with attachments into the +// OpenWebUI content payload format (text + image_url + files). +// - Adds text first (if non-empty) +// - Converts image attachments to image_url with data URLs (resolving MIME type when needed) +// - Includes non-image attachments in a 'files' array for server-side resolution +Future> _buildMessagePayloadWithAttachments({ + required dynamic api, + required String role, + required String cleanedText, + required List attachmentIds, +}) async { + final List> contentArray = []; + final List> nonImageFiles = []; + + if (cleanedText.isNotEmpty) { + contentArray.add({'type': 'text', 'text': cleanedText}); + } + + for (final attachmentId in attachmentIds) { + try { + final base64Data = await _getFileAsBase64(api, attachmentId); + if (base64Data != null) { + if (base64Data.startsWith('data:')) { + contentArray.add({ + 'type': 'image_url', + 'image_url': {'url': base64Data}, + }); + } else { + if (!attachmentId.startsWith('data:')) { + final fileInfo = await api.getFileInfo(attachmentId); + final fileName = fileInfo['filename'] ?? ''; + final ext = fileName.toLowerCase().split('.').last; + + String mimeType = 'image/png'; + if (ext == 'jpg' || ext == 'jpeg') { + mimeType = 'image/jpeg'; + } else if (ext == 'gif') { + mimeType = 'image/gif'; + } else if (ext == 'webp') { + mimeType = 'image/webp'; + } + + contentArray.add({ + 'type': 'image_url', + 'image_url': {'url': 'data:$mimeType;base64,$base64Data'}, + }); + } + } + } else { + nonImageFiles.add({'id': attachmentId, 'type': 'file'}); + } + } catch (_) { + // Swallow and continue to keep regeneration robust + } + } + + final messageMap = { + 'role': role, + 'content': contentArray.isNotEmpty ? contentArray : cleanedText, + }; + if (nonImageFiles.isNotEmpty) { + messageMap['files'] = nonImageFiles; + } + return messageMap; +} + // Regenerate message function that doesn't duplicate user message Future regenerateMessage( - WidgetRef ref, + dynamic ref, String userMessageContent, List? attachments, ) async { @@ -665,31 +746,34 @@ Future regenerateMessage( // For real API, proceed with regeneration using existing conversation messages try { + // Include selected tool ids so provider-native tool calling is triggered + final selectedToolIds = ref.read(selectedToolIdsProvider); // Get conversation history for context (excluding the removed assistant message) final List messages = ref.read(chatMessagesProvider); final List> conversationMessages = >[]; - for (final msg in messages) { + for (int i = 0; i < messages.length; i++) { + final msg = messages[i]; if (msg.role.isNotEmpty && msg.content.isNotEmpty && !msg.isStreaming) { - // Clean up tool/details markup to match web client behavior final cleaned = ToolCallsParser.sanitizeForApi(msg.content); - // Handle messages with attachments - if (msg.attachmentIds != null && msg.attachmentIds!.isNotEmpty) { - final List> contentArray = []; + // Prefer provided attachments for the last user message; otherwise use message attachments + final bool isLastUser = (i == messages.length - 1) && msg.role == 'user'; + final List messageAttachments = + (isLastUser && (attachments != null && attachments.isNotEmpty)) + ? List.from(attachments!) + : (msg.attachmentIds ?? const []); - // Add text content first - if (cleaned.isNotEmpty) { - contentArray.add({'type': 'text', 'text': cleaned}); - } - - conversationMessages.add({ - 'role': msg.role, - 'content': contentArray.isNotEmpty ? contentArray : cleaned, - }); + if (messageAttachments.isNotEmpty) { + final messageMap = await _buildMessagePayloadWithAttachments( + api: api, + role: msg.role, + cleanedText: cleaned, + attachmentIds: messageAttachments, + ); + conversationMessages.add(messageMap); } else { - // Regular text message conversationMessages.add({'role': msg.role, 'content': cleaned}); } } @@ -701,31 +785,213 @@ Future regenerateMessage( modelId: selectedModel.id, ); - // Stream response via background task (socket/dynamic channel or polling) + // Feature toggles + final webSearchEnabled = + ref.read(webSearchEnabledProvider) && + ref.read(webSearchAvailableProvider); + final imageGenerationEnabled = ref.read(imageGenerationEnabledProvider); + + // Model metadata for completion notifications + final supportedParams = + selectedModel.supportedParameters ?? + [ + 'max_tokens', + 'tool_choice', + 'tools', + 'response_format', + 'structured_outputs', + ]; + final modelItem = { + 'id': selectedModel.id, + 'canonical_slug': selectedModel.id, + 'hugging_face_id': '', + 'name': selectedModel.name, + 'created': 1754089419, + 'description': + selectedModel.description ?? + 'This is a cloaked model provided to the community to gather feedback. This is an improved version of [Horizon Alpha](/openrouter/horizon-alpha)\n\nNote: It\'s free to use during this testing period, and prompts and completions are logged by the model creator for feedback and training.', + 'context_length': 256000, + 'architecture': { + 'modality': 'text+image->text', + 'input_modalities': ['image', 'text'], + 'output_modalities': ['text'], + 'tokenizer': 'Other', + 'instruct_type': null, + }, + 'pricing': { + 'prompt': '0', + 'completion': '0', + 'request': '0', + 'image': '0', + 'audio': '0', + 'web_search': '0', + 'internal_reasoning': '0', + }, + 'top_provider': { + 'context_length': 256000, + 'max_completion_tokens': 128000, + 'is_moderated': false, + }, + 'per_request_limits': null, + 'supported_parameters': supportedParams, + 'connection_type': 'external', + 'owned_by': 'openai', + 'openai': { + 'id': selectedModel.id, + 'canonical_slug': selectedModel.id, + 'hugging_face_id': '', + 'name': selectedModel.name, + 'created': 1754089419, + 'description': + selectedModel.description ?? + 'This is a cloaked model provided to the community to gather feedback. This is an improved version of [Horizon Alpha](/openrout' + 'er/horizon-alpha)\n\nNote: It\'s free to use during this testing period, and prompts and completions are logged by the model creator for feedback and training.', + 'context_length': 256000, + 'architecture': { + 'modality': 'text+image->text', + 'input_modalities': ['image', 'text'], + 'output_modalities': ['text'], + 'tokenizer': 'Other', + 'instruct_type': null, + }, + 'pricing': { + 'prompt': '0', + 'completion': '0', + 'request': '0', + 'image': '0', + 'audio': '0', + 'web_search': '0', + 'internal_reasoning': '0', + }, + 'top_provider': { + 'context_length': 256000, + 'max_completion_tokens': 128000, + 'is_moderated': false, + }, + 'per_request_limits': null, + 'supported_parameters': [ + 'max_tokens', + 'tool_choice', + 'tools', + 'response_format', + 'structured_outputs', + ], + 'connection_type': 'external', + }, + 'urlIdx': 0, + 'actions': [], + 'filters': [], + 'tags': [], + }; + + // Socket binding for background flows + final socketService = ref.read(socketServiceProvider); + final socketSessionId = socketService?.sessionId; + final bool wantSessionBinding = + (socketService?.isConnected == true) && + (socketSessionId != null && socketSessionId.isNotEmpty); + + // Resolve tool servers from user settings (if any) + List>? toolServers; + try { + final userSettings = await api!.getUserSettings(); + final ui = userSettings['ui'] as Map?; + final rawServers = ui != null ? (ui['toolServers'] as List?) : null; + if (rawServers != null && rawServers.isNotEmpty) { + toolServers = await _resolveToolServers(rawServers, api); + } + } catch (_) {} + + // Background tasks parity with Web client (safe defaults) + bool shouldGenerateTitle = false; + try { + final conv = ref.read(activeConversationProvider); + final nonSystemCount = conversationMessages + .where((m) => (m['role']?.toString() ?? '') != 'system') + .length; + shouldGenerateTitle = + (conv == null) || + ((conv.title == 'New Chat' || (conv.title.isEmpty)) && + nonSystemCount == 1); + } catch (_) {} + + final bgTasks = { + if (shouldGenerateTitle) 'title_generation': true, + if (shouldGenerateTitle) 'tags_generation': true, + 'follow_up_generation': true, + if (webSearchEnabled) 'web_search': true, + if (imageGenerationEnabled) 'image_generation': true, + }; + + final bool isBackgroundToolsFlowPre = + (selectedToolIds.isNotEmpty) || + (toolServers != null && toolServers.isNotEmpty); + final bool isBackgroundWebSearchPre = webSearchEnabled; + + // Dispatch using unified send pipeline (background tools flow) final response = api!.sendMessage( messages: conversationMessages, model: selectedModel.id, conversationId: activeConversation.id, + toolIds: selectedToolIds.isNotEmpty ? selectedToolIds : null, + enableWebSearch: webSearchEnabled, + enableImageGeneration: imageGenerationEnabled, + modelItem: modelItem, + sessionIdOverride: wantSessionBinding ? socketSessionId : null, + toolServers: toolServers, + backgroundTasks: bgTasks, responseMessageId: assistantMessageId, ); final stream = response.stream; + final sessionId = response.sessionId; - // Handle streaming response (basic chunking for this path) - final chunkedStream = StreamChunker.chunkStream( - stream, - enableChunking: true, - minChunkSize: 5, - maxChunkLength: 3, - delayBetweenChunks: const Duration(milliseconds: 15), + // New unified streaming path via helper; bypass old inline socket block + final bool _isBackgroundFlow = + isBackgroundToolsFlowPre || + isBackgroundWebSearchPre || + imageGenerationEnabled || + wantSessionBinding; + try { + ref.read(chatMessagesProvider.notifier).updateLastMessageWithFunction(( + m, + ) { + final mergedMeta = { + if (m.metadata != null) ...m.metadata!, + 'backgroundFlow': _isBackgroundFlow, + if (isBackgroundWebSearchPre) 'webSearchFlow': true, + if (imageGenerationEnabled) 'imageGenerationFlow': true, + }; + return m.copyWith(metadata: mergedMeta); + }); + } catch (_) {} + + final _sendStreamSub = attachUnifiedChunkedStreaming( + stream: stream, + webSearchEnabled: webSearchEnabled, + isBackgroundFlow: _isBackgroundFlow, + suppressSocketContentInitially: !_isBackgroundFlow, + usingDynamicChannelInitially: false, + assistantMessageId: assistantMessageId, + modelId: selectedModel.id, + modelItem: modelItem, + sessionId: sessionId, + activeConversationId: activeConversation.id, + api: api, + socketService: socketService, + appendToLastMessage: (c) => + ref.read(chatMessagesProvider.notifier).appendToLastMessage(c), + replaceLastMessageContent: (c) => + ref.read(chatMessagesProvider.notifier).replaceLastMessageContent(c), + updateLastMessageWith: (updater) => ref + .read(chatMessagesProvider.notifier) + .updateLastMessageWithFunction(updater), + finishStreaming: () => + ref.read(chatMessagesProvider.notifier).finishStreaming(), + getMessages: () => ref.read(chatMessagesProvider), ); - - await for (final chunk in chunkedStream) { - ref.read(chatMessagesProvider.notifier).appendToLastMessage(chunk); - } - - ref.read(chatMessagesProvider.notifier).finishStreaming(); - await _saveConversationLocally(ref); + ref.read(chatMessagesProvider.notifier).setMessageStream(_sendStreamSub); + return; } catch (e) { rethrow; } @@ -928,70 +1194,14 @@ Future _sendMessageInternal( // Prepare cleaned text content (strip tool details etc.) final cleaned = ToolCallsParser.sanitizeForApi(msg.content); - // Check if message has attachments (images and non-images) - if (msg.attachmentIds != null && msg.attachmentIds!.isNotEmpty) { - // All models use the same content array format (OpenWebUI standard) - - // Use the same content array format for all models (OpenWebUI standard) - final List> contentArray = []; - // Collect non-image files to include in the message map so API can forward top-level 'files' - final List> nonImageFiles = []; - - // Add text content first - if (cleaned.isNotEmpty) { - contentArray.add({'type': 'text', 'text': cleaned}); - } - - // Add image attachments with proper MIME type handling; collect non-image attachments - for (final attachmentId in msg.attachmentIds!) { - try { - final base64Data = await _getFileAsBase64(api, attachmentId); - if (base64Data != null) { - // Check if this is already a data URL - if (base64Data.startsWith('data:')) { - contentArray.add({ - 'type': 'image_url', - 'image_url': {'url': base64Data}, - }); - } else { - // For server files, determine MIME type from file extension - // Only call getFileInfo if attachmentId is not a data URL - if (!attachmentId.startsWith('data:')) { - final fileInfo = await api.getFileInfo(attachmentId); - final fileName = fileInfo['filename'] ?? ''; - final ext = fileName.toLowerCase().split('.').last; - - String mimeType = 'image/png'; // default - if (ext == 'jpg' || ext == 'jpeg') { - mimeType = 'image/jpeg'; - } else if (ext == 'gif') { - mimeType = 'image/gif'; - } else if (ext == 'webp') { - mimeType = 'image/webp'; - } - - contentArray.add({ - 'type': 'image_url', - 'image_url': {'url': 'data:$mimeType;base64,$base64Data'}, - }); - } - } - } else { - // Treat as non-image file; include minimal descriptor so server can resolve by id - nonImageFiles.add({'id': attachmentId, 'type': 'file'}); - } - } catch (e) { - // Handle attachment processing errors silently - } - } - - final messageMap = { - 'role': msg.role, - 'content': contentArray, - }; - if (nonImageFiles.isNotEmpty) { - messageMap['files'] = nonImageFiles; - } + final List ids = msg.attachmentIds ?? const []; + if (ids.isNotEmpty) { + final messageMap = await _buildMessagePayloadWithAttachments( + api: api, + role: msg.role, + cleanedText: cleaned, + attachmentIds: ids, + ); conversationMessages.add(messageMap); } else { // Regular text-only message @@ -1134,183 +1344,6 @@ Future _sendMessageInternal( 'tags': [], }; - // Image generation will be handled by server background tools; continue with unified flow - - // Define helpers for extracting/attaching image files from tool deltas/content - List> _extractFilesFromResult(dynamic resp) { - final results = >[]; - if (resp == null) return results; - dynamic r = resp; - if (r is String) { - try { r = jsonDecode(r); } catch (_) {} - } - if (r is List) { - for (final item in r) { - if (item is String && item.isNotEmpty) { - results.add({'type': 'image', 'url': item}); - } else if (item is Map) { - final url = item['url']; - final b64 = item['b64_json'] ?? item['b64']; - if (url is String && url.isNotEmpty) { - results.add({'type': 'image', 'url': url}); - } else if (b64 is String && b64.isNotEmpty) { - results.add({'type': 'image', 'url': 'data:image/png;base64,$b64'}); - } - } - } - return results; - } - if (r is! Map) return results; - final data = r['data']; - if (data is List) { - for (final item in data) { - if (item is Map) { - final url = item['url']; - final b64 = item['b64_json'] ?? item['b64']; - if (url is String && url.isNotEmpty) { - results.add({'type': 'image', 'url': url}); - } else if (b64 is String && b64.isNotEmpty) { - results.add({'type': 'image', 'url': 'data:image/png;base64,$b64'}); - } - } else if (item is String && item.isNotEmpty) { - results.add({'type': 'image', 'url': item}); - } - } - } - final images = r['images']; - if (images is List) { - for (final item in images) { - if (item is String && item.isNotEmpty) { - results.add({'type': 'image', 'url': item}); - } else if (item is Map) { - final url = item['url']; - final b64 = item['b64_json'] ?? item['b64']; - if (url is String && url.isNotEmpty) { - results.add({'type': 'image', 'url': url}); - } else if (b64 is String && b64.isNotEmpty) { - results.add({'type': 'image', 'url': 'data:image/png;base64,$b64'}); - } - } - } - } - final files = r['files']; - if (files is List) { - results.addAll(_extractFilesFromResult(files)); - } - final singleUrl = r['url']; - if (singleUrl is String && singleUrl.isNotEmpty) { - results.add({'type': 'image', 'url': singleUrl}); - } - final singleB64 = r['b64_json'] ?? r['b64']; - if (singleB64 is String && singleB64.isNotEmpty) { - results.add({'type': 'image', 'url': 'data:image/png;base64,$singleB64'}); - } - return results; - } - - void _updateImagesFromCurrentContent() { - try { - final msgs = ref.read(chatMessagesProvider); - if (msgs.isEmpty || msgs.last.role != 'assistant') return; - final content = msgs.last.content; - if (content.isEmpty) return; - - final collected = >[]; - - // First, try the complete parsing approach - if (content.contains('"]+\.(jpg|jpeg|png|gif|webp)', caseSensitive: false); - final urlMatches = urlPattern.allMatches(content); - for (final match in urlMatches) { - final url = match.group(0); - if (url != null && url.isNotEmpty) { - collected.add({'type': 'image', 'url': url}); - } - } - - // Look for JSON-like structures in streaming content - final jsonPattern = RegExp(r'\{[^}]*"url"[^}]*:[^}]*"(data:image/[^"]+|https?://[^"]+\.(jpg|jpeg|png|gif|webp))"[^}]*\}', caseSensitive: false); - final jsonMatches = jsonPattern.allMatches(content); - for (final match in jsonMatches) { - final url = RegExp(r'"url"[^:]*:[^"]*"([^"]+)"').firstMatch(match.group(0) ?? '')?.group(1); - if (url != null && url.isNotEmpty) { - collected.add({'type': 'image', 'url': url}); - } - } - - // Look for image generation results in partial results/files attributes - final partialResultsPattern = RegExp(r'(result|files)="([^"]*(?:data:image/[^"]*|https?://[^"]*\.(jpg|jpeg|png|gif|webp))[^"]*)"', caseSensitive: false); - final partialMatches = partialResultsPattern.allMatches(content); - for (final match in partialMatches) { - final attrValue = match.group(2); - if (attrValue != null) { - // Try to parse as JSON array or single value - try { - final decoded = json.decode(attrValue); - collected.addAll(_extractFilesFromResult(decoded)); - } catch (_) { - // If not JSON, check if it's a direct URL - if (attrValue.startsWith('data:image/') || - RegExp(r'https?://[^\s]+\.(jpg|jpeg|png|gif|webp)$', caseSensitive: false).hasMatch(attrValue)) { - collected.add({'type': 'image', 'url': attrValue}); - } - } - } - } - } - - if (collected.isEmpty) return; - - final existing = msgs.last.files ?? >[]; - final seen = { - for (final f in existing) - if (f['url'] is String) (f['url'] as String) else '', - }..removeWhere((e) => e.isEmpty); - - final merged = >[...existing]; - for (final f in collected) { - final url = f['url'] as String?; - if (url != null && url.isNotEmpty && !seen.contains(url)) { - merged.add({'type': 'image', 'url': url}); - seen.add(url); - } - } - - if (merged.length != existing.length) { - ref.read(chatMessagesProvider.notifier).updateLastMessageWithFunction( - (m) => m.copyWith(files: merged), - ); - } - } catch (_) {} - } - // Stream response using server-push via Socket when available, otherwise fallback // Resolve Socket session for background tasks parity final socketService = ref.read(socketServiceProvider); @@ -1339,8 +1372,10 @@ Future _sendMessageInternal( final nonSystemCount = conversationMessages .where((m) => (m['role']?.toString() ?? '') != 'system') .length; - shouldGenerateTitle = (conv == null) || - ((conv.title == 'New Chat' || (conv.title.isEmpty)) && nonSystemCount == 1); + shouldGenerateTitle = + (conv == null) || + ((conv.title == 'New Chat' || (conv.title.isEmpty)) && + nonSystemCount == 1); } catch (_) {} // Match web client: request background follow-ups always; title/tags on first turn @@ -1349,7 +1384,8 @@ Future _sendMessageInternal( if (shouldGenerateTitle) 'tags_generation': true, 'follow_up_generation': true, if (webSearchEnabled) 'web_search': true, // enable bg web search - if (imageGenerationEnabled) 'image_generation': true, // enable bg image flow + if (imageGenerationEnabled) + 'image_generation': true, // enable bg image flow }; // Determine if we need background task flow (tools/tool servers or web search) @@ -1378,17 +1414,38 @@ Future _sendMessageInternal( final stream = response.stream; final sessionId = response.sessionId; + // (socket handlers attached below after flow flags are set) + // If socket is available, start listening for chat-events immediately // Background-tools flow OR any session-bound flow relies on socket/dynamic channel for // streaming content. Allow socket TEXT in those modes. For pure SSE/polling flows, suppress // socket TEXT to avoid duplicates (still surface tool_call status). final bool isBackgroundFlow = - isBackgroundToolsFlowPre || isBackgroundWebSearchPre || wantSessionBinding; - bool suppressSocketContent = !isBackgroundFlow; // allow socket text when session-bound or tools + isBackgroundToolsFlowPre || + isBackgroundWebSearchPre || + wantSessionBinding; + bool suppressSocketContent = + !isBackgroundFlow; // allow socket text when session-bound or tools bool usingDynamicChannel = false; // set true when server provides a channel + // Attach socket handlers for background flows/dynamic channels + if (socketService != null) { + _attachSocketStreamingHandlers( + ref: ref, + socketService: socketService, + assistantMessageId: assistantMessageId, + modelId: selectedModel.id, + modelItem: modelItem, + sessionId: sessionId, + isBackgroundFlow: isBackgroundFlow, + suppressSocketContentInitially: suppressSocketContent, + activeConversationId: activeConversation?.id, + ); + } // Enrich the assistant placeholder metadata so the typing guard can use longer timeouts try { - ref.read(chatMessagesProvider.notifier).updateLastMessageWithFunction((m) { + ref.read(chatMessagesProvider.notifier).updateLastMessageWithFunction(( + m, + ) { final mergedMeta = { if (m.metadata != null) ...m.metadata!, 'backgroundFlow': isBackgroundFlow, @@ -1417,16 +1474,25 @@ Future _sendMessageInternal( for (final call in tc) { if (call is Map) { final fn = call['function']; - final name = (fn is Map && fn['name'] is String) ? fn['name'] as String : null; + final name = (fn is Map && fn['name'] is String) + ? fn['name'] as String + : null; if (name is String && name.isNotEmpty) { final msgs = ref.read(chatMessagesProvider); - final exists = (msgs.isNotEmpty) && RegExp( - r']*\bname=\"' + RegExp.escape(name) + r'\"', + final exists = + (msgs.isNotEmpty) && + RegExp( + r']*\bname=\"' + + RegExp.escape(name) + + r'\"', multiLine: true, ).hasMatch(msgs.last.content); if (!exists) { - final status = '\n
Executing...\n
\n'; - ref.read(chatMessagesProvider.notifier).appendToLastMessage(status); + final status = + '\n
Executing...\n
\n'; + ref + .read(chatMessagesProvider.notifier) + .appendToLastMessage(status); } } } @@ -1446,16 +1512,25 @@ Future _sendMessageInternal( for (final call in tc) { if (call is Map) { final fn = call['function']; - final name = (fn is Map && fn['name'] is String) ? fn['name'] as String : null; + final name = (fn is Map && fn['name'] is String) + ? fn['name'] as String + : null; if (name is String && name.isNotEmpty) { final msgs = ref.read(chatMessagesProvider); - final exists = (msgs.isNotEmpty) && RegExp( - r']*\bname=\"' + RegExp.escape(name) + r'\"', + final exists = + (msgs.isNotEmpty) && + RegExp( + r']*\bname=\"' + + RegExp.escape(name) + + r'\"', multiLine: true, ).hasMatch(msgs.last.content); if (!exists) { - final status = '\n
Executing...\n
\n'; - ref.read(chatMessagesProvider.notifier).appendToLastMessage(status); + final status = + '\n
Executing...\n
\n'; + ref + .read(chatMessagesProvider.notifier) + .appendToLastMessage(status); } } } @@ -1464,8 +1539,10 @@ Future _sendMessageInternal( } final content = delta['content']?.toString() ?? ''; if (content.isNotEmpty) { - ref.read(chatMessagesProvider.notifier).appendToLastMessage(content); - _updateImagesFromCurrentContent(); + ref + .read(chatMessagesProvider.notifier) + .appendToLastMessage(content); + _updateImagesFromCurrentContent(ref); } } } @@ -1489,35 +1566,39 @@ Future _sendMessageInternal( .read(chatMessagesProvider.notifier) .replaceLastMessageContent(content); } - _updateImagesFromCurrentContent(); + _updateImagesFromCurrentContent(ref); } else { ref .read(chatMessagesProvider.notifier) .appendToLastMessage(content); - _updateImagesFromCurrentContent(); + _updateImagesFromCurrentContent(ref); } } } if (payload['done'] == true) { // Stop listening to further socket events for this session. - try { socketService.offChatEvents(); } catch (_) {} + try { + socketService.offChatEvents(); + } catch (_) {} // Notify server that chat is completed (mirrors web client) try { final apiSvc = ref.read(apiServiceProvider); final chatId = activeConversation?.id ?? ''; if (apiSvc != null && chatId.isNotEmpty) { - unawaited(apiSvc - .sendChatCompleted( - chatId: chatId, - messageId: assistantMessageId, - messages: const [], - model: selectedModel.id, - modelItem: modelItem, - sessionId: sessionId, - ) - .timeout(const Duration(seconds: 3)) - .catchError((_) {})); + unawaited( + apiSvc + .sendChatCompleted( + chatId: chatId, + messageId: assistantMessageId, + messages: const [], + model: selectedModel.id, + modelItem: modelItem, + sessionId: sessionId, + ) + .timeout(const Duration(seconds: 3)) + .catchError((_) {}), + ); } } catch (_) {} @@ -1532,7 +1613,9 @@ Future _sendMessageInternal( if (apiSvc != null && chatId != null && chatId.isNotEmpty) { Future.microtask(() async { try { - final resp = await apiSvc.dio.get('/api/v1/chats/$chatId'); + final resp = await apiSvc.dio.get( + '/api/v1/chats/$chatId', + ); final data = resp.data as Map; String content = ''; final chatObj = data['chat'] as Map?; @@ -1541,7 +1624,9 @@ Future _sendMessageInternal( final list = chatObj['messages']; if (list is List) { final target = list.firstWhere( - (m) => (m is Map && (m['id']?.toString() == msgId)), + (m) => + (m is Map && + (m['id']?.toString() == msgId)), orElse: () => null, ); if (target != null) { @@ -1554,7 +1639,8 @@ Future _sendMessageInternal( orElse: () => null, ); if (textItem != null) { - content = textItem['text']?.toString() ?? ''; + content = + textItem['text']?.toString() ?? ''; } } } @@ -1562,9 +1648,11 @@ Future _sendMessageInternal( // Fallback to history map if (content.isEmpty) { final history = chatObj['history']; - if (history is Map && history['messages'] is Map) { + if (history is Map && + history['messages'] is Map) { final Map messagesMap = - (history['messages'] as Map).cast(); + (history['messages'] as Map) + .cast(); final msg = messagesMap[msgId]; if (msg is Map) { final rawContent = msg['content']; @@ -1576,7 +1664,8 @@ Future _sendMessageInternal( orElse: () => null, ); if (textItem != null) { - content = textItem['text']?.toString() ?? ''; + content = + textItem['text']?.toString() ?? ''; } } } @@ -1592,7 +1681,9 @@ Future _sendMessageInternal( } catch (_) { // Swallow; we'll still finish streaming } finally { - ref.read(chatMessagesProvider.notifier).finishStreaming(); + ref + .read(chatMessagesProvider.notifier) + .finishStreaming(); } }); return; // Defer finish to microtask @@ -1612,25 +1703,31 @@ Future _sendMessageInternal( usingDynamicChannel = true; usingDynamicChannel = true; if (kSocketVerboseLogging) { - DebugLogger.stream('Socket request:chat:completion channel=$channel'); + DebugLogger.stream( + 'Socket request:chat:completion channel=$channel', + ); } void channelLineHandler(dynamic line) { try { if (line is String) { final s = line.trim(); - DebugLogger.stream('Socket [$channel] line=${s.length > 160 ? '${s.substring(0, 160)}…' : s}'); + DebugLogger.stream( + 'Socket [$channel] line=${s.length > 160 ? '${s.substring(0, 160)}…' : s}', + ); if (s == '[DONE]' || s == 'DONE') { socketService.offEvent(channel); // Channel completed try { - unawaited(api.sendChatCompleted( - chatId: activeConversation?.id ?? '', - messageId: assistantMessageId, - messages: const [], - model: selectedModel.id, - modelItem: modelItem, - sessionId: sessionId, - )); + unawaited( + api.sendChatCompleted( + chatId: activeConversation?.id ?? '', + messageId: assistantMessageId, + messages: const [], + model: selectedModel.id, + modelItem: modelItem, + sessionId: sessionId, + ), + ); } catch (_) {} ref.read(chatMessagesProvider.notifier).finishStreaming(); return; @@ -1640,16 +1737,20 @@ Future _sendMessageInternal( if (dataStr == '[DONE]') { socketService.offEvent(channel); try { - unawaited(api.sendChatCompleted( - chatId: activeConversation?.id ?? '', - messageId: assistantMessageId, - messages: const [], - model: selectedModel.id, - modelItem: modelItem, - sessionId: sessionId, - )); + unawaited( + api.sendChatCompleted( + chatId: activeConversation?.id ?? '', + messageId: assistantMessageId, + messages: const [], + model: selectedModel.id, + modelItem: modelItem, + sessionId: sessionId, + ), + ); } catch (_) {} - ref.read(chatMessagesProvider.notifier).finishStreaming(); + ref + .read(chatMessagesProvider.notifier) + .finishStreaming(); return; } // Try to parse OpenAI-style delta JSON @@ -1663,31 +1764,45 @@ Future _sendMessageInternal( if (delta.containsKey('content')) { final c = delta['content']?.toString() ?? ''; if (c.isNotEmpty) { - DebugLogger.stream('Socket [$channel] delta.content len=${c.length}'); + DebugLogger.stream( + 'Socket [$channel] delta.content len=${c.length}', + ); } } // Surface tool_calls status if (delta.containsKey('tool_calls')) { if (kSocketVerboseLogging) { - DebugLogger.stream('Socket [$channel] delta.tool_calls detected'); + DebugLogger.stream( + 'Socket [$channel] delta.tool_calls detected', + ); } final tc = delta['tool_calls']; if (tc is List) { for (final call in tc) { if (call is Map) { final fn = call['function']; - final name = (fn is Map && fn['name'] is String) + final name = + (fn is Map && fn['name'] is String) ? fn['name'] as String : null; if (name is String && name.isNotEmpty) { - final msgs = ref.read(chatMessagesProvider); - final exists = (msgs.isNotEmpty) && RegExp( - r']*\\bname=\"' + RegExp.escape(name) + r'\"', + final msgs = ref.read( + chatMessagesProvider, + ); + final exists = + (msgs.isNotEmpty) && + RegExp( + r']*\\bname=\"' + + RegExp.escape(name) + + r'\"', multiLine: true, ).hasMatch(msgs.last.content); if (!exists) { - final status = '\n
Executing...\n
\n'; - ref.read(chatMessagesProvider.notifier).appendToLastMessage(status); + final status = + '\n
Executing...\n
\n'; + ref + .read(chatMessagesProvider.notifier) + .appendToLastMessage(status); } } } @@ -1697,23 +1812,29 @@ Future _sendMessageInternal( // Append streamed content final content = delta['content']?.toString() ?? ''; if (content.isNotEmpty) { - ref.read(chatMessagesProvider.notifier).appendToLastMessage(content); - _updateImagesFromCurrentContent(); + ref + .read(chatMessagesProvider.notifier) + .appendToLastMessage(content); + _updateImagesFromCurrentContent(ref); } } } } catch (_) { // Non-JSON line: append as-is if (s.isNotEmpty) { - ref.read(chatMessagesProvider.notifier).appendToLastMessage(s); - _updateImagesFromCurrentContent(); + ref + .read(chatMessagesProvider.notifier) + .appendToLastMessage(s); + _updateImagesFromCurrentContent(ref); } } } else { // Plain text line if (s.isNotEmpty) { - ref.read(chatMessagesProvider.notifier).appendToLastMessage(s); - _updateImagesFromCurrentContent(); + ref + .read(chatMessagesProvider.notifier) + .appendToLastMessage(s); + _updateImagesFromCurrentContent(ref); } } } else if (line is Map) { @@ -1722,14 +1843,16 @@ Future _sendMessageInternal( if (done) { socketService.offEvent(channel); try { - unawaited(api.sendChatCompleted( - chatId: activeConversation?.id ?? '', - messageId: assistantMessageId, - messages: const [], - model: selectedModel.id, - modelItem: modelItem, - sessionId: sessionId, - )); + unawaited( + api.sendChatCompleted( + chatId: activeConversation?.id ?? '', + messageId: assistantMessageId, + messages: const [], + model: selectedModel.id, + modelItem: modelItem, + sessionId: sessionId, + ), + ); } catch (_) {} ref.read(chatMessagesProvider.notifier).finishStreaming(); return; @@ -1748,8 +1871,11 @@ Future _sendMessageInternal( try { final name = payload['name']?.toString() ?? 'tool'; DebugLogger.stream('Socket execute:tool name=$name'); - final status = '\n
Executing...\n
\n'; - ref.read(chatMessagesProvider.notifier).appendToLastMessage(status); + final status = + '\n
Executing...\n
\n'; + ref + .read(chatMessagesProvider.notifier) + .appendToLastMessage(status); // If tool payload already carries files/result, try to extract images for grid try { final files = _extractFilesFromResult(payload['files']); @@ -1758,7 +1884,8 @@ Future _sendMessageInternal( if (all.isNotEmpty) { final msgs = ref.read(chatMessagesProvider); if (msgs.isNotEmpty && msgs.last.role == 'assistant') { - final existing = msgs.last.files ?? >[]; + final existing = + msgs.last.files ?? >[]; final seen = { for (final f in existing) if (f['url'] is String) (f['url'] as String) else '', @@ -1766,13 +1893,17 @@ Future _sendMessageInternal( final merged = >[...existing]; for (final f in all) { final url = f['url'] as String?; - if (url != null && url.isNotEmpty && !seen.contains(url)) { + if (url != null && + url.isNotEmpty && + !seen.contains(url)) { merged.add({'type': 'image', 'url': url}); seen.add(url); } } if (merged.length != existing.length) { - ref.read(chatMessagesProvider.notifier).updateLastMessageWithFunction( + ref + .read(chatMessagesProvider.notifier) + .updateLastMessageWithFunction( (m) => m.copyWith(files: merged), ); } @@ -1783,7 +1914,9 @@ Future _sendMessageInternal( } else if (type == 'files' && payload != null) { // Handle files event from socket (image generation results) try { - DebugLogger.stream('Socket files event received: ${payload.toString()}'); + DebugLogger.stream( + 'Socket files event received: ${payload.toString()}', + ); final files = _extractFilesFromResult(payload); if (files.isNotEmpty) { final msgs = ref.read(chatMessagesProvider); @@ -1802,10 +1935,19 @@ Future _sendMessageInternal( } } if (merged.length != existing.length) { - DebugLogger.stream('Socket files: Adding ${merged.length - existing.length} new images'); - final updatedMessage = ref.read(chatMessagesProvider).last.copyWith(files: merged); - DebugLogger.stream('Socket files: Updated message files count: ${updatedMessage.files?.length}'); - ref.read(chatMessagesProvider.notifier).updateLastMessageWithFunction( + DebugLogger.stream( + 'Socket files: Adding ${merged.length - existing.length} new images', + ); + final updatedMessage = ref + .read(chatMessagesProvider) + .last + .copyWith(files: merged); + DebugLogger.stream( + 'Socket files: Updated message files count: ${updatedMessage.files?.length}', + ); + ref + .read(chatMessagesProvider.notifier) + .updateLastMessageWithFunction( (ChatMessage m) => m.copyWith(files: merged), ); } @@ -1831,12 +1973,15 @@ Future _sendMessageInternal( if (type == 'message' && payload is Map) { final content = payload['content']?.toString() ?? ''; if (content.isNotEmpty) { - ref.read(chatMessagesProvider.notifier).appendToLastMessage(content); - _updateImagesFromCurrentContent(); + ref + .read(chatMessagesProvider.notifier) + .appendToLastMessage(content); + _updateImagesFromCurrentContent(ref); } } } catch (_) {} } + socketService.onChannelEvents(channelEventsHandler); Future.delayed(const Duration(seconds: 90), () { try { @@ -1939,9 +2084,8 @@ Future _sendMessageInternal( ref .read(chatMessagesProvider.notifier) .updateLastMessageWithFunction( - (message) => message.copyWith( - metadata: {'webSearchActive': false}, - ), + (message) => + message.copyWith(metadata: {'webSearchActive': false}), ); // Strip markers from this chunk and continue processing effectiveChunk = effectiveChunk @@ -1954,7 +2098,7 @@ Future _sendMessageInternal( ref .read(chatMessagesProvider.notifier) .appendToLastMessage(effectiveChunk); - _updateImagesFromCurrentContent(); + _updateImagesFromCurrentContent(ref); } }, @@ -1963,7 +2107,9 @@ Future _sendMessageInternal( persistentService.unregisterStream(streamId); // Stop socket events now that streaming finished only for SSE-driven streams if (socketService != null && suppressSocketContent == true) { - try { socketService.offChatEvents(); } catch (_) {} + try { + socketService.offChatEvents(); + } catch (_) {} } // Allow socket content again for future sessions (harmless if already false) suppressSocketContent = false; @@ -2091,7 +2237,9 @@ Future _sendMessageInternal( ref.read(chatMessagesProvider.notifier).finishStreaming(); // Stop socket events to avoid duplicates after error (only for SSE-driven) if (socketService != null && suppressSocketContent == true) { - try { socketService.offChatEvents(); } catch (_) {} + try { + socketService.offChatEvents(); + } catch (_) {} } // Special handling for Socket.IO streaming failures @@ -2520,13 +2668,13 @@ final regenerateLastMessageProvider = Provider Function()>((ref) { if (lastAssistantHadImages) { final prev = ref.read(imageGenerationEnabledProvider); try { + // Force image generation enabled during regeneration ref.read(imageGenerationEnabledProvider.notifier).state = true; - final activeConv = ref.read(activeConversationProvider); - await ref.read(taskQueueProvider.notifier).enqueueSendText( - conversationId: activeConv?.id, - text: lastUserMessage.content, - attachments: lastUserMessage.attachmentIds, - ); + await regenerateMessage( + ref, + lastUserMessage.content, + lastUserMessage.attachmentIds, + ); } finally { // restore previous state ref.read(imageGenerationEnabledProvider.notifier).state = prev; @@ -2534,13 +2682,12 @@ final regenerateLastMessageProvider = Provider Function()>((ref) { return; } - // Resend the message via task queue (unified flow) - final activeConv = ref.read(activeConversationProvider); - await ref.read(taskQueueProvider.notifier).enqueueSendText( - conversationId: activeConv?.id, - text: lastUserMessage.content, - attachments: lastUserMessage.attachmentIds, - ); + // Text regeneration without duplicating user message + await regenerateMessage( + ref, + lastUserMessage.content, + lastUserMessage.attachmentIds, + ); }; }); @@ -2579,7 +2726,9 @@ final stopGenerationProvider = Provider((ref) { try { final ids = await api.getTaskIdsByChat(activeConv.id); for (final t in ids) { - try { await api.stopTask(t); } catch (_) {} + try { + await api.stopTask(t); + } catch (_) {} } } catch (_) {} }()); @@ -2600,6 +2749,588 @@ final stopGenerationProvider = Provider((ref) { }; }); +// ========== Shared Streaming Utilities ========== + +List> _extractFilesFromResult(dynamic resp) { + final results = >[]; + if (resp == null) return results; + dynamic r = resp; + if (r is String) { + try { + r = jsonDecode(r); + } catch (_) {} + } + if (r is List) { + for (final item in r) { + if (item is String && item.isNotEmpty) { + results.add({'type': 'image', 'url': item}); + } else if (item is Map) { + final url = item['url']; + final b64 = item['b64_json'] ?? item['b64']; + if (url is String && url.isNotEmpty) { + results.add({'type': 'image', 'url': url}); + } else if (b64 is String && b64.isNotEmpty) { + results.add({'type': 'image', 'url': 'data:image/png;base64,$b64'}); + } + } + } + return results; + } + if (r is! Map) return results; + final data = r['data']; + if (data is List) { + for (final item in data) { + if (item is Map) { + final url = item['url']; + final b64 = item['b64_json'] ?? item['b64']; + if (url is String && url.isNotEmpty) { + results.add({'type': 'image', 'url': url}); + } else if (b64 is String && b64.isNotEmpty) { + results.add({'type': 'image', 'url': 'data:image/png;base64,$b64'}); + } + } else if (item is String && item.isNotEmpty) { + results.add({'type': 'image', 'url': item}); + } + } + } + final images = r['images']; + if (images is List) { + for (final item in images) { + if (item is String && item.isNotEmpty) { + results.add({'type': 'image', 'url': item}); + } else if (item is Map) { + final url = item['url']; + final b64 = item['b64_json'] ?? item['b64']; + if (url is String && url.isNotEmpty) { + results.add({'type': 'image', 'url': url}); + } else if (b64 is String && b64.isNotEmpty) { + results.add({'type': 'image', 'url': 'data:image/png;base64,$b64'}); + } + } + } + } + final files = r['files']; + if (files is List) { + results.addAll(_extractFilesFromResult(files)); + } + final singleUrl = r['url']; + if (singleUrl is String && singleUrl.isNotEmpty) { + results.add({'type': 'image', 'url': singleUrl}); + } + final singleB64 = r['b64_json'] ?? r['b64']; + if (singleB64 is String && singleB64.isNotEmpty) { + results.add({'type': 'image', 'url': 'data:image/png;base64,$singleB64'}); + } + return results; +} + +void _updateImagesFromCurrentContent(dynamic ref) { + try { + final msgs = ref.read(chatMessagesProvider); + if (msgs.isEmpty || msgs.last.role != 'assistant') return; + final content = msgs.last.content; + if (content.isEmpty) return; + + final collected = >[]; + + if (content.contains('\"]+\.(jpg|jpeg|png|gif|webp)', + caseSensitive: false, + ); + final urlMatches = urlPattern.allMatches(content); + for (final match in urlMatches) { + final url = match.group(0); + if (url != null && url.isNotEmpty) { + collected.add({'type': 'image', 'url': url}); + } + } + + final jsonPattern = RegExp( + r'\{[^}]*"url"[^}]*:[^}]*"(data:image/[^"]+|https?://[^"]+\.(jpg|jpeg|png|gif|webp))"[^}]*\}', + caseSensitive: false, + ); + final jsonMatches = jsonPattern.allMatches(content); + for (final match in jsonMatches) { + final url = RegExp( + r'"url"[^:]*:[^"]*"([^"]+)"', + ).firstMatch(match.group(0) ?? '')?.group(1); + if (url != null && url.isNotEmpty) { + collected.add({'type': 'image', 'url': url}); + } + } + + final partialResultsPattern = RegExp( + r'(result|files)="([^"]*(?:data:image/[^"]*|https?://[^"]*\.(jpg|jpeg|png|gif|webp))[^"]*)"', + caseSensitive: false, + ); + final partialMatches = partialResultsPattern.allMatches(content); + for (final match in partialMatches) { + final attrValue = match.group(2); + if (attrValue != null) { + try { + final decoded = json.decode(attrValue); + collected.addAll(_extractFilesFromResult(decoded)); + } catch (_) { + if (attrValue.startsWith('data:image/') || + RegExp( + r'https?://[^\s]+\.(jpg|jpeg|png|gif|webp)$', + caseSensitive: false, + ).hasMatch(attrValue)) { + collected.add({'type': 'image', 'url': attrValue}); + } + } + } + } + } + + if (collected.isEmpty) return; + + final existing = msgs.last.files ?? >[]; + final seen = { + for (final f in existing) + if (f['url'] is String) (f['url'] as String) else '', + }..removeWhere((e) => e.isEmpty); + + final merged = >[...existing]; + for (final f in collected) { + final url = f['url'] as String?; + if (url != null && url.isNotEmpty && !seen.contains(url)) { + merged.add({'type': 'image', 'url': url}); + seen.add(url); + } + } + + if (merged.length != existing.length) { + ref + .read(chatMessagesProvider.notifier) + .updateLastMessageWithFunction((m) => m.copyWith(files: merged)); + } + } catch (_) {} +} + +void _attachSocketStreamingHandlers({ + required dynamic ref, + required dynamic socketService, + required String assistantMessageId, + required String modelId, + required Map modelItem, + required String sessionId, + required bool isBackgroundFlow, + required bool suppressSocketContentInitially, + String? activeConversationId, +}) { + bool suppressSocketContent = suppressSocketContentInitially; + + final api = ref.read(apiServiceProvider); + + void channelLineHandlerFactory(String channel) { + void handler(dynamic line) { + try { + if (line is String) { + final s = line.trim(); + if (s == '[DONE]' || s == 'DONE') { + try { + socketService.offEvent(channel); + } catch (_) {} + try { + unawaited( + api?.sendChatCompleted( + chatId: activeConversationId ?? '', + messageId: assistantMessageId, + messages: const [], + model: modelId, + modelItem: modelItem, + sessionId: sessionId, + ), + ); + } catch (_) {} + ref.read(chatMessagesProvider.notifier).finishStreaming(); + return; + } + if (s.startsWith('data:')) { + final dataStr = s.substring(5).trim(); + if (dataStr == '[DONE]') { + try { + socketService.offEvent(channel); + } catch (_) {} + try { + unawaited( + api?.sendChatCompleted( + chatId: activeConversationId ?? '', + messageId: assistantMessageId, + messages: const [], + model: modelId, + modelItem: modelItem, + sessionId: sessionId, + ), + ); + } catch (_) {} + ref.read(chatMessagesProvider.notifier).finishStreaming(); + return; + } + try { + final Map j = jsonDecode(dataStr); + final choices = j['choices']; + if (choices is List && choices.isNotEmpty) { + final choice = choices.first; + final delta = choice is Map ? choice['delta'] : null; + if (delta is Map) { + if (delta.containsKey('tool_calls')) { + final tc = delta['tool_calls']; + if (tc is List) { + for (final call in tc) { + if (call is Map) { + final fn = call['function']; + final name = (fn is Map && fn['name'] is String) + ? fn['name'] as String + : null; + if (name is String && name.isNotEmpty) { + final msgs = ref.read(chatMessagesProvider); + final exists = + (msgs.isNotEmpty) && + RegExp( + r']*\bname=\"' + + RegExp.escape(name) + + r'\"', + multiLine: true, + ).hasMatch(msgs.last.content); + if (!exists) { + final status = + '\n
Executing...\n
\n'; + ref + .read(chatMessagesProvider.notifier) + .appendToLastMessage(status); + } + } + } + } + } + } + final content = delta['content']?.toString() ?? ''; + if (content.isNotEmpty) { + ref + .read(chatMessagesProvider.notifier) + .appendToLastMessage(content); + _updateImagesFromCurrentContent(ref); + } + } + } + } catch (_) { + if (s.isNotEmpty) { + ref.read(chatMessagesProvider.notifier).appendToLastMessage(s); + _updateImagesFromCurrentContent(ref); + } + } + } else { + if (s.isNotEmpty) { + ref.read(chatMessagesProvider.notifier).appendToLastMessage(s); + _updateImagesFromCurrentContent(ref); + } + } + } else if (line is Map) { + if (line['done'] == true) { + try { + socketService.offEvent(channel); + } catch (_) {} + ref.read(chatMessagesProvider.notifier).finishStreaming(); + return; + } + } + } catch (_) {} + } + + socketService.onEvent(channel, handler); + Future.delayed(const Duration(minutes: 3), () { + try { + socketService.offEvent(channel); + } catch (_) {} + }); + } + + void chatHandler(Map ev) { + try { + final data = ev['data']; + if (data == null) return; + final type = data['type']; + final payload = data['data']; + + if (type == 'chat:completion' && payload != null) { + if (payload is Map) { + if (payload.containsKey('tool_calls')) { + final tc = payload['tool_calls']; + if (tc is List) { + for (final call in tc) { + if (call is Map) { + final fn = call['function']; + final name = (fn is Map && fn['name'] is String) + ? fn['name'] as String + : null; + if (name is String && name.isNotEmpty) { + final msgs = ref.read(chatMessagesProvider); + final exists = + (msgs.isNotEmpty) && + RegExp( + r']*\bname=\"' + + RegExp.escape(name) + + r'\"', + multiLine: true, + ).hasMatch(msgs.last.content); + if (!exists) { + final status = + '\n
Executing...\n
\n'; + ref + .read(chatMessagesProvider.notifier) + .appendToLastMessage(status); + } + } + } + } + } + } + if (!suppressSocketContent && payload.containsKey('choices')) { + final choices = payload['choices']; + if (choices is List && choices.isNotEmpty) { + final choice = choices.first; + final delta = choice is Map ? choice['delta'] : null; + if (delta is Map) { + if (delta.containsKey('tool_calls')) { + final tc = delta['tool_calls']; + if (tc is List) { + for (final call in tc) { + if (call is Map) { + final fn = call['function']; + final name = (fn is Map && fn['name'] is String) + ? fn['name'] as String + : null; + if (name is String && name.isNotEmpty) { + final msgs = ref.read(chatMessagesProvider); + final exists = + (msgs.isNotEmpty) && + RegExp( + r']*\bname=\"' + + RegExp.escape(name) + + r'\"', + multiLine: true, + ).hasMatch(msgs.last.content); + if (!exists) { + final status = + '\n
Executing...\n
\n'; + ref + .read(chatMessagesProvider.notifier) + .appendToLastMessage(status); + } + } + } + } + } + } + final content = delta['content']?.toString() ?? ''; + if (content.isNotEmpty) { + ref + .read(chatMessagesProvider.notifier) + .appendToLastMessage(content); + _updateImagesFromCurrentContent(ref); + } + } + } + } + if (payload['done'] == true) { + try { + socketService.offChatEvents(); + } catch (_) {} + try { + unawaited( + api + ?.sendChatCompleted( + chatId: activeConversationId ?? '', + messageId: assistantMessageId, + messages: const [], + model: modelId, + modelItem: modelItem, + sessionId: sessionId, + ) + ?.timeout(const Duration(seconds: 3)) + .catchError((_) {}), + ); + } catch (_) {} + + final msgs = ref.read(chatMessagesProvider); + if (msgs.isNotEmpty && msgs.last.role == 'assistant') { + final lastContent = msgs.last.content.trim(); + if (lastContent.isEmpty) { + Future.microtask(() async { + try { + final chatId = activeConversationId; + if (chatId != null && chatId.isNotEmpty) { + final resp = await api?.dio.get('/api/v1/chats/$chatId'); + final data = resp?.data as Map?; + String content = ''; + final chatObj = data?['chat'] as Map?; + if (chatObj != null) { + final list = chatObj['messages']; + if (list is List) { + final target = list.firstWhere( + (m) => + (m is Map && + (m['id']?.toString() == assistantMessageId)), + orElse: () => null, + ); + if (target != null) { + final rawContent = (target as Map)['content']; + if (rawContent is String) { + content = rawContent; + } else if (rawContent is List) { + final textItem = rawContent.firstWhere( + (i) => i is Map && i['type'] == 'text', + orElse: () => null, + ); + if (textItem != null) { + content = textItem['text']?.toString() ?? ''; + } + } + } + } + if (content.isEmpty) { + final history = chatObj['history']; + if (history is Map && history['messages'] is Map) { + final Map messagesMap = + (history['messages'] as Map) + .cast(); + final msg = messagesMap[assistantMessageId]; + if (msg is Map) { + final rawContent = msg['content']; + if (rawContent is String) { + content = rawContent; + } else if (rawContent is List) { + final textItem = rawContent.firstWhere( + (i) => i is Map && i['type'] == 'text', + orElse: () => null, + ); + if (textItem != null) { + content = textItem['text']?.toString() ?? ''; + } + } + } + } + } + } + if (content.isNotEmpty) { + ref + .read(chatMessagesProvider.notifier) + .replaceLastMessageContent(content); + } + } + } catch (_) { + } finally { + ref.read(chatMessagesProvider.notifier).finishStreaming(); + } + }); + return; + } + } + ref.read(chatMessagesProvider.notifier).finishStreaming(); + } + } + } else if (type == 'request:chat:completion' && payload != null) { + final channel = payload['channel']; + if (channel is String && channel.isNotEmpty) { + suppressSocketContent = true; + channelLineHandlerFactory(channel); + } + } else if (type == 'event:status' && payload != null) { + final status = payload['status']?.toString() ?? ''; + if (status.isNotEmpty) { + ref + .read(chatMessagesProvider.notifier) + .updateLastMessageWithFunction( + (m) => m.copyWith(metadata: {...?m.metadata, 'status': status}), + ); + } + } else if (type == 'event:tool' && payload != null) { + final files = _extractFilesFromResult(payload['result']); + if (files.isNotEmpty) { + final msgs = ref.read(chatMessagesProvider); + if (msgs.isNotEmpty && msgs.last.role == 'assistant') { + final existing = msgs.last.files ?? >[]; + final merged = [...existing, ...files]; + ref + .read(chatMessagesProvider.notifier) + .updateLastMessageWithFunction( + (m) => m.copyWith(files: merged), + ); + } + } + } else if (type == 'event:message:delta' && payload != null) { + if (suppressSocketContent) return; + final content = payload['content']?.toString() ?? ''; + if (content.isNotEmpty) { + ref.read(chatMessagesProvider.notifier).appendToLastMessage(content); + _updateImagesFromCurrentContent(ref); + } + } + } catch (_) {} + } + + void channelEventsHandler(Map ev) { + try { + final data = ev['data']; + if (data == null) return; + final type = data['type']; + final payload = data['data']; + if (type == 'message' && payload is Map) { + final content = payload['content']?.toString() ?? ''; + if (content.isNotEmpty) { + ref.read(chatMessagesProvider.notifier).appendToLastMessage(content); + _updateImagesFromCurrentContent(ref); + } + } + } catch (_) {} + } + + socketService.onChatEvents(chatHandler); + socketService.onChannelEvents(channelEventsHandler); + Future.delayed(const Duration(seconds: 90), () { + try { + socketService.offChatEvents(); + socketService.offChannelEvents(); + } catch (_) {} + try { + final msgs = ref.read(chatMessagesProvider); + if (msgs.isNotEmpty && + msgs.last.role == 'assistant' && + msgs.last.isStreaming) { + ref.read(chatMessagesProvider.notifier).finishStreaming(); + } + } catch (_) {} + }); +} + // ========== Tool Servers (OpenAPI) Helpers ========== Future>> _resolveToolServers( @@ -2658,7 +3389,10 @@ Future>> _resolveToolServers( return resolved; } -Map? _resolveRef(String ref, Map? components) { +Map? _resolveRef( + String ref, + Map? components, +) { // e.g., #/components/schemas/MySchema if (!ref.startsWith('#/')) return null; final parts = ref.split('/'); @@ -2668,7 +3402,8 @@ Map? _resolveRef(String ref, Map? components) final section = components?[type]; if (section is Map) { final schema = section[name]; - if (schema is Map) return Map.from(schema); + if (schema is Map) + return Map.from(schema); } return null; } @@ -2687,10 +3422,12 @@ Map _resolveSchemaSimple( final out = {}; if (type is String) { out['type'] = type; - if (schema['description'] != null) out['description'] = schema['description']; + if (schema['description'] != null) + out['description'] = schema['description']; if (type == 'object') { out['properties'] = {}; - if (schema['required'] is List) out['required'] = List.from(schema['required']); + if (schema['required'] is List) + out['required'] = List.from(schema['required']); final props = schema['properties']; if (props is Map) { props.forEach((k, v) { @@ -2706,7 +3443,9 @@ Map _resolveSchemaSimple( return {}; } -List> _convertOpenApiToToolPayload(Map openApi) { +List> _convertOpenApiToToolPayload( + Map openApi, +) { final tools = >[]; final paths = openApi['paths']; if (paths is! Map) return tools; @@ -2716,7 +3455,10 @@ List> _convertOpenApiToToolPayload(Map ope if (operation is Map && operation['operationId'] != null) { final tool = { 'name': operation['operationId'], - 'description': operation['description'] ?? operation['summary'] ?? 'No description available.', + 'description': + operation['description'] ?? + operation['summary'] ?? + 'No description available.', 'parameters': { 'type': 'object', 'properties': {}, @@ -2731,9 +3473,11 @@ List> _convertOpenApiToToolPayload(Map ope final name = p['name']; final schema = p['schema'] as Map?; if (name != null && schema != null) { - String desc = (schema['description'] ?? p['description'] ?? '').toString(); + String desc = (schema['description'] ?? p['description'] ?? '') + .toString(); if (schema['enum'] is List) { - desc = '$desc. Possible values: ${(schema['enum'] as List).join(', ')}'; + desc = + '$desc. Possible values: ${(schema['enum'] as List).join(', ')}'; } tool['parameters']['properties'][name] = { 'type': schema['type'], @@ -2752,7 +3496,10 @@ List> _convertOpenApiToToolPayload(Map ope final content = reqBody['content']; if (content is Map && content['application/json'] is Map) { final schema = content['application/json']['schema']; - final resolved = _resolveSchemaSimple(schema, openApi['components'] as Map?); + final resolved = _resolveSchemaSimple( + schema, + openApi['components'] as Map?, + ); if (resolved['properties'] is Map) { tool['parameters']['properties'] = { ...tool['parameters']['properties'],