refactor: improve chat message handling and remote task monitoring

- Removed the InactivityWatchdog implementation to streamline typing indicator management.
- Introduced a Timer-based remote task monitoring system to check task status periodically.
- Enhanced the logic for managing streaming assistant states and task synchronization.
- Improved overall code clarity and maintainability by consolidating task monitoring logic.
This commit is contained in:
cogwheel0
2025-10-03 01:02:29 +05:30
parent 349806f313
commit f8098670f9

View File

@@ -15,7 +15,6 @@ import '../../../core/services/conversation_delta_listener.dart';
import '../../../core/services/streaming_helper.dart'; import '../../../core/services/streaming_helper.dart';
import '../../../core/services/streaming_response_controller.dart'; import '../../../core/services/streaming_response_controller.dart';
import '../../../core/utils/debug_logger.dart'; import '../../../core/utils/debug_logger.dart';
import '../../../core/utils/inactivity_watchdog.dart';
import '../../../core/utils/markdown_stream_formatter.dart'; import '../../../core/utils/markdown_stream_formatter.dart';
import '../../../core/utils/tool_calls_parser.dart'; import '../../../core/utils/tool_calls_parser.dart';
import '../../../shared/services/tasks/task_queue.dart'; import '../../../shared/services/tasks/task_queue.dart';
@@ -83,9 +82,10 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
final List<StreamSubscription> _subscriptions = []; final List<StreamSubscription> _subscriptions = [];
final List<VoidCallback> _socketSubscriptions = []; final List<VoidCallback> _socketSubscriptions = [];
VoidCallback? _socketTeardown; VoidCallback? _socketTeardown;
// Activity-based watchdog to prevent stuck typing indicator
InactivityWatchdog? _typingWatchdog;
DateTime? _lastStreamingActivity; DateTime? _lastStreamingActivity;
Timer? _taskStatusTimer;
bool _taskStatusCheckInFlight = false;
bool _observedRemoteTask = false;
MarkdownStreamFormatter? _markdownFormatter; MarkdownStreamFormatter? _markdownFormatter;
String? _activeStreamingMessageId; String? _activeStreamingMessageId;
@@ -143,16 +143,20 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
// Cancel any existing message stream when switching conversations // Cancel any existing message stream when switching conversations
_cancelMessageStream(); _cancelMessageStream();
// Also cancel typing guard on conversation switch _stopRemoteTaskMonitor();
_cancelTypingGuard();
if (next != null) { if (next != null) {
state = next.messages; state = next.messages;
// Update selected model if conversation has a different model // Update selected model if conversation has a different model
_updateModelForConversation(next); _updateModelForConversation(next);
if (_hasStreamingAssistant) {
_ensureRemoteTaskMonitor();
}
} else { } else {
state = []; state = [];
_stopRemoteTaskMonitor();
} }
}); });
@@ -163,8 +167,7 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
_subscriptions.clear(); _subscriptions.clear();
_cancelMessageStream(); _cancelMessageStream();
cancelSocketSubscriptions(); _stopRemoteTaskMonitor();
_cancelTypingGuard();
_conversationListener?.close(); _conversationListener?.close();
_conversationListener = null; _conversationListener = null;
@@ -183,11 +186,7 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
} }
_clearStreamingFormatter(); _clearStreamingFormatter();
cancelSocketSubscriptions(); cancelSocketSubscriptions();
} _stopRemoteTaskMonitor();
void _cancelTypingGuard() {
_typingWatchdog?.stop();
_typingWatchdog = null;
} }
void _clearStreamingFormatter() { void _clearStreamingFormatter() {
@@ -195,6 +194,69 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
_activeStreamingMessageId = null; _activeStreamingMessageId = null;
} }
bool get _hasStreamingAssistant {
if (state.isEmpty) return false;
final last = state.last;
return last.role == 'assistant' && last.isStreaming;
}
void _ensureRemoteTaskMonitor() {
if (_taskStatusTimer != null) {
return;
}
_taskStatusTimer = Timer.periodic(const Duration(seconds: 5), (_) {
if (!_taskStatusCheckInFlight) {
unawaited(_syncRemoteTaskStatus());
}
});
if (!_taskStatusCheckInFlight) {
unawaited(_syncRemoteTaskStatus());
}
}
void _stopRemoteTaskMonitor() {
_taskStatusTimer?.cancel();
_taskStatusTimer = null;
_taskStatusCheckInFlight = false;
_observedRemoteTask = false;
}
Future<void> _syncRemoteTaskStatus() async {
if (_taskStatusCheckInFlight) {
return;
}
if (!_hasStreamingAssistant) {
_stopRemoteTaskMonitor();
return;
}
final api = ref.read(apiServiceProvider);
final activeConversation = ref.read(activeConversationProvider);
if (api == null || activeConversation == null) {
_stopRemoteTaskMonitor();
return;
}
_taskStatusCheckInFlight = true;
try {
final taskIds = await api.getTaskIdsByChat(activeConversation.id);
if (taskIds.isEmpty) {
if (_observedRemoteTask && _hasStreamingAssistant) {
finishStreaming();
} else if (!_observedRemoteTask) {
// No tasks reported yet; keep monitoring to allow registration.
}
} else {
_observedRemoteTask = true;
}
} catch (err, stack) {
DebugLogger.log('Task status poll failed: $err', scope: 'chat/provider');
debugPrintStack(stackTrace: stack);
} finally {
_taskStatusCheckInFlight = false;
}
}
void _ensureFormatterForMessage(ChatMessage message) { void _ensureFormatterForMessage(ChatMessage message) {
if (_markdownFormatter != null && _activeStreamingMessageId == message.id) { if (_markdownFormatter != null && _activeStreamingMessageId == message.id) {
return; return;
@@ -231,132 +293,16 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
return fallback; return fallback;
} }
void _scheduleTypingGuard({Duration? timeout}) {
// Default timeout tuned to balance long tool gaps and UX
final effectiveTimeout = timeout ?? const Duration(seconds: 25);
_typingWatchdog ??= InactivityWatchdog(
window: effectiveTimeout,
onTimeout: () async {
try {
if (state.isEmpty) return;
final last = state.last;
// Still the same streaming message and no finish signal
if (last.role == 'assistant' && last.isStreaming) {
// Attempt a soft recovery: if content is still empty, try fetching final content from server
if ((last.content).trim().isEmpty) {
try {
final apiSvc = ref.read(apiServiceProvider);
final activeConv = ref.read(activeConversationProvider);
final msgId = last.id;
final chatId = activeConv?.id;
if (apiSvc != null && chatId != null && chatId.isNotEmpty) {
final resp = await apiSvc.dio.get('/api/v1/chats/$chatId');
final data = resp.data as Map<String, dynamic>;
String content = '';
final chatObj = data['chat'] as Map<String, dynamic>?;
if (chatObj != null) {
final list = chatObj['messages'];
if (list is List) {
final target = list.firstWhere(
(m) => (m is Map && (m['id']?.toString() == msgId)),
orElse: () => null,
);
if (target != null) {
final rawContent = (target as Map)['content'];
if (rawContent is String) {
content = rawContent;
} else if (rawContent is List) {
final textItem = rawContent.firstWhere(
(i) => i is Map && i['type'] == 'text',
orElse: () => null,
);
if (textItem != null) {
content =
(textItem as Map)['text']?.toString() ?? '';
}
}
}
}
if (content.isEmpty) {
final history = chatObj['history'];
if (history is Map && history['messages'] is Map) {
final Map<String, dynamic> messagesMap =
(history['messages'] as Map)
.cast<String, dynamic>();
final msg = messagesMap[msgId];
if (msg is Map) {
final rawContent = msg['content'];
if (rawContent is String) {
content = rawContent;
} else if (rawContent is List) {
final textItem = rawContent.firstWhere(
(i) => i is Map && i['type'] == 'text',
orElse: () => null,
);
if (textItem != null) {
content =
(textItem as Map)['text']?.toString() ?? '';
}
}
}
}
}
}
if (content.isNotEmpty) {
replaceLastMessageContent(content);
}
}
} catch (_) {}
}
// Regardless of fetch result, ensure UI is not stuck
finishStreaming();
}
} finally {
_cancelTypingGuard();
}
},
);
_typingWatchdog!.setWindow(effectiveTimeout);
_typingWatchdog!.ping();
}
void _touchStreamingActivity() { void _touchStreamingActivity() {
_lastStreamingActivity = DateTime.now(); _lastStreamingActivity = DateTime.now();
// Keep guard alive while streaming if (_hasStreamingAssistant) {
if (state.isNotEmpty) { // Reset observed flag each time a new streaming session starts.
final last = state.last; if (_taskStatusTimer == null) {
if (last.role == 'assistant' && last.isStreaming) { _observedRemoteTask = false;
// Compute a dynamic timeout based on flow type
Duration timeout = const Duration(seconds: 25);
try {
final meta = last.metadata ?? const <String, dynamic>{};
final isBgFlow = (meta['backgroundFlow'] == true);
final isWebSearchFlow =
(meta['webSearchFlow'] == true) ||
(meta['webSearchActive'] == true);
final isImageGenFlow = (meta['imageGenerationFlow'] == true);
// Also consult global toggles if metadata not present
final globalWebSearch = ref.read(webSearchEnabledProvider);
final webSearchAvailable = ref.read(webSearchAvailableProvider);
final globalImageGen = ref.read(imageGenerationEnabledProvider);
// Extend guard windows to tolerate long reasoning/tools (> 1 min)
if (isWebSearchFlow || (globalWebSearch && webSearchAvailable)) {
if (timeout.inSeconds < 60) timeout = const Duration(seconds: 60);
}
if (isBgFlow) {
// Background tools/dynamic channel can be much longer
if (timeout.inSeconds < 120) timeout = const Duration(seconds: 120);
}
if (isImageGenFlow || globalImageGen) {
// Image generation tends to be the longest
if (timeout.inSeconds < 180) timeout = const Duration(seconds: 180);
}
} catch (_) {}
_scheduleTypingGuard(timeout: timeout);
} }
_ensureRemoteTaskMonitor();
} else {
_stopRemoteTaskMonitor();
} }
} }
@@ -641,7 +587,7 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
lastMessage.copyWith(isStreaming: false, content: cleaned), lastMessage.copyWith(isStreaming: false, content: cleaned),
]; ];
_messageStream = null; _messageStream = null;
_cancelTypingGuard(); _stopRemoteTaskMonitor();
// Trigger a refresh of the conversations list so UI like the Chats Drawer // Trigger a refresh of the conversations list so UI like the Chats Drawer
// can pick up updated titles and ordering once streaming completes. // can pick up updated titles and ordering once streaming completes.