refactor: login and streaming issues

This commit is contained in:
cogwheel0
2025-09-26 20:57:54 +05:30
parent 3c959c83bf
commit 0c2bd5b4f9
4 changed files with 85 additions and 37 deletions

View File

@@ -254,6 +254,32 @@ final apiTokenUpdaterProvider = Provider<void>((ref) {
data: {'length': length}, data: {'length': length},
); );
} }
// When the token transitions from empty -> present, force-refresh models
final hadToken = previous != null && previous.isNotEmpty;
final hasToken = next != null && next.isNotEmpty;
if (!hadToken && hasToken) {
// New valid token acquired (e.g., re-login). Invalidate caches that
// depend on authentication so next reads refetch from server.
DebugLogger.log('invalidate-on-auth', scope: 'models');
ref.invalidate(modelsProvider);
ref.invalidate(defaultModelProvider);
// Refresh permissions to enable gated features promptly
ref.invalidate(userPermissionsProvider);
// Kick background model load to warm caches without blocking UI
Future.microtask(() {
// Accessing the provider is enough to schedule its work
ref.read(backgroundModelLoadProvider);
});
}
if (hadToken && !hasToken) {
// Token was cleared/invalidated; clear model selection and caches
ref.read(selectedModelProvider.notifier).clear();
ref.invalidate(modelsProvider);
ref.invalidate(defaultModelProvider);
}
}); });
}); });

View File

@@ -288,6 +288,7 @@ class SocketService with WidgetsBindingObserver {
final ackFn = _wrapAck(ack); final ackFn = _wrapAck(ack);
final sessionId = _extractSessionId(map); final sessionId = _extractSessionId(map);
final chatId = map['chat_id']?.toString(); final chatId = map['chat_id']?.toString();
final channelId = _extractChannelId(map);
for (final registration in List<_ChatEventRegistration>.from( for (final registration in List<_ChatEventRegistration>.from(
_chatEventHandlers.values, _chatEventHandlers.values,
@@ -298,6 +299,7 @@ class SocketService with WidgetsBindingObserver {
chatId, chatId,
sessionId, sessionId,
registration.requireFocus, registration.requireFocus,
incomingChannelId: channelId,
)) { )) {
continue; continue;
} }
@@ -315,6 +317,7 @@ class SocketService with WidgetsBindingObserver {
final ackFn = _wrapAck(ack); final ackFn = _wrapAck(ack);
final sessionId = _extractSessionId(map); final sessionId = _extractSessionId(map);
final chatId = map['chat_id']?.toString(); final chatId = map['chat_id']?.toString();
final channelId = _extractChannelId(map);
for (final registration in List<_ChannelEventRegistration>.from( for (final registration in List<_ChannelEventRegistration>.from(
_channelEventHandlers.values, _channelEventHandlers.values,
@@ -325,6 +328,7 @@ class SocketService with WidgetsBindingObserver {
chatId, chatId,
sessionId, sessionId,
registration.requireFocus, registration.requireFocus,
incomingChannelId: channelId,
)) { )) {
continue; continue;
} }
@@ -340,18 +344,21 @@ class SocketService with WidgetsBindingObserver {
String? registeredSessionId, String? registeredSessionId,
String? incomingConversationId, String? incomingConversationId,
String? incomingSessionId, String? incomingSessionId,
bool requireFocus, bool requireFocus, {
) { String? incomingChannelId,
final matchesChat = }) {
final matchesConversation =
registeredConversationId == null || registeredConversationId == null ||
(incomingConversationId != null && (incomingConversationId != null &&
registeredConversationId == incomingConversationId); registeredConversationId == incomingConversationId) ||
(incomingChannelId != null &&
registeredConversationId == incomingChannelId);
final matchesSession = final matchesSession =
registeredSessionId != null && registeredSessionId != null &&
incomingSessionId != null && incomingSessionId != null &&
registeredSessionId == incomingSessionId; registeredSessionId == incomingSessionId;
if (!matchesChat && !matchesSession) { if (!matchesConversation && !matchesSession) {
return false; return false;
} }
@@ -421,6 +428,38 @@ class SocketService with WidgetsBindingObserver {
return candidate; return candidate;
} }
String? _extractChannelId(Map<String, dynamic> event) {
String? candidate;
if (event['channel_id'] != null) {
candidate = event['channel_id'].toString();
}
if (candidate == null && event['channelId'] != null) {
candidate = event['channelId'].toString();
}
final data = event['data'];
if (data is Map) {
if (candidate == null && data['channel_id'] != null) {
candidate = data['channel_id'].toString();
}
if (candidate == null && data['channelId'] != null) {
candidate = data['channelId'].toString();
}
final inner = data['data'];
if (inner is Map) {
if (candidate == null && inner['channel_id'] != null) {
candidate = inner['channel_id'].toString();
}
if (candidate == null && inner['channelId'] != null) {
candidate = inner['channelId'].toString();
}
}
}
return candidate;
}
String _nextHandlerId() { String _nextHandlerId() {
_handlerSeed += 1; _handlerSeed += 1;
return _handlerSeed.toString(); return _handlerSeed.toString();

View File

@@ -31,14 +31,12 @@ class ActiveSocketStream {
/// Unified streaming helper for chat send/regenerate flows. /// Unified streaming helper for chat send/regenerate flows.
/// ///
/// This attaches chunked SSE streaming handlers, optional WebSocket event handlers, /// This attaches chunked polling streams (fallback) plus WebSocket event handlers,
/// and manages background search/image-gen UI updates. It operates via callbacks to /// and manages background search/image-gen UI updates. It operates via callbacks to
/// avoid tight coupling with provider files for easier reuse and testing. /// avoid tight coupling with provider files for easier reuse and testing.
ActiveSocketStream attachUnifiedChunkedStreaming({ ActiveSocketStream attachUnifiedChunkedStreaming({
required Stream<String> stream, required Stream<String> stream,
required bool webSearchEnabled, required bool webSearchEnabled,
required bool isBackgroundFlow,
required bool suppressSocketContentInitially,
required String assistantMessageId, required String assistantMessageId,
required String modelId, required String modelId,
required Map<String, dynamic> modelItem, required Map<String, dynamic> modelItem,
@@ -140,7 +138,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
} }
bool isSearching = false; bool isSearching = false;
bool suppressSocketContent = suppressSocketContentInitially;
void updateImagesFromCurrentContent() { void updateImagesFromCurrentContent() {
try { try {
@@ -443,7 +440,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
if (kSocketVerboseLogging && payload is Map) { if (kSocketVerboseLogging && payload is Map) {
DebugLogger.log( DebugLogger.log(
'socket delta type=$type suppress=$suppressSocketContent session=$sessionId message=$messageId keys=${payload.keys.toList()}', 'socket delta type=$type session=$sessionId message=$messageId keys=${payload.keys.toList()}',
scope: 'socket/chat', scope: 'socket/chat',
); );
} }
@@ -479,7 +476,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
} }
} }
} }
if (!suppressSocketContent && payload.containsKey('choices')) { if (payload.containsKey('choices')) {
final choices = payload['choices']; final choices = payload['choices'];
if (choices is List && choices.isNotEmpty) { if (choices is List && choices.isNotEmpty) {
final choice = choices.first; final choice = choices.first;
@@ -522,7 +519,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
} }
} }
} }
if (!suppressSocketContent && payload.containsKey('content')) { if (payload.containsKey('content')) {
final raw = payload['content']?.toString() ?? ''; final raw = payload['content']?.toString() ?? '';
if (raw.isNotEmpty) { if (raw.isNotEmpty) {
replaceLastMessageContent(raw); replaceLastMessageContent(raw);
@@ -763,22 +760,18 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
socketWatchdog?.stop(); socketWatchdog?.stop();
} else if ((type == 'chat:message:delta' || type == 'message') && } else if ((type == 'chat:message:delta' || type == 'message') &&
payload != null) { payload != null) {
// Incremental message content over socket; respect suppression on SSE-driven flows // Incremental message content over socket
if (!suppressSocketContent) { final content = payload['content']?.toString() ?? '';
final content = payload['content']?.toString() ?? ''; if (content.isNotEmpty) {
if (content.isNotEmpty) { appendToLastMessage(content);
appendToLastMessage(content); updateImagesFromCurrentContent();
updateImagesFromCurrentContent();
}
} }
} else if ((type == 'chat:message' || type == 'replace') && } else if ((type == 'chat:message' || type == 'replace') &&
payload != null) { payload != null) {
// Full message replacement over socket; respect suppression on SSE-driven flows // Full message replacement over socket
if (!suppressSocketContent) { final content = payload['content']?.toString() ?? '';
final content = payload['content']?.toString() ?? ''; if (content.isNotEmpty) {
if (content.isNotEmpty) { replaceLastMessageContent(content);
replaceLastMessageContent(content);
}
} }
} else if ((type == 'chat:message:files') && payload != null) { } else if ((type == 'chat:message:files') && payload != null) {
// Alias for files event used by web client // Alias for files event used by web client
@@ -809,7 +802,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
} else if (type == 'request:chat:completion' && payload != null) { } else if (type == 'request:chat:completion' && payload != null) {
final channel = payload['channel']; final channel = payload['channel'];
if (channel is String && channel.isNotEmpty) { if (channel is String && channel.isNotEmpty) {
suppressSocketContent = true;
channelLineHandlerFactory(channel); channelLineHandlerFactory(channel);
} }
} else if (type == 'execute:tool' && payload != null) { } else if (type == 'execute:tool' && payload != null) {
@@ -902,7 +894,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
} }
} }
} else if (type == 'event:message:delta' && payload != null) { } else if (type == 'event:message:delta' && payload != null) {
if (suppressSocketContent) return;
final content = payload['content']?.toString() ?? ''; final content = payload['content']?.toString() ?? '';
if (content.isNotEmpty) { if (content.isNotEmpty) {
appendToLastMessage(content); appendToLastMessage(content);
@@ -988,11 +979,8 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
// Unregister from persistent service // Unregister from persistent service
persistentService.unregisterStream(streamId); persistentService.unregisterStream(streamId);
// Allow socket-delivered follow-ups/title updates after SSE completes // If no socket subscriptions are active, treat this as a poll-driven flow
suppressSocketContent = false; if (socketSubscriptions.isEmpty) {
// If SSE-driven (no dynamic channel/background flow), clean up sockets
if (!isBackgroundFlow) {
finishStreaming(); finishStreaming();
Future.microtask(refreshConversationSnapshot); Future.microtask(refreshConversationSnapshot);
} }
@@ -1001,7 +989,6 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
try { try {
persistentService.unregisterStream(streamId); persistentService.unregisterStream(streamId);
} catch (_) {} } catch (_) {}
suppressSocketContent = false;
disposeSocketSubscriptions(); disposeSocketSubscriptions();
finishStreaming(); finishStreaming();
Future.microtask(refreshConversationSnapshot); Future.microtask(refreshConversationSnapshot);

View File

@@ -1291,8 +1291,6 @@ Future<void> regenerateMessage(
final activeStream = attachUnifiedChunkedStreaming( final activeStream = attachUnifiedChunkedStreaming(
stream: stream, stream: stream,
webSearchEnabled: webSearchEnabled, webSearchEnabled: webSearchEnabled,
isBackgroundFlow: isBackgroundFlow,
suppressSocketContentInitially: !isBackgroundFlow,
assistantMessageId: assistantMessageId, assistantMessageId: assistantMessageId,
modelId: selectedModel.id, modelId: selectedModel.id,
modelItem: modelItem, modelItem: modelItem,
@@ -1828,8 +1826,6 @@ Future<void> _sendMessageInternal(
final activeStream = attachUnifiedChunkedStreaming( final activeStream = attachUnifiedChunkedStreaming(
stream: stream, stream: stream,
webSearchEnabled: webSearchEnabled, webSearchEnabled: webSearchEnabled,
isBackgroundFlow: isBackgroundFlow,
suppressSocketContentInitially: !isBackgroundFlow,
assistantMessageId: assistantMessageId, assistantMessageId: assistantMessageId,
modelId: selectedModel.id, modelId: selectedModel.id,
modelItem: modelItem, modelItem: modelItem,