From 0d5fcabea829ab4c1f72309c859251a26aef6e89 Mon Sep 17 00:00:00 2001 From: cogwheel0 <172976095+cogwheel0@users.noreply.github.com> Date: Sat, 27 Sep 2025 16:34:37 +0530 Subject: [PATCH] fix: streaming issues --- lib/core/services/api_service.dart | 18 ++- lib/core/services/streaming_helper.dart | 143 +++++++++++++++++- .../chat/providers/chat_providers.dart | 97 +++++++++++- 3 files changed, 245 insertions(+), 13 deletions(-) diff --git a/lib/core/services/api_service.dart b/lib/core/services/api_service.dart index 8814f0f..4dcab27 100644 --- a/lib/core/services/api_service.dart +++ b/lib/core/services/api_service.dart @@ -3073,8 +3073,9 @@ class ApiService { bool containsDone(String s) => s.contains('
= 3) { + // Increased threshold from 3 to 8 polls to be more conservative + // This gives ~7-8 seconds of stability before assuming completion + if (content.isNotEmpty && stableCount >= 8) { + DebugLogger.log( + 'Content stable for $stableCount polls, assuming completion', + scope: 'api/polling', + ); break; } diff --git a/lib/core/services/streaming_helper.dart b/lib/core/services/streaming_helper.dart index 0d66d88..00da4d9 100644 --- a/lib/core/services/streaming_helper.dart +++ b/lib/core/services/streaming_helper.dart @@ -66,10 +66,11 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ required void Function() finishStreaming, required List Function() getMessages, }) { - // Chunk the incoming stream for smoother UI updates + // Temporarily disable chunking to debug second turn issues + // OpenWebUI doesn't use complex chunking like this final chunkedStream = StreamChunker.chunkStream( stream, - enableChunking: true, + enableChunking: false, // Disabled for debugging minChunkSize: 5, maxChunkLength: 3, delayBetweenChunks: const Duration(milliseconds: 15), @@ -102,9 +103,14 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ InactivityWatchdog? socketWatchdog; final socketSubscriptions = []; if (socketService != null) { + // Increase timeout to match OpenWebUI's more generous timeouts for long responses socketWatchdog = InactivityWatchdog( - window: const Duration(minutes: 5), + window: const Duration(minutes: 15), // Increased from 5 to 15 minutes onTimeout: () { + DebugLogger.log( + 'Socket watchdog timeout - finishing streaming gracefully', + scope: 'streaming/helper', + ); try { for (final sub in socketSubscriptions) { sub.dispose(); @@ -301,11 +307,24 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ void channelLineHandlerFactory(String channel) { void handler(dynamic line) { try { + DebugLogger.log( + 'Channel $channel received line: ${line.toString().length > 50 ? line.toString().substring(0, 50) + "..." : line.toString()}', + scope: 'streaming/helper', + ); if (line is String) { final s = line.trim(); socketWatchdog?.ping(); - if (s == '[DONE]' || s == 'DONE') { + // Enhanced completion detection matching OpenWebUI patterns + if (s == '[DONE]' || s == 'DONE' || s == 'data: [DONE]') { + DebugLogger.log( + 'Received completion signal: $s', + scope: 'streaming/helper', + ); try { + DebugLogger.log( + 'Unregistering channel handler for: $channel (completion)', + scope: 'streaming/helper', + ); socketService?.offEvent(channel); } catch (_) {} try { @@ -415,11 +434,25 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } try { + DebugLogger.log( + 'Registering channel handler for: $channel', + scope: 'streaming/helper', + ); socketService?.onEvent(channel, handler); } catch (_) {} socketWatchdog?.ping(); - Future.delayed(const Duration(minutes: 3), () { + // Increased timeout to match our more generous streaming timeouts + // OpenWebUI doesn't have such aggressive channel timeouts + Future.delayed(const Duration(minutes: 12), () { + DebugLogger.log( + 'Channel handler timeout reached for $channel', + scope: 'streaming/helper', + ); try { + DebugLogger.log( + 'Unregistering channel handler for: $channel (timeout)', + scope: 'streaming/helper', + ); socketService?.offEvent(channel); } catch (_) {} socketWatchdog?.stop(); @@ -431,6 +464,10 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ void Function(dynamic response)? ack, ) { try { + DebugLogger.log( + 'Received chat event: ${ev.keys.join(", ")}', + scope: 'streaming/helper', + ); final data = ev['data']; if (data == null) return; final type = data['type']; @@ -763,15 +800,33 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ // Incremental message content over socket final content = payload['content']?.toString() ?? ''; if (content.isNotEmpty) { + DebugLogger.log( + 'Appending socket content: "${content.length > 30 ? content.substring(0, 30) + "..." : content}"', + scope: 'streaming/helper', + ); appendToLastMessage(content); updateImagesFromCurrentContent(); + } else { + DebugLogger.log( + 'Socket delta event with empty content', + scope: 'streaming/helper', + ); } } else if ((type == 'chat:message' || type == 'replace') && payload != null) { // Full message replacement over socket final content = payload['content']?.toString() ?? ''; if (content.isNotEmpty) { + DebugLogger.log( + 'Replacing socket content: "${content.length > 30 ? content.substring(0, 30) + "..." : content}"', + scope: 'streaming/helper', + ); replaceLastMessageContent(content); + } else { + DebugLogger.log( + 'Socket replace event with empty content', + scope: 'streaming/helper', + ); } } else if ((type == 'chat:message:files') && payload != null) { // Alias for files event used by web client @@ -908,6 +963,10 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ void Function(dynamic response)? ack, ) { try { + DebugLogger.log( + 'Received channel event: ${ev.keys.join(", ")}', + scope: 'streaming/helper', + ); final data = ev['data']; if (data == null) return; final type = data['type']; @@ -923,6 +982,10 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } if (socketService != null) { + DebugLogger.log( + 'Creating socket subscriptions for conversationId: $activeConversationId, sessionId: $sessionId', + scope: 'streaming/helper', + ); final chatSub = socketService.addChatEventHandler( conversationId: activeConversationId, sessionId: sessionId, @@ -938,10 +1001,23 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ handler: channelEventsHandler, ); socketSubscriptions.add(channelSub); + DebugLogger.log( + 'Created ${socketSubscriptions.length} socket subscriptions', + scope: 'streaming/helper', + ); + } else { + DebugLogger.log( + 'No socket service available - using polling only', + scope: 'streaming/helper', + ); } final subscription = persistentController.stream.listen( (chunk) { + DebugLogger.log( + 'Received chunk: "${chunk.length > 100 ? chunk.substring(0, 100) + "..." : chunk}"', + scope: 'streaming/helper', + ); var effectiveChunk = chunk; if (webSearchEnabled && !isSearching) { if (chunk.contains('[SEARCHING]') || @@ -976,19 +1052,74 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } }, onDone: () async { + DebugLogger.log('Stream completed normally', scope: 'streaming/helper'); + // Unregister from persistent service persistentService.unregisterStream(streamId); - // If no socket subscriptions are active, treat this as a poll-driven flow + // Only finish streaming if no socket subscriptions are active + // This indicates a polling-driven flow where the stream ending means completion + // For socket flows, completion should be handled by socket events (done: true) + DebugLogger.log( + 'Stream onDone - socketSubscriptions.length: ${socketSubscriptions.length}', + scope: 'streaming/helper', + ); if (socketSubscriptions.isEmpty) { + DebugLogger.log( + 'No socket subscriptions - finishing polling-based stream', + scope: 'streaming/helper', + ); finishStreaming(); Future.microtask(refreshConversationSnapshot); + } else { + DebugLogger.log( + 'Socket subscriptions active - keeping stream alive for socket events', + scope: 'streaming/helper', + ); } }, onError: (error) async { + DebugLogger.error( + 'Stream error occurred', + scope: 'streaming/helper', + error: error, + data: { + 'conversationId': activeConversationId, + 'messageId': assistantMessageId, + 'modelId': modelId, + }, + ); + try { persistentService.unregisterStream(streamId); } catch (_) {} + + // Check if this is a recoverable error (network issues, etc.) + final isRecoverable = + error is! FormatException && + error.toString().contains('SocketException') || + error.toString().contains('TimeoutException') || + error.toString().contains('HandshakeException'); + + if (isRecoverable && socketService != null) { + DebugLogger.log( + 'Attempting to recover from recoverable stream error', + scope: 'streaming/helper', + ); + + // Try to recover via socket connection if available + try { + await socketService.ensureConnected( + timeout: const Duration(seconds: 5), + ); + // Don't finish streaming immediately - let socket recovery handle it + socketWatchdog?.stop(); + return; + } catch (_) { + // Socket recovery failed, fall through to cleanup + } + } + disposeSocketSubscriptions(); finishStreaming(); Future.microtask(refreshConversationSnapshot); diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index f2a169a..373577c 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -93,6 +93,7 @@ class ChatMessagesNotifier extends Notifier> { VoidCallback? _socketTeardown; // Activity-based watchdog to prevent stuck typing indicator InactivityWatchdog? _typingWatchdog; + DateTime? _lastStreamingActivity; bool _initialized = false; @@ -284,6 +285,7 @@ class ChatMessagesNotifier extends Notifier> { } void _touchStreamingActivity() { + _lastStreamingActivity = DateTime.now(); // Keep guard alive while streaming if (state.isNotEmpty) { final last = state.last; @@ -322,6 +324,30 @@ class ChatMessagesNotifier extends Notifier> { } } + // Enhanced streaming recovery method similar to OpenWebUI's approach + void recoverStreamingIfNeeded() { + if (state.isEmpty) return; + + final lastMessage = state.last; + if (lastMessage.role != 'assistant' || !lastMessage.isStreaming) return; + + // Check if streaming has been inactive for too long + final now = DateTime.now(); + if (_lastStreamingActivity != null) { + final inactiveTime = now.difference(_lastStreamingActivity!); + // If inactive for more than 3 minutes, consider recovery + if (inactiveTime > const Duration(minutes: 3)) { + DebugLogger.log( + 'Streaming inactive for ${inactiveTime.inSeconds}s, attempting recovery', + scope: 'chat/provider', + ); + + // Try to gracefully finish the streaming state + finishStreaming(); + } + } + } + // Public wrapper to cancel the currently active stream (used by Stop) void cancelActiveMessageStream() { _cancelMessageStream(); @@ -363,6 +389,10 @@ class ChatMessagesNotifier extends Notifier> { } void setMessageStream(StreamSubscription stream) { + DebugLogger.log( + 'Setting new message stream, cancelling previous', + scope: 'chat/provider', + ); _cancelMessageStream(); _messageStream = stream; @@ -374,6 +404,10 @@ class ChatMessagesNotifier extends Notifier> { List subscriptions, { VoidCallback? onDispose, }) { + DebugLogger.log( + 'Setting ${subscriptions.length} socket subscriptions, cancelling previous', + scope: 'chat/provider', + ); cancelSocketSubscriptions(); _socketSubscriptions.addAll(subscriptions); _socketTeardown = onDispose; @@ -396,6 +430,10 @@ class ChatMessagesNotifier extends Notifier> { } void addMessage(ChatMessage message) { + DebugLogger.log( + 'addMessage: ${message.role} message (id: ${message.id}, streaming: ${message.isStreaming})', + scope: 'chat/provider', + ); state = [...state, message]; if (message.role == 'assistant' && message.isStreaming) { _touchStreamingActivity(); @@ -518,15 +556,32 @@ class ChatMessagesNotifier extends Notifier> { } void appendToLastMessage(String content) { + DebugLogger.log( + 'appendToLastMessage called with: "${content.length > 30 ? content.substring(0, 30) + "..." : content}"', + scope: 'chat/provider', + ); + if (state.isEmpty) { + DebugLogger.log( + 'appendToLastMessage: state is empty', + scope: 'chat/provider', + ); return; } final lastMessage = state.last; if (lastMessage.role != 'assistant') { + DebugLogger.log( + 'appendToLastMessage: last message is not assistant (${lastMessage.role})', + scope: 'chat/provider', + ); return; } if (!lastMessage.isStreaming) { + DebugLogger.log( + 'appendToLastMessage: last message is not streaming', + scope: 'chat/provider', + ); // Ignore late chunks when streaming already finished return; } @@ -543,6 +598,10 @@ class ChatMessagesNotifier extends Notifier> { } final newContent = current.isEmpty ? content : current + content; + DebugLogger.log( + 'appendToLastMessage: updating UI with new content length: ${newContent.length}', + scope: 'chat/provider', + ); state = [ ...state.sublist(0, state.length - 1), lastMessage.copyWith(content: newContent), @@ -551,12 +610,25 @@ class ChatMessagesNotifier extends Notifier> { } void replaceLastMessageContent(String content) { + DebugLogger.log( + 'replaceLastMessageContent called with: "${content.length > 30 ? content.substring(0, 30) + "..." : content}"', + scope: 'chat/provider', + ); + if (state.isEmpty) { + DebugLogger.log( + 'replaceLastMessageContent: state is empty', + scope: 'chat/provider', + ); return; } final lastMessage = state.last; if (lastMessage.role != 'assistant') { + DebugLogger.log( + 'replaceLastMessageContent: last message is not assistant (${lastMessage.role})', + scope: 'chat/provider', + ); return; } @@ -570,6 +642,10 @@ class ChatMessagesNotifier extends Notifier> { if (sanitized.startsWith(searchBanner)) { sanitized = sanitized.substring(searchBanner.length); } + DebugLogger.log( + 'replaceLastMessageContent: updating UI with sanitized content length: ${sanitized.length}', + scope: 'chat/provider', + ); state = [ ...state.sublist(0, state.length - 1), lastMessage.copyWith(content: sanitized), @@ -578,10 +654,23 @@ class ChatMessagesNotifier extends Notifier> { } void finishStreaming() { - if (state.isEmpty) return; + DebugLogger.log('finishStreaming called', scope: 'chat/provider'); + if (state.isEmpty) { + DebugLogger.log( + 'finishStreaming: state is empty', + scope: 'chat/provider', + ); + return; + } final lastMessage = state.last; - if (lastMessage.role != 'assistant' || !lastMessage.isStreaming) return; + if (lastMessage.role != 'assistant' || !lastMessage.isStreaming) { + DebugLogger.log( + 'finishStreaming: last message is not streaming assistant (role: ${lastMessage.role}, streaming: ${lastMessage.isStreaming})', + scope: 'chat/provider', + ); + return; + } // Also strip any leftover typing indicator before finalizing const ti = '[TYPING_INDICATOR]'; @@ -594,6 +683,10 @@ class ChatMessagesNotifier extends Notifier> { cleaned = cleaned.substring(searchBanner.length); } + DebugLogger.log( + 'finishStreaming: setting isStreaming=false and content length: ${cleaned.length}', + scope: 'chat/provider', + ); state = [ ...state.sublist(0, state.length - 1), lastMessage.copyWith(isStreaming: false, content: cleaned),