Merge pull request #202 from cogwheel0/streaming-task-status-polling
feat(streaming): Improve task status polling and socket recovery
This commit is contained in:
@@ -204,12 +204,13 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
final hasSocketSignals =
|
||||
socketService != null || registerDeltaListener != null;
|
||||
if (hasSocketSignals) {
|
||||
// Use a reasonable inactivity timeout - if no data arrives for 45 seconds,
|
||||
// something is likely wrong with the connection
|
||||
// Use a short inactivity timeout - if no data arrives for 10 seconds,
|
||||
// something is likely wrong with the connection. Combined with 1-second
|
||||
// polling and server state sync, this provides fast recovery.
|
||||
socketWatchdog = InactivityWatchdog(
|
||||
window: const Duration(seconds: 45),
|
||||
window: const Duration(seconds: 10),
|
||||
// Absolute cap ensures streaming never gets stuck indefinitely
|
||||
absoluteCap: const Duration(minutes: 10),
|
||||
absoluteCap: const Duration(minutes: 5),
|
||||
onTimeout: () {
|
||||
DebugLogger.log(
|
||||
'Socket watchdog timeout - finishing streaming gracefully',
|
||||
|
||||
@@ -109,6 +109,10 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
|
||||
bool _taskStatusCheckInFlight = false;
|
||||
bool _observedRemoteTask = false;
|
||||
|
||||
/// Counts consecutive polls where no tasks were observed.
|
||||
/// Used to trigger fallback server check if task registration was missed.
|
||||
int _noTaskPollCount = 0;
|
||||
|
||||
MarkdownStreamFormatter? _markdownFormatter;
|
||||
String? _activeStreamingMessageId;
|
||||
|
||||
@@ -280,7 +284,9 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
|
||||
if (_taskStatusTimer != null) {
|
||||
return;
|
||||
}
|
||||
_taskStatusTimer = Timer.periodic(const Duration(seconds: 5), (_) {
|
||||
// Poll every second for fast recovery from missed socket events.
|
||||
// This is a lightweight API call and provides the best UX for stuck streaming.
|
||||
_taskStatusTimer = Timer.periodic(const Duration(seconds: 1), (_) {
|
||||
if (!_taskStatusCheckInFlight) {
|
||||
unawaited(_syncRemoteTaskStatus());
|
||||
}
|
||||
@@ -295,6 +301,7 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
|
||||
_taskStatusTimer = null;
|
||||
_taskStatusCheckInFlight = false;
|
||||
_observedRemoteTask = false;
|
||||
_noTaskPollCount = 0;
|
||||
}
|
||||
|
||||
Future<void> _syncRemoteTaskStatus() async {
|
||||
@@ -321,17 +328,26 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
|
||||
|
||||
if (hasActiveTasks) {
|
||||
_observedRemoteTask = true;
|
||||
_noTaskPollCount = 0;
|
||||
} else {
|
||||
_noTaskPollCount++;
|
||||
}
|
||||
|
||||
// When no active tasks and we previously observed tasks, streaming should be done.
|
||||
// Run secondary check to sync any missed content from server.
|
||||
final tasksDone = _observedRemoteTask && !hasActiveTasks;
|
||||
|
||||
// Fallback: If we've polled exactly 3 times without ever seeing tasks,
|
||||
// something may be wrong - check server state directly. This catches cases
|
||||
// where task registration was completely missed due to socket issues.
|
||||
// Using == 3 instead of >= 3 ensures this only triggers once, not every poll.
|
||||
final fallbackCheck = !_observedRemoteTask && _noTaskPollCount == 3;
|
||||
|
||||
// Secondary check: fetch conversation from server and compare message state
|
||||
// This catches cases where the done signal was missed AND syncs any missed content.
|
||||
// Only run when tasks have actually completed (were observed and are now gone),
|
||||
// not on every poll before tasks are registered.
|
||||
if (_hasStreamingAssistant && tasksDone) {
|
||||
// Runs when:
|
||||
// 1. Tasks completed (were observed and are now gone), OR
|
||||
// 2. Fallback: No tasks ever observed after exactly 3 polls (one-time check)
|
||||
if (_hasStreamingAssistant && (tasksDone || fallbackCheck)) {
|
||||
try {
|
||||
final serverConversation = await api.getConversation(
|
||||
activeConversation.id,
|
||||
|
||||
Reference in New Issue
Block a user