feat(streaming): Improve task status polling and socket recovery

This commit is contained in:
cogwheel0
2025-12-01 11:52:04 +05:30
parent a245c4248f
commit 8005cbf6e0
2 changed files with 26 additions and 9 deletions

View File

@@ -204,12 +204,13 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
final hasSocketSignals = final hasSocketSignals =
socketService != null || registerDeltaListener != null; socketService != null || registerDeltaListener != null;
if (hasSocketSignals) { if (hasSocketSignals) {
// Use a reasonable inactivity timeout - if no data arrives for 45 seconds, // Use a short inactivity timeout - if no data arrives for 10 seconds,
// something is likely wrong with the connection // something is likely wrong with the connection. Combined with 1-second
// polling and server state sync, this provides fast recovery.
socketWatchdog = InactivityWatchdog( socketWatchdog = InactivityWatchdog(
window: const Duration(seconds: 45), window: const Duration(seconds: 10),
// Absolute cap ensures streaming never gets stuck indefinitely // Absolute cap ensures streaming never gets stuck indefinitely
absoluteCap: const Duration(minutes: 10), absoluteCap: const Duration(minutes: 5),
onTimeout: () { onTimeout: () {
DebugLogger.log( DebugLogger.log(
'Socket watchdog timeout - finishing streaming gracefully', 'Socket watchdog timeout - finishing streaming gracefully',

View File

@@ -109,6 +109,10 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
bool _taskStatusCheckInFlight = false; bool _taskStatusCheckInFlight = false;
bool _observedRemoteTask = false; bool _observedRemoteTask = false;
/// Counts consecutive polls where no tasks were observed.
/// Used to trigger fallback server check if task registration was missed.
int _noTaskPollCount = 0;
MarkdownStreamFormatter? _markdownFormatter; MarkdownStreamFormatter? _markdownFormatter;
String? _activeStreamingMessageId; String? _activeStreamingMessageId;
@@ -280,7 +284,9 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
if (_taskStatusTimer != null) { if (_taskStatusTimer != null) {
return; return;
} }
_taskStatusTimer = Timer.periodic(const Duration(seconds: 5), (_) { // Poll every second for fast recovery from missed socket events.
// This is a lightweight API call and provides the best UX for stuck streaming.
_taskStatusTimer = Timer.periodic(const Duration(seconds: 1), (_) {
if (!_taskStatusCheckInFlight) { if (!_taskStatusCheckInFlight) {
unawaited(_syncRemoteTaskStatus()); unawaited(_syncRemoteTaskStatus());
} }
@@ -295,6 +301,7 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
_taskStatusTimer = null; _taskStatusTimer = null;
_taskStatusCheckInFlight = false; _taskStatusCheckInFlight = false;
_observedRemoteTask = false; _observedRemoteTask = false;
_noTaskPollCount = 0;
} }
Future<void> _syncRemoteTaskStatus() async { Future<void> _syncRemoteTaskStatus() async {
@@ -321,17 +328,26 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
if (hasActiveTasks) { if (hasActiveTasks) {
_observedRemoteTask = true; _observedRemoteTask = true;
_noTaskPollCount = 0;
} else {
_noTaskPollCount++;
} }
// When no active tasks and we previously observed tasks, streaming should be done. // When no active tasks and we previously observed tasks, streaming should be done.
// Run secondary check to sync any missed content from server.
final tasksDone = _observedRemoteTask && !hasActiveTasks; final tasksDone = _observedRemoteTask && !hasActiveTasks;
// Fallback: If we've polled exactly 3 times without ever seeing tasks,
// something may be wrong - check server state directly. This catches cases
// where task registration was completely missed due to socket issues.
// Using == 3 instead of >= 3 ensures this only triggers once, not every poll.
final fallbackCheck = !_observedRemoteTask && _noTaskPollCount == 3;
// Secondary check: fetch conversation from server and compare message state // Secondary check: fetch conversation from server and compare message state
// This catches cases where the done signal was missed AND syncs any missed content. // This catches cases where the done signal was missed AND syncs any missed content.
// Only run when tasks have actually completed (were observed and are now gone), // Runs when:
// not on every poll before tasks are registered. // 1. Tasks completed (were observed and are now gone), OR
if (_hasStreamingAssistant && tasksDone) { // 2. Fallback: No tasks ever observed after exactly 3 polls (one-time check)
if (_hasStreamingAssistant && (tasksDone || fallbackCheck)) {
try { try {
final serverConversation = await api.getConversation( final serverConversation = await api.getConversation(
activeConversation.id, activeConversation.id,