refactor: socket fallbacks
This commit is contained in:
@@ -38,6 +38,8 @@ class ChatMessagesNotifier extends StateNotifier<List<ChatMessage>> {
|
|||||||
StreamSubscription? _messageStream;
|
StreamSubscription? _messageStream;
|
||||||
ProviderSubscription? _conversationListener;
|
ProviderSubscription? _conversationListener;
|
||||||
final List<StreamSubscription> _subscriptions = [];
|
final List<StreamSubscription> _subscriptions = [];
|
||||||
|
// Inactivity watchdog to prevent stuck typing indicator
|
||||||
|
Timer? _typingStuckGuard;
|
||||||
|
|
||||||
ChatMessagesNotifier(this._ref) : super([]) {
|
ChatMessagesNotifier(this._ref) : super([]) {
|
||||||
// Load messages when conversation changes with proper cleanup
|
// Load messages when conversation changes with proper cleanup
|
||||||
@@ -64,6 +66,8 @@ class ChatMessagesNotifier extends StateNotifier<List<ChatMessage>> {
|
|||||||
|
|
||||||
// Cancel any existing message stream when switching conversations
|
// Cancel any existing message stream when switching conversations
|
||||||
_cancelMessageStream();
|
_cancelMessageStream();
|
||||||
|
// Also cancel typing guard on conversation switch
|
||||||
|
_cancelTypingGuard();
|
||||||
|
|
||||||
if (next != null) {
|
if (next != null) {
|
||||||
state = next.messages;
|
state = next.messages;
|
||||||
@@ -87,6 +91,132 @@ class ChatMessagesNotifier extends StateNotifier<List<ChatMessage>> {
|
|||||||
_messageStream = null;
|
_messageStream = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void _cancelTypingGuard() {
|
||||||
|
_typingStuckGuard?.cancel();
|
||||||
|
_typingStuckGuard = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
void _scheduleTypingGuard({Duration? timeout}) {
|
||||||
|
// Default timeout tuned to balance long tool gaps and UX
|
||||||
|
final effectiveTimeout = timeout ?? const Duration(seconds: 25);
|
||||||
|
_typingStuckGuard?.cancel();
|
||||||
|
_typingStuckGuard = Timer(effectiveTimeout, () async {
|
||||||
|
try {
|
||||||
|
if (state.isEmpty) return;
|
||||||
|
final last = state.last;
|
||||||
|
// Still the same streaming message and no finish signal
|
||||||
|
if (last.role == 'assistant' && last.isStreaming) {
|
||||||
|
// Attempt a soft recovery: if content is still empty, try fetching final content from server
|
||||||
|
if ((last.content).trim().isEmpty) {
|
||||||
|
try {
|
||||||
|
final apiSvc = _ref.read(apiServiceProvider);
|
||||||
|
final activeConv = _ref.read(activeConversationProvider);
|
||||||
|
final msgId = last.id;
|
||||||
|
final chatId = activeConv?.id;
|
||||||
|
if (apiSvc != null && chatId != null && chatId.isNotEmpty) {
|
||||||
|
final resp = await apiSvc.dio.get('/api/v1/chats/' + chatId);
|
||||||
|
final data = resp.data as Map<String, dynamic>;
|
||||||
|
String content = '';
|
||||||
|
final chatObj = data['chat'] as Map<String, dynamic>?;
|
||||||
|
if (chatObj != null) {
|
||||||
|
final list = chatObj['messages'];
|
||||||
|
if (list is List) {
|
||||||
|
final target = list.firstWhere(
|
||||||
|
(m) => (m is Map && (m['id']?.toString() == msgId)),
|
||||||
|
orElse: () => null,
|
||||||
|
);
|
||||||
|
if (target != null) {
|
||||||
|
final rawContent = (target as Map)['content'];
|
||||||
|
if (rawContent is String) {
|
||||||
|
content = rawContent;
|
||||||
|
} else if (rawContent is List) {
|
||||||
|
final textItem = rawContent.firstWhere(
|
||||||
|
(i) => i is Map && i['type'] == 'text',
|
||||||
|
orElse: () => null,
|
||||||
|
);
|
||||||
|
if (textItem != null) {
|
||||||
|
content = (textItem as Map)['text']?.toString() ?? '';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (content.isEmpty) {
|
||||||
|
final history = chatObj['history'];
|
||||||
|
if (history is Map && history['messages'] is Map) {
|
||||||
|
final Map<String, dynamic> messagesMap =
|
||||||
|
(history['messages'] as Map).cast<String, dynamic>();
|
||||||
|
final msg = messagesMap[msgId];
|
||||||
|
if (msg is Map) {
|
||||||
|
final rawContent = msg['content'];
|
||||||
|
if (rawContent is String) {
|
||||||
|
content = rawContent;
|
||||||
|
} else if (rawContent is List) {
|
||||||
|
final textItem = rawContent.firstWhere(
|
||||||
|
(i) => i is Map && i['type'] == 'text',
|
||||||
|
orElse: () => null,
|
||||||
|
);
|
||||||
|
if (textItem != null) {
|
||||||
|
content = (textItem as Map)['text']?.toString() ?? '';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (content.isNotEmpty) {
|
||||||
|
replaceLastMessageContent(content);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (_) {}
|
||||||
|
}
|
||||||
|
// Regardless of fetch result, ensure UI is not stuck
|
||||||
|
finishStreaming();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
_cancelTypingGuard();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void _touchStreamingActivity() {
|
||||||
|
// Keep guard alive while streaming
|
||||||
|
if (state.isNotEmpty) {
|
||||||
|
final last = state.last;
|
||||||
|
if (last.role == 'assistant' && last.isStreaming) {
|
||||||
|
// Compute a dynamic timeout based on flow type
|
||||||
|
Duration timeout = const Duration(seconds: 25);
|
||||||
|
try {
|
||||||
|
final meta = last.metadata ?? const <String, dynamic>{};
|
||||||
|
final isBgFlow = (meta['backgroundFlow'] == true);
|
||||||
|
final isWebSearchFlow =
|
||||||
|
(meta['webSearchFlow'] == true) || (meta['webSearchActive'] == true);
|
||||||
|
final isImageGenFlow = (meta['imageGenerationFlow'] == true);
|
||||||
|
|
||||||
|
// Also consult global toggles if metadata not present
|
||||||
|
final globalWebSearch = _ref.read(webSearchEnabledProvider);
|
||||||
|
final webSearchAvailable = _ref.read(webSearchAvailableProvider);
|
||||||
|
final globalImageGen = _ref.read(imageGenerationEnabledProvider);
|
||||||
|
|
||||||
|
if (isWebSearchFlow || (globalWebSearch && webSearchAvailable)) {
|
||||||
|
timeout = Duration(milliseconds: timeout.inMilliseconds.clamp(0, 45000));
|
||||||
|
// If current < 45s, bump to 45s
|
||||||
|
if (timeout.inSeconds < 45) timeout = const Duration(seconds: 45);
|
||||||
|
}
|
||||||
|
if (isBgFlow) {
|
||||||
|
// Background tools/dynamic channel can be longer
|
||||||
|
if (timeout.inSeconds < 60) timeout = const Duration(seconds: 60);
|
||||||
|
}
|
||||||
|
if (isImageGenFlow || globalImageGen) {
|
||||||
|
// Image generation tends to be the longest
|
||||||
|
if (timeout.inSeconds < 90) timeout = const Duration(seconds: 90);
|
||||||
|
}
|
||||||
|
} catch (_) {}
|
||||||
|
|
||||||
|
_scheduleTypingGuard(timeout: timeout);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Public wrapper to cancel the currently active stream (used by Stop)
|
// Public wrapper to cancel the currently active stream (used by Stop)
|
||||||
void cancelActiveMessageStream() {
|
void cancelActiveMessageStream() {
|
||||||
_cancelMessageStream();
|
_cancelMessageStream();
|
||||||
@@ -137,6 +267,9 @@ class ChatMessagesNotifier extends StateNotifier<List<ChatMessage>> {
|
|||||||
|
|
||||||
void addMessage(ChatMessage message) {
|
void addMessage(ChatMessage message) {
|
||||||
state = [...state, message];
|
state = [...state, message];
|
||||||
|
if (message.role == 'assistant' && message.isStreaming) {
|
||||||
|
_touchStreamingActivity();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void removeLastMessage() {
|
void removeLastMessage() {
|
||||||
@@ -176,6 +309,7 @@ class ChatMessagesNotifier extends StateNotifier<List<ChatMessage>> {
|
|||||||
...state.sublist(0, state.length - 1),
|
...state.sublist(0, state.length - 1),
|
||||||
lastMessage.copyWith(content: sanitized(content)),
|
lastMessage.copyWith(content: sanitized(content)),
|
||||||
];
|
];
|
||||||
|
_touchStreamingActivity();
|
||||||
}
|
}
|
||||||
|
|
||||||
void updateLastMessageWithFunction(
|
void updateLastMessageWithFunction(
|
||||||
@@ -185,8 +319,11 @@ class ChatMessagesNotifier extends StateNotifier<List<ChatMessage>> {
|
|||||||
|
|
||||||
final lastMessage = state.last;
|
final lastMessage = state.last;
|
||||||
if (lastMessage.role != 'assistant') return;
|
if (lastMessage.role != 'assistant') return;
|
||||||
|
final updated = updater(lastMessage);
|
||||||
state = [...state.sublist(0, state.length - 1), updater(lastMessage)];
|
state = [...state.sublist(0, state.length - 1), updated];
|
||||||
|
if (updated.isStreaming) {
|
||||||
|
_touchStreamingActivity();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void appendToLastMessage(String content) {
|
void appendToLastMessage(String content) {
|
||||||
@@ -219,6 +356,7 @@ class ChatMessagesNotifier extends StateNotifier<List<ChatMessage>> {
|
|||||||
...state.sublist(0, state.length - 1),
|
...state.sublist(0, state.length - 1),
|
||||||
lastMessage.copyWith(content: newContent),
|
lastMessage.copyWith(content: newContent),
|
||||||
];
|
];
|
||||||
|
_touchStreamingActivity();
|
||||||
}
|
}
|
||||||
|
|
||||||
void replaceLastMessageContent(String content) {
|
void replaceLastMessageContent(String content) {
|
||||||
@@ -245,6 +383,7 @@ class ChatMessagesNotifier extends StateNotifier<List<ChatMessage>> {
|
|||||||
...state.sublist(0, state.length - 1),
|
...state.sublist(0, state.length - 1),
|
||||||
lastMessage.copyWith(content: sanitized),
|
lastMessage.copyWith(content: sanitized),
|
||||||
];
|
];
|
||||||
|
_touchStreamingActivity();
|
||||||
}
|
}
|
||||||
|
|
||||||
void finishStreaming() {
|
void finishStreaming() {
|
||||||
@@ -268,6 +407,7 @@ class ChatMessagesNotifier extends StateNotifier<List<ChatMessage>> {
|
|||||||
...state.sublist(0, state.length - 1),
|
...state.sublist(0, state.length - 1),
|
||||||
lastMessage.copyWith(isStreaming: false, content: cleaned),
|
lastMessage.copyWith(isStreaming: false, content: cleaned),
|
||||||
];
|
];
|
||||||
|
_cancelTypingGuard();
|
||||||
}
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
@@ -284,6 +424,8 @@ class ChatMessagesNotifier extends StateNotifier<List<ChatMessage>> {
|
|||||||
|
|
||||||
// Cancel message stream specifically
|
// Cancel message stream specifically
|
||||||
_cancelMessageStream();
|
_cancelMessageStream();
|
||||||
|
// Cancel any active typing guard
|
||||||
|
_cancelTypingGuard();
|
||||||
|
|
||||||
// Cancel conversation listener specifically
|
// Cancel conversation listener specifically
|
||||||
_conversationListener?.close();
|
_conversationListener?.close();
|
||||||
@@ -1220,6 +1362,19 @@ Future<void> _sendMessageInternal(
|
|||||||
isBackgroundToolsFlowPre || isBackgroundWebSearchPre || wantSessionBinding;
|
isBackgroundToolsFlowPre || isBackgroundWebSearchPre || wantSessionBinding;
|
||||||
bool suppressSocketContent = !isBackgroundFlow; // allow socket text when session-bound or tools
|
bool suppressSocketContent = !isBackgroundFlow; // allow socket text when session-bound or tools
|
||||||
bool usingDynamicChannel = false; // set true when server provides a channel
|
bool usingDynamicChannel = false; // set true when server provides a channel
|
||||||
|
// Enrich the assistant placeholder metadata so the typing guard can use longer timeouts
|
||||||
|
try {
|
||||||
|
ref.read(chatMessagesProvider.notifier).updateLastMessageWithFunction((m) {
|
||||||
|
final mergedMeta = {
|
||||||
|
if (m.metadata != null) ...m.metadata!,
|
||||||
|
'backgroundFlow': isBackgroundFlow,
|
||||||
|
if (isBackgroundWebSearchPre) 'webSearchFlow': true,
|
||||||
|
if (imageGenerationEnabled) 'imageGenerationFlow': true,
|
||||||
|
};
|
||||||
|
return m.copyWith(metadata: mergedMeta);
|
||||||
|
});
|
||||||
|
} catch (_) {}
|
||||||
|
|
||||||
if (socketService != null) {
|
if (socketService != null) {
|
||||||
void chatHandler(Map<String, dynamic> ev) {
|
void chatHandler(Map<String, dynamic> ev) {
|
||||||
try {
|
try {
|
||||||
@@ -1664,6 +1819,15 @@ Future<void> _sendMessageInternal(
|
|||||||
socketService.offChatEvents();
|
socketService.offChatEvents();
|
||||||
socketService.offChannelEvents();
|
socketService.offChannelEvents();
|
||||||
} catch (_) {}
|
} catch (_) {}
|
||||||
|
// As a final safeguard, if we're still in streaming state, finish it to avoid stuck UI
|
||||||
|
try {
|
||||||
|
final msgs = ref.read(chatMessagesProvider);
|
||||||
|
if (msgs.isNotEmpty &&
|
||||||
|
msgs.last.role == 'assistant' &&
|
||||||
|
msgs.last.isStreaming) {
|
||||||
|
ref.read(chatMessagesProvider.notifier).finishStreaming();
|
||||||
|
}
|
||||||
|
} catch (_) {}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user