Merge pull request #206 from cogwheel0/improve-server-message-sync

fix(chat): Improve server message synchronization and streaming recovery
This commit is contained in:
cogwheel
2025-12-02 16:21:14 +05:30
committed by GitHub
4 changed files with 357 additions and 122 deletions

View File

@@ -176,7 +176,10 @@ class SocketService with WidgetsBindingObserver {
requireFocus: requireFocus,
handler: handler,
);
return SocketEventSubscription(() => _chatEventHandlers.remove(id));
return SocketEventSubscription(
() => _chatEventHandlers.remove(id),
handlerId: id,
);
}
SocketEventSubscription addChannelEventHandler({
@@ -193,7 +196,10 @@ class SocketService with WidgetsBindingObserver {
requireFocus: requireFocus,
handler: handler,
);
return SocketEventSubscription(() => _channelEventHandlers.remove(id));
return SocketEventSubscription(
() => _channelEventHandlers.remove(id),
handlerId: id,
);
}
void clearChatEventHandlers() {
@@ -204,6 +210,66 @@ class SocketService with WidgetsBindingObserver {
_channelEventHandlers.clear();
}
/// Update the session ID for a chat event handler registration.
/// Used when socket reconnects and gets a new session ID.
void updateChatHandlerSessionId(String handlerId, String newSessionId) {
final existing = _chatEventHandlers[handlerId];
if (existing != null) {
_chatEventHandlers[handlerId] = _ChatEventRegistration(
id: existing.id,
conversationId: existing.conversationId,
sessionId: newSessionId,
requireFocus: existing.requireFocus,
handler: existing.handler,
);
}
}
/// Update the session ID for a channel event handler registration.
/// Used when socket reconnects and gets a new session ID.
void updateChannelHandlerSessionId(String handlerId, String newSessionId) {
final existing = _channelEventHandlers[handlerId];
if (existing != null) {
_channelEventHandlers[handlerId] = _ChannelEventRegistration(
id: existing.id,
conversationId: existing.conversationId,
sessionId: newSessionId,
requireFocus: existing.requireFocus,
handler: existing.handler,
);
}
}
/// Update session IDs for all handlers matching a conversation ID.
/// Called after socket reconnection to update handlers with the new session.
void updateSessionIdForConversation(
String conversationId,
String newSessionId,
) {
for (final entry in _chatEventHandlers.entries.toList()) {
if (entry.value.conversationId == conversationId) {
_chatEventHandlers[entry.key] = _ChatEventRegistration(
id: entry.value.id,
conversationId: entry.value.conversationId,
sessionId: newSessionId,
requireFocus: entry.value.requireFocus,
handler: entry.value.handler,
);
}
}
for (final entry in _channelEventHandlers.entries.toList()) {
if (entry.value.conversationId == conversationId) {
_channelEventHandlers[entry.key] = _ChannelEventRegistration(
id: entry.value.id,
conversationId: entry.value.conversationId,
sessionId: newSessionId,
requireFocus: entry.value.requireFocus,
handler: entry.value.handler,
);
}
}
}
// Subscribe to an arbitrary socket.io event (used for dynamic tool channels)
void onEvent(String eventName, void Function(dynamic data) handler) {
_socket?.on(eventName, handler);
@@ -378,19 +444,29 @@ class SocketService with WidgetsBindingObserver {
incomingSessionId != null &&
registeredSessionId == incomingSessionId;
// Must match either conversation or session to be considered
if (!matchesConversation && !matchesSession) {
return false;
}
// If no focus requirement, always deliver
if (!requireFocus) {
return true;
}
// Session-targeted messages always bypass focus check (critical for
// background streaming - done/delta events must arrive even when backgrounded)
if (matchesSession) {
// Session-targeted messages should always pass through even if unfocused
return true;
}
// FIX for issue #172: If conversation matches (even without session match),
// still deliver when app is in foreground. This handles socket reconnection
// where session_id changes but chat_id stays the same.
if (matchesConversation && registeredConversationId != null) {
return _isAppForeground;
}
return _isAppForeground;
}
@@ -487,9 +563,10 @@ class SocketService with WidgetsBindingObserver {
}
class SocketEventSubscription {
SocketEventSubscription(this._dispose);
SocketEventSubscription(this._dispose, {this.handlerId});
final VoidCallback _dispose;
final String? handlerId;
bool _isDisposed = false;
void dispose() {

View File

@@ -123,6 +123,55 @@ class ActiveSocketStream {
final VoidCallback disposeWatchdog;
}
/// Helper to handle reconnect recovery asynchronously with proper error handling.
/// Extracted to avoid async callback in Timer which silently drops the Future.
Future<void> _handleReconnectRecovery({
required bool Function() hasFinished,
required List<ChatMessage> Function() getMessages,
required Future<({String content, List<String> followUps, bool isDone})?>
Function()
pollServerForMessage,
required bool Function(
String,
List<String>, {
required bool finishIfDone,
required bool isDone,
required String source,
})
applyServerContent,
required void Function() syncImages,
}) async {
try {
if (hasFinished()) return;
final msgs = getMessages();
if (msgs.isEmpty ||
msgs.last.role != 'assistant' ||
!msgs.last.isStreaming) {
return;
}
final result = await pollServerForMessage();
if (hasFinished()) return;
if (result != null) {
final applied = applyServerContent(
result.content,
result.followUps,
finishIfDone: true,
isDone: result.isDone,
source: 'Reconnect recovery',
);
if (applied) {
syncImages();
}
}
} catch (e) {
// Log error but don't crash - reconnect recovery is best-effort
DebugLogger.log('Reconnect recovery failed: $e', scope: 'streaming/helper');
}
}
/// Unified streaming helper for chat send/regenerate flows.
///
/// This attaches WebSocket event handlers and manages background search/image-gen
@@ -203,19 +252,124 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
final socketSubscriptions = <VoidCallback>[];
final hasSocketSignals =
socketService != null || registerDeltaListener != null;
// Reference to image sync function - initialized to no-op, reassigned after definition.
// Must not be `late` to avoid LateInitializationError if callbacks fire early.
void Function() syncImages = () {};
// Shared helper to poll server for message content.
// Used by watchdog timeout and reconnection handler to recover from missed events.
// Returns (content, followUps, isDone) or null if fetch fails or message not found.
Future<({String content, List<String> followUps, bool isDone})?>
pollServerForMessage() async {
try {
final chatId = activeConversationId;
if (chatId == null || chatId.isEmpty) return null;
final resp = await api.dio.get('/api/v1/chats/$chatId');
final data = resp.data as Map<String, dynamic>?;
final chatObj = data?['chat'] as Map<String, dynamic>?;
if (chatObj == null) return null;
final list = chatObj['messages'];
if (list is! List) return null;
final serverMsg = list.firstWhere(
(m) => m is Map && m['id']?.toString() == assistantMessageId,
orElse: () => null,
);
if (serverMsg == null || serverMsg is! Map) return null;
// Extract content
final serverContent = serverMsg['content'];
String content = '';
if (serverContent is String) {
content = serverContent;
} else if (serverContent is List) {
final textItem = serverContent.firstWhere(
(i) => i is Map && i['type'] == 'text',
orElse: () => null,
);
if (textItem != null) {
content = textItem['text']?.toString() ?? '';
}
}
// Extract follow-ups (check both camelCase and snake_case keys)
// Use _parseFollowUpsField for consistent parsing with socket handler
final followUpsRaw = serverMsg['followUps'] ?? serverMsg['follow_ups'];
final followUps = _parseFollowUpsField(followUpsRaw);
// Check completion status
final isDone =
serverMsg['done'] == true ||
(serverMsg['isStreaming'] != true && content.isNotEmpty);
return (content: content, followUps: followUps, isDone: isDone);
} catch (e) {
DebugLogger.log('Server poll failed: $e', scope: 'streaming/helper');
return null;
}
}
// Helper to apply server content if it's better than local.
// Returns true if content was applied, so caller can trigger image sync.
bool applyServerContent(
String content,
List<String> followUps, {
required bool finishIfDone,
required bool isDone,
required String source,
}) {
final msgs = getMessages();
if (msgs.isEmpty || msgs.last.role != 'assistant') return false;
if (content.isNotEmpty && content.length >= msgs.last.content.length) {
DebugLogger.log(
'$source: adopting server content (${content.length} chars)',
scope: 'streaming/helper',
);
replaceLastMessageContent(content);
if (followUps.isNotEmpty) {
setFollowUps(assistantMessageId, followUps);
}
if (finishIfDone && isDone && msgs.last.isStreaming) {
wrappedFinishStreaming();
}
return true;
}
return false;
}
if (hasSocketSignals) {
// Use a short inactivity timeout - if no data arrives for 10 seconds,
// something is likely wrong with the connection. Combined with 1-second
// polling and server state sync, this provides fast recovery.
// Inactivity timeout - if no data arrives for 10 seconds, poll server
// and finish streaming. This handles stuck connections (issue #172).
socketWatchdog = InactivityWatchdog(
window: const Duration(seconds: 10),
// Absolute cap ensures streaming never gets stuck indefinitely
absoluteCap: const Duration(minutes: 5),
onTimeout: () {
onTimeout: () async {
DebugLogger.log(
'Socket watchdog timeout - finishing streaming gracefully',
'Socket watchdog timeout - polling server',
scope: 'streaming/helper',
);
final result = await pollServerForMessage();
if (result != null) {
final applied = applyServerContent(
result.content,
result.followUps,
finishIfDone: false, // We always finish on timeout
isDone: result.isDone,
source: 'Watchdog',
);
if (applied) {
syncImages();
}
}
// Clean up and finish
try {
for (final dispose in socketSubscriptions) {
try {
@@ -224,6 +378,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
}
socketSubscriptions.clear();
} catch (_) {}
try {
final msgs = getMessages();
if (msgs.isNotEmpty &&
@@ -236,103 +391,43 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
},
)..start();
// Subscribe to socket reconnection events to sync state after reconnect.
// This catches cases where the done signal was missed due to disconnection.
// Handle socket reconnection - update session IDs and check for missed events
if (socketService != null) {
StreamSubscription<void>? reconnectSub;
Timer? reconnectDelayTimer;
var reconnectSubDisposed = false;
reconnectSub = socketService.onReconnect.listen((_) {
DebugLogger.log(
'Socket reconnected - checking server state for missed signals',
'Socket reconnected - updating session ID',
scope: 'streaming/helper',
);
// Cancel any pending timer from a previous reconnect
// Update handler registrations with new session ID (issue #172 fix)
final newSessionId = socketService.sessionId;
final convId = activeConversationId;
if (newSessionId != null && convId != null && convId.isNotEmpty) {
socketService.updateSessionIdForConversation(convId, newSessionId);
}
// Brief delay then check server for missed completion
reconnectDelayTimer?.cancel();
// After reconnection, give a brief moment for any queued events
// then check server state to catch any missed completion signals.
// Use Timer instead of Future.delayed so it can be cancelled on dispose.
reconnectDelayTimer = Timer(const Duration(milliseconds: 500), () async {
// Check if disposed before executing
if (reconnectSubDisposed || hasFinished) return;
// Check current state before making the async call
var msgs = getMessages();
if (msgs.isEmpty || msgs.last.role != 'assistant') return;
if (!msgs.last.isStreaming) return;
// Fetch conversation from server to check if streaming actually completed
try {
final chatId = activeConversationId;
if (chatId != null && chatId.isNotEmpty) {
final resp = await api.dio.get('/api/v1/chats/$chatId');
// Re-check state after async call - it may have changed or been disposed
if (reconnectSubDisposed || hasFinished) return;
msgs = getMessages();
if (msgs.isEmpty || msgs.last.role != 'assistant') return;
if (!msgs.last.isStreaming) return;
final data = resp.data as Map<String, dynamic>?;
final chatObj = data?['chat'] as Map<String, dynamic>?;
if (chatObj != null) {
// Check if server has the completed message
final list = chatObj['messages'];
if (list is List) {
final serverMsg = list.firstWhere(
(m) =>
m is Map && m['id']?.toString() == assistantMessageId,
orElse: () => null,
);
if (serverMsg != null && serverMsg is Map) {
final serverContent = serverMsg['content'];
String content = '';
if (serverContent is String) {
content = serverContent;
} else if (serverContent is List) {
final textItem = serverContent.firstWhere(
(i) => i is Map && i['type'] == 'text',
orElse: () => null,
);
if (textItem != null) {
content = textItem['text']?.toString() ?? '';
}
}
// If server has content, adopt it and finish streaming
// Use current msgs (re-fetched after await) for comparison
if (content.isNotEmpty &&
content.length >= msgs.last.content.length) {
DebugLogger.log(
'Reconnect recovery: adopting server content (${content.length} chars)',
scope: 'streaming/helper',
);
replaceLastMessageContent(content);
wrappedFinishStreaming();
}
}
}
}
}
} catch (e) {
DebugLogger.log(
'Reconnect recovery fetch failed: $e',
scope: 'streaming/helper',
);
}
reconnectDelayTimer = Timer(const Duration(milliseconds: 500), () {
// Wrap async work in unawaited to handle errors properly
unawaited(
_handleReconnectRecovery(
hasFinished: () => hasFinished,
getMessages: getMessages,
pollServerForMessage: pollServerForMessage,
applyServerContent: applyServerContent,
syncImages: syncImages,
),
);
});
});
socketSubscriptions.add(() {
reconnectSubDisposed = true;
reconnectDelayTimer?.cancel();
reconnectDelayTimer = null;
reconnectSub?.cancel();
reconnectSub = null;
});
}
}
@@ -474,6 +569,9 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
} catch (_) {}
}
// Bind the late reference now that updateImagesFromCurrentContent is defined
syncImages = updateImagesFromCurrentContent;
bool refreshingSnapshot = false;
Future<void> refreshConversationSnapshot() async {
if (refreshingSnapshot) return;
@@ -642,12 +740,17 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
socketWatchdog?.ping();
// Increased timeout to match our more generous streaming timeouts
// OpenWebUI doesn't have such aggressive channel timeouts
Future.delayed(const Duration(minutes: 12), () {
// Use Timer instead of Future.delayed so it can be cancelled on cleanup
final channelTimeoutTimer = Timer(const Duration(minutes: 12), () {
try {
socketService?.offEvent(channel);
} catch (_) {}
socketWatchdog?.stop();
});
// Register cleanup so the timer is cancelled if streaming completes early
socketSubscriptions.add(() {
channelTimeoutTimer.cancel();
});
}
void chatHandler(
@@ -1311,12 +1414,30 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
}
},
onComplete: () {
// HTTP stream completed - cleanup already done in onDone handler.
// For WebSocket flows, actual completion is handled by socket events (done: true).
// Only finish streaming here if there are no socket subscriptions (simple/legacy flow).
// HTTP stream completed.
// With WebSocket-based streaming, HTTP closes immediately after returning task_id.
// All actual content comes via WebSocket events, so we should NOT finish streaming
// here if socket subscriptions are active - the socket done:true event will finish it.
DebugLogger.log(
'HTTP stream complete '
'(socketSubs=${socketSubscriptions.length}, socketConnected=${socketService?.isConnected})',
scope: 'streaming/helper',
);
// Only finish streaming if no socket subscriptions are active.
// If sockets are active, they will handle the completion via done:true event.
if (socketSubscriptions.isEmpty) {
DebugLogger.log(
'No socket subscriptions - finishing streaming on HTTP complete',
scope: 'streaming/helper',
);
wrappedFinishStreaming();
Future.microtask(refreshConversationSnapshot);
} else {
DebugLogger.log(
'Socket subscriptions active - waiting for socket done signal',
scope: 'streaming/helper',
);
}
},
onError: (error, stackTrace) async {
@@ -1334,10 +1455,10 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
// Check if this is a recoverable error (network issues, etc.)
final errorText = error.toString();
final isRecoverable =
(error is! FormatException &&
errorText.contains('SocketException')) ||
errorText.contains('TimeoutException') ||
errorText.contains('HandshakeException');
error is! FormatException &&
(errorText.contains('SocketException') ||
errorText.contains('TimeoutException') ||
errorText.contains('HandshakeException'));
if (isRecoverable && socketService != null) {
// Try to recover via socket connection if available

View File

@@ -5,6 +5,9 @@ import 'dart:async';
/// Call [ping] whenever activity occurs. If no activity happens
/// within [window], [onTimeout] fires. Optionally, an [absoluteCap]
/// enforces a maximum total duration regardless of activity.
///
/// The [onTimeout] callback can be sync or async - if async, it will be
/// awaited before the watchdog considers itself fully stopped.
class InactivityWatchdog {
InactivityWatchdog({
required Duration window,
@@ -13,16 +16,20 @@ class InactivityWatchdog {
}) : _window = window,
_absoluteCap = absoluteCap;
final void Function() onTimeout;
final FutureOr<void> Function() onTimeout;
Duration _window;
Duration? _absoluteCap;
Timer? _timer;
Timer? _absoluteTimer;
bool _started = false;
bool _firing = false;
Duration get window => _window;
/// Whether the timeout callback is currently executing.
bool get isFiring => _firing;
void setWindow(Duration newWindow) {
_window = newWindow;
if (_started) {
@@ -43,6 +50,8 @@ class InactivityWatchdog {
void start() {
if (_started) return;
// Prevent restart while callback is still executing to avoid double-fire
if (_firing) return;
_started = true;
_restart();
if (_absoluteCap != null) {
@@ -51,6 +60,8 @@ class InactivityWatchdog {
}
void ping() {
// Prevent restart while callback is still executing to avoid double-fire
if (_firing) return;
if (!_started) {
start();
return;
@@ -73,10 +84,24 @@ class InactivityWatchdog {
_timer = Timer(_window, _fire);
}
/// Synchronous entry point called by Timer. Kicks off async work.
void _fire() {
if (_firing) return; // Prevent re-entry
_firing = true;
stop();
// Execute the callback asynchronously. We don't await because Timer
// expects a sync callback, but the async work will complete in background.
_executeCallback();
}
/// Executes the timeout callback asynchronously.
Future<void> _executeCallback() async {
try {
onTimeout();
} catch (_) {}
await onTimeout();
} catch (_) {
// Swallow errors to prevent unhandled exceptions
} finally {
_firing = false;
}
}
}

View File

@@ -355,29 +355,41 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
final serverMessages = serverConversation.messages;
if (serverMessages.isNotEmpty && state.isNotEmpty) {
final serverLast = serverMessages.last;
final localLast = state.last;
// If server has the same message but says it's not streaming,
// or server has more content, sync from server
if (serverLast.id == localLast.id &&
serverLast.role == 'assistant') {
final serverDone = !serverLast.isStreaming;
final serverHasMoreContent =
serverLast.content.length > localLast.content.length;
// Case 1: Server has more messages than local - streaming must be done
if (serverMessages.length > state.length) {
DebugLogger.log(
'Server sync: server has more messages '
'(${serverMessages.length} vs ${state.length})',
scope: 'chat/providers',
);
state = serverMessages;
_cancelMessageStream();
return;
}
if (serverDone || serverHasMoreContent) {
DebugLogger.log(
'Server sync: adopting server state '
'(serverDone=$serverDone, serverHasMore=$serverHasMoreContent)',
scope: 'chat/providers',
);
state = serverMessages;
// Always cancel local streaming when adopting server state.
// If serverDone, streaming is complete. If serverHasMoreContent,
// we've adopted server content and continuing local streaming
// could cause conflicts or duplicate content.
_cancelMessageStream();
// Case 2: Find the local streaming message in server messages by ID
// This handles cases where last messages differ
if (localLast.role == 'assistant' && localLast.isStreaming) {
final serverVersion = serverMessages
.where((m) => m.id == localLast.id)
.firstOrNull;
if (serverVersion != null) {
final serverDone = !serverVersion.isStreaming;
final serverHasMoreContent =
serverVersion.content.length > localLast.content.length;
if (serverDone || serverHasMoreContent) {
DebugLogger.log(
'Server sync: adopting server state '
'(serverDone=$serverDone, serverHasMore=$serverHasMoreContent)',
scope: 'chat/providers',
);
state = serverMessages;
_cancelMessageStream();
}
}
}
}