fix: streaming

This commit is contained in:
cogwheel0
2025-09-26 01:38:00 +05:30
parent 748f2a43a8
commit 5f03610f35
4 changed files with 467 additions and 146 deletions

View File

@@ -2798,7 +2798,13 @@ class ApiService {
// Send message with SSE streaming // Send message with SSE streaming
// Returns a record with (stream, messageId, sessionId) // Returns a record with (stream, messageId, sessionId)
({Stream<String> stream, String messageId, String sessionId}) sendMessage({ ({
Stream<String> stream,
String messageId,
String sessionId,
String? socketSessionId,
})
sendMessage({
required List<Map<String, dynamic>> messages, required List<Map<String, dynamic>> messages,
required String model, required String model,
String? conversationId, String? conversationId,
@@ -2807,6 +2813,7 @@ class ApiService {
bool enableImageGeneration = false, bool enableImageGeneration = false,
Map<String, dynamic>? modelItem, Map<String, dynamic>? modelItem,
String? sessionIdOverride, String? sessionIdOverride,
String? socketSessionId,
List<Map<String, dynamic>>? toolServers, List<Map<String, dynamic>>? toolServers,
Map<String, dynamic>? backgroundTasks, Map<String, dynamic>? backgroundTasks,
String? responseMessageId, String? responseMessageId,
@@ -2904,6 +2911,16 @@ class ApiService {
}; };
} }
if (backgroundTasks != null && backgroundTasks.isNotEmpty) {
data['background_tasks'] = backgroundTasks;
}
if (socketSessionId != null && socketSessionId.isNotEmpty) {
data['session_id'] = socketSessionId;
}
data['id'] = messageId;
// No default reasoning parameters included; providers handle thinking UIs natively. // No default reasoning parameters included; providers handle thinking UIs natively.
// Add tool_ids if provided (Open-WebUI expects tool_ids as array of strings) // Add tool_ids if provided (Open-WebUI expects tool_ids as array of strings)
@@ -3132,6 +3149,7 @@ class ApiService {
stream: streamController.stream, stream: streamController.stream,
messageId: messageId, messageId: messageId,
sessionId: sessionId, sessionId: sessionId,
socketSessionId: socketSessionId,
); );
} }

View File

@@ -1,24 +1,55 @@
import 'package:flutter/widgets.dart';
import 'package:socket_io_client/socket_io_client.dart' as io; import 'package:socket_io_client/socket_io_client.dart' as io;
import '../models/server_config.dart'; import '../models/server_config.dart';
import '../utils/debug_logger.dart'; import '../utils/debug_logger.dart';
class SocketService { typedef SocketChatEventHandler =
void Function(
Map<String, dynamic> event,
void Function(dynamic response)? ack,
);
typedef SocketChannelEventHandler =
void Function(
Map<String, dynamic> event,
void Function(dynamic response)? ack,
);
class SocketService with WidgetsBindingObserver {
final ServerConfig serverConfig; final ServerConfig serverConfig;
final bool websocketOnly; final bool websocketOnly;
io.Socket? _socket; io.Socket? _socket;
String? _authToken; String? _authToken;
bool _isAppForeground = true;
final Map<String, _ChatEventRegistration> _chatEventHandlers = {};
final Map<String, _ChannelEventRegistration> _channelEventHandlers = {};
int _handlerSeed = 0;
SocketService({ SocketService({
required this.serverConfig, required this.serverConfig,
String? authToken, String? authToken,
this.websocketOnly = false, this.websocketOnly = false,
}) : _authToken = authToken; }) : _authToken = authToken {
final binding = WidgetsBinding.instance;
final lifecycle = binding.lifecycleState;
_isAppForeground =
lifecycle == null || lifecycle == AppLifecycleState.resumed;
binding.addObserver(this);
}
@override
void didChangeAppLifecycleState(AppLifecycleState state) {
_isAppForeground = state == AppLifecycleState.resumed;
}
String? get sessionId => _socket?.id; String? get sessionId => _socket?.id;
io.Socket? get socket => _socket; io.Socket? get socket => _socket;
String? get authToken => _authToken; String? get authToken => _authToken;
bool get isConnected => _socket?.connected == true; bool get isConnected => _socket?.connected == true;
bool get isAppForeground => _isAppForeground;
Future<void> connect({bool force = false}) async { Future<void> connect({bool force = false}) async {
if (_socket != null && _socket!.connected && !force) return; if (_socket != null && _socket!.connected && !force) return;
@@ -91,43 +122,7 @@ class SocketService {
_socket = io.io(base, builder.build()); _socket = io.io(base, builder.build());
_socket!.on('connect', (_) { _bindCoreSocketHandlers();
DebugLogger.log('Socket connected: ${_socket!.id}', scope: 'socket');
if (_authToken != null && _authToken!.isNotEmpty) {
_socket!.emit('user-join', {
'auth': {'token': _authToken},
});
}
});
_socket!.on('connect_error', (err) {
DebugLogger.log('Socket connect_error: $err', scope: 'socket');
});
_socket!.on('reconnect_attempt', (attempt) {
DebugLogger.log('Socket reconnect_attempt: $attempt', scope: 'socket');
});
_socket!.on('reconnect', (attempt) {
DebugLogger.log(
'Socket reconnected after $attempt attempts',
scope: 'socket',
);
if (_authToken != null && _authToken!.isNotEmpty) {
// Best-effort rejoin
_socket!.emit('user-join', {
'auth': {'token': _authToken},
});
}
});
_socket!.on('reconnect_failed', (_) {
DebugLogger.log('Socket reconnect_failed', scope: 'socket');
});
_socket!.on('disconnect', (reason) {
DebugLogger.log('Socket disconnected: $reason', scope: 'socket');
});
} }
/// Update the auth token used by the socket service. /// Update the auth token used by the socket service.
@@ -145,77 +140,48 @@ class SocketService {
} }
} }
void onChatEvents( SocketEventSubscription addChatEventHandler({
void Function( String? conversationId,
Map<String, dynamic> event, String? sessionId,
void Function(dynamic response)? ack, bool requireFocus = true,
) required SocketChatEventHandler handler,
handler, }) {
) { final id = _nextHandlerId();
_socket?.on('chat-events', (dynamic data, [dynamic ack]) { _chatEventHandlers[id] = _ChatEventRegistration(
try { id: id,
Map<String, dynamic>? map; conversationId: conversationId,
if (data is Map<String, dynamic>) { sessionId: sessionId,
map = data; requireFocus: requireFocus,
} else if (data is Map) { handler: handler,
map = Map<String, dynamic>.from(data); );
} _bindCoreSocketHandlers();
if (map == null) return; return SocketEventSubscription(() => _chatEventHandlers.remove(id));
final ackFn = ack is Function
? (dynamic payload) {
if (payload is List) {
Function.apply(ack, payload);
} else if (payload == null) {
Function.apply(ack, const []);
} else {
Function.apply(ack, [payload]);
}
}
: null;
handler(map, ackFn);
} catch (_) {}
});
} }
// Subscribe to general channel events (server-broadcasted channel updates) SocketEventSubscription addChannelEventHandler({
void onChannelEvents( String? conversationId,
void Function( String? sessionId,
Map<String, dynamic> event, bool requireFocus = true,
void Function(dynamic response)? ack, required SocketChannelEventHandler handler,
) }) {
handler, final id = _nextHandlerId();
) { _channelEventHandlers[id] = _ChannelEventRegistration(
_socket?.on('channel-events', (dynamic data, [dynamic ack]) { id: id,
try { conversationId: conversationId,
Map<String, dynamic>? map; sessionId: sessionId,
if (data is Map<String, dynamic>) { requireFocus: requireFocus,
map = data; handler: handler,
} else if (data is Map) { );
map = Map<String, dynamic>.from(data); _bindCoreSocketHandlers();
} return SocketEventSubscription(() => _channelEventHandlers.remove(id));
if (map == null) return;
final ackFn = ack is Function
? (dynamic payload) {
if (payload is List) {
Function.apply(ack, payload);
} else if (payload == null) {
Function.apply(ack, const []);
} else {
Function.apply(ack, [payload]);
}
}
: null;
handler(map, ackFn);
} catch (_) {}
});
} }
void offChatEvents() { void clearChatEventHandlers() {
_socket?.off('chat-events'); _chatEventHandlers.clear();
} }
void offChannelEvents() { void clearChannelEventHandlers() {
_socket?.off('channel-events'); _channelEventHandlers.clear();
} }
// Subscribe to an arbitrary socket.io event (used for dynamic tool channels) // Subscribe to an arbitrary socket.io event (used for dynamic tool channels)
@@ -232,6 +198,9 @@ class SocketService {
_socket?.dispose(); _socket?.dispose();
} catch (_) {} } catch (_) {}
_socket = null; _socket = null;
WidgetsBinding.instance.removeObserver(this);
_chatEventHandlers.clear();
_channelEventHandlers.clear();
} }
// Best-effort: ensure there is an active connection and wait briefly. // Best-effort: ensure there is an active connection and wait briefly.
@@ -249,4 +218,256 @@ class SocketService {
} }
return isConnected; return isConnected;
} }
void _bindCoreSocketHandlers() {
final socket = _socket;
if (socket == null) return;
socket
..off('chat-events', _handleChatEvent)
..off('channel-events', _handleChannelEvent)
..off('connect', _handleConnect)
..off('connect_error', _handleConnectError)
..off('reconnect_attempt', _handleReconnectAttempt)
..off('reconnect', _handleReconnect)
..off('reconnect_failed', _handleReconnectFailed)
..off('disconnect', _handleDisconnect);
socket
..on('chat-events', _handleChatEvent)
..on('channel-events', _handleChannelEvent)
..on('connect', _handleConnect)
..on('connect_error', _handleConnectError)
..on('reconnect_attempt', _handleReconnectAttempt)
..on('reconnect', _handleReconnect)
..on('reconnect_failed', _handleReconnectFailed)
..on('disconnect', _handleDisconnect);
}
void _handleConnect(dynamic _) {
DebugLogger.log('Socket connected: ${_socket?.id}', scope: 'socket');
if (_authToken != null && _authToken!.isNotEmpty) {
_socket?.emit('user-join', {
'auth': {'token': _authToken},
});
}
}
void _handleReconnectAttempt(dynamic attempt) {
DebugLogger.log('Socket reconnect_attempt: $attempt', scope: 'socket');
}
void _handleReconnect(dynamic attempt) {
DebugLogger.log(
'Socket reconnected after $attempt attempts',
scope: 'socket',
);
if (_authToken != null && _authToken!.isNotEmpty) {
_socket?.emit('user-join', {
'auth': {'token': _authToken},
});
}
}
void _handleConnectError(dynamic err) {
DebugLogger.log('Socket connect_error: $err', scope: 'socket');
}
void _handleReconnectFailed(dynamic _) {
DebugLogger.log('Socket reconnect_failed', scope: 'socket');
}
void _handleDisconnect(dynamic reason) {
DebugLogger.log('Socket disconnected: $reason', scope: 'socket');
}
void _handleChatEvent(dynamic data, [dynamic ack]) {
final map = _coerceToMap(data);
if (map == null) return;
final ackFn = _wrapAck(ack);
final sessionId = _extractSessionId(map);
final chatId = map['chat_id']?.toString();
for (final registration in List<_ChatEventRegistration>.from(
_chatEventHandlers.values,
)) {
if (!_shouldDeliver(
registration.conversationId,
registration.sessionId,
chatId,
sessionId,
registration.requireFocus,
)) {
continue;
}
try {
registration.handler(map, ackFn);
} catch (_) {}
}
}
void _handleChannelEvent(dynamic data, [dynamic ack]) {
final map = _coerceToMap(data);
if (map == null) return;
final ackFn = _wrapAck(ack);
final sessionId = _extractSessionId(map);
final chatId = map['chat_id']?.toString();
for (final registration in List<_ChannelEventRegistration>.from(
_channelEventHandlers.values,
)) {
if (!_shouldDeliver(
registration.conversationId,
registration.sessionId,
chatId,
sessionId,
registration.requireFocus,
)) {
continue;
}
try {
registration.handler(map, ackFn);
} catch (_) {}
}
}
bool _shouldDeliver(
String? registeredConversationId,
String? registeredSessionId,
String? incomingConversationId,
String? incomingSessionId,
bool requireFocus,
) {
final matchesChat =
registeredConversationId == null ||
(incomingConversationId != null &&
registeredConversationId == incomingConversationId);
final matchesSession =
registeredSessionId != null &&
incomingSessionId != null &&
registeredSessionId == incomingSessionId;
if (!matchesChat && !matchesSession) {
return false;
}
if (!requireFocus) {
return true;
}
if (matchesSession) {
// Session-targeted messages should always pass through even if unfocused
return true;
}
return _isAppForeground;
}
Map<String, dynamic>? _coerceToMap(dynamic data) {
if (data is Map<String, dynamic>) {
return data;
}
if (data is Map) {
return Map<String, dynamic>.from(data);
}
return null;
}
void Function(dynamic response)? _wrapAck(dynamic ack) {
if (ack is! Function) return null;
return (dynamic payload) {
try {
if (payload is List) {
Function.apply(ack, payload);
} else if (payload == null) {
Function.apply(ack, const []);
} else {
Function.apply(ack, [payload]);
}
} catch (_) {}
};
}
String? _extractSessionId(Map<String, dynamic> event) {
String? candidate;
if (event['session_id'] != null) {
candidate = event['session_id'].toString();
}
final data = event['data'];
if (data is Map) {
if (candidate == null && data['session_id'] != null) {
candidate = data['session_id'].toString();
}
if (candidate == null && data['sessionId'] != null) {
candidate = data['sessionId'].toString();
}
final inner = data['data'];
if (inner is Map) {
if (candidate == null && inner['session_id'] != null) {
candidate = inner['session_id'].toString();
}
if (candidate == null && inner['sessionId'] != null) {
candidate = inner['sessionId'].toString();
}
}
}
return candidate;
}
String _nextHandlerId() {
_handlerSeed += 1;
return _handlerSeed.toString();
}
}
class SocketEventSubscription {
SocketEventSubscription(this._dispose);
final VoidCallback _dispose;
bool _isDisposed = false;
void dispose() {
if (_isDisposed) return;
_isDisposed = true;
_dispose();
}
}
class _ChatEventRegistration {
_ChatEventRegistration({
required this.id,
required this.handler,
this.conversationId,
this.sessionId,
this.requireFocus = true,
});
final String id;
final String? conversationId;
final String? sessionId;
final bool requireFocus;
final SocketChatEventHandler handler;
}
class _ChannelEventRegistration {
_ChannelEventRegistration({
required this.id,
required this.handler,
this.conversationId,
this.sessionId,
this.requireFocus = true,
});
final String id;
final String? conversationId;
final String? sessionId;
final bool requireFocus;
final SocketChannelEventHandler handler;
} }

View File

@@ -17,17 +17,28 @@ import '../utils/debug_logger.dart';
// Keep local verbosity toggle for socket logs // Keep local verbosity toggle for socket logs
const bool kSocketVerboseLogging = false; const bool kSocketVerboseLogging = false;
class ActiveSocketStream {
ActiveSocketStream({
required this.streamSubscription,
required this.socketSubscriptions,
required this.disposeWatchdog,
});
final StreamSubscription<String> streamSubscription;
final List<SocketEventSubscription> socketSubscriptions;
final VoidCallback disposeWatchdog;
}
/// Unified streaming helper for chat send/regenerate flows. /// Unified streaming helper for chat send/regenerate flows.
/// ///
/// This attaches chunked SSE streaming handlers, optional WebSocket event handlers, /// This attaches chunked SSE streaming handlers, optional WebSocket event handlers,
/// and manages background search/image-gen UI updates. It operates via callbacks to /// and manages background search/image-gen UI updates. It operates via callbacks to
/// avoid tight coupling with provider files for easier reuse and testing. /// avoid tight coupling with provider files for easier reuse and testing.
StreamSubscription<String> attachUnifiedChunkedStreaming({ ActiveSocketStream attachUnifiedChunkedStreaming({
required Stream<String> stream, required Stream<String> stream,
required bool webSearchEnabled, required bool webSearchEnabled,
required bool isBackgroundFlow, required bool isBackgroundFlow,
required bool suppressSocketContentInitially, required bool suppressSocketContentInitially,
required bool usingDynamicChannelInitially,
required String assistantMessageId, required String assistantMessageId,
required String modelId, required String modelId,
required Map<String, dynamic> modelItem, required Map<String, dynamic> modelItem,
@@ -91,13 +102,16 @@ StreamSubscription<String> attachUnifiedChunkedStreaming({
); );
InactivityWatchdog? socketWatchdog; InactivityWatchdog? socketWatchdog;
final socketSubscriptions = <SocketEventSubscription>[];
if (socketService != null) { if (socketService != null) {
socketWatchdog = InactivityWatchdog( socketWatchdog = InactivityWatchdog(
window: const Duration(minutes: 5), window: const Duration(minutes: 5),
onTimeout: () { onTimeout: () {
try { try {
socketService.offChatEvents(); for (final sub in socketSubscriptions) {
socketService.offChannelEvents(); sub.dispose();
}
socketSubscriptions.clear();
} catch (_) {} } catch (_) {}
try { try {
final msgs = getMessages(); final msgs = getMessages();
@@ -107,13 +121,26 @@ StreamSubscription<String> attachUnifiedChunkedStreaming({
finishStreaming(); finishStreaming();
} }
} catch (_) {} } catch (_) {}
socketWatchdog?.stop();
}, },
)..start(); )..start();
} }
void disposeSocketSubscriptions() {
if (socketSubscriptions.isEmpty) {
return;
}
for (final sub in socketSubscriptions) {
try {
sub.dispose();
} catch (_) {}
}
socketSubscriptions.clear();
socketWatchdog?.stop();
}
bool isSearching = false; bool isSearching = false;
bool suppressSocketContent = suppressSocketContentInitially; bool suppressSocketContent = suppressSocketContentInitially;
bool usingDynamicChannel = usingDynamicChannelInitially;
void updateImagesFromCurrentContent() { void updateImagesFromCurrentContent() {
try { try {
@@ -414,6 +441,13 @@ StreamSubscription<String> attachUnifiedChunkedStreaming({
final messageId = ev['message_id']?.toString(); final messageId = ev['message_id']?.toString();
socketWatchdog?.ping(); socketWatchdog?.ping();
if (kSocketVerboseLogging && payload is Map) {
DebugLogger.log(
'socket delta type=$type suppress=$suppressSocketContent session=$sessionId message=$messageId keys=${payload.keys.toList()}',
scope: 'socket/chat',
);
}
if (type == 'chat:completion' && payload != null) { if (type == 'chat:completion' && payload != null) {
if (payload is Map<String, dynamic>) { if (payload is Map<String, dynamic>) {
if (payload.containsKey('tool_calls')) { if (payload.containsKey('tool_calls')) {
@@ -488,6 +522,13 @@ StreamSubscription<String> attachUnifiedChunkedStreaming({
} }
} }
} }
if (!suppressSocketContent && payload.containsKey('content')) {
final raw = payload['content']?.toString() ?? '';
if (raw.isNotEmpty) {
replaceLastMessageContent(raw);
updateImagesFromCurrentContent();
}
}
if (payload['done'] == true) { if (payload['done'] == true) {
try { try {
// ignore: unawaited_futures // ignore: unawaited_futures
@@ -603,6 +644,7 @@ StreamSubscription<String> attachUnifiedChunkedStreaming({
return current.copyWith(metadata: metadata, isStreaming: false); return current.copyWith(metadata: metadata, isStreaming: false);
}); });
} }
disposeSocketSubscriptions();
finishStreaming(); finishStreaming();
} else if (type == 'chat:message:follow_ups' && payload != null) { } else if (type == 'chat:message:follow_ups' && payload != null) {
final followMap = _asStringMap(payload); final followMap = _asStringMap(payload);
@@ -890,23 +932,21 @@ StreamSubscription<String> attachUnifiedChunkedStreaming({
} }
if (socketService != null) { if (socketService != null) {
socketService.onChatEvents(chatHandler); final chatSub = socketService.addChatEventHandler(
socketService.onChannelEvents(channelEventsHandler); conversationId: activeConversationId,
Future.delayed(const Duration(seconds: 90), () { sessionId: sessionId,
try { requireFocus: false,
socketService.offChatEvents(); handler: chatHandler,
socketService.offChannelEvents(); );
} catch (_) {} socketSubscriptions.add(chatSub);
try {
final msgs = getMessages(); final channelSub = socketService.addChannelEventHandler(
if (msgs.isNotEmpty && conversationId: activeConversationId,
msgs.last.role == 'assistant' && sessionId: sessionId,
msgs.last.isStreaming) { requireFocus: false,
finishStreaming(); handler: channelEventsHandler,
} );
} catch (_) {} socketSubscriptions.add(channelSub);
socketWatchdog?.stop();
});
} }
final subscription = persistentController.stream.listen( final subscription = persistentController.stream.listen(
@@ -951,25 +991,29 @@ StreamSubscription<String> attachUnifiedChunkedStreaming({
// Allow socket-delivered follow-ups/title updates after SSE completes // Allow socket-delivered follow-ups/title updates after SSE completes
suppressSocketContent = false; suppressSocketContent = false;
// If SSE-driven (no dynamic channel/background flow), finish now // If SSE-driven (no dynamic channel/background flow), clean up sockets
if (!usingDynamicChannel && !isBackgroundFlow) { if (!isBackgroundFlow) {
finishStreaming(); finishStreaming();
Future.microtask(refreshConversationSnapshot); Future.microtask(refreshConversationSnapshot);
} }
socketWatchdog?.stop();
}, },
onError: (error) async { onError: (error) async {
try { try {
persistentService.unregisterStream(streamId); persistentService.unregisterStream(streamId);
} catch (_) {} } catch (_) {}
suppressSocketContent = false; suppressSocketContent = false;
disposeSocketSubscriptions();
finishStreaming(); finishStreaming();
Future.microtask(refreshConversationSnapshot); Future.microtask(refreshConversationSnapshot);
socketWatchdog?.stop(); socketWatchdog?.stop();
}, },
); );
return subscription; return ActiveSocketStream(
streamSubscription: subscription,
socketSubscriptions: socketSubscriptions,
disposeWatchdog: () => socketWatchdog?.stop(),
);
} }
List<Map<String, dynamic>> _extractFilesFromResult(dynamic resp) { List<Map<String, dynamic>> _extractFilesFromResult(dynamic resp) {

View File

@@ -1,10 +1,12 @@
import 'dart:convert'; import 'dart:convert';
import 'package:yaml/yaml.dart' as yaml; import 'package:yaml/yaml.dart' as yaml;
import 'package:flutter/foundation.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:uuid/uuid.dart'; import 'package:uuid/uuid.dart';
import '../../../core/utils/tool_calls_parser.dart'; import '../../../core/utils/tool_calls_parser.dart';
import '../../../core/services/streaming_helper.dart'; import '../../../core/services/streaming_helper.dart';
import '../../../core/services/socket_service.dart';
import '../../../core/models/chat_message.dart'; import '../../../core/models/chat_message.dart';
import '../../../core/models/conversation.dart'; import '../../../core/models/conversation.dart';
import '../../../core/providers/app_providers.dart'; import '../../../core/providers/app_providers.dart';
@@ -87,6 +89,8 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
StreamSubscription? _messageStream; StreamSubscription? _messageStream;
ProviderSubscription? _conversationListener; ProviderSubscription? _conversationListener;
final List<StreamSubscription> _subscriptions = []; final List<StreamSubscription> _subscriptions = [];
final List<SocketEventSubscription> _socketSubscriptions = [];
VoidCallback? _socketTeardown;
// Activity-based watchdog to prevent stuck typing indicator // Activity-based watchdog to prevent stuck typing indicator
InactivityWatchdog? _typingWatchdog; InactivityWatchdog? _typingWatchdog;
@@ -163,6 +167,7 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
_subscriptions.clear(); _subscriptions.clear();
_cancelMessageStream(); _cancelMessageStream();
cancelSocketSubscriptions();
_cancelTypingGuard(); _cancelTypingGuard();
_conversationListener?.close(); _conversationListener?.close();
@@ -181,6 +186,7 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
void _cancelMessageStream() { void _cancelMessageStream() {
_messageStream?.cancel(); _messageStream?.cancel();
_messageStream = null; _messageStream = null;
cancelSocketSubscriptions();
} }
void _cancelTypingGuard() { void _cancelTypingGuard() {
@@ -364,6 +370,31 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
_addSubscription(stream); _addSubscription(stream);
} }
void setSocketSubscriptions(
List<SocketEventSubscription> subscriptions, {
VoidCallback? onDispose,
}) {
cancelSocketSubscriptions();
_socketSubscriptions.addAll(subscriptions);
_socketTeardown = onDispose;
}
void cancelSocketSubscriptions() {
if (_socketSubscriptions.isEmpty) {
_socketTeardown?.call();
_socketTeardown = null;
return;
}
for (final sub in _socketSubscriptions) {
try {
sub.dispose();
} catch (_) {}
}
_socketSubscriptions.clear();
_socketTeardown?.call();
_socketTeardown = null;
}
void addMessage(ChatMessage message) { void addMessage(ChatMessage message) {
state = [...state, message]; state = [...state, message];
if (message.role == 'assistant' && message.isStreaming) { if (message.role == 'assistant' && message.isStreaming) {
@@ -1230,6 +1261,7 @@ Future<void> regenerateMessage(
enableImageGeneration: imageGenerationEnabled, enableImageGeneration: imageGenerationEnabled,
modelItem: modelItem, modelItem: modelItem,
sessionIdOverride: passSocketSession ? socketSessionId : null, sessionIdOverride: passSocketSession ? socketSessionId : null,
socketSessionId: socketSessionId,
toolServers: toolServers, toolServers: toolServers,
backgroundTasks: bgTasks, backgroundTasks: bgTasks,
responseMessageId: assistantMessageId, responseMessageId: assistantMessageId,
@@ -1237,6 +1269,8 @@ Future<void> regenerateMessage(
final stream = response.stream; final stream = response.stream;
final sessionId = response.sessionId; final sessionId = response.sessionId;
final effectiveSessionId =
response.socketSessionId ?? socketSessionId ?? sessionId;
// New unified streaming path via helper; bypass old inline socket block // New unified streaming path via helper; bypass old inline socket block
final bool isBackgroundFlow = final bool isBackgroundFlow =
@@ -1258,16 +1292,15 @@ Future<void> regenerateMessage(
}); });
} catch (_) {} } catch (_) {}
final sendStreamSub = attachUnifiedChunkedStreaming( final activeStream = attachUnifiedChunkedStreaming(
stream: stream, stream: stream,
webSearchEnabled: webSearchEnabled, webSearchEnabled: webSearchEnabled,
isBackgroundFlow: isBackgroundFlow, isBackgroundFlow: isBackgroundFlow,
suppressSocketContentInitially: !isBackgroundFlow, suppressSocketContentInitially: !isBackgroundFlow,
usingDynamicChannelInitially: false,
assistantMessageId: assistantMessageId, assistantMessageId: assistantMessageId,
modelId: selectedModel.id, modelId: selectedModel.id,
modelItem: modelItem, modelItem: modelItem,
sessionId: sessionId, sessionId: effectiveSessionId,
activeConversationId: activeConversation.id, activeConversationId: activeConversation.id,
api: api, api: api,
socketService: socketService, socketService: socketService,
@@ -1319,7 +1352,12 @@ Future<void> regenerateMessage(
ref.read(chatMessagesProvider.notifier).finishStreaming(), ref.read(chatMessagesProvider.notifier).finishStreaming(),
getMessages: () => ref.read(chatMessagesProvider), getMessages: () => ref.read(chatMessagesProvider),
); );
ref.read(chatMessagesProvider.notifier).setMessageStream(sendStreamSub); ref.read(chatMessagesProvider.notifier)
..setMessageStream(activeStream.streamSubscription)
..setSocketSubscriptions(
activeStream.socketSubscriptions,
onDispose: activeStream.disposeWatchdog,
);
return; return;
} catch (e) { } catch (e) {
rethrow; rethrow;
@@ -1756,6 +1794,7 @@ Future<void> _sendMessageInternal(
// Bind to Socket session whenever available so the server can push // Bind to Socket session whenever available so the server can push
// streaming updates to this client (improves first-turn streaming). // streaming updates to this client (improves first-turn streaming).
sessionIdOverride: wantSessionBinding ? socketSessionId : null, sessionIdOverride: wantSessionBinding ? socketSessionId : null,
socketSessionId: socketSessionId,
toolServers: toolServers, toolServers: toolServers,
backgroundTasks: bgTasks, backgroundTasks: bgTasks,
responseMessageId: assistantMessageId, responseMessageId: assistantMessageId,
@@ -1763,6 +1802,8 @@ Future<void> _sendMessageInternal(
final stream = response.stream; final stream = response.stream;
final sessionId = response.sessionId; final sessionId = response.sessionId;
final effectiveSessionId =
response.socketSessionId ?? socketSessionId ?? sessionId;
// Use unified streaming helper for SSE/WebSocket handling // Use unified streaming helper for SSE/WebSocket handling
final bool isBackgroundFlow = final bool isBackgroundFlow =
@@ -1784,16 +1825,15 @@ Future<void> _sendMessageInternal(
}); });
} catch (_) {} } catch (_) {}
final sendStreamSub = attachUnifiedChunkedStreaming( final activeStream = attachUnifiedChunkedStreaming(
stream: stream, stream: stream,
webSearchEnabled: webSearchEnabled, webSearchEnabled: webSearchEnabled,
isBackgroundFlow: isBackgroundFlow, isBackgroundFlow: isBackgroundFlow,
suppressSocketContentInitially: !isBackgroundFlow, suppressSocketContentInitially: !isBackgroundFlow,
usingDynamicChannelInitially: false,
assistantMessageId: assistantMessageId, assistantMessageId: assistantMessageId,
modelId: selectedModel.id, modelId: selectedModel.id,
modelItem: modelItem, modelItem: modelItem,
sessionId: sessionId, sessionId: effectiveSessionId,
activeConversationId: activeConversation?.id, activeConversationId: activeConversation?.id,
api: api, api: api,
socketService: socketService, socketService: socketService,
@@ -1846,7 +1886,12 @@ Future<void> _sendMessageInternal(
getMessages: () => ref.read(chatMessagesProvider), getMessages: () => ref.read(chatMessagesProvider),
); );
ref.read(chatMessagesProvider.notifier).setMessageStream(sendStreamSub); ref.read(chatMessagesProvider.notifier)
..setMessageStream(activeStream.streamSubscription)
..setSocketSubscriptions(
activeStream.socketSubscriptions,
onDispose: activeStream.disposeWatchdog,
);
return; return;
} catch (e) { } catch (e) {
// Handle error - remove the assistant message placeholder // Handle error - remove the assistant message placeholder
@@ -2160,13 +2205,6 @@ final stopGenerationProvider = Provider<void Function()>((ref) {
final api = ref.read(apiServiceProvider); final api = ref.read(apiServiceProvider);
api?.cancelStreamingMessage(lastId); api?.cancelStreamingMessage(lastId);
// Stop any active socket listeners for chat/channel events
try {
final socketService = ref.read(socketServiceProvider);
socketService?.offChatEvents();
socketService?.offChannelEvents();
} catch (_) {}
// Cancel local stream subscription to stop propagating further chunks // Cancel local stream subscription to stop propagating further chunks
ref.read(chatMessagesProvider.notifier).cancelActiveMessageStream(); ref.read(chatMessagesProvider.notifier).cancelActiveMessageStream();
} }