From 637274133f58a4c43e8b02d6f97e820125fe1896 Mon Sep 17 00:00:00 2001 From: cogwheel0 <172976095+cogwheel0@users.noreply.github.com> Date: Thu, 25 Sep 2025 12:28:02 +0530 Subject: [PATCH] refactor: removing legacy socket code --- lib/core/services/streaming_helper.dart | 35 + .../chat/providers/chat_providers.dart | 1576 +---------------- 2 files changed, 65 insertions(+), 1546 deletions(-) diff --git a/lib/core/services/streaming_helper.dart b/lib/core/services/streaming_helper.dart index 58b62d1..debe2c2 100644 --- a/lib/core/services/streaming_helper.dart +++ b/lib/core/services/streaming_helper.dart @@ -6,6 +6,7 @@ import 'package:flutter/foundation.dart'; import '../../core/models/chat_message.dart'; import '../../core/services/persistent_streaming_service.dart'; import '../../core/services/socket_service.dart'; +import '../../core/utils/inactivity_watchdog.dart'; import '../../core/utils/stream_chunker.dart'; import '../../core/utils/tool_calls_parser.dart'; @@ -68,6 +69,27 @@ StreamSubscription attachUnifiedChunkedStreaming({ }, ); + InactivityWatchdog? socketWatchdog; + if (socketService != null) { + socketWatchdog = InactivityWatchdog( + window: const Duration(minutes: 5), + onTimeout: () { + try { + socketService.offChatEvents(); + socketService.offChannelEvents(); + } catch (_) {} + try { + final msgs = getMessages(); + if (msgs.isNotEmpty && + msgs.last.role == 'assistant' && + msgs.last.isStreaming) { + finishStreaming(); + } + } catch (_) {} + }, + )..start(); + } + bool isSearching = false; bool suppressSocketContent = suppressSocketContentInitially; bool usingDynamicChannel = usingDynamicChannelInitially; @@ -185,6 +207,7 @@ StreamSubscription attachUnifiedChunkedStreaming({ try { if (line is String) { final s = line.trim(); + socketWatchdog?.ping(); if (s == '[DONE]' || s == 'DONE') { try { socketService?.offEvent(channel); @@ -202,6 +225,7 @@ StreamSubscription attachUnifiedChunkedStreaming({ ); } catch (_) {} finishStreaming(); + socketWatchdog?.stop(); return; } if (s.startsWith('data:')) { @@ -222,6 +246,7 @@ StreamSubscription attachUnifiedChunkedStreaming({ ); } catch (_) {} finishStreaming(); + socketWatchdog?.stop(); return; } try { @@ -280,11 +305,13 @@ StreamSubscription attachUnifiedChunkedStreaming({ } } } else if (line is Map) { + socketWatchdog?.ping(); if (line['done'] == true) { try { socketService?.offEvent(channel); } catch (_) {} finishStreaming(); + socketWatchdog?.stop(); return; } } @@ -294,10 +321,12 @@ StreamSubscription attachUnifiedChunkedStreaming({ try { socketService?.onEvent(channel, handler); } catch (_) {} + socketWatchdog?.ping(); Future.delayed(const Duration(minutes: 3), () { try { socketService?.offEvent(channel); } catch (_) {} + socketWatchdog?.stop(); }); } @@ -307,6 +336,7 @@ StreamSubscription attachUnifiedChunkedStreaming({ if (data == null) return; final type = data['type']; final payload = data['data']; + socketWatchdog?.ping(); if (type == 'chat:completion' && payload != null) { if (payload is Map) { @@ -471,6 +501,7 @@ StreamSubscription attachUnifiedChunkedStreaming({ } } finishStreaming(); + socketWatchdog?.stop(); } } } else if (type == 'chat:message:error' && payload != null) { @@ -497,6 +528,7 @@ StreamSubscription attachUnifiedChunkedStreaming({ } catch (_) {} // Ensure UI exits streaming state finishStreaming(); + socketWatchdog?.stop(); } else if ((type == 'chat:message:delta' || type == 'message') && payload != null) { // Incremental message content over socket; respect suppression on SSE-driven flows @@ -672,6 +704,7 @@ StreamSubscription attachUnifiedChunkedStreaming({ finishStreaming(); } } catch (_) {} + socketWatchdog?.stop(); }); } @@ -727,6 +760,7 @@ StreamSubscription attachUnifiedChunkedStreaming({ if (!usingDynamicChannel && !isBackgroundFlow) { finishStreaming(); } + socketWatchdog?.stop(); }, onError: (error) async { try { @@ -738,6 +772,7 @@ StreamSubscription attachUnifiedChunkedStreaming({ socketService.offChatEvents(); } catch (_) {} } + socketWatchdog?.stop(); }, ); diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index 892cd45..115c486 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -10,9 +10,6 @@ import '../../../core/models/chat_message.dart'; import '../../../core/models/conversation.dart'; import '../../../core/providers/app_providers.dart'; import '../../../core/auth/auth_state_manager.dart'; -import '../../../core/utils/stream_chunker.dart'; -import '../../../core/services/persistent_streaming_service.dart'; -import '../../../core/utils/debug_logger.dart'; import '../../../core/utils/inactivity_watchdog.dart'; import '../services/reviewer_mode_service.dart'; import '../../../shared/services/tasks/task_queue.dart'; @@ -1694,34 +1691,12 @@ Future _sendMessageInternal( final stream = response.stream; final sessionId = response.sessionId; - // (socket handlers attached below after flow flags are set) - - // If socket is available, start listening for chat-events immediately - // Background-tools flow OR any session-bound flow relies on socket/dynamic channel for - // streaming content. Allow socket TEXT in those modes. For pure SSE/polling flows, suppress - // socket TEXT to avoid duplicates (still surface tool_call status). + // Use unified streaming helper for SSE/WebSocket handling final bool isBackgroundFlow = isBackgroundToolsFlowPre || isBackgroundWebSearchPre || wantSessionBinding; - bool suppressSocketContent = - !isBackgroundFlow; // allow socket text when session-bound or tools - bool usingDynamicChannel = false; // set true when server provides a channel - // Attach socket handlers for background flows/dynamic channels - if (socketService != null) { - _attachSocketStreamingHandlers( - ref: ref, - socketService: socketService, - assistantMessageId: assistantMessageId, - modelId: selectedModel.id, - modelItem: modelItem, - sessionId: sessionId, - isBackgroundFlow: isBackgroundFlow, - suppressSocketContentInitially: suppressSocketContent, - activeConversationId: activeConversation?.id, - ); - } - // Enrich the assistant placeholder metadata so the typing guard can use longer timeouts + try { ref.read(chatMessagesProvider.notifier).updateLastMessageWithFunction(( m, @@ -1736,940 +1711,33 @@ Future _sendMessageInternal( }); } catch (_) {} - if (socketService != null) { - // Activity-based watchdog for chat/channel events (resets on activity) - final chatWatchdog = InactivityWatchdog( - window: const Duration(minutes: 5), - onTimeout: () { - try { - socketService.offChatEvents(); - socketService.offChannelEvents(); - } catch (_) {} - // As a final safeguard, if we're still in streaming state, finish it - try { - final msgs = ref.read(chatMessagesProvider); - if (msgs.isNotEmpty && - msgs.last.role == 'assistant' && - msgs.last.isStreaming) { - ref.read(chatMessagesProvider.notifier).finishStreaming(); - } - } catch (_) {} - }, - )..start(); - - void chatHandler(Map ev) { - try { - final data = ev['data']; - if (data == null) return; - final type = data['type']; - final payload = data['data']; - DebugLogger.stream('Socket chat-events: type=$type'); - // Any chat event indicates activity; reset inactivity watchdog - // (watchdog defined below, near handler registration) - chatWatchdog.ping(); - if (type == 'chat:completion' && payload != null) { - if (payload is Map) { - // Provider may emit tool_calls at the top level - // Always surface tool_calls status from socket for instant tiles - if (payload.containsKey('tool_calls')) { - final tc = payload['tool_calls']; - if (tc is List) { - for (final call in tc) { - if (call is Map) { - final fn = call['function']; - final name = (fn is Map && fn['name'] is String) - ? fn['name'] as String - : null; - if (name is String && name.isNotEmpty) { - final msgs = ref.read(chatMessagesProvider); - final exists = - (msgs.isNotEmpty) && - RegExp( - ']*\\bname="${RegExp.escape(name)}"', - multiLine: true, - ).hasMatch(msgs.last.content); - if (!exists) { - final status = - '\n
Executing...\n
\n'; - ref - .read(chatMessagesProvider.notifier) - .appendToLastMessage(status); - } - } - } - } - } - } - if (!suppressSocketContent && payload.containsKey('choices')) { - final choices = payload['choices']; - if (choices is List && choices.isNotEmpty) { - final choice = choices.first; - final delta = choice is Map ? choice['delta'] : null; - if (delta is Map) { - // Surface tool_calls status like SSE path - if (delta.containsKey('tool_calls')) { - final tc = delta['tool_calls']; - if (tc is List) { - for (final call in tc) { - if (call is Map) { - final fn = call['function']; - final name = (fn is Map && fn['name'] is String) - ? fn['name'] as String - : null; - if (name is String && name.isNotEmpty) { - final msgs = ref.read(chatMessagesProvider); - final exists = - (msgs.isNotEmpty) && - RegExp( - ']*\\bname="${RegExp.escape(name)}"', - multiLine: true, - ).hasMatch(msgs.last.content); - if (!exists) { - final status = - '\n
Executing...\n
\n'; - ref - .read(chatMessagesProvider.notifier) - .appendToLastMessage(status); - } - } - } - } - } - } - final content = delta['content']?.toString() ?? ''; - if (content.isNotEmpty) { - ref - .read(chatMessagesProvider.notifier) - .appendToLastMessage(content); - _updateImagesFromCurrentContent(ref); - } - } - } - } - if (!suppressSocketContent && payload.containsKey('content')) { - final content = payload['content']?.toString() ?? ''; - if (content.isNotEmpty) { - final msgs = ref.read(chatMessagesProvider); - if (msgs.isNotEmpty && msgs.last.role == 'assistant') { - final prev = msgs.last.content; - if (prev.isEmpty || prev == '[TYPING_INDICATOR]') { - ref - .read(chatMessagesProvider.notifier) - .replaceLastMessageContent(content); - } else if (content.startsWith(prev)) { - ref - .read(chatMessagesProvider.notifier) - .appendToLastMessage(content.substring(prev.length)); - } else { - ref - .read(chatMessagesProvider.notifier) - .replaceLastMessageContent(content); - } - _updateImagesFromCurrentContent(ref); - } else { - ref - .read(chatMessagesProvider.notifier) - .appendToLastMessage(content); - _updateImagesFromCurrentContent(ref); - } - } - } - if (payload['done'] == true) { - // Stop listening to further socket events for this session. - try { - socketService.offChatEvents(); - } catch (_) {} - try { - chatWatchdog.ping(); // ensure timer exists - chatWatchdog.stop(); - } catch (_) {} - - // Notify server that chat is completed (mirrors web client) - try { - final apiSvc = ref.read(apiServiceProvider); - final chatId = activeConversation?.id ?? ''; - if (apiSvc != null && chatId.isNotEmpty) { - unawaited( - apiSvc - .sendChatCompleted( - chatId: chatId, - messageId: assistantMessageId, - messages: const [], - model: selectedModel.id, - modelItem: modelItem, - sessionId: sessionId, - ) - .timeout(const Duration(seconds: 3)) - .catchError((_) {}), - ); - } - } catch (_) {} - - // If no content was rendered yet, fetch final assistant message from server - final msgs = ref.read(chatMessagesProvider); - if (msgs.isNotEmpty && msgs.last.role == 'assistant') { - final lastContent = msgs.last.content.trim(); - if (lastContent.isEmpty) { - final apiSvc = ref.read(apiServiceProvider); - final chatId = activeConversation?.id; - final msgId = assistantMessageId; - if (apiSvc != null && chatId != null && chatId.isNotEmpty) { - Future.microtask(() async { - try { - final resp = await apiSvc.dio.get( - '/api/v1/chats/$chatId', - ); - final data = resp.data as Map; - String content = ''; - final chatObj = data['chat'] as Map?; - if (chatObj != null) { - // Prefer chat.messages list - final list = chatObj['messages']; - if (list is List) { - final target = list.firstWhere( - (m) => - (m is Map && - (m['id']?.toString() == msgId)), - orElse: () => null, - ); - if (target != null) { - final rawContent = (target as Map)['content']; - if (rawContent is String) { - content = rawContent; - } else if (rawContent is List) { - final textItem = rawContent.firstWhere( - (i) => i is Map && i['type'] == 'text', - orElse: () => null, - ); - if (textItem != null) { - content = - textItem['text']?.toString() ?? ''; - } - } - } - } - // Fallback to history map - if (content.isEmpty) { - final history = chatObj['history']; - if (history is Map && - history['messages'] is Map) { - final Map messagesMap = - (history['messages'] as Map) - .cast(); - final msg = messagesMap[msgId]; - if (msg is Map) { - final rawContent = msg['content']; - if (rawContent is String) { - content = rawContent; - } else if (rawContent is List) { - final textItem = rawContent.firstWhere( - (i) => i is Map && i['type'] == 'text', - orElse: () => null, - ); - if (textItem != null) { - content = - textItem['text']?.toString() ?? ''; - } - } - } - } - } - } - - if (content.isNotEmpty) { - ref - .read(chatMessagesProvider.notifier) - .replaceLastMessageContent(content); - } - } catch (_) { - // Swallow; we'll still finish streaming - } finally { - ref - .read(chatMessagesProvider.notifier) - .finishStreaming(); - } - }); - return; // Defer finish to microtask - } - } - } - // Normal path: finish now - ref.read(chatMessagesProvider.notifier).finishStreaming(); - try { - chatWatchdog.stop(); - } catch (_) {} - } - } - } else if (type == 'request:chat:completion' && payload != null) { - // Mirror web client's execute path: listen on provided dynamic channel - final channel = payload['channel']; - if (channel is String && channel.isNotEmpty) { - // Prefer dynamic channel for streaming content; suppress chat-events text to avoid duplicates - suppressSocketContent = true; - usingDynamicChannel = true; - usingDynamicChannel = true; - if (kSocketVerboseLogging) { - DebugLogger.stream( - 'Socket request:chat:completion channel=$channel', - ); - } - void channelLineHandler(dynamic line) { - try { - if (line is String) { - final s = line.trim(); - // Dynamic channel activity - try { - chatWatchdog.ping(); - } catch (_) {} - DebugLogger.stream( - 'Socket [$channel] line=${s.length > 160 ? '${s.substring(0, 160)}…' : s}', - ); - if (s == '[DONE]' || s == 'DONE') { - socketService.offEvent(channel); - // Channel completed - try { - unawaited( - api.sendChatCompleted( - chatId: activeConversation?.id ?? '', - messageId: assistantMessageId, - messages: const [], - model: selectedModel.id, - modelItem: modelItem, - sessionId: sessionId, - ), - ); - } catch (_) {} - ref.read(chatMessagesProvider.notifier).finishStreaming(); - return; - } - if (s.startsWith('data:')) { - final dataStr = s.substring(5).trim(); - if (dataStr == '[DONE]') { - socketService.offEvent(channel); - try { - unawaited( - api.sendChatCompleted( - chatId: activeConversation?.id ?? '', - messageId: assistantMessageId, - messages: const [], - model: selectedModel.id, - modelItem: modelItem, - sessionId: sessionId, - ), - ); - } catch (_) {} - ref - .read(chatMessagesProvider.notifier) - .finishStreaming(); - return; - } - // Try to parse OpenAI-style delta JSON - try { - final Map j = jsonDecode(dataStr); - final choices = j['choices']; - if (choices is List && choices.isNotEmpty) { - final choice = choices.first; - final delta = choice is Map ? choice['delta'] : null; - if (delta is Map) { - if (delta.containsKey('content')) { - final c = delta['content']?.toString() ?? ''; - if (c.isNotEmpty) { - DebugLogger.stream( - 'Socket [$channel] delta.content len=${c.length}', - ); - } - } - // Surface tool_calls status - if (delta.containsKey('tool_calls')) { - if (kSocketVerboseLogging) { - DebugLogger.stream( - 'Socket [$channel] delta.tool_calls detected', - ); - } - final tc = delta['tool_calls']; - if (tc is List) { - for (final call in tc) { - if (call is Map) { - final fn = call['function']; - final name = - (fn is Map && fn['name'] is String) - ? fn['name'] as String - : null; - if (name is String && name.isNotEmpty) { - final msgs = ref.read( - chatMessagesProvider, - ); - final exists = - (msgs.isNotEmpty) && - RegExp( - ']*\\bname="${RegExp.escape(name)}"', - multiLine: true, - ).hasMatch(msgs.last.content); - if (!exists) { - final status = - '\n
Executing...\n
\n'; - ref - .read(chatMessagesProvider.notifier) - .appendToLastMessage(status); - } - } - } - } - } - } - // Append streamed content - final content = delta['content']?.toString() ?? ''; - if (content.isNotEmpty) { - ref - .read(chatMessagesProvider.notifier) - .appendToLastMessage(content); - _updateImagesFromCurrentContent(ref); - } - } - } - } catch (_) { - // Non-JSON line: append as-is - if (s.isNotEmpty) { - ref - .read(chatMessagesProvider.notifier) - .appendToLastMessage(s); - _updateImagesFromCurrentContent(ref); - } - } - } else { - // Plain text line - if (s.isNotEmpty) { - ref - .read(chatMessagesProvider.notifier) - .appendToLastMessage(s); - _updateImagesFromCurrentContent(ref); - } - } - } else if (line is Map) { - // If server sends { done: true } via channel - final done = line['done'] == true; - if (done) { - socketService.offEvent(channel); - try { - unawaited( - api.sendChatCompleted( - chatId: activeConversation?.id ?? '', - messageId: assistantMessageId, - messages: const [], - model: selectedModel.id, - modelItem: modelItem, - sessionId: sessionId, - ), - ); - } catch (_) {} - ref.read(chatMessagesProvider.notifier).finishStreaming(); - return; - } - } - } catch (_) {} - } - - // Register dynamic channel listener - try { - socketService.onEvent(channel, channelLineHandler); - } catch (_) {} - } - } else if (type == 'chat:message:error' && payload != null) { - // Surface error associated with the current assistant message - try { - dynamic err = payload is Map ? payload['error'] : null; - String content = ''; - if (err is Map) { - final c = err['content']; - if (c is String) { - content = c; - } else if (c != null) { - content = c.toString(); - } - } else if (err is String) { - content = err; - } else if (payload is Map && payload['message'] is String) { - content = payload['message']; - } - if (content.isNotEmpty) { - ref - .read(chatMessagesProvider.notifier) - .replaceLastMessageContent('⚠️ $content'); - } - } catch (_) {} - ref.read(chatMessagesProvider.notifier).finishStreaming(); - } else if (type == 'execute:tool' && payload != null) { - // Show an executing tile immediately using provided tool info - try { - final name = payload['name']?.toString() ?? 'tool'; - DebugLogger.stream('Socket execute:tool name=$name'); - final status = - '\n
Executing...\n
\n'; - ref - .read(chatMessagesProvider.notifier) - .appendToLastMessage(status); - // If tool payload already carries files/result, try to extract images for grid - try { - final files = _extractFilesFromResult(payload['files']); - final resultFiles = _extractFilesFromResult(payload['result']); - final all = [...files, ...resultFiles]; - if (all.isNotEmpty) { - final msgs = ref.read(chatMessagesProvider); - if (msgs.isNotEmpty && msgs.last.role == 'assistant') { - final existing = - msgs.last.files ?? >[]; - final seen = { - for (final f in existing) - if (f['url'] is String) (f['url'] as String) else '', - }..removeWhere((e) => e.isEmpty); - final merged = >[...existing]; - for (final f in all) { - final url = f['url'] as String?; - if (url != null && - url.isNotEmpty && - !seen.contains(url)) { - merged.add({'type': 'image', 'url': url}); - seen.add(url); - } - } - if (merged.length != existing.length) { - ref - .read(chatMessagesProvider.notifier) - .updateLastMessageWithFunction( - (m) => m.copyWith(files: merged), - ); - } - } - } - } catch (_) {} - } catch (_) {} - } else if ((type == 'files' || type == 'chat:message:files') && - payload != null) { - // Handle files event from socket (image generation results) - try { - DebugLogger.stream( - 'Socket files event received: ${payload.toString()}', - ); - final files = _extractFilesFromResult(payload); - if (files.isNotEmpty) { - final msgs = ref.read(chatMessagesProvider); - if (msgs.isNotEmpty && msgs.last.role == 'assistant') { - final existing = msgs.last.files ?? >[]; - final seen = { - for (final f in existing) - if (f['url'] is String) (f['url'] as String) else '', - }..removeWhere((e) => e.isEmpty); - final merged = >[...existing]; - for (final f in files) { - final url = f['url'] as String?; - if (url != null && url.isNotEmpty && !seen.contains(url)) { - merged.add({'type': 'image', 'url': url}); - seen.add(url); - } - } - if (merged.length != existing.length) { - DebugLogger.stream( - 'Socket files: Adding ${merged.length - existing.length} new images', - ); - final updatedMessage = ref - .read(chatMessagesProvider) - .last - .copyWith(files: merged); - DebugLogger.stream( - 'Socket files: Updated message files count: ${updatedMessage.files?.length}', - ); - ref - .read(chatMessagesProvider.notifier) - .updateLastMessageWithFunction( - (ChatMessage m) => m.copyWith(files: merged), - ); - } - } - } - } catch (e) { - DebugLogger.stream('Socket files event error: $e'); - } - } - } catch (_) {} - } - - socketService.onChatEvents(chatHandler); - // Also mirror channel-events like the web client - void channelEventsHandler(Map ev) { - try { - final data = ev['data']; - if (data == null) return; - final type = data['type']; - final payload = data['data']; - DebugLogger.stream('Socket channel-events: type=$type'); - // Handle generic channel progress messages if needed - if (type == 'message' && payload is Map) { - final content = payload['content']?.toString() ?? ''; - if (content.isNotEmpty) { - ref - .read(chatMessagesProvider.notifier) - .appendToLastMessage(content); - _updateImagesFromCurrentContent(ref); - chatWatchdog.ping(); - } - } - } catch (_) {} - } - - socketService.onChannelEvents(channelEventsHandler); - // Start activity watchdog - chatWatchdog.ping(); - } - - // Prepare streaming and background handling - final chunkedStream = StreamChunker.chunkStream( - stream, - enableChunking: true, - minChunkSize: 5, - maxChunkLength: 3, - delayBetweenChunks: const Duration(milliseconds: 15), + final sendStreamSub = attachUnifiedChunkedStreaming( + stream: stream, + webSearchEnabled: webSearchEnabled, + isBackgroundFlow: isBackgroundFlow, + suppressSocketContentInitially: !isBackgroundFlow, + usingDynamicChannelInitially: false, + assistantMessageId: assistantMessageId, + modelId: selectedModel.id, + modelItem: modelItem, + sessionId: sessionId, + activeConversationId: activeConversation?.id, + api: api, + socketService: socketService, + appendToLastMessage: (c) => + ref.read(chatMessagesProvider.notifier).appendToLastMessage(c), + replaceLastMessageContent: (c) => + ref.read(chatMessagesProvider.notifier).replaceLastMessageContent(c), + updateLastMessageWith: (updater) => ref + .read(chatMessagesProvider.notifier) + .updateLastMessageWithFunction(updater), + finishStreaming: () => + ref.read(chatMessagesProvider.notifier).finishStreaming(), + getMessages: () => ref.read(chatMessagesProvider), ); - // Create a stream controller for persistent handling - final persistentController = StreamController.broadcast(); - - // Register stream with persistent service for app lifecycle handling - final persistentService = PersistentStreamingService(); - - final streamId = persistentService.registerStream( - subscription: chunkedStream.listen( - (chunk) { - persistentController.add(chunk); - }, - onDone: () { - persistentController.close(); - }, - onError: (error) { - persistentController.addError(error); - }, - ), - controller: persistentController, - recoveryCallback: () async { - // Recovery callback to restart streaming if interrupted - debugPrint('DEBUG: Attempting to recover interrupted stream'); - // TODO: Implement stream recovery logic - }, - metadata: { - 'conversationId': activeConversation?.id, - 'messageId': assistantMessageId, - 'modelId': selectedModel.id, - }, - ); - - // Image generation handled server-side via tools; no client pre-request - - // For built-in web search, the status will be updated when function calls are detected - // in the streaming response. Manual status update is not needed here. - - // (moved above) streaming registration is already set up - - // Track web search status - bool isSearching = false; - - // Helpers were defined above - - int chunkSeq = 0; - final streamSubscription = persistentController.stream.listen( - (chunk) { - chunkSeq += 1; - try { - persistentService.updateStreamProgress( - streamId, - chunkSequence: chunkSeq, - appendedContent: chunk, - ); - } catch (_) {} - var effectiveChunk = chunk; - // Check for web search indicators in the stream - if (webSearchEnabled && !isSearching) { - // Check if this is the start of web search - if (chunk.contains('[SEARCHING]') || - chunk.contains('Searching the web') || - chunk.contains('web search')) { - isSearching = true; - // Update the message to show search status - ref - .read(chatMessagesProvider.notifier) - .updateLastMessageWithFunction( - (message) => message.copyWith( - content: '🔍 Searching the web...', - metadata: {'webSearchActive': true}, - ), - ); - return; // Don't append this chunk - } - } - - // Check if web search is complete - if (isSearching && - (chunk.contains('[/SEARCHING]') || - chunk.contains('Search complete'))) { - isSearching = false; - // Only update metadata; keep content to avoid flicker/indicator reappearing - ref - .read(chatMessagesProvider.notifier) - .updateLastMessageWithFunction( - (message) => - message.copyWith(metadata: {'webSearchActive': false}), - ); - // Strip markers from this chunk and continue processing - effectiveChunk = effectiveChunk - .replaceAll('[SEARCHING]', '') - .replaceAll('[/SEARCHING]', ''); - } - - // Regular content - append to message (markers removed above) - if (effectiveChunk.trim().isNotEmpty) { - ref - .read(chatMessagesProvider.notifier) - .appendToLastMessage(effectiveChunk); - _updateImagesFromCurrentContent(ref); - } - }, - - onDone: () async { - // Unregister from persistent service - persistentService.unregisterStream(streamId); - // Stop socket events now that streaming finished only for SSE-driven streams - if (socketService != null && suppressSocketContent == true) { - try { - socketService.offChatEvents(); - } catch (_) {} - } - // Allow socket content again for future sessions (harmless if already false) - suppressSocketContent = false; - // If this path was SSE-driven (no background tools/dynamic channel), finish now. - // Otherwise keep streaming state until socket/dynamic channel signals done. - if (!usingDynamicChannel && !isBackgroundFlow) { - ref.read(chatMessagesProvider.notifier).finishStreaming(); - } - - // Send chat completed notification to OpenWebUI - final messages = ref.read(chatMessagesProvider); - if (messages.isNotEmpty && activeConversation != null) { - final lastMessage = messages.last; - if (lastMessage.role == 'assistant') { - try { - // Convert messages to the format expected by /api/chat/completed - final List> formattedMessages = []; - - for (final msg in messages) { - final messageMap = { - 'id': msg.id, - 'role': msg.role, - 'content': msg.content, - 'timestamp': msg.timestamp.millisecondsSinceEpoch ~/ 1000, - }; - - // For assistant messages, add completion details - if (msg.role == 'assistant') { - messageMap['model'] = selectedModel.id; - - // Add mock usage data if not available (OpenWebUI expects this) - if (msg.usage != null) { - messageMap['usage'] = msg.usage; - } else if (msg == messages.last) { - // Add basic usage for the last assistant message - messageMap['usage'] = { - 'prompt_tokens': 10, - 'completion_tokens': msg.content.split(' ').length, - 'total_tokens': 10 + msg.content.split(' ').length, - }; - } - } - - formattedMessages.add(messageMap); - } - - // Only notify completion immediately for non-background SSE flows. - // For background tools/dynamic-channel flows, defer completion - // until the socket/dynamic channel signals done. - if (!isBackgroundFlow && !usingDynamicChannel) { - try { - unawaited( - api - .sendChatCompleted( - chatId: activeConversation.id, - messageId: - assistantMessageId, // Use message ID from response - messages: formattedMessages, - model: selectedModel.id, - modelItem: modelItem, // Include model metadata - sessionId: sessionId, // Include session ID - ) - .timeout(const Duration(seconds: 3)) - .catchError((_) {}), - ); - } catch (_) { - // Ignore - } - } - - // Fetch the latest conversation state - try { - // Quick fetch to get the current state - no waiting for title generation - final updatedConv = await api.getConversation( - activeConversation.id, - ); - - // Check if we should update the title (only on first response and if server has one) - final shouldUpdateTitle = - messages.length <= 2 && - updatedConv.title != 'New Chat' && - updatedConv.title.isNotEmpty; - - if (shouldUpdateTitle) { - // Ensure the title is reasonable (not too long) - final cleanTitle = updatedConv.title.length > 100 - ? '${updatedConv.title.substring(0, 100)}...' - : updatedConv.title; - - // Update the conversation with title and combined messages - final updatedConversation = activeConversation.copyWith( - title: cleanTitle, - updatedAt: DateTime.now(), - ); - - ref - .read(activeConversationProvider.notifier) - .set(updatedConversation); - } else { - // Keep local messages and only refresh conversations list - ref.invalidate(conversationsProvider); - } - - // Streaming already marked as complete when stream ended - // Removed post-assistant title trigger/background check; handled right after user message - } catch (e) { - // Streaming already marked as complete when stream ended - } - } catch (e) { - // Continue without failing the entire process - // Note: Conversation still syncs via _saveConversationToServer - // Streaming already marked as complete when stream ended - } - } - } - - // Do not persist conversation to server here. Server manages chat state. - // Keep local save only for quick resume. - await Future.delayed(const Duration(milliseconds: 50)); - await _saveConversationLocally(ref); - - // Removed post-assistant image generation; images are handled immediately after user message - }, - onError: (error) { - // Mark streaming as complete on error - ref.read(chatMessagesProvider.notifier).finishStreaming(); - // Stop socket events to avoid duplicates after error (only for SSE-driven) - if (socketService != null && suppressSocketContent == true) { - try { - socketService.offChatEvents(); - } catch (_) {} - } - - // Special handling for Socket.IO streaming failures - // These indicate the server generated a response but we couldn't stream it - if (error.toString().contains( - 'Socket.IO streaming not fully implemented', - )) { - // Don't remove the message - let the server content replacement handle it - // The onDone callback will fetch the actual response from the server - return; // Exit early to avoid removing the message - } - - // Handle streaming error - remove the assistant message placeholder for other errors - ref.read(chatMessagesProvider.notifier).removeLastMessage(); - - // Handle different types of errors - if (error.toString().contains('400')) { - // Bad request errors - likely malformed request format - final errorMessage = ChatMessage( - id: const Uuid().v4(), - role: 'assistant', - content: '''⚠️ **Message Format Error** - -This might be because: -• Image attachment couldn't be processed -• Request format incompatible with selected model -• Message contains unsupported content - -**💡 Solutions:** -• Long press this message and select "Retry" -• Try removing attachments and resending -• Switch to a different model and retry - -*Long press this message to access retry options.*''', - timestamp: DateTime.now(), - isStreaming: false, - ); - ref.read(chatMessagesProvider.notifier).addMessage(errorMessage); - } else if (error.toString().contains('401') || - error.toString().contains('403')) { - // Authentication errors - clear auth state and redirect to login - ref.invalidate(authStateManagerProvider); - } else if (error.toString().contains('500')) { - // Server errors - add user-friendly error message - final errorMessage = ChatMessage( - id: const Uuid().v4(), - role: 'assistant', - content: '''⚠️ **Server Error** - -This usually means: -• OpenWebUI server is experiencing issues -• Selected model might be unavailable -• Temporary connection problem - -**💡 Solutions:** -• Long press this message and select "Retry" -• Wait a moment and try again -• Switch to a different model -• Check with your server administrator - -*Long press this message to access retry options.*''', - timestamp: DateTime.now(), - isStreaming: false, - ); - ref.read(chatMessagesProvider.notifier).addMessage(errorMessage); - } else if (error.toString().contains('timeout')) { - // Timeout errors - final errorMessage = ChatMessage( - id: const Uuid().v4(), - role: 'assistant', - content: '''⏱️ **Request Timeout** - -This might be because: -• Server taking too long to respond -• Internet connection is slow -• Model processing a complex request - -**💡 Solutions:** -• Long press this message and select "Retry" -• Try a shorter message -• Check your internet connection -• Switch to a faster model - -*Long press this message to access retry options.*''', - timestamp: DateTime.now(), - isStreaming: false, - ); - ref.read(chatMessagesProvider.notifier).addMessage(errorMessage); - } - - // Don't throw the error to prevent unhandled exceptions - // The error message has been added to the chat - }, - ); - - // Register the stream subscription for proper cleanup - ref - .read(chatMessagesProvider.notifier) - .setMessageStream(streamSubscription); + ref.read(chatMessagesProvider.notifier).setMessageStream(sendStreamSub); + return; } catch (e) { // Handle error - remove the assistant message placeholder ref.read(chatMessagesProvider.notifier).removeLastMessage(); @@ -2691,6 +1759,9 @@ Please try sending the message again, or try without attachments.''', isStreaming: false, ); ref.read(chatMessagesProvider.notifier).addMessage(errorMessage); + } else if (e.toString().contains('401') || e.toString().contains('403')) { + // Authentication errors - clear auth state and redirect to login + ref.invalidate(authStateManagerProvider); } else if (e.toString().contains('500')) { final errorMessage = ChatMessage( id: const Uuid().v4(), @@ -3083,593 +2154,6 @@ final stopGenerationProvider = Provider((ref) { // ========== Shared Streaming Utilities ========== -List> _extractFilesFromResult(dynamic resp) { - final results = >[]; - if (resp == null) return results; - dynamic r = resp; - if (r is String) { - try { - r = jsonDecode(r); - } catch (_) {} - } - if (r is List) { - for (final item in r) { - if (item is String && item.isNotEmpty) { - results.add({'type': 'image', 'url': item}); - } else if (item is Map) { - final url = item['url']; - final b64 = item['b64_json'] ?? item['b64']; - if (url is String && url.isNotEmpty) { - results.add({'type': 'image', 'url': url}); - } else if (b64 is String && b64.isNotEmpty) { - results.add({'type': 'image', 'url': 'data:image/png;base64,$b64'}); - } - } - } - return results; - } - if (r is! Map) return results; - final data = r['data']; - if (data is List) { - for (final item in data) { - if (item is Map) { - final url = item['url']; - final b64 = item['b64_json'] ?? item['b64']; - if (url is String && url.isNotEmpty) { - results.add({'type': 'image', 'url': url}); - } else if (b64 is String && b64.isNotEmpty) { - results.add({'type': 'image', 'url': 'data:image/png;base64,$b64'}); - } - } else if (item is String && item.isNotEmpty) { - results.add({'type': 'image', 'url': item}); - } - } - } - final images = r['images']; - if (images is List) { - for (final item in images) { - if (item is String && item.isNotEmpty) { - results.add({'type': 'image', 'url': item}); - } else if (item is Map) { - final url = item['url']; - final b64 = item['b64_json'] ?? item['b64']; - if (url is String && url.isNotEmpty) { - results.add({'type': 'image', 'url': url}); - } else if (b64 is String && b64.isNotEmpty) { - results.add({'type': 'image', 'url': 'data:image/png;base64,$b64'}); - } - } - } - } - final files = r['files']; - if (files is List) { - results.addAll(_extractFilesFromResult(files)); - } - final singleUrl = r['url']; - if (singleUrl is String && singleUrl.isNotEmpty) { - results.add({'type': 'image', 'url': singleUrl}); - } - final singleB64 = r['b64_json'] ?? r['b64']; - if (singleB64 is String && singleB64.isNotEmpty) { - results.add({'type': 'image', 'url': 'data:image/png;base64,$singleB64'}); - } - return results; -} - -void _updateImagesFromCurrentContent(dynamic ref) { - try { - final msgs = ref.read(chatMessagesProvider); - if (msgs.isEmpty || msgs.last.role != 'assistant') return; - final content = msgs.last.content; - if (content.isEmpty) return; - - final collected = >[]; - - if (content.contains('\"]+\.(jpg|jpeg|png|gif|webp)', - caseSensitive: false, - ); - final urlMatches = urlPattern.allMatches(content); - for (final match in urlMatches) { - final url = match.group(0); - if (url != null && url.isNotEmpty) { - collected.add({'type': 'image', 'url': url}); - } - } - - final jsonPattern = RegExp( - r'\{[^}]*"url"[^}]*:[^}]*"(data:image/[^"]+|https?://[^"]+\.(jpg|jpeg|png|gif|webp))"[^}]*\}', - caseSensitive: false, - ); - final jsonMatches = jsonPattern.allMatches(content); - for (final match in jsonMatches) { - final url = RegExp( - r'"url"[^:]*:[^"]*"([^"]+)"', - ).firstMatch(match.group(0) ?? '')?.group(1); - if (url != null && url.isNotEmpty) { - collected.add({'type': 'image', 'url': url}); - } - } - - final partialResultsPattern = RegExp( - r'(result|files)="([^"]*(?:data:image/[^"]*|https?://[^"]*\.(jpg|jpeg|png|gif|webp))[^"]*)"', - caseSensitive: false, - ); - final partialMatches = partialResultsPattern.allMatches(content); - for (final match in partialMatches) { - final attrValue = match.group(2); - if (attrValue != null) { - try { - final decoded = json.decode(attrValue); - collected.addAll(_extractFilesFromResult(decoded)); - } catch (_) { - if (attrValue.startsWith('data:image/') || - RegExp( - r'https?://[^\s]+\.(jpg|jpeg|png|gif|webp)$', - caseSensitive: false, - ).hasMatch(attrValue)) { - collected.add({'type': 'image', 'url': attrValue}); - } - } - } - } - } - - if (collected.isEmpty) return; - - final existing = msgs.last.files ?? >[]; - final seen = { - for (final f in existing) - if (f['url'] is String) (f['url'] as String) else '', - }..removeWhere((e) => e.isEmpty); - - final merged = >[...existing]; - for (final f in collected) { - final url = f['url'] as String?; - if (url != null && url.isNotEmpty && !seen.contains(url)) { - merged.add({'type': 'image', 'url': url}); - seen.add(url); - } - } - - if (merged.length != existing.length) { - ref - .read(chatMessagesProvider.notifier) - .updateLastMessageWithFunction((m) => m.copyWith(files: merged)); - } - } catch (_) {} -} - -void _attachSocketStreamingHandlers({ - required dynamic ref, - required dynamic socketService, - required String assistantMessageId, - required String modelId, - required Map modelItem, - required String sessionId, - required bool isBackgroundFlow, - required bool suppressSocketContentInitially, - String? activeConversationId, -}) { - bool suppressSocketContent = suppressSocketContentInitially; - - final api = ref.read(apiServiceProvider); - - // Activity-based watchdog for socket-driven streaming (resets on activity) - final socketWatchdog = InactivityWatchdog( - window: const Duration(minutes: 5), - onTimeout: () { - try { - socketService.offChatEvents(); - socketService.offChannelEvents(); - } catch (_) {} - try { - final msgs = ref.read(chatMessagesProvider); - if (msgs.isNotEmpty && - msgs.last.role == 'assistant' && - msgs.last.isStreaming) { - ref.read(chatMessagesProvider.notifier).finishStreaming(); - } - } catch (_) {} - }, - )..start(); - - void channelLineHandlerFactory(String channel) { - void handler(dynamic line) { - try { - if (line is String) { - final s = line.trim(); - // Any socket line is activity - socketWatchdog.ping(); - if (s == '[DONE]' || s == 'DONE') { - try { - socketService.offEvent(channel); - } catch (_) {} - try { - unawaited( - api?.sendChatCompleted( - chatId: activeConversationId ?? '', - messageId: assistantMessageId, - messages: const [], - model: modelId, - modelItem: modelItem, - sessionId: sessionId, - ), - ); - } catch (_) {} - ref.read(chatMessagesProvider.notifier).finishStreaming(); - socketWatchdog.stop(); - return; - } - if (s.startsWith('data:')) { - final dataStr = s.substring(5).trim(); - if (dataStr == '[DONE]') { - try { - socketService.offEvent(channel); - } catch (_) {} - try { - unawaited( - api?.sendChatCompleted( - chatId: activeConversationId ?? '', - messageId: assistantMessageId, - messages: const [], - model: modelId, - modelItem: modelItem, - sessionId: sessionId, - ), - ); - } catch (_) {} - ref.read(chatMessagesProvider.notifier).finishStreaming(); - socketWatchdog.stop(); - return; - } - try { - final Map j = jsonDecode(dataStr); - final choices = j['choices']; - if (choices is List && choices.isNotEmpty) { - final choice = choices.first; - final delta = choice is Map ? choice['delta'] : null; - if (delta is Map) { - if (delta.containsKey('tool_calls')) { - final tc = delta['tool_calls']; - if (tc is List) { - for (final call in tc) { - if (call is Map) { - final fn = call['function']; - final name = (fn is Map && fn['name'] is String) - ? fn['name'] as String - : null; - if (name is String && name.isNotEmpty) { - final msgs = ref.read(chatMessagesProvider); - final exists = - (msgs.isNotEmpty) && - RegExp( - ']*\\bname="${RegExp.escape(name)}"', - multiLine: true, - ).hasMatch(msgs.last.content); - if (!exists) { - final status = - '\n
Executing...\n
\n'; - ref - .read(chatMessagesProvider.notifier) - .appendToLastMessage(status); - } - } - } - } - } - } - final content = delta['content']?.toString() ?? ''; - if (content.isNotEmpty) { - ref - .read(chatMessagesProvider.notifier) - .appendToLastMessage(content); - _updateImagesFromCurrentContent(ref); - } - } - } - } catch (_) { - if (s.isNotEmpty) { - ref.read(chatMessagesProvider.notifier).appendToLastMessage(s); - _updateImagesFromCurrentContent(ref); - } - } - } else { - if (s.isNotEmpty) { - ref.read(chatMessagesProvider.notifier).appendToLastMessage(s); - _updateImagesFromCurrentContent(ref); - } - } - } else if (line is Map) { - socketWatchdog.ping(); - if (line['done'] == true) { - try { - socketService.offEvent(channel); - } catch (_) {} - ref.read(chatMessagesProvider.notifier).finishStreaming(); - socketWatchdog.stop(); - return; - } - } - } catch (_) {} - } - - socketService.onEvent(channel, handler); - // Start activity watchdog now that handler is attached - socketWatchdog.ping(); - } - - void chatHandler(Map ev) { - try { - final data = ev['data']; - if (data == null) return; - final type = data['type']; - final payload = data['data']; - - if (type == 'chat:completion' && payload != null) { - if (payload is Map) { - if (payload.containsKey('tool_calls')) { - final tc = payload['tool_calls']; - if (tc is List) { - for (final call in tc) { - if (call is Map) { - final fn = call['function']; - final name = (fn is Map && fn['name'] is String) - ? fn['name'] as String - : null; - if (name is String && name.isNotEmpty) { - final msgs = ref.read(chatMessagesProvider); - final exists = - (msgs.isNotEmpty) && - RegExp( - ']*\\bname="${RegExp.escape(name)}"', - multiLine: true, - ).hasMatch(msgs.last.content); - if (!exists) { - final status = - '\n
Executing...\n
\n'; - ref - .read(chatMessagesProvider.notifier) - .appendToLastMessage(status); - } - } - } - } - } - } - if (!suppressSocketContent && payload.containsKey('choices')) { - final choices = payload['choices']; - if (choices is List && choices.isNotEmpty) { - final choice = choices.first; - final delta = choice is Map ? choice['delta'] : null; - if (delta is Map) { - if (delta.containsKey('tool_calls')) { - final tc = delta['tool_calls']; - if (tc is List) { - for (final call in tc) { - if (call is Map) { - final fn = call['function']; - final name = (fn is Map && fn['name'] is String) - ? fn['name'] as String - : null; - if (name is String && name.isNotEmpty) { - final msgs = ref.read(chatMessagesProvider); - final exists = - (msgs.isNotEmpty) && - RegExp( - ']*\\bname="${RegExp.escape(name)}"', - multiLine: true, - ).hasMatch(msgs.last.content); - if (!exists) { - final status = - '\n
Executing...\n
\n'; - ref - .read(chatMessagesProvider.notifier) - .appendToLastMessage(status); - } - } - } - } - } - } - final content = delta['content']?.toString() ?? ''; - if (content.isNotEmpty) { - ref - .read(chatMessagesProvider.notifier) - .appendToLastMessage(content); - _updateImagesFromCurrentContent(ref); - } - } - } - } - if (payload['done'] == true) { - try { - socketService.offChatEvents(); - } catch (_) {} - try { - socketWatchdog.stop(); - } catch (_) {} - try { - unawaited( - api - ?.sendChatCompleted( - chatId: activeConversationId ?? '', - messageId: assistantMessageId, - messages: const [], - model: modelId, - modelItem: modelItem, - sessionId: sessionId, - ) - ?.timeout(const Duration(seconds: 3)) - .catchError((_) {}), - ); - } catch (_) {} - - final msgs = ref.read(chatMessagesProvider); - if (msgs.isNotEmpty && msgs.last.role == 'assistant') { - final lastContent = msgs.last.content.trim(); - if (lastContent.isEmpty) { - Future.microtask(() async { - try { - final chatId = activeConversationId; - if (chatId != null && chatId.isNotEmpty) { - final resp = await api?.dio.get('/api/v1/chats/$chatId'); - final data = resp?.data as Map?; - String content = ''; - final chatObj = data?['chat'] as Map?; - if (chatObj != null) { - final list = chatObj['messages']; - if (list is List) { - final target = list.firstWhere( - (m) => - (m is Map && - (m['id']?.toString() == assistantMessageId)), - orElse: () => null, - ); - if (target != null) { - final rawContent = (target as Map)['content']; - if (rawContent is String) { - content = rawContent; - } else if (rawContent is List) { - final textItem = rawContent.firstWhere( - (i) => i is Map && i['type'] == 'text', - orElse: () => null, - ); - if (textItem != null) { - content = textItem['text']?.toString() ?? ''; - } - } - } - } - if (content.isEmpty) { - final history = chatObj['history']; - if (history is Map && history['messages'] is Map) { - final Map messagesMap = - (history['messages'] as Map) - .cast(); - final msg = messagesMap[assistantMessageId]; - if (msg is Map) { - final rawContent = msg['content']; - if (rawContent is String) { - content = rawContent; - } else if (rawContent is List) { - final textItem = rawContent.firstWhere( - (i) => i is Map && i['type'] == 'text', - orElse: () => null, - ); - if (textItem != null) { - content = textItem['text']?.toString() ?? ''; - } - } - } - } - } - } - if (content.isNotEmpty) { - ref - .read(chatMessagesProvider.notifier) - .replaceLastMessageContent(content); - } - } - } catch (_) { - } finally { - ref.read(chatMessagesProvider.notifier).finishStreaming(); - } - }); - return; - } - } - ref.read(chatMessagesProvider.notifier).finishStreaming(); - } - } - } else if (type == 'request:chat:completion' && payload != null) { - final channel = payload['channel']; - if (channel is String && channel.isNotEmpty) { - suppressSocketContent = true; - channelLineHandlerFactory(channel); - } - } else if (type == 'event:status' && payload != null) { - final status = payload['status']?.toString() ?? ''; - if (status.isNotEmpty) { - ref - .read(chatMessagesProvider.notifier) - .updateLastMessageWithFunction( - (m) => m.copyWith(metadata: {...?m.metadata, 'status': status}), - ); - } - } else if (type == 'event:tool' && payload != null) { - final files = _extractFilesFromResult(payload['result']); - if (files.isNotEmpty) { - final msgs = ref.read(chatMessagesProvider); - if (msgs.isNotEmpty && msgs.last.role == 'assistant') { - final existing = msgs.last.files ?? >[]; - final merged = [...existing, ...files]; - ref - .read(chatMessagesProvider.notifier) - .updateLastMessageWithFunction( - (m) => m.copyWith(files: merged), - ); - } - } - } else if (type == 'event:message:delta' && payload != null) { - if (suppressSocketContent) return; - final content = payload['content']?.toString() ?? ''; - if (content.isNotEmpty) { - ref.read(chatMessagesProvider.notifier).appendToLastMessage(content); - _updateImagesFromCurrentContent(ref); - } - } - } catch (_) {} - } - - void channelEventsHandler(Map ev) { - try { - final data = ev['data']; - if (data == null) return; - final type = data['type']; - final payload = data['data']; - if (type == 'message' && payload is Map) { - final content = payload['content']?.toString() ?? ''; - if (content.isNotEmpty) { - ref.read(chatMessagesProvider.notifier).appendToLastMessage(content); - _updateImagesFromCurrentContent(ref); - } - } - } catch (_) {} - } - - socketService.onChatEvents(chatHandler); - socketService.onChannelEvents(channelEventsHandler); - // Start activity watchdog for chat/channel events - socketWatchdog.ping(); -} - // ========== Tool Servers (OpenAPI) Helpers ========== Future>> _resolveToolServers(