diff --git a/lib/core/providers/app_providers.dart b/lib/core/providers/app_providers.dart index 2c488b2..5da5957 100644 --- a/lib/core/providers/app_providers.dart +++ b/lib/core/providers/app_providers.dart @@ -254,6 +254,32 @@ final apiTokenUpdaterProvider = Provider((ref) { data: {'length': length}, ); } + + // When the token transitions from empty -> present, force-refresh models + final hadToken = previous != null && previous.isNotEmpty; + final hasToken = next != null && next.isNotEmpty; + + if (!hadToken && hasToken) { + // New valid token acquired (e.g., re-login). Invalidate caches that + // depend on authentication so next reads refetch from server. + DebugLogger.log('invalidate-on-auth', scope: 'models'); + ref.invalidate(modelsProvider); + ref.invalidate(defaultModelProvider); + // Refresh permissions to enable gated features promptly + ref.invalidate(userPermissionsProvider); + // Kick background model load to warm caches without blocking UI + Future.microtask(() { + // Accessing the provider is enough to schedule its work + ref.read(backgroundModelLoadProvider); + }); + } + + if (hadToken && !hasToken) { + // Token was cleared/invalidated; clear model selection and caches + ref.read(selectedModelProvider.notifier).clear(); + ref.invalidate(modelsProvider); + ref.invalidate(defaultModelProvider); + } }); }); diff --git a/lib/core/services/socket_service.dart b/lib/core/services/socket_service.dart index c1e9862..80aefb9 100644 --- a/lib/core/services/socket_service.dart +++ b/lib/core/services/socket_service.dart @@ -288,6 +288,7 @@ class SocketService with WidgetsBindingObserver { final ackFn = _wrapAck(ack); final sessionId = _extractSessionId(map); final chatId = map['chat_id']?.toString(); + final channelId = _extractChannelId(map); for (final registration in List<_ChatEventRegistration>.from( _chatEventHandlers.values, @@ -298,6 +299,7 @@ class SocketService with WidgetsBindingObserver { chatId, sessionId, registration.requireFocus, + incomingChannelId: channelId, )) { continue; } @@ -315,6 +317,7 @@ class SocketService with WidgetsBindingObserver { final ackFn = _wrapAck(ack); final sessionId = _extractSessionId(map); final chatId = map['chat_id']?.toString(); + final channelId = _extractChannelId(map); for (final registration in List<_ChannelEventRegistration>.from( _channelEventHandlers.values, @@ -325,6 +328,7 @@ class SocketService with WidgetsBindingObserver { chatId, sessionId, registration.requireFocus, + incomingChannelId: channelId, )) { continue; } @@ -340,18 +344,21 @@ class SocketService with WidgetsBindingObserver { String? registeredSessionId, String? incomingConversationId, String? incomingSessionId, - bool requireFocus, - ) { - final matchesChat = + bool requireFocus, { + String? incomingChannelId, + }) { + final matchesConversation = registeredConversationId == null || (incomingConversationId != null && - registeredConversationId == incomingConversationId); + registeredConversationId == incomingConversationId) || + (incomingChannelId != null && + registeredConversationId == incomingChannelId); final matchesSession = registeredSessionId != null && incomingSessionId != null && registeredSessionId == incomingSessionId; - if (!matchesChat && !matchesSession) { + if (!matchesConversation && !matchesSession) { return false; } @@ -421,6 +428,38 @@ class SocketService with WidgetsBindingObserver { return candidate; } + String? _extractChannelId(Map event) { + String? candidate; + + if (event['channel_id'] != null) { + candidate = event['channel_id'].toString(); + } + if (candidate == null && event['channelId'] != null) { + candidate = event['channelId'].toString(); + } + + final data = event['data']; + if (data is Map) { + if (candidate == null && data['channel_id'] != null) { + candidate = data['channel_id'].toString(); + } + if (candidate == null && data['channelId'] != null) { + candidate = data['channelId'].toString(); + } + final inner = data['data']; + if (inner is Map) { + if (candidate == null && inner['channel_id'] != null) { + candidate = inner['channel_id'].toString(); + } + if (candidate == null && inner['channelId'] != null) { + candidate = inner['channelId'].toString(); + } + } + } + + return candidate; + } + String _nextHandlerId() { _handlerSeed += 1; return _handlerSeed.toString(); diff --git a/lib/core/services/streaming_helper.dart b/lib/core/services/streaming_helper.dart index 7022987..0d66d88 100644 --- a/lib/core/services/streaming_helper.dart +++ b/lib/core/services/streaming_helper.dart @@ -31,14 +31,12 @@ class ActiveSocketStream { /// Unified streaming helper for chat send/regenerate flows. /// -/// This attaches chunked SSE streaming handlers, optional WebSocket event handlers, +/// This attaches chunked polling streams (fallback) plus WebSocket event handlers, /// and manages background search/image-gen UI updates. It operates via callbacks to /// avoid tight coupling with provider files for easier reuse and testing. ActiveSocketStream attachUnifiedChunkedStreaming({ required Stream stream, required bool webSearchEnabled, - required bool isBackgroundFlow, - required bool suppressSocketContentInitially, required String assistantMessageId, required String modelId, required Map modelItem, @@ -140,7 +138,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } bool isSearching = false; - bool suppressSocketContent = suppressSocketContentInitially; void updateImagesFromCurrentContent() { try { @@ -443,7 +440,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ if (kSocketVerboseLogging && payload is Map) { DebugLogger.log( - 'socket delta type=$type suppress=$suppressSocketContent session=$sessionId message=$messageId keys=${payload.keys.toList()}', + 'socket delta type=$type session=$sessionId message=$messageId keys=${payload.keys.toList()}', scope: 'socket/chat', ); } @@ -479,7 +476,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } } } - if (!suppressSocketContent && payload.containsKey('choices')) { + if (payload.containsKey('choices')) { final choices = payload['choices']; if (choices is List && choices.isNotEmpty) { final choice = choices.first; @@ -522,7 +519,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } } } - if (!suppressSocketContent && payload.containsKey('content')) { + if (payload.containsKey('content')) { final raw = payload['content']?.toString() ?? ''; if (raw.isNotEmpty) { replaceLastMessageContent(raw); @@ -763,22 +760,18 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ socketWatchdog?.stop(); } else if ((type == 'chat:message:delta' || type == 'message') && payload != null) { - // Incremental message content over socket; respect suppression on SSE-driven flows - if (!suppressSocketContent) { - final content = payload['content']?.toString() ?? ''; - if (content.isNotEmpty) { - appendToLastMessage(content); - updateImagesFromCurrentContent(); - } + // Incremental message content over socket + final content = payload['content']?.toString() ?? ''; + if (content.isNotEmpty) { + appendToLastMessage(content); + updateImagesFromCurrentContent(); } } else if ((type == 'chat:message' || type == 'replace') && payload != null) { - // Full message replacement over socket; respect suppression on SSE-driven flows - if (!suppressSocketContent) { - final content = payload['content']?.toString() ?? ''; - if (content.isNotEmpty) { - replaceLastMessageContent(content); - } + // Full message replacement over socket + final content = payload['content']?.toString() ?? ''; + if (content.isNotEmpty) { + replaceLastMessageContent(content); } } else if ((type == 'chat:message:files') && payload != null) { // Alias for files event used by web client @@ -809,7 +802,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } else if (type == 'request:chat:completion' && payload != null) { final channel = payload['channel']; if (channel is String && channel.isNotEmpty) { - suppressSocketContent = true; channelLineHandlerFactory(channel); } } else if (type == 'execute:tool' && payload != null) { @@ -902,7 +894,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } } } else if (type == 'event:message:delta' && payload != null) { - if (suppressSocketContent) return; final content = payload['content']?.toString() ?? ''; if (content.isNotEmpty) { appendToLastMessage(content); @@ -988,11 +979,8 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ // Unregister from persistent service persistentService.unregisterStream(streamId); - // Allow socket-delivered follow-ups/title updates after SSE completes - suppressSocketContent = false; - - // If SSE-driven (no dynamic channel/background flow), clean up sockets - if (!isBackgroundFlow) { + // If no socket subscriptions are active, treat this as a poll-driven flow + if (socketSubscriptions.isEmpty) { finishStreaming(); Future.microtask(refreshConversationSnapshot); } @@ -1001,7 +989,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ try { persistentService.unregisterStream(streamId); } catch (_) {} - suppressSocketContent = false; disposeSocketSubscriptions(); finishStreaming(); Future.microtask(refreshConversationSnapshot); diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index 6c418e4..f2a169a 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -1291,8 +1291,6 @@ Future regenerateMessage( final activeStream = attachUnifiedChunkedStreaming( stream: stream, webSearchEnabled: webSearchEnabled, - isBackgroundFlow: isBackgroundFlow, - suppressSocketContentInitially: !isBackgroundFlow, assistantMessageId: assistantMessageId, modelId: selectedModel.id, modelItem: modelItem, @@ -1828,8 +1826,6 @@ Future _sendMessageInternal( final activeStream = attachUnifiedChunkedStreaming( stream: stream, webSearchEnabled: webSearchEnabled, - isBackgroundFlow: isBackgroundFlow, - suppressSocketContentInitially: !isBackgroundFlow, assistantMessageId: assistantMessageId, modelId: selectedModel.id, modelItem: modelItem,