feat(streaming): Improve socket reconnection and timeout handling
This commit is contained in:
@@ -30,6 +30,13 @@ class SocketService with WidgetsBindingObserver {
|
|||||||
final Map<String, _ChannelEventRegistration> _channelEventHandlers = {};
|
final Map<String, _ChannelEventRegistration> _channelEventHandlers = {};
|
||||||
int _handlerSeed = 0;
|
int _handlerSeed = 0;
|
||||||
|
|
||||||
|
/// Stream controller that emits when a socket reconnection occurs.
|
||||||
|
/// Listeners can use this to sync state after a reconnect.
|
||||||
|
final _reconnectController = StreamController<void>.broadcast();
|
||||||
|
|
||||||
|
/// Stream that emits when a socket reconnection occurs.
|
||||||
|
Stream<void> get onReconnect => _reconnectController.stream;
|
||||||
|
|
||||||
SocketService({
|
SocketService({
|
||||||
required this.serverConfig,
|
required this.serverConfig,
|
||||||
String? authToken,
|
String? authToken,
|
||||||
@@ -214,6 +221,7 @@ class SocketService with WidgetsBindingObserver {
|
|||||||
WidgetsBinding.instance.removeObserver(this);
|
WidgetsBinding.instance.removeObserver(this);
|
||||||
_chatEventHandlers.clear();
|
_chatEventHandlers.clear();
|
||||||
_channelEventHandlers.clear();
|
_channelEventHandlers.clear();
|
||||||
|
_reconnectController.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Best-effort: ensure there is an active connection and wait briefly.
|
// Best-effort: ensure there is an active connection and wait briefly.
|
||||||
@@ -279,6 +287,10 @@ class SocketService with WidgetsBindingObserver {
|
|||||||
'auth': {'token': _authToken},
|
'auth': {'token': _authToken},
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
// Notify listeners that a reconnection occurred so they can refresh state
|
||||||
|
if (!_reconnectController.isClosed) {
|
||||||
|
_reconnectController.add(null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void _handleConnectError(dynamic err) {}
|
void _handleConnectError(dynamic err) {}
|
||||||
|
|||||||
@@ -204,9 +204,12 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
|||||||
final hasSocketSignals =
|
final hasSocketSignals =
|
||||||
socketService != null || registerDeltaListener != null;
|
socketService != null || registerDeltaListener != null;
|
||||||
if (hasSocketSignals) {
|
if (hasSocketSignals) {
|
||||||
// Increase timeout to match OpenWebUI's more generous timeouts for long responses
|
// Use a reasonable inactivity timeout - if no data arrives for 45 seconds,
|
||||||
|
// something is likely wrong with the connection
|
||||||
socketWatchdog = InactivityWatchdog(
|
socketWatchdog = InactivityWatchdog(
|
||||||
window: const Duration(minutes: 15), // Increased from 5 to 15 minutes
|
window: const Duration(seconds: 45),
|
||||||
|
// Absolute cap ensures streaming never gets stuck indefinitely
|
||||||
|
absoluteCap: const Duration(minutes: 10),
|
||||||
onTimeout: () {
|
onTimeout: () {
|
||||||
DebugLogger.log(
|
DebugLogger.log(
|
||||||
'Socket watchdog timeout - finishing streaming gracefully',
|
'Socket watchdog timeout - finishing streaming gracefully',
|
||||||
@@ -231,6 +234,106 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
|||||||
socketWatchdog?.stop();
|
socketWatchdog?.stop();
|
||||||
},
|
},
|
||||||
)..start();
|
)..start();
|
||||||
|
|
||||||
|
// Subscribe to socket reconnection events to sync state after reconnect.
|
||||||
|
// This catches cases where the done signal was missed due to disconnection.
|
||||||
|
if (socketService != null) {
|
||||||
|
StreamSubscription<void>? reconnectSub;
|
||||||
|
Timer? reconnectDelayTimer;
|
||||||
|
var reconnectSubDisposed = false;
|
||||||
|
|
||||||
|
reconnectSub = socketService.onReconnect.listen((_) {
|
||||||
|
DebugLogger.log(
|
||||||
|
'Socket reconnected - checking server state for missed signals',
|
||||||
|
scope: 'streaming/helper',
|
||||||
|
);
|
||||||
|
|
||||||
|
// Cancel any pending timer from a previous reconnect
|
||||||
|
reconnectDelayTimer?.cancel();
|
||||||
|
|
||||||
|
// After reconnection, give a brief moment for any queued events
|
||||||
|
// then check server state to catch any missed completion signals.
|
||||||
|
// Use Timer instead of Future.delayed so it can be cancelled on dispose.
|
||||||
|
reconnectDelayTimer = Timer(const Duration(milliseconds: 500), () async {
|
||||||
|
// Check if disposed before executing
|
||||||
|
if (reconnectSubDisposed || hasFinished) return;
|
||||||
|
|
||||||
|
// Check current state before making the async call
|
||||||
|
var msgs = getMessages();
|
||||||
|
if (msgs.isEmpty || msgs.last.role != 'assistant') return;
|
||||||
|
if (!msgs.last.isStreaming) return;
|
||||||
|
|
||||||
|
// Fetch conversation from server to check if streaming actually completed
|
||||||
|
try {
|
||||||
|
final chatId = activeConversationId;
|
||||||
|
if (chatId != null && chatId.isNotEmpty) {
|
||||||
|
final resp = await api.dio.get('/api/v1/chats/$chatId');
|
||||||
|
|
||||||
|
// Re-check state after async call - it may have changed or been disposed
|
||||||
|
if (reconnectSubDisposed || hasFinished) return;
|
||||||
|
msgs = getMessages();
|
||||||
|
if (msgs.isEmpty || msgs.last.role != 'assistant') return;
|
||||||
|
if (!msgs.last.isStreaming) return;
|
||||||
|
|
||||||
|
final data = resp.data as Map<String, dynamic>?;
|
||||||
|
final chatObj = data?['chat'] as Map<String, dynamic>?;
|
||||||
|
|
||||||
|
if (chatObj != null) {
|
||||||
|
// Check if server has the completed message
|
||||||
|
final list = chatObj['messages'];
|
||||||
|
if (list is List) {
|
||||||
|
final serverMsg = list.firstWhere(
|
||||||
|
(m) =>
|
||||||
|
m is Map && m['id']?.toString() == assistantMessageId,
|
||||||
|
orElse: () => null,
|
||||||
|
);
|
||||||
|
if (serverMsg != null && serverMsg is Map) {
|
||||||
|
final serverContent = serverMsg['content'];
|
||||||
|
String content = '';
|
||||||
|
if (serverContent is String) {
|
||||||
|
content = serverContent;
|
||||||
|
} else if (serverContent is List) {
|
||||||
|
final textItem = serverContent.firstWhere(
|
||||||
|
(i) => i is Map && i['type'] == 'text',
|
||||||
|
orElse: () => null,
|
||||||
|
);
|
||||||
|
if (textItem != null) {
|
||||||
|
content = textItem['text']?.toString() ?? '';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If server has content, adopt it and finish streaming
|
||||||
|
// Use current msgs (re-fetched after await) for comparison
|
||||||
|
if (content.isNotEmpty &&
|
||||||
|
content.length >= msgs.last.content.length) {
|
||||||
|
DebugLogger.log(
|
||||||
|
'Reconnect recovery: adopting server content (${content.length} chars)',
|
||||||
|
scope: 'streaming/helper',
|
||||||
|
);
|
||||||
|
replaceLastMessageContent(content);
|
||||||
|
wrappedFinishStreaming();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
DebugLogger.log(
|
||||||
|
'Reconnect recovery fetch failed: $e',
|
||||||
|
scope: 'streaming/helper',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
socketSubscriptions.add(() {
|
||||||
|
reconnectSubDisposed = true;
|
||||||
|
reconnectDelayTimer?.cancel();
|
||||||
|
reconnectDelayTimer = null;
|
||||||
|
reconnectSub?.cancel();
|
||||||
|
reconnectSub = null;
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Timer? imageCollectionDebounce;
|
Timer? imageCollectionDebounce;
|
||||||
|
|||||||
@@ -135,7 +135,12 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
|
|||||||
final serverMessages = next?.messages ?? const [];
|
final serverMessages = next?.messages ?? const [];
|
||||||
// Primary rule: adopt server messages when there are strictly more of them.
|
// Primary rule: adopt server messages when there are strictly more of them.
|
||||||
if (serverMessages.length > state.length) {
|
if (serverMessages.length > state.length) {
|
||||||
|
// Check streaming state BEFORE updating state
|
||||||
|
final needsCleanup = _shouldCleanupStreamingFromServer(
|
||||||
|
serverMessages,
|
||||||
|
);
|
||||||
state = serverMessages;
|
state = serverMessages;
|
||||||
|
if (needsCleanup) _cancelMessageStream();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -152,10 +157,20 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
|
|||||||
serverText.isNotEmpty && serverText.length > localText.length;
|
serverText.isNotEmpty && serverText.length > localText.length;
|
||||||
final localEmptyButServerHas =
|
final localEmptyButServerHas =
|
||||||
localText.isEmpty && serverText.isNotEmpty;
|
localText.isEmpty && serverText.isNotEmpty;
|
||||||
|
// Also recover if server says streaming is done but local still streaming
|
||||||
|
final serverDoneButLocalStreaming =
|
||||||
|
!serverLast.isStreaming && localLast.isStreaming;
|
||||||
if (sameLastId &&
|
if (sameLastId &&
|
||||||
isAssistant &&
|
isAssistant &&
|
||||||
(serverHasMore || localEmptyButServerHas)) {
|
(serverHasMore ||
|
||||||
|
localEmptyButServerHas ||
|
||||||
|
serverDoneButLocalStreaming)) {
|
||||||
|
// Check streaming state BEFORE updating state
|
||||||
|
final needsCleanup = _shouldCleanupStreamingFromServer(
|
||||||
|
serverMessages,
|
||||||
|
);
|
||||||
state = serverMessages;
|
state = serverMessages;
|
||||||
|
if (needsCleanup) _cancelMessageStream();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -217,6 +232,44 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
|
|||||||
_activeStreamingMessageId = null;
|
_activeStreamingMessageId = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Checks if streaming cleanup is needed when adopting server messages.
|
||||||
|
/// Must be called BEFORE updating state, as it compares current local state
|
||||||
|
/// with incoming server state.
|
||||||
|
bool _shouldCleanupStreamingFromServer(List<ChatMessage> serverMessages) {
|
||||||
|
if (serverMessages.isEmpty) return false;
|
||||||
|
if (!_hasStreamingAssistant) return false;
|
||||||
|
|
||||||
|
// Find the local streaming assistant message
|
||||||
|
final localStreamingMsg = state.lastWhere(
|
||||||
|
(m) => m.role == 'assistant' && m.isStreaming,
|
||||||
|
orElse: () => state.last,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Find the same message in server messages by ID
|
||||||
|
final serverMsg = serverMessages.where((m) => m.id == localStreamingMsg.id);
|
||||||
|
if (serverMsg.isNotEmpty && !serverMsg.first.isStreaming) {
|
||||||
|
DebugLogger.log(
|
||||||
|
'Server indicates streaming complete for message ${localStreamingMsg.id}',
|
||||||
|
scope: 'chat/providers',
|
||||||
|
);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Also check if server has MORE messages than local - if so, streaming must be done
|
||||||
|
// (e.g., server has [assistant(done), user] but local only has [assistant(streaming)])
|
||||||
|
if (serverMessages.length > state.length) {
|
||||||
|
// Server has additional messages, so any local streaming must have completed
|
||||||
|
DebugLogger.log(
|
||||||
|
'Server has more messages (${serverMessages.length} vs ${state.length}) - '
|
||||||
|
'streaming must be complete',
|
||||||
|
scope: 'chat/providers',
|
||||||
|
);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
bool get _hasStreamingAssistant {
|
bool get _hasStreamingAssistant {
|
||||||
if (state.isEmpty) return false;
|
if (state.isEmpty) return false;
|
||||||
final last = state.last;
|
final last = state.last;
|
||||||
@@ -262,16 +315,63 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
|
|||||||
|
|
||||||
_taskStatusCheckInFlight = true;
|
_taskStatusCheckInFlight = true;
|
||||||
try {
|
try {
|
||||||
|
// Check both task status and server message state
|
||||||
final taskIds = await api.getTaskIdsByChat(activeConversation.id);
|
final taskIds = await api.getTaskIdsByChat(activeConversation.id);
|
||||||
if (taskIds.isEmpty) {
|
final hasActiveTasks = taskIds.isNotEmpty;
|
||||||
if (_observedRemoteTask && _hasStreamingAssistant) {
|
|
||||||
finishStreaming();
|
if (hasActiveTasks) {
|
||||||
} else if (!_observedRemoteTask) {
|
|
||||||
// No tasks reported yet; keep monitoring to allow registration.
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
_observedRemoteTask = true;
|
_observedRemoteTask = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// When no active tasks and we previously observed tasks, streaming should be done.
|
||||||
|
// Run secondary check to sync any missed content from server.
|
||||||
|
final tasksDone = _observedRemoteTask && !hasActiveTasks;
|
||||||
|
|
||||||
|
// Secondary check: fetch conversation from server and compare message state
|
||||||
|
// This catches cases where the done signal was missed AND syncs any missed content.
|
||||||
|
// Only run when tasks have actually completed (were observed and are now gone),
|
||||||
|
// not on every poll before tasks are registered.
|
||||||
|
if (_hasStreamingAssistant && tasksDone) {
|
||||||
|
try {
|
||||||
|
final serverConversation = await api.getConversation(
|
||||||
|
activeConversation.id,
|
||||||
|
);
|
||||||
|
final serverMessages = serverConversation.messages;
|
||||||
|
|
||||||
|
if (serverMessages.isNotEmpty && state.isNotEmpty) {
|
||||||
|
final serverLast = serverMessages.last;
|
||||||
|
final localLast = state.last;
|
||||||
|
|
||||||
|
// If server has the same message but says it's not streaming,
|
||||||
|
// or server has more content, sync from server
|
||||||
|
if (serverLast.id == localLast.id &&
|
||||||
|
serverLast.role == 'assistant') {
|
||||||
|
final serverDone = !serverLast.isStreaming;
|
||||||
|
final serverHasMoreContent =
|
||||||
|
serverLast.content.length > localLast.content.length;
|
||||||
|
|
||||||
|
if (serverDone || serverHasMoreContent) {
|
||||||
|
DebugLogger.log(
|
||||||
|
'Server sync: adopting server state '
|
||||||
|
'(serverDone=$serverDone, serverHasMore=$serverHasMoreContent)',
|
||||||
|
scope: 'chat/providers',
|
||||||
|
);
|
||||||
|
state = serverMessages;
|
||||||
|
// Always cancel local streaming when adopting server state.
|
||||||
|
// If serverDone, streaming is complete. If serverHasMoreContent,
|
||||||
|
// we've adopted server content and continuing local streaming
|
||||||
|
// could cause conflicts or duplicate content.
|
||||||
|
_cancelMessageStream();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
DebugLogger.log(
|
||||||
|
'Server conversation fetch failed: $e',
|
||||||
|
scope: 'chat/providers',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch (err, stack) {
|
} catch (err, stack) {
|
||||||
DebugLogger.log('Task status poll failed: $err', scope: 'chat/provider');
|
DebugLogger.log('Task status poll failed: $err', scope: 'chat/provider');
|
||||||
debugPrintStack(stackTrace: stack);
|
debugPrintStack(stackTrace: stack);
|
||||||
|
|||||||
Reference in New Issue
Block a user