fix(chat): Improve server message synchronization and streaming recovery

This commit is contained in:
cogwheel0
2025-12-02 16:15:16 +05:30
parent 384df15514
commit c59a46d568
4 changed files with 357 additions and 122 deletions

View File

@@ -176,7 +176,10 @@ class SocketService with WidgetsBindingObserver {
requireFocus: requireFocus, requireFocus: requireFocus,
handler: handler, handler: handler,
); );
return SocketEventSubscription(() => _chatEventHandlers.remove(id)); return SocketEventSubscription(
() => _chatEventHandlers.remove(id),
handlerId: id,
);
} }
SocketEventSubscription addChannelEventHandler({ SocketEventSubscription addChannelEventHandler({
@@ -193,7 +196,10 @@ class SocketService with WidgetsBindingObserver {
requireFocus: requireFocus, requireFocus: requireFocus,
handler: handler, handler: handler,
); );
return SocketEventSubscription(() => _channelEventHandlers.remove(id)); return SocketEventSubscription(
() => _channelEventHandlers.remove(id),
handlerId: id,
);
} }
void clearChatEventHandlers() { void clearChatEventHandlers() {
@@ -204,6 +210,66 @@ class SocketService with WidgetsBindingObserver {
_channelEventHandlers.clear(); _channelEventHandlers.clear();
} }
/// Update the session ID for a chat event handler registration.
/// Used when socket reconnects and gets a new session ID.
void updateChatHandlerSessionId(String handlerId, String newSessionId) {
final existing = _chatEventHandlers[handlerId];
if (existing != null) {
_chatEventHandlers[handlerId] = _ChatEventRegistration(
id: existing.id,
conversationId: existing.conversationId,
sessionId: newSessionId,
requireFocus: existing.requireFocus,
handler: existing.handler,
);
}
}
/// Update the session ID for a channel event handler registration.
/// Used when socket reconnects and gets a new session ID.
void updateChannelHandlerSessionId(String handlerId, String newSessionId) {
final existing = _channelEventHandlers[handlerId];
if (existing != null) {
_channelEventHandlers[handlerId] = _ChannelEventRegistration(
id: existing.id,
conversationId: existing.conversationId,
sessionId: newSessionId,
requireFocus: existing.requireFocus,
handler: existing.handler,
);
}
}
/// Update session IDs for all handlers matching a conversation ID.
/// Called after socket reconnection to update handlers with the new session.
void updateSessionIdForConversation(
String conversationId,
String newSessionId,
) {
for (final entry in _chatEventHandlers.entries.toList()) {
if (entry.value.conversationId == conversationId) {
_chatEventHandlers[entry.key] = _ChatEventRegistration(
id: entry.value.id,
conversationId: entry.value.conversationId,
sessionId: newSessionId,
requireFocus: entry.value.requireFocus,
handler: entry.value.handler,
);
}
}
for (final entry in _channelEventHandlers.entries.toList()) {
if (entry.value.conversationId == conversationId) {
_channelEventHandlers[entry.key] = _ChannelEventRegistration(
id: entry.value.id,
conversationId: entry.value.conversationId,
sessionId: newSessionId,
requireFocus: entry.value.requireFocus,
handler: entry.value.handler,
);
}
}
}
// Subscribe to an arbitrary socket.io event (used for dynamic tool channels) // Subscribe to an arbitrary socket.io event (used for dynamic tool channels)
void onEvent(String eventName, void Function(dynamic data) handler) { void onEvent(String eventName, void Function(dynamic data) handler) {
_socket?.on(eventName, handler); _socket?.on(eventName, handler);
@@ -378,19 +444,29 @@ class SocketService with WidgetsBindingObserver {
incomingSessionId != null && incomingSessionId != null &&
registeredSessionId == incomingSessionId; registeredSessionId == incomingSessionId;
// Must match either conversation or session to be considered
if (!matchesConversation && !matchesSession) { if (!matchesConversation && !matchesSession) {
return false; return false;
} }
// If no focus requirement, always deliver
if (!requireFocus) { if (!requireFocus) {
return true; return true;
} }
// Session-targeted messages always bypass focus check (critical for
// background streaming - done/delta events must arrive even when backgrounded)
if (matchesSession) { if (matchesSession) {
// Session-targeted messages should always pass through even if unfocused
return true; return true;
} }
// FIX for issue #172: If conversation matches (even without session match),
// still deliver when app is in foreground. This handles socket reconnection
// where session_id changes but chat_id stays the same.
if (matchesConversation && registeredConversationId != null) {
return _isAppForeground;
}
return _isAppForeground; return _isAppForeground;
} }
@@ -487,9 +563,10 @@ class SocketService with WidgetsBindingObserver {
} }
class SocketEventSubscription { class SocketEventSubscription {
SocketEventSubscription(this._dispose); SocketEventSubscription(this._dispose, {this.handlerId});
final VoidCallback _dispose; final VoidCallback _dispose;
final String? handlerId;
bool _isDisposed = false; bool _isDisposed = false;
void dispose() { void dispose() {

View File

@@ -123,6 +123,55 @@ class ActiveSocketStream {
final VoidCallback disposeWatchdog; final VoidCallback disposeWatchdog;
} }
/// Helper to handle reconnect recovery asynchronously with proper error handling.
/// Extracted to avoid async callback in Timer which silently drops the Future.
Future<void> _handleReconnectRecovery({
required bool Function() hasFinished,
required List<ChatMessage> Function() getMessages,
required Future<({String content, List<String> followUps, bool isDone})?>
Function()
pollServerForMessage,
required bool Function(
String,
List<String>, {
required bool finishIfDone,
required bool isDone,
required String source,
})
applyServerContent,
required void Function() syncImages,
}) async {
try {
if (hasFinished()) return;
final msgs = getMessages();
if (msgs.isEmpty ||
msgs.last.role != 'assistant' ||
!msgs.last.isStreaming) {
return;
}
final result = await pollServerForMessage();
if (hasFinished()) return;
if (result != null) {
final applied = applyServerContent(
result.content,
result.followUps,
finishIfDone: true,
isDone: result.isDone,
source: 'Reconnect recovery',
);
if (applied) {
syncImages();
}
}
} catch (e) {
// Log error but don't crash - reconnect recovery is best-effort
DebugLogger.log('Reconnect recovery failed: $e', scope: 'streaming/helper');
}
}
/// Unified streaming helper for chat send/regenerate flows. /// Unified streaming helper for chat send/regenerate flows.
/// ///
/// This attaches WebSocket event handlers and manages background search/image-gen /// This attaches WebSocket event handlers and manages background search/image-gen
@@ -203,19 +252,124 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
final socketSubscriptions = <VoidCallback>[]; final socketSubscriptions = <VoidCallback>[];
final hasSocketSignals = final hasSocketSignals =
socketService != null || registerDeltaListener != null; socketService != null || registerDeltaListener != null;
// Reference to image sync function - initialized to no-op, reassigned after definition.
// Must not be `late` to avoid LateInitializationError if callbacks fire early.
void Function() syncImages = () {};
// Shared helper to poll server for message content.
// Used by watchdog timeout and reconnection handler to recover from missed events.
// Returns (content, followUps, isDone) or null if fetch fails or message not found.
Future<({String content, List<String> followUps, bool isDone})?>
pollServerForMessage() async {
try {
final chatId = activeConversationId;
if (chatId == null || chatId.isEmpty) return null;
final resp = await api.dio.get('/api/v1/chats/$chatId');
final data = resp.data as Map<String, dynamic>?;
final chatObj = data?['chat'] as Map<String, dynamic>?;
if (chatObj == null) return null;
final list = chatObj['messages'];
if (list is! List) return null;
final serverMsg = list.firstWhere(
(m) => m is Map && m['id']?.toString() == assistantMessageId,
orElse: () => null,
);
if (serverMsg == null || serverMsg is! Map) return null;
// Extract content
final serverContent = serverMsg['content'];
String content = '';
if (serverContent is String) {
content = serverContent;
} else if (serverContent is List) {
final textItem = serverContent.firstWhere(
(i) => i is Map && i['type'] == 'text',
orElse: () => null,
);
if (textItem != null) {
content = textItem['text']?.toString() ?? '';
}
}
// Extract follow-ups (check both camelCase and snake_case keys)
// Use _parseFollowUpsField for consistent parsing with socket handler
final followUpsRaw = serverMsg['followUps'] ?? serverMsg['follow_ups'];
final followUps = _parseFollowUpsField(followUpsRaw);
// Check completion status
final isDone =
serverMsg['done'] == true ||
(serverMsg['isStreaming'] != true && content.isNotEmpty);
return (content: content, followUps: followUps, isDone: isDone);
} catch (e) {
DebugLogger.log('Server poll failed: $e', scope: 'streaming/helper');
return null;
}
}
// Helper to apply server content if it's better than local.
// Returns true if content was applied, so caller can trigger image sync.
bool applyServerContent(
String content,
List<String> followUps, {
required bool finishIfDone,
required bool isDone,
required String source,
}) {
final msgs = getMessages();
if (msgs.isEmpty || msgs.last.role != 'assistant') return false;
if (content.isNotEmpty && content.length >= msgs.last.content.length) {
DebugLogger.log(
'$source: adopting server content (${content.length} chars)',
scope: 'streaming/helper',
);
replaceLastMessageContent(content);
if (followUps.isNotEmpty) {
setFollowUps(assistantMessageId, followUps);
}
if (finishIfDone && isDone && msgs.last.isStreaming) {
wrappedFinishStreaming();
}
return true;
}
return false;
}
if (hasSocketSignals) { if (hasSocketSignals) {
// Use a short inactivity timeout - if no data arrives for 10 seconds, // Inactivity timeout - if no data arrives for 10 seconds, poll server
// something is likely wrong with the connection. Combined with 1-second // and finish streaming. This handles stuck connections (issue #172).
// polling and server state sync, this provides fast recovery.
socketWatchdog = InactivityWatchdog( socketWatchdog = InactivityWatchdog(
window: const Duration(seconds: 10), window: const Duration(seconds: 10),
// Absolute cap ensures streaming never gets stuck indefinitely
absoluteCap: const Duration(minutes: 5), absoluteCap: const Duration(minutes: 5),
onTimeout: () { onTimeout: () async {
DebugLogger.log( DebugLogger.log(
'Socket watchdog timeout - finishing streaming gracefully', 'Socket watchdog timeout - polling server',
scope: 'streaming/helper', scope: 'streaming/helper',
); );
final result = await pollServerForMessage();
if (result != null) {
final applied = applyServerContent(
result.content,
result.followUps,
finishIfDone: false, // We always finish on timeout
isDone: result.isDone,
source: 'Watchdog',
);
if (applied) {
syncImages();
}
}
// Clean up and finish
try { try {
for (final dispose in socketSubscriptions) { for (final dispose in socketSubscriptions) {
try { try {
@@ -224,6 +378,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
} }
socketSubscriptions.clear(); socketSubscriptions.clear();
} catch (_) {} } catch (_) {}
try { try {
final msgs = getMessages(); final msgs = getMessages();
if (msgs.isNotEmpty && if (msgs.isNotEmpty &&
@@ -236,103 +391,43 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
}, },
)..start(); )..start();
// Subscribe to socket reconnection events to sync state after reconnect. // Handle socket reconnection - update session IDs and check for missed events
// This catches cases where the done signal was missed due to disconnection.
if (socketService != null) { if (socketService != null) {
StreamSubscription<void>? reconnectSub; StreamSubscription<void>? reconnectSub;
Timer? reconnectDelayTimer; Timer? reconnectDelayTimer;
var reconnectSubDisposed = false;
reconnectSub = socketService.onReconnect.listen((_) { reconnectSub = socketService.onReconnect.listen((_) {
DebugLogger.log( DebugLogger.log(
'Socket reconnected - checking server state for missed signals', 'Socket reconnected - updating session ID',
scope: 'streaming/helper', scope: 'streaming/helper',
); );
// Cancel any pending timer from a previous reconnect // Update handler registrations with new session ID (issue #172 fix)
final newSessionId = socketService.sessionId;
final convId = activeConversationId;
if (newSessionId != null && convId != null && convId.isNotEmpty) {
socketService.updateSessionIdForConversation(convId, newSessionId);
}
// Brief delay then check server for missed completion
reconnectDelayTimer?.cancel(); reconnectDelayTimer?.cancel();
reconnectDelayTimer = Timer(const Duration(milliseconds: 500), () {
// After reconnection, give a brief moment for any queued events // Wrap async work in unawaited to handle errors properly
// then check server state to catch any missed completion signals. unawaited(
// Use Timer instead of Future.delayed so it can be cancelled on dispose. _handleReconnectRecovery(
reconnectDelayTimer = Timer(const Duration(milliseconds: 500), () async { hasFinished: () => hasFinished,
// Check if disposed before executing getMessages: getMessages,
if (reconnectSubDisposed || hasFinished) return; pollServerForMessage: pollServerForMessage,
applyServerContent: applyServerContent,
// Check current state before making the async call syncImages: syncImages,
var msgs = getMessages(); ),
if (msgs.isEmpty || msgs.last.role != 'assistant') return; );
if (!msgs.last.isStreaming) return;
// Fetch conversation from server to check if streaming actually completed
try {
final chatId = activeConversationId;
if (chatId != null && chatId.isNotEmpty) {
final resp = await api.dio.get('/api/v1/chats/$chatId');
// Re-check state after async call - it may have changed or been disposed
if (reconnectSubDisposed || hasFinished) return;
msgs = getMessages();
if (msgs.isEmpty || msgs.last.role != 'assistant') return;
if (!msgs.last.isStreaming) return;
final data = resp.data as Map<String, dynamic>?;
final chatObj = data?['chat'] as Map<String, dynamic>?;
if (chatObj != null) {
// Check if server has the completed message
final list = chatObj['messages'];
if (list is List) {
final serverMsg = list.firstWhere(
(m) =>
m is Map && m['id']?.toString() == assistantMessageId,
orElse: () => null,
);
if (serverMsg != null && serverMsg is Map) {
final serverContent = serverMsg['content'];
String content = '';
if (serverContent is String) {
content = serverContent;
} else if (serverContent is List) {
final textItem = serverContent.firstWhere(
(i) => i is Map && i['type'] == 'text',
orElse: () => null,
);
if (textItem != null) {
content = textItem['text']?.toString() ?? '';
}
}
// If server has content, adopt it and finish streaming
// Use current msgs (re-fetched after await) for comparison
if (content.isNotEmpty &&
content.length >= msgs.last.content.length) {
DebugLogger.log(
'Reconnect recovery: adopting server content (${content.length} chars)',
scope: 'streaming/helper',
);
replaceLastMessageContent(content);
wrappedFinishStreaming();
}
}
}
}
}
} catch (e) {
DebugLogger.log(
'Reconnect recovery fetch failed: $e',
scope: 'streaming/helper',
);
}
}); });
}); });
socketSubscriptions.add(() { socketSubscriptions.add(() {
reconnectSubDisposed = true;
reconnectDelayTimer?.cancel(); reconnectDelayTimer?.cancel();
reconnectDelayTimer = null;
reconnectSub?.cancel(); reconnectSub?.cancel();
reconnectSub = null;
}); });
} }
} }
@@ -474,6 +569,9 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
} catch (_) {} } catch (_) {}
} }
// Bind the late reference now that updateImagesFromCurrentContent is defined
syncImages = updateImagesFromCurrentContent;
bool refreshingSnapshot = false; bool refreshingSnapshot = false;
Future<void> refreshConversationSnapshot() async { Future<void> refreshConversationSnapshot() async {
if (refreshingSnapshot) return; if (refreshingSnapshot) return;
@@ -642,12 +740,17 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
socketWatchdog?.ping(); socketWatchdog?.ping();
// Increased timeout to match our more generous streaming timeouts // Increased timeout to match our more generous streaming timeouts
// OpenWebUI doesn't have such aggressive channel timeouts // OpenWebUI doesn't have such aggressive channel timeouts
Future.delayed(const Duration(minutes: 12), () { // Use Timer instead of Future.delayed so it can be cancelled on cleanup
final channelTimeoutTimer = Timer(const Duration(minutes: 12), () {
try { try {
socketService?.offEvent(channel); socketService?.offEvent(channel);
} catch (_) {} } catch (_) {}
socketWatchdog?.stop(); socketWatchdog?.stop();
}); });
// Register cleanup so the timer is cancelled if streaming completes early
socketSubscriptions.add(() {
channelTimeoutTimer.cancel();
});
} }
void chatHandler( void chatHandler(
@@ -1311,12 +1414,30 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
} }
}, },
onComplete: () { onComplete: () {
// HTTP stream completed - cleanup already done in onDone handler. // HTTP stream completed.
// For WebSocket flows, actual completion is handled by socket events (done: true). // With WebSocket-based streaming, HTTP closes immediately after returning task_id.
// Only finish streaming here if there are no socket subscriptions (simple/legacy flow). // All actual content comes via WebSocket events, so we should NOT finish streaming
// here if socket subscriptions are active - the socket done:true event will finish it.
DebugLogger.log(
'HTTP stream complete '
'(socketSubs=${socketSubscriptions.length}, socketConnected=${socketService?.isConnected})',
scope: 'streaming/helper',
);
// Only finish streaming if no socket subscriptions are active.
// If sockets are active, they will handle the completion via done:true event.
if (socketSubscriptions.isEmpty) { if (socketSubscriptions.isEmpty) {
DebugLogger.log(
'No socket subscriptions - finishing streaming on HTTP complete',
scope: 'streaming/helper',
);
wrappedFinishStreaming(); wrappedFinishStreaming();
Future.microtask(refreshConversationSnapshot); Future.microtask(refreshConversationSnapshot);
} else {
DebugLogger.log(
'Socket subscriptions active - waiting for socket done signal',
scope: 'streaming/helper',
);
} }
}, },
onError: (error, stackTrace) async { onError: (error, stackTrace) async {
@@ -1334,10 +1455,10 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
// Check if this is a recoverable error (network issues, etc.) // Check if this is a recoverable error (network issues, etc.)
final errorText = error.toString(); final errorText = error.toString();
final isRecoverable = final isRecoverable =
(error is! FormatException && error is! FormatException &&
errorText.contains('SocketException')) || (errorText.contains('SocketException') ||
errorText.contains('TimeoutException') || errorText.contains('TimeoutException') ||
errorText.contains('HandshakeException'); errorText.contains('HandshakeException'));
if (isRecoverable && socketService != null) { if (isRecoverable && socketService != null) {
// Try to recover via socket connection if available // Try to recover via socket connection if available

View File

@@ -5,6 +5,9 @@ import 'dart:async';
/// Call [ping] whenever activity occurs. If no activity happens /// Call [ping] whenever activity occurs. If no activity happens
/// within [window], [onTimeout] fires. Optionally, an [absoluteCap] /// within [window], [onTimeout] fires. Optionally, an [absoluteCap]
/// enforces a maximum total duration regardless of activity. /// enforces a maximum total duration regardless of activity.
///
/// The [onTimeout] callback can be sync or async - if async, it will be
/// awaited before the watchdog considers itself fully stopped.
class InactivityWatchdog { class InactivityWatchdog {
InactivityWatchdog({ InactivityWatchdog({
required Duration window, required Duration window,
@@ -13,16 +16,20 @@ class InactivityWatchdog {
}) : _window = window, }) : _window = window,
_absoluteCap = absoluteCap; _absoluteCap = absoluteCap;
final void Function() onTimeout; final FutureOr<void> Function() onTimeout;
Duration _window; Duration _window;
Duration? _absoluteCap; Duration? _absoluteCap;
Timer? _timer; Timer? _timer;
Timer? _absoluteTimer; Timer? _absoluteTimer;
bool _started = false; bool _started = false;
bool _firing = false;
Duration get window => _window; Duration get window => _window;
/// Whether the timeout callback is currently executing.
bool get isFiring => _firing;
void setWindow(Duration newWindow) { void setWindow(Duration newWindow) {
_window = newWindow; _window = newWindow;
if (_started) { if (_started) {
@@ -43,6 +50,8 @@ class InactivityWatchdog {
void start() { void start() {
if (_started) return; if (_started) return;
// Prevent restart while callback is still executing to avoid double-fire
if (_firing) return;
_started = true; _started = true;
_restart(); _restart();
if (_absoluteCap != null) { if (_absoluteCap != null) {
@@ -51,6 +60,8 @@ class InactivityWatchdog {
} }
void ping() { void ping() {
// Prevent restart while callback is still executing to avoid double-fire
if (_firing) return;
if (!_started) { if (!_started) {
start(); start();
return; return;
@@ -73,10 +84,24 @@ class InactivityWatchdog {
_timer = Timer(_window, _fire); _timer = Timer(_window, _fire);
} }
/// Synchronous entry point called by Timer. Kicks off async work.
void _fire() { void _fire() {
if (_firing) return; // Prevent re-entry
_firing = true;
stop(); stop();
// Execute the callback asynchronously. We don't await because Timer
// expects a sync callback, but the async work will complete in background.
_executeCallback();
}
/// Executes the timeout callback asynchronously.
Future<void> _executeCallback() async {
try { try {
onTimeout(); await onTimeout();
} catch (_) {} } catch (_) {
// Swallow errors to prevent unhandled exceptions
} finally {
_firing = false;
}
} }
} }

View File

@@ -355,29 +355,41 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
final serverMessages = serverConversation.messages; final serverMessages = serverConversation.messages;
if (serverMessages.isNotEmpty && state.isNotEmpty) { if (serverMessages.isNotEmpty && state.isNotEmpty) {
final serverLast = serverMessages.last;
final localLast = state.last; final localLast = state.last;
// If server has the same message but says it's not streaming, // Case 1: Server has more messages than local - streaming must be done
// or server has more content, sync from server if (serverMessages.length > state.length) {
if (serverLast.id == localLast.id && DebugLogger.log(
serverLast.role == 'assistant') { 'Server sync: server has more messages '
final serverDone = !serverLast.isStreaming; '(${serverMessages.length} vs ${state.length})',
final serverHasMoreContent = scope: 'chat/providers',
serverLast.content.length > localLast.content.length; );
state = serverMessages;
_cancelMessageStream();
return;
}
if (serverDone || serverHasMoreContent) { // Case 2: Find the local streaming message in server messages by ID
DebugLogger.log( // This handles cases where last messages differ
'Server sync: adopting server state ' if (localLast.role == 'assistant' && localLast.isStreaming) {
'(serverDone=$serverDone, serverHasMore=$serverHasMoreContent)', final serverVersion = serverMessages
scope: 'chat/providers', .where((m) => m.id == localLast.id)
); .firstOrNull;
state = serverMessages;
// Always cancel local streaming when adopting server state. if (serverVersion != null) {
// If serverDone, streaming is complete. If serverHasMoreContent, final serverDone = !serverVersion.isStreaming;
// we've adopted server content and continuing local streaming final serverHasMoreContent =
// could cause conflicts or duplicate content. serverVersion.content.length > localLast.content.length;
_cancelMessageStream();
if (serverDone || serverHasMoreContent) {
DebugLogger.log(
'Server sync: adopting server state '
'(serverDone=$serverDone, serverHasMore=$serverHasMoreContent)',
scope: 'chat/providers',
);
state = serverMessages;
_cancelMessageStream();
}
} }
} }
} }