fix: streaming issues

This commit is contained in:
cogwheel0
2025-09-27 16:34:37 +05:30
parent 8ebf17f712
commit 0d5fcabea8
3 changed files with 245 additions and 13 deletions

View File

@@ -3073,8 +3073,9 @@ class ApiService {
bool containsDone(String s) =>
s.contains('<details type="tool_calls"') && s.contains('done="true"');
// Allow longer time for large completions (e.g., long stories)
while (DateTime.now().difference(started).inSeconds < 180) {
// Allow much longer time for large completions, matching OpenWebUI's generous timeouts
while (DateTime.now().difference(started).inSeconds < 600) {
// Increased from 180 to 600 seconds (10 minutes)
try {
// Small delay between polls
await Future.delayed(const Duration(milliseconds: 900));
@@ -3236,15 +3237,22 @@ class ApiService {
break;
}
// If content hasn't changed for a few polls, assume completion,
// but do not early-exit while content is still empty.
// If content hasn't changed for several polls, assume completion,
// but be more conservative to avoid cutting off long responses.
// OpenWebUI relies more on explicit done signals than stability checks.
final prev = last;
if (content == prev && content.isNotEmpty) {
stableCount++;
} else if (content != prev) {
stableCount = 0;
}
if (content.isNotEmpty && stableCount >= 3) {
// Increased threshold from 3 to 8 polls to be more conservative
// This gives ~7-8 seconds of stability before assuming completion
if (content.isNotEmpty && stableCount >= 8) {
DebugLogger.log(
'Content stable for $stableCount polls, assuming completion',
scope: 'api/polling',
);
break;
}

View File

@@ -66,10 +66,11 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
required void Function() finishStreaming,
required List<ChatMessage> Function() getMessages,
}) {
// Chunk the incoming stream for smoother UI updates
// Temporarily disable chunking to debug second turn issues
// OpenWebUI doesn't use complex chunking like this
final chunkedStream = StreamChunker.chunkStream(
stream,
enableChunking: true,
enableChunking: false, // Disabled for debugging
minChunkSize: 5,
maxChunkLength: 3,
delayBetweenChunks: const Duration(milliseconds: 15),
@@ -102,9 +103,14 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
InactivityWatchdog? socketWatchdog;
final socketSubscriptions = <SocketEventSubscription>[];
if (socketService != null) {
// Increase timeout to match OpenWebUI's more generous timeouts for long responses
socketWatchdog = InactivityWatchdog(
window: const Duration(minutes: 5),
window: const Duration(minutes: 15), // Increased from 5 to 15 minutes
onTimeout: () {
DebugLogger.log(
'Socket watchdog timeout - finishing streaming gracefully',
scope: 'streaming/helper',
);
try {
for (final sub in socketSubscriptions) {
sub.dispose();
@@ -301,11 +307,24 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
void channelLineHandlerFactory(String channel) {
void handler(dynamic line) {
try {
DebugLogger.log(
'Channel $channel received line: ${line.toString().length > 50 ? line.toString().substring(0, 50) + "..." : line.toString()}',
scope: 'streaming/helper',
);
if (line is String) {
final s = line.trim();
socketWatchdog?.ping();
if (s == '[DONE]' || s == 'DONE') {
// Enhanced completion detection matching OpenWebUI patterns
if (s == '[DONE]' || s == 'DONE' || s == 'data: [DONE]') {
DebugLogger.log(
'Received completion signal: $s',
scope: 'streaming/helper',
);
try {
DebugLogger.log(
'Unregistering channel handler for: $channel (completion)',
scope: 'streaming/helper',
);
socketService?.offEvent(channel);
} catch (_) {}
try {
@@ -415,11 +434,25 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
}
try {
DebugLogger.log(
'Registering channel handler for: $channel',
scope: 'streaming/helper',
);
socketService?.onEvent(channel, handler);
} catch (_) {}
socketWatchdog?.ping();
Future.delayed(const Duration(minutes: 3), () {
// Increased timeout to match our more generous streaming timeouts
// OpenWebUI doesn't have such aggressive channel timeouts
Future.delayed(const Duration(minutes: 12), () {
DebugLogger.log(
'Channel handler timeout reached for $channel',
scope: 'streaming/helper',
);
try {
DebugLogger.log(
'Unregistering channel handler for: $channel (timeout)',
scope: 'streaming/helper',
);
socketService?.offEvent(channel);
} catch (_) {}
socketWatchdog?.stop();
@@ -431,6 +464,10 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
void Function(dynamic response)? ack,
) {
try {
DebugLogger.log(
'Received chat event: ${ev.keys.join(", ")}',
scope: 'streaming/helper',
);
final data = ev['data'];
if (data == null) return;
final type = data['type'];
@@ -763,15 +800,33 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
// Incremental message content over socket
final content = payload['content']?.toString() ?? '';
if (content.isNotEmpty) {
DebugLogger.log(
'Appending socket content: "${content.length > 30 ? content.substring(0, 30) + "..." : content}"',
scope: 'streaming/helper',
);
appendToLastMessage(content);
updateImagesFromCurrentContent();
} else {
DebugLogger.log(
'Socket delta event with empty content',
scope: 'streaming/helper',
);
}
} else if ((type == 'chat:message' || type == 'replace') &&
payload != null) {
// Full message replacement over socket
final content = payload['content']?.toString() ?? '';
if (content.isNotEmpty) {
DebugLogger.log(
'Replacing socket content: "${content.length > 30 ? content.substring(0, 30) + "..." : content}"',
scope: 'streaming/helper',
);
replaceLastMessageContent(content);
} else {
DebugLogger.log(
'Socket replace event with empty content',
scope: 'streaming/helper',
);
}
} else if ((type == 'chat:message:files') && payload != null) {
// Alias for files event used by web client
@@ -908,6 +963,10 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
void Function(dynamic response)? ack,
) {
try {
DebugLogger.log(
'Received channel event: ${ev.keys.join(", ")}',
scope: 'streaming/helper',
);
final data = ev['data'];
if (data == null) return;
final type = data['type'];
@@ -923,6 +982,10 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
}
if (socketService != null) {
DebugLogger.log(
'Creating socket subscriptions for conversationId: $activeConversationId, sessionId: $sessionId',
scope: 'streaming/helper',
);
final chatSub = socketService.addChatEventHandler(
conversationId: activeConversationId,
sessionId: sessionId,
@@ -938,10 +1001,23 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
handler: channelEventsHandler,
);
socketSubscriptions.add(channelSub);
DebugLogger.log(
'Created ${socketSubscriptions.length} socket subscriptions',
scope: 'streaming/helper',
);
} else {
DebugLogger.log(
'No socket service available - using polling only',
scope: 'streaming/helper',
);
}
final subscription = persistentController.stream.listen(
(chunk) {
DebugLogger.log(
'Received chunk: "${chunk.length > 100 ? chunk.substring(0, 100) + "..." : chunk}"',
scope: 'streaming/helper',
);
var effectiveChunk = chunk;
if (webSearchEnabled && !isSearching) {
if (chunk.contains('[SEARCHING]') ||
@@ -976,19 +1052,74 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
}
},
onDone: () async {
DebugLogger.log('Stream completed normally', scope: 'streaming/helper');
// Unregister from persistent service
persistentService.unregisterStream(streamId);
// If no socket subscriptions are active, treat this as a poll-driven flow
// Only finish streaming if no socket subscriptions are active
// This indicates a polling-driven flow where the stream ending means completion
// For socket flows, completion should be handled by socket events (done: true)
DebugLogger.log(
'Stream onDone - socketSubscriptions.length: ${socketSubscriptions.length}',
scope: 'streaming/helper',
);
if (socketSubscriptions.isEmpty) {
DebugLogger.log(
'No socket subscriptions - finishing polling-based stream',
scope: 'streaming/helper',
);
finishStreaming();
Future.microtask(refreshConversationSnapshot);
} else {
DebugLogger.log(
'Socket subscriptions active - keeping stream alive for socket events',
scope: 'streaming/helper',
);
}
},
onError: (error) async {
DebugLogger.error(
'Stream error occurred',
scope: 'streaming/helper',
error: error,
data: {
'conversationId': activeConversationId,
'messageId': assistantMessageId,
'modelId': modelId,
},
);
try {
persistentService.unregisterStream(streamId);
} catch (_) {}
// Check if this is a recoverable error (network issues, etc.)
final isRecoverable =
error is! FormatException &&
error.toString().contains('SocketException') ||
error.toString().contains('TimeoutException') ||
error.toString().contains('HandshakeException');
if (isRecoverable && socketService != null) {
DebugLogger.log(
'Attempting to recover from recoverable stream error',
scope: 'streaming/helper',
);
// Try to recover via socket connection if available
try {
await socketService.ensureConnected(
timeout: const Duration(seconds: 5),
);
// Don't finish streaming immediately - let socket recovery handle it
socketWatchdog?.stop();
return;
} catch (_) {
// Socket recovery failed, fall through to cleanup
}
}
disposeSocketSubscriptions();
finishStreaming();
Future.microtask(refreshConversationSnapshot);

View File

@@ -93,6 +93,7 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
VoidCallback? _socketTeardown;
// Activity-based watchdog to prevent stuck typing indicator
InactivityWatchdog? _typingWatchdog;
DateTime? _lastStreamingActivity;
bool _initialized = false;
@@ -284,6 +285,7 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
}
void _touchStreamingActivity() {
_lastStreamingActivity = DateTime.now();
// Keep guard alive while streaming
if (state.isNotEmpty) {
final last = state.last;
@@ -322,6 +324,30 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
}
}
// Enhanced streaming recovery method similar to OpenWebUI's approach
void recoverStreamingIfNeeded() {
if (state.isEmpty) return;
final lastMessage = state.last;
if (lastMessage.role != 'assistant' || !lastMessage.isStreaming) return;
// Check if streaming has been inactive for too long
final now = DateTime.now();
if (_lastStreamingActivity != null) {
final inactiveTime = now.difference(_lastStreamingActivity!);
// If inactive for more than 3 minutes, consider recovery
if (inactiveTime > const Duration(minutes: 3)) {
DebugLogger.log(
'Streaming inactive for ${inactiveTime.inSeconds}s, attempting recovery',
scope: 'chat/provider',
);
// Try to gracefully finish the streaming state
finishStreaming();
}
}
}
// Public wrapper to cancel the currently active stream (used by Stop)
void cancelActiveMessageStream() {
_cancelMessageStream();
@@ -363,6 +389,10 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
}
void setMessageStream(StreamSubscription stream) {
DebugLogger.log(
'Setting new message stream, cancelling previous',
scope: 'chat/provider',
);
_cancelMessageStream();
_messageStream = stream;
@@ -374,6 +404,10 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
List<SocketEventSubscription> subscriptions, {
VoidCallback? onDispose,
}) {
DebugLogger.log(
'Setting ${subscriptions.length} socket subscriptions, cancelling previous',
scope: 'chat/provider',
);
cancelSocketSubscriptions();
_socketSubscriptions.addAll(subscriptions);
_socketTeardown = onDispose;
@@ -396,6 +430,10 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
}
void addMessage(ChatMessage message) {
DebugLogger.log(
'addMessage: ${message.role} message (id: ${message.id}, streaming: ${message.isStreaming})',
scope: 'chat/provider',
);
state = [...state, message];
if (message.role == 'assistant' && message.isStreaming) {
_touchStreamingActivity();
@@ -518,15 +556,32 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
}
void appendToLastMessage(String content) {
DebugLogger.log(
'appendToLastMessage called with: "${content.length > 30 ? content.substring(0, 30) + "..." : content}"',
scope: 'chat/provider',
);
if (state.isEmpty) {
DebugLogger.log(
'appendToLastMessage: state is empty',
scope: 'chat/provider',
);
return;
}
final lastMessage = state.last;
if (lastMessage.role != 'assistant') {
DebugLogger.log(
'appendToLastMessage: last message is not assistant (${lastMessage.role})',
scope: 'chat/provider',
);
return;
}
if (!lastMessage.isStreaming) {
DebugLogger.log(
'appendToLastMessage: last message is not streaming',
scope: 'chat/provider',
);
// Ignore late chunks when streaming already finished
return;
}
@@ -543,6 +598,10 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
}
final newContent = current.isEmpty ? content : current + content;
DebugLogger.log(
'appendToLastMessage: updating UI with new content length: ${newContent.length}',
scope: 'chat/provider',
);
state = [
...state.sublist(0, state.length - 1),
lastMessage.copyWith(content: newContent),
@@ -551,12 +610,25 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
}
void replaceLastMessageContent(String content) {
DebugLogger.log(
'replaceLastMessageContent called with: "${content.length > 30 ? content.substring(0, 30) + "..." : content}"',
scope: 'chat/provider',
);
if (state.isEmpty) {
DebugLogger.log(
'replaceLastMessageContent: state is empty',
scope: 'chat/provider',
);
return;
}
final lastMessage = state.last;
if (lastMessage.role != 'assistant') {
DebugLogger.log(
'replaceLastMessageContent: last message is not assistant (${lastMessage.role})',
scope: 'chat/provider',
);
return;
}
@@ -570,6 +642,10 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
if (sanitized.startsWith(searchBanner)) {
sanitized = sanitized.substring(searchBanner.length);
}
DebugLogger.log(
'replaceLastMessageContent: updating UI with sanitized content length: ${sanitized.length}',
scope: 'chat/provider',
);
state = [
...state.sublist(0, state.length - 1),
lastMessage.copyWith(content: sanitized),
@@ -578,10 +654,23 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
}
void finishStreaming() {
if (state.isEmpty) return;
DebugLogger.log('finishStreaming called', scope: 'chat/provider');
if (state.isEmpty) {
DebugLogger.log(
'finishStreaming: state is empty',
scope: 'chat/provider',
);
return;
}
final lastMessage = state.last;
if (lastMessage.role != 'assistant' || !lastMessage.isStreaming) return;
if (lastMessage.role != 'assistant' || !lastMessage.isStreaming) {
DebugLogger.log(
'finishStreaming: last message is not streaming assistant (role: ${lastMessage.role}, streaming: ${lastMessage.isStreaming})',
scope: 'chat/provider',
);
return;
}
// Also strip any leftover typing indicator before finalizing
const ti = '[TYPING_INDICATOR]';
@@ -594,6 +683,10 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
cleaned = cleaned.substring(searchBanner.length);
}
DebugLogger.log(
'finishStreaming: setting isStreaming=false and content length: ${cleaned.length}',
scope: 'chat/provider',
);
state = [
...state.sublist(0, state.length - 1),
lastMessage.copyWith(isStreaming: false, content: cleaned),