From 5f03610f359f1db8c8f63ded1703b7bbea2fb7f0 Mon Sep 17 00:00:00 2001 From: cogwheel0 <172976095+cogwheel0@users.noreply.github.com> Date: Fri, 26 Sep 2025 01:38:00 +0530 Subject: [PATCH] fix: streaming --- lib/core/services/api_service.dart | 20 +- lib/core/services/socket_service.dart | 429 +++++++++++++----- lib/core/services/streaming_helper.dart | 96 ++-- .../chat/providers/chat_providers.dart | 68 ++- 4 files changed, 467 insertions(+), 146 deletions(-) diff --git a/lib/core/services/api_service.dart b/lib/core/services/api_service.dart index 552bc33..63b879e 100644 --- a/lib/core/services/api_service.dart +++ b/lib/core/services/api_service.dart @@ -2798,7 +2798,13 @@ class ApiService { // Send message with SSE streaming // Returns a record with (stream, messageId, sessionId) - ({Stream stream, String messageId, String sessionId}) sendMessage({ + ({ + Stream stream, + String messageId, + String sessionId, + String? socketSessionId, + }) + sendMessage({ required List> messages, required String model, String? conversationId, @@ -2807,6 +2813,7 @@ class ApiService { bool enableImageGeneration = false, Map? modelItem, String? sessionIdOverride, + String? socketSessionId, List>? toolServers, Map? backgroundTasks, String? responseMessageId, @@ -2904,6 +2911,16 @@ class ApiService { }; } + if (backgroundTasks != null && backgroundTasks.isNotEmpty) { + data['background_tasks'] = backgroundTasks; + } + + if (socketSessionId != null && socketSessionId.isNotEmpty) { + data['session_id'] = socketSessionId; + } + + data['id'] = messageId; + // No default reasoning parameters included; providers handle thinking UIs natively. // Add tool_ids if provided (Open-WebUI expects tool_ids as array of strings) @@ -3132,6 +3149,7 @@ class ApiService { stream: streamController.stream, messageId: messageId, sessionId: sessionId, + socketSessionId: socketSessionId, ); } diff --git a/lib/core/services/socket_service.dart b/lib/core/services/socket_service.dart index 9f38f01..c1e9862 100644 --- a/lib/core/services/socket_service.dart +++ b/lib/core/services/socket_service.dart @@ -1,24 +1,55 @@ +import 'package:flutter/widgets.dart'; import 'package:socket_io_client/socket_io_client.dart' as io; + import '../models/server_config.dart'; import '../utils/debug_logger.dart'; -class SocketService { +typedef SocketChatEventHandler = + void Function( + Map event, + void Function(dynamic response)? ack, + ); + +typedef SocketChannelEventHandler = + void Function( + Map event, + void Function(dynamic response)? ack, + ); + +class SocketService with WidgetsBindingObserver { final ServerConfig serverConfig; final bool websocketOnly; io.Socket? _socket; String? _authToken; + bool _isAppForeground = true; + + final Map _chatEventHandlers = {}; + final Map _channelEventHandlers = {}; + int _handlerSeed = 0; SocketService({ required this.serverConfig, String? authToken, this.websocketOnly = false, - }) : _authToken = authToken; + }) : _authToken = authToken { + final binding = WidgetsBinding.instance; + final lifecycle = binding.lifecycleState; + _isAppForeground = + lifecycle == null || lifecycle == AppLifecycleState.resumed; + binding.addObserver(this); + } + + @override + void didChangeAppLifecycleState(AppLifecycleState state) { + _isAppForeground = state == AppLifecycleState.resumed; + } String? get sessionId => _socket?.id; io.Socket? get socket => _socket; String? get authToken => _authToken; bool get isConnected => _socket?.connected == true; + bool get isAppForeground => _isAppForeground; Future connect({bool force = false}) async { if (_socket != null && _socket!.connected && !force) return; @@ -91,43 +122,7 @@ class SocketService { _socket = io.io(base, builder.build()); - _socket!.on('connect', (_) { - DebugLogger.log('Socket connected: ${_socket!.id}', scope: 'socket'); - if (_authToken != null && _authToken!.isNotEmpty) { - _socket!.emit('user-join', { - 'auth': {'token': _authToken}, - }); - } - }); - - _socket!.on('connect_error', (err) { - DebugLogger.log('Socket connect_error: $err', scope: 'socket'); - }); - - _socket!.on('reconnect_attempt', (attempt) { - DebugLogger.log('Socket reconnect_attempt: $attempt', scope: 'socket'); - }); - - _socket!.on('reconnect', (attempt) { - DebugLogger.log( - 'Socket reconnected after $attempt attempts', - scope: 'socket', - ); - if (_authToken != null && _authToken!.isNotEmpty) { - // Best-effort rejoin - _socket!.emit('user-join', { - 'auth': {'token': _authToken}, - }); - } - }); - - _socket!.on('reconnect_failed', (_) { - DebugLogger.log('Socket reconnect_failed', scope: 'socket'); - }); - - _socket!.on('disconnect', (reason) { - DebugLogger.log('Socket disconnected: $reason', scope: 'socket'); - }); + _bindCoreSocketHandlers(); } /// Update the auth token used by the socket service. @@ -145,77 +140,48 @@ class SocketService { } } - void onChatEvents( - void Function( - Map event, - void Function(dynamic response)? ack, - ) - handler, - ) { - _socket?.on('chat-events', (dynamic data, [dynamic ack]) { - try { - Map? map; - if (data is Map) { - map = data; - } else if (data is Map) { - map = Map.from(data); - } - if (map == null) return; - final ackFn = ack is Function - ? (dynamic payload) { - if (payload is List) { - Function.apply(ack, payload); - } else if (payload == null) { - Function.apply(ack, const []); - } else { - Function.apply(ack, [payload]); - } - } - : null; - handler(map, ackFn); - } catch (_) {} - }); + SocketEventSubscription addChatEventHandler({ + String? conversationId, + String? sessionId, + bool requireFocus = true, + required SocketChatEventHandler handler, + }) { + final id = _nextHandlerId(); + _chatEventHandlers[id] = _ChatEventRegistration( + id: id, + conversationId: conversationId, + sessionId: sessionId, + requireFocus: requireFocus, + handler: handler, + ); + _bindCoreSocketHandlers(); + return SocketEventSubscription(() => _chatEventHandlers.remove(id)); } - // Subscribe to general channel events (server-broadcasted channel updates) - void onChannelEvents( - void Function( - Map event, - void Function(dynamic response)? ack, - ) - handler, - ) { - _socket?.on('channel-events', (dynamic data, [dynamic ack]) { - try { - Map? map; - if (data is Map) { - map = data; - } else if (data is Map) { - map = Map.from(data); - } - if (map == null) return; - final ackFn = ack is Function - ? (dynamic payload) { - if (payload is List) { - Function.apply(ack, payload); - } else if (payload == null) { - Function.apply(ack, const []); - } else { - Function.apply(ack, [payload]); - } - } - : null; - handler(map, ackFn); - } catch (_) {} - }); + SocketEventSubscription addChannelEventHandler({ + String? conversationId, + String? sessionId, + bool requireFocus = true, + required SocketChannelEventHandler handler, + }) { + final id = _nextHandlerId(); + _channelEventHandlers[id] = _ChannelEventRegistration( + id: id, + conversationId: conversationId, + sessionId: sessionId, + requireFocus: requireFocus, + handler: handler, + ); + _bindCoreSocketHandlers(); + return SocketEventSubscription(() => _channelEventHandlers.remove(id)); } - void offChatEvents() { - _socket?.off('chat-events'); + void clearChatEventHandlers() { + _chatEventHandlers.clear(); } - void offChannelEvents() { - _socket?.off('channel-events'); + void clearChannelEventHandlers() { + _channelEventHandlers.clear(); } // Subscribe to an arbitrary socket.io event (used for dynamic tool channels) @@ -232,6 +198,9 @@ class SocketService { _socket?.dispose(); } catch (_) {} _socket = null; + WidgetsBinding.instance.removeObserver(this); + _chatEventHandlers.clear(); + _channelEventHandlers.clear(); } // Best-effort: ensure there is an active connection and wait briefly. @@ -249,4 +218,256 @@ class SocketService { } return isConnected; } + + void _bindCoreSocketHandlers() { + final socket = _socket; + if (socket == null) return; + + socket + ..off('chat-events', _handleChatEvent) + ..off('channel-events', _handleChannelEvent) + ..off('connect', _handleConnect) + ..off('connect_error', _handleConnectError) + ..off('reconnect_attempt', _handleReconnectAttempt) + ..off('reconnect', _handleReconnect) + ..off('reconnect_failed', _handleReconnectFailed) + ..off('disconnect', _handleDisconnect); + + socket + ..on('chat-events', _handleChatEvent) + ..on('channel-events', _handleChannelEvent) + ..on('connect', _handleConnect) + ..on('connect_error', _handleConnectError) + ..on('reconnect_attempt', _handleReconnectAttempt) + ..on('reconnect', _handleReconnect) + ..on('reconnect_failed', _handleReconnectFailed) + ..on('disconnect', _handleDisconnect); + } + + void _handleConnect(dynamic _) { + DebugLogger.log('Socket connected: ${_socket?.id}', scope: 'socket'); + if (_authToken != null && _authToken!.isNotEmpty) { + _socket?.emit('user-join', { + 'auth': {'token': _authToken}, + }); + } + } + + void _handleReconnectAttempt(dynamic attempt) { + DebugLogger.log('Socket reconnect_attempt: $attempt', scope: 'socket'); + } + + void _handleReconnect(dynamic attempt) { + DebugLogger.log( + 'Socket reconnected after $attempt attempts', + scope: 'socket', + ); + if (_authToken != null && _authToken!.isNotEmpty) { + _socket?.emit('user-join', { + 'auth': {'token': _authToken}, + }); + } + } + + void _handleConnectError(dynamic err) { + DebugLogger.log('Socket connect_error: $err', scope: 'socket'); + } + + void _handleReconnectFailed(dynamic _) { + DebugLogger.log('Socket reconnect_failed', scope: 'socket'); + } + + void _handleDisconnect(dynamic reason) { + DebugLogger.log('Socket disconnected: $reason', scope: 'socket'); + } + + void _handleChatEvent(dynamic data, [dynamic ack]) { + final map = _coerceToMap(data); + if (map == null) return; + + final ackFn = _wrapAck(ack); + final sessionId = _extractSessionId(map); + final chatId = map['chat_id']?.toString(); + + for (final registration in List<_ChatEventRegistration>.from( + _chatEventHandlers.values, + )) { + if (!_shouldDeliver( + registration.conversationId, + registration.sessionId, + chatId, + sessionId, + registration.requireFocus, + )) { + continue; + } + + try { + registration.handler(map, ackFn); + } catch (_) {} + } + } + + void _handleChannelEvent(dynamic data, [dynamic ack]) { + final map = _coerceToMap(data); + if (map == null) return; + + final ackFn = _wrapAck(ack); + final sessionId = _extractSessionId(map); + final chatId = map['chat_id']?.toString(); + + for (final registration in List<_ChannelEventRegistration>.from( + _channelEventHandlers.values, + )) { + if (!_shouldDeliver( + registration.conversationId, + registration.sessionId, + chatId, + sessionId, + registration.requireFocus, + )) { + continue; + } + + try { + registration.handler(map, ackFn); + } catch (_) {} + } + } + + bool _shouldDeliver( + String? registeredConversationId, + String? registeredSessionId, + String? incomingConversationId, + String? incomingSessionId, + bool requireFocus, + ) { + final matchesChat = + registeredConversationId == null || + (incomingConversationId != null && + registeredConversationId == incomingConversationId); + final matchesSession = + registeredSessionId != null && + incomingSessionId != null && + registeredSessionId == incomingSessionId; + + if (!matchesChat && !matchesSession) { + return false; + } + + if (!requireFocus) { + return true; + } + + if (matchesSession) { + // Session-targeted messages should always pass through even if unfocused + return true; + } + + return _isAppForeground; + } + + Map? _coerceToMap(dynamic data) { + if (data is Map) { + return data; + } + if (data is Map) { + return Map.from(data); + } + return null; + } + + void Function(dynamic response)? _wrapAck(dynamic ack) { + if (ack is! Function) return null; + return (dynamic payload) { + try { + if (payload is List) { + Function.apply(ack, payload); + } else if (payload == null) { + Function.apply(ack, const []); + } else { + Function.apply(ack, [payload]); + } + } catch (_) {} + }; + } + + String? _extractSessionId(Map event) { + String? candidate; + + if (event['session_id'] != null) { + candidate = event['session_id'].toString(); + } + + final data = event['data']; + if (data is Map) { + if (candidate == null && data['session_id'] != null) { + candidate = data['session_id'].toString(); + } + if (candidate == null && data['sessionId'] != null) { + candidate = data['sessionId'].toString(); + } + final inner = data['data']; + if (inner is Map) { + if (candidate == null && inner['session_id'] != null) { + candidate = inner['session_id'].toString(); + } + if (candidate == null && inner['sessionId'] != null) { + candidate = inner['sessionId'].toString(); + } + } + } + + return candidate; + } + + String _nextHandlerId() { + _handlerSeed += 1; + return _handlerSeed.toString(); + } +} + +class SocketEventSubscription { + SocketEventSubscription(this._dispose); + + final VoidCallback _dispose; + bool _isDisposed = false; + + void dispose() { + if (_isDisposed) return; + _isDisposed = true; + _dispose(); + } +} + +class _ChatEventRegistration { + _ChatEventRegistration({ + required this.id, + required this.handler, + this.conversationId, + this.sessionId, + this.requireFocus = true, + }); + + final String id; + final String? conversationId; + final String? sessionId; + final bool requireFocus; + final SocketChatEventHandler handler; +} + +class _ChannelEventRegistration { + _ChannelEventRegistration({ + required this.id, + required this.handler, + this.conversationId, + this.sessionId, + this.requireFocus = true, + }); + + final String id; + final String? conversationId; + final String? sessionId; + final bool requireFocus; + final SocketChannelEventHandler handler; } diff --git a/lib/core/services/streaming_helper.dart b/lib/core/services/streaming_helper.dart index 8aa5560..7022987 100644 --- a/lib/core/services/streaming_helper.dart +++ b/lib/core/services/streaming_helper.dart @@ -17,17 +17,28 @@ import '../utils/debug_logger.dart'; // Keep local verbosity toggle for socket logs const bool kSocketVerboseLogging = false; +class ActiveSocketStream { + ActiveSocketStream({ + required this.streamSubscription, + required this.socketSubscriptions, + required this.disposeWatchdog, + }); + + final StreamSubscription streamSubscription; + final List socketSubscriptions; + final VoidCallback disposeWatchdog; +} + /// Unified streaming helper for chat send/regenerate flows. /// /// This attaches chunked SSE streaming handlers, optional WebSocket event handlers, /// and manages background search/image-gen UI updates. It operates via callbacks to /// avoid tight coupling with provider files for easier reuse and testing. -StreamSubscription attachUnifiedChunkedStreaming({ +ActiveSocketStream attachUnifiedChunkedStreaming({ required Stream stream, required bool webSearchEnabled, required bool isBackgroundFlow, required bool suppressSocketContentInitially, - required bool usingDynamicChannelInitially, required String assistantMessageId, required String modelId, required Map modelItem, @@ -91,13 +102,16 @@ StreamSubscription attachUnifiedChunkedStreaming({ ); InactivityWatchdog? socketWatchdog; + final socketSubscriptions = []; if (socketService != null) { socketWatchdog = InactivityWatchdog( window: const Duration(minutes: 5), onTimeout: () { try { - socketService.offChatEvents(); - socketService.offChannelEvents(); + for (final sub in socketSubscriptions) { + sub.dispose(); + } + socketSubscriptions.clear(); } catch (_) {} try { final msgs = getMessages(); @@ -107,13 +121,26 @@ StreamSubscription attachUnifiedChunkedStreaming({ finishStreaming(); } } catch (_) {} + socketWatchdog?.stop(); }, )..start(); } + void disposeSocketSubscriptions() { + if (socketSubscriptions.isEmpty) { + return; + } + for (final sub in socketSubscriptions) { + try { + sub.dispose(); + } catch (_) {} + } + socketSubscriptions.clear(); + socketWatchdog?.stop(); + } + bool isSearching = false; bool suppressSocketContent = suppressSocketContentInitially; - bool usingDynamicChannel = usingDynamicChannelInitially; void updateImagesFromCurrentContent() { try { @@ -414,6 +441,13 @@ StreamSubscription attachUnifiedChunkedStreaming({ final messageId = ev['message_id']?.toString(); socketWatchdog?.ping(); + if (kSocketVerboseLogging && payload is Map) { + DebugLogger.log( + 'socket delta type=$type suppress=$suppressSocketContent session=$sessionId message=$messageId keys=${payload.keys.toList()}', + scope: 'socket/chat', + ); + } + if (type == 'chat:completion' && payload != null) { if (payload is Map) { if (payload.containsKey('tool_calls')) { @@ -488,6 +522,13 @@ StreamSubscription attachUnifiedChunkedStreaming({ } } } + if (!suppressSocketContent && payload.containsKey('content')) { + final raw = payload['content']?.toString() ?? ''; + if (raw.isNotEmpty) { + replaceLastMessageContent(raw); + updateImagesFromCurrentContent(); + } + } if (payload['done'] == true) { try { // ignore: unawaited_futures @@ -603,6 +644,7 @@ StreamSubscription attachUnifiedChunkedStreaming({ return current.copyWith(metadata: metadata, isStreaming: false); }); } + disposeSocketSubscriptions(); finishStreaming(); } else if (type == 'chat:message:follow_ups' && payload != null) { final followMap = _asStringMap(payload); @@ -890,23 +932,21 @@ StreamSubscription attachUnifiedChunkedStreaming({ } if (socketService != null) { - socketService.onChatEvents(chatHandler); - socketService.onChannelEvents(channelEventsHandler); - Future.delayed(const Duration(seconds: 90), () { - try { - socketService.offChatEvents(); - socketService.offChannelEvents(); - } catch (_) {} - try { - final msgs = getMessages(); - if (msgs.isNotEmpty && - msgs.last.role == 'assistant' && - msgs.last.isStreaming) { - finishStreaming(); - } - } catch (_) {} - socketWatchdog?.stop(); - }); + final chatSub = socketService.addChatEventHandler( + conversationId: activeConversationId, + sessionId: sessionId, + requireFocus: false, + handler: chatHandler, + ); + socketSubscriptions.add(chatSub); + + final channelSub = socketService.addChannelEventHandler( + conversationId: activeConversationId, + sessionId: sessionId, + requireFocus: false, + handler: channelEventsHandler, + ); + socketSubscriptions.add(channelSub); } final subscription = persistentController.stream.listen( @@ -951,25 +991,29 @@ StreamSubscription attachUnifiedChunkedStreaming({ // Allow socket-delivered follow-ups/title updates after SSE completes suppressSocketContent = false; - // If SSE-driven (no dynamic channel/background flow), finish now - if (!usingDynamicChannel && !isBackgroundFlow) { + // If SSE-driven (no dynamic channel/background flow), clean up sockets + if (!isBackgroundFlow) { finishStreaming(); Future.microtask(refreshConversationSnapshot); } - socketWatchdog?.stop(); }, onError: (error) async { try { persistentService.unregisterStream(streamId); } catch (_) {} suppressSocketContent = false; + disposeSocketSubscriptions(); finishStreaming(); Future.microtask(refreshConversationSnapshot); socketWatchdog?.stop(); }, ); - return subscription; + return ActiveSocketStream( + streamSubscription: subscription, + socketSubscriptions: socketSubscriptions, + disposeWatchdog: () => socketWatchdog?.stop(), + ); } List> _extractFilesFromResult(dynamic resp) { diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index e890028..68ab9c8 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -1,10 +1,12 @@ import 'dart:convert'; import 'package:yaml/yaml.dart' as yaml; +import 'package:flutter/foundation.dart'; import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'package:uuid/uuid.dart'; import '../../../core/utils/tool_calls_parser.dart'; import '../../../core/services/streaming_helper.dart'; +import '../../../core/services/socket_service.dart'; import '../../../core/models/chat_message.dart'; import '../../../core/models/conversation.dart'; import '../../../core/providers/app_providers.dart'; @@ -87,6 +89,8 @@ class ChatMessagesNotifier extends Notifier> { StreamSubscription? _messageStream; ProviderSubscription? _conversationListener; final List _subscriptions = []; + final List _socketSubscriptions = []; + VoidCallback? _socketTeardown; // Activity-based watchdog to prevent stuck typing indicator InactivityWatchdog? _typingWatchdog; @@ -163,6 +167,7 @@ class ChatMessagesNotifier extends Notifier> { _subscriptions.clear(); _cancelMessageStream(); + cancelSocketSubscriptions(); _cancelTypingGuard(); _conversationListener?.close(); @@ -181,6 +186,7 @@ class ChatMessagesNotifier extends Notifier> { void _cancelMessageStream() { _messageStream?.cancel(); _messageStream = null; + cancelSocketSubscriptions(); } void _cancelTypingGuard() { @@ -364,6 +370,31 @@ class ChatMessagesNotifier extends Notifier> { _addSubscription(stream); } + void setSocketSubscriptions( + List subscriptions, { + VoidCallback? onDispose, + }) { + cancelSocketSubscriptions(); + _socketSubscriptions.addAll(subscriptions); + _socketTeardown = onDispose; + } + + void cancelSocketSubscriptions() { + if (_socketSubscriptions.isEmpty) { + _socketTeardown?.call(); + _socketTeardown = null; + return; + } + for (final sub in _socketSubscriptions) { + try { + sub.dispose(); + } catch (_) {} + } + _socketSubscriptions.clear(); + _socketTeardown?.call(); + _socketTeardown = null; + } + void addMessage(ChatMessage message) { state = [...state, message]; if (message.role == 'assistant' && message.isStreaming) { @@ -1230,6 +1261,7 @@ Future regenerateMessage( enableImageGeneration: imageGenerationEnabled, modelItem: modelItem, sessionIdOverride: passSocketSession ? socketSessionId : null, + socketSessionId: socketSessionId, toolServers: toolServers, backgroundTasks: bgTasks, responseMessageId: assistantMessageId, @@ -1237,6 +1269,8 @@ Future regenerateMessage( final stream = response.stream; final sessionId = response.sessionId; + final effectiveSessionId = + response.socketSessionId ?? socketSessionId ?? sessionId; // New unified streaming path via helper; bypass old inline socket block final bool isBackgroundFlow = @@ -1258,16 +1292,15 @@ Future regenerateMessage( }); } catch (_) {} - final sendStreamSub = attachUnifiedChunkedStreaming( + final activeStream = attachUnifiedChunkedStreaming( stream: stream, webSearchEnabled: webSearchEnabled, isBackgroundFlow: isBackgroundFlow, suppressSocketContentInitially: !isBackgroundFlow, - usingDynamicChannelInitially: false, assistantMessageId: assistantMessageId, modelId: selectedModel.id, modelItem: modelItem, - sessionId: sessionId, + sessionId: effectiveSessionId, activeConversationId: activeConversation.id, api: api, socketService: socketService, @@ -1319,7 +1352,12 @@ Future regenerateMessage( ref.read(chatMessagesProvider.notifier).finishStreaming(), getMessages: () => ref.read(chatMessagesProvider), ); - ref.read(chatMessagesProvider.notifier).setMessageStream(sendStreamSub); + ref.read(chatMessagesProvider.notifier) + ..setMessageStream(activeStream.streamSubscription) + ..setSocketSubscriptions( + activeStream.socketSubscriptions, + onDispose: activeStream.disposeWatchdog, + ); return; } catch (e) { rethrow; @@ -1756,6 +1794,7 @@ Future _sendMessageInternal( // Bind to Socket session whenever available so the server can push // streaming updates to this client (improves first-turn streaming). sessionIdOverride: wantSessionBinding ? socketSessionId : null, + socketSessionId: socketSessionId, toolServers: toolServers, backgroundTasks: bgTasks, responseMessageId: assistantMessageId, @@ -1763,6 +1802,8 @@ Future _sendMessageInternal( final stream = response.stream; final sessionId = response.sessionId; + final effectiveSessionId = + response.socketSessionId ?? socketSessionId ?? sessionId; // Use unified streaming helper for SSE/WebSocket handling final bool isBackgroundFlow = @@ -1784,16 +1825,15 @@ Future _sendMessageInternal( }); } catch (_) {} - final sendStreamSub = attachUnifiedChunkedStreaming( + final activeStream = attachUnifiedChunkedStreaming( stream: stream, webSearchEnabled: webSearchEnabled, isBackgroundFlow: isBackgroundFlow, suppressSocketContentInitially: !isBackgroundFlow, - usingDynamicChannelInitially: false, assistantMessageId: assistantMessageId, modelId: selectedModel.id, modelItem: modelItem, - sessionId: sessionId, + sessionId: effectiveSessionId, activeConversationId: activeConversation?.id, api: api, socketService: socketService, @@ -1846,7 +1886,12 @@ Future _sendMessageInternal( getMessages: () => ref.read(chatMessagesProvider), ); - ref.read(chatMessagesProvider.notifier).setMessageStream(sendStreamSub); + ref.read(chatMessagesProvider.notifier) + ..setMessageStream(activeStream.streamSubscription) + ..setSocketSubscriptions( + activeStream.socketSubscriptions, + onDispose: activeStream.disposeWatchdog, + ); return; } catch (e) { // Handle error - remove the assistant message placeholder @@ -2160,13 +2205,6 @@ final stopGenerationProvider = Provider((ref) { final api = ref.read(apiServiceProvider); api?.cancelStreamingMessage(lastId); - // Stop any active socket listeners for chat/channel events - try { - final socketService = ref.read(socketServiceProvider); - socketService?.offChatEvents(); - socketService?.offChannelEvents(); - } catch (_) {} - // Cancel local stream subscription to stop propagating further chunks ref.read(chatMessagesProvider.notifier).cancelActiveMessageStream(); }