feat: inactivity watchdog for sockets

This commit is contained in:
cogwheel0
2025-09-07 23:17:26 +05:30
parent a850a567a1
commit 30f1650faf
3 changed files with 242 additions and 110 deletions

View File

@@ -333,7 +333,9 @@ class PersistentStreamingService with WidgetsBindingObserver {
final lastUpdate = metadata['lastUpdate'] as DateTime?;
if (lastUpdate != null) {
final timeSinceUpdate = DateTime.now().difference(lastUpdate);
return timeSinceUpdate > const Duration(minutes: 1);
// Align with app-side watchdogs: be less aggressive than UI guard
// but still attempt recovery before server timeouts become likely.
return timeSinceUpdate > const Duration(minutes: 2);
}
return false;

View File

@@ -0,0 +1,83 @@
import 'dart:async';
/// A simple activity-based watchdog.
///
/// Call [ping] whenever activity occurs. If no activity happens
/// within [window], [onTimeout] fires. Optionally, an [absoluteCap]
/// enforces a maximum total duration regardless of activity.
class InactivityWatchdog {
InactivityWatchdog({
required Duration window,
required this.onTimeout,
Duration? absoluteCap,
}) : _window = window,
_absoluteCap = absoluteCap;
final void Function() onTimeout;
Duration _window;
Duration? _absoluteCap;
Timer? _timer;
Timer? _absoluteTimer;
bool _started = false;
Duration get window => _window;
void setWindow(Duration newWindow) {
_window = newWindow;
if (_started) {
// Restart timer with new window
_restart();
}
}
void setAbsoluteCap(Duration? cap) {
_absoluteCap = cap;
if (_started) {
_absoluteTimer?.cancel();
if (_absoluteCap != null) {
_absoluteTimer = Timer(_absoluteCap!, _fire);
}
}
}
void start() {
if (_started) return;
_started = true;
_restart();
if (_absoluteCap != null) {
_absoluteTimer = Timer(_absoluteCap!, _fire);
}
}
void ping() {
if (!_started) {
start();
return;
}
_restart();
}
void stop() {
_timer?.cancel();
_timer = null;
_absoluteTimer?.cancel();
_absoluteTimer = null;
_started = false;
}
void dispose() => stop();
void _restart() {
_timer?.cancel();
_timer = Timer(_window, _fire);
}
void _fire() {
stop();
try {
onTimeout();
} catch (_) {}
}
}

View File

@@ -13,6 +13,7 @@ import '../../../core/auth/auth_state_manager.dart';
import '../../../core/utils/stream_chunker.dart';
import '../../../core/services/persistent_streaming_service.dart';
import '../../../core/utils/debug_logger.dart';
import '../../../core/utils/inactivity_watchdog.dart';
import '../services/reviewer_mode_service.dart';
import '../../../shared/services/tasks/task_queue.dart';
import '../../tools/providers/tools_providers.dart';
@@ -40,8 +41,8 @@ class ChatMessagesNotifier extends StateNotifier<List<ChatMessage>> {
StreamSubscription? _messageStream;
ProviderSubscription? _conversationListener;
final List<StreamSubscription> _subscriptions = [];
// Inactivity watchdog to prevent stuck typing indicator
Timer? _typingStuckGuard;
// Activity-based watchdog to prevent stuck typing indicator
InactivityWatchdog? _typingWatchdog;
ChatMessagesNotifier(this._ref) : super([]) {
// Load messages when conversation changes with proper cleanup
@@ -115,15 +116,16 @@ class ChatMessagesNotifier extends StateNotifier<List<ChatMessage>> {
}
void _cancelTypingGuard() {
_typingStuckGuard?.cancel();
_typingStuckGuard = null;
_typingWatchdog?.stop();
_typingWatchdog = null;
}
void _scheduleTypingGuard({Duration? timeout}) {
// Default timeout tuned to balance long tool gaps and UX
final effectiveTimeout = timeout ?? const Duration(seconds: 25);
_typingStuckGuard?.cancel();
_typingStuckGuard = Timer(effectiveTimeout, () async {
_typingWatchdog ??= InactivityWatchdog(
window: effectiveTimeout,
onTimeout: () async {
try {
if (state.isEmpty) return;
final last = state.last;
@@ -167,7 +169,8 @@ class ChatMessagesNotifier extends StateNotifier<List<ChatMessage>> {
final history = chatObj['history'];
if (history is Map && history['messages'] is Map) {
final Map<String, dynamic> messagesMap =
(history['messages'] as Map).cast<String, dynamic>();
(history['messages'] as Map)
.cast<String, dynamic>();
final msg = messagesMap[msgId];
if (msg is Map) {
final rawContent = msg['content'];
@@ -199,7 +202,10 @@ class ChatMessagesNotifier extends StateNotifier<List<ChatMessage>> {
} finally {
_cancelTypingGuard();
}
});
},
);
_typingWatchdog!.setWindow(effectiveTimeout);
_typingWatchdog!.ping();
}
void _touchStreamingActivity() {
@@ -222,20 +228,17 @@ class ChatMessagesNotifier extends StateNotifier<List<ChatMessage>> {
final webSearchAvailable = _ref.read(webSearchAvailableProvider);
final globalImageGen = _ref.read(imageGenerationEnabledProvider);
// Extend guard windows to tolerate long reasoning/tools (> 1 min)
if (isWebSearchFlow || (globalWebSearch && webSearchAvailable)) {
timeout = Duration(
milliseconds: timeout.inMilliseconds.clamp(0, 45000),
);
// If current < 45s, bump to 45s
if (timeout.inSeconds < 45) timeout = const Duration(seconds: 45);
if (timeout.inSeconds < 60) timeout = const Duration(seconds: 60);
}
if (isBgFlow) {
// Background tools/dynamic channel can be longer
if (timeout.inSeconds < 60) timeout = const Duration(seconds: 60);
// Background tools/dynamic channel can be much longer
if (timeout.inSeconds < 120) timeout = const Duration(seconds: 120);
}
if (isImageGenFlow || globalImageGen) {
// Image generation tends to be the longest
if (timeout.inSeconds < 90) timeout = const Duration(seconds: 90);
if (timeout.inSeconds < 180) timeout = const Duration(seconds: 180);
}
} catch (_) {}
@@ -762,7 +765,7 @@ Future<void> regenerateMessage(
final bool isLastUser = (i == messages.length - 1) && msg.role == 'user';
final List<String> messageAttachments =
(isLastUser && (attachments != null && attachments.isNotEmpty))
? List<String>.from(attachments!)
? List<String>.from(attachments)
: (msg.attachmentIds ?? const <String>[]);
if (messageAttachments.isNotEmpty) {
@@ -1468,6 +1471,26 @@ Future<void> _sendMessageInternal(
} catch (_) {}
if (socketService != null) {
// Activity-based watchdog for chat/channel events (resets on activity)
final _chatWatchdog = InactivityWatchdog(
window: const Duration(minutes: 5),
onTimeout: () {
try {
socketService.offChatEvents();
socketService.offChannelEvents();
} catch (_) {}
// As a final safeguard, if we're still in streaming state, finish it
try {
final msgs = ref.read(chatMessagesProvider);
if (msgs.isNotEmpty &&
msgs.last.role == 'assistant' &&
msgs.last.isStreaming) {
ref.read(chatMessagesProvider.notifier).finishStreaming();
}
} catch (_) {}
},
)..start();
void chatHandler(Map<String, dynamic> ev) {
try {
final data = ev['data'];
@@ -1475,6 +1498,9 @@ Future<void> _sendMessageInternal(
final type = data['type'];
final payload = data['data'];
DebugLogger.stream('Socket chat-events: type=$type');
// Any chat event indicates activity; reset inactivity watchdog
// (watchdog defined below, near handler registration)
_chatWatchdog.ping();
if (type == 'chat:completion' && payload != null) {
if (payload is Map<String, dynamic>) {
// Provider may emit tool_calls at the top level
@@ -1591,6 +1617,10 @@ Future<void> _sendMessageInternal(
try {
socketService.offChatEvents();
} catch (_) {}
try {
_chatWatchdog.ping(); // ensure timer exists
_chatWatchdog.stop();
} catch (_) {}
// Notify server that chat is completed (mirrors web client)
try {
@@ -1703,6 +1733,9 @@ Future<void> _sendMessageInternal(
}
// Normal path: finish now
ref.read(chatMessagesProvider.notifier).finishStreaming();
try {
_chatWatchdog.stop();
} catch (_) {}
}
}
} else if (type == 'request:chat:completion' && payload != null) {
@@ -1722,6 +1755,10 @@ Future<void> _sendMessageInternal(
try {
if (line is String) {
final s = line.trim();
// Dynamic channel activity
try {
_chatWatchdog.ping();
} catch (_) {}
DebugLogger.stream(
'Socket [$channel] line=${s.length > 160 ? '${s.substring(0, 160)}' : s}',
);
@@ -1988,27 +2025,15 @@ Future<void> _sendMessageInternal(
.read(chatMessagesProvider.notifier)
.appendToLastMessage(content);
_updateImagesFromCurrentContent(ref);
_chatWatchdog.ping();
}
}
} catch (_) {}
}
socketService.onChannelEvents(channelEventsHandler);
Future.delayed(const Duration(seconds: 90), () {
try {
socketService.offChatEvents();
socketService.offChannelEvents();
} catch (_) {}
// As a final safeguard, if we're still in streaming state, finish it to avoid stuck UI
try {
final msgs = ref.read(chatMessagesProvider);
if (msgs.isNotEmpty &&
msgs.last.role == 'assistant' &&
msgs.last.isStreaming) {
ref.read(chatMessagesProvider.notifier).finishStreaming();
}
} catch (_) {}
});
// Start activity watchdog
_chatWatchdog.ping();
}
// Prepare streaming and background handling
@@ -2063,8 +2088,17 @@ Future<void> _sendMessageInternal(
// Helpers were defined above
int _chunkSeq = 0;
final streamSubscription = persistentController.stream.listen(
(chunk) {
_chunkSeq += 1;
try {
persistentService.updateStreamProgress(
streamId,
chunkSequence: _chunkSeq,
appendedContent: chunk,
);
} catch (_) {}
var effectiveChunk = chunk;
// Check for web search indicators in the stream
if (webSearchEnabled && !isSearching) {
@@ -2960,11 +2994,32 @@ void _attachSocketStreamingHandlers({
final api = ref.read(apiServiceProvider);
// Activity-based watchdog for socket-driven streaming (resets on activity)
final _socketWatchdog = InactivityWatchdog(
window: const Duration(minutes: 5),
onTimeout: () {
try {
socketService.offChatEvents();
socketService.offChannelEvents();
} catch (_) {}
try {
final msgs = ref.read(chatMessagesProvider);
if (msgs.isNotEmpty &&
msgs.last.role == 'assistant' &&
msgs.last.isStreaming) {
ref.read(chatMessagesProvider.notifier).finishStreaming();
}
} catch (_) {}
},
)..start();
void channelLineHandlerFactory(String channel) {
void handler(dynamic line) {
try {
if (line is String) {
final s = line.trim();
// Any socket line is activity
_socketWatchdog.ping();
if (s == '[DONE]' || s == 'DONE') {
try {
socketService.offEvent(channel);
@@ -2982,6 +3037,7 @@ void _attachSocketStreamingHandlers({
);
} catch (_) {}
ref.read(chatMessagesProvider.notifier).finishStreaming();
_socketWatchdog.stop();
return;
}
if (s.startsWith('data:')) {
@@ -3003,6 +3059,7 @@ void _attachSocketStreamingHandlers({
);
} catch (_) {}
ref.read(chatMessagesProvider.notifier).finishStreaming();
_socketWatchdog.stop();
return;
}
try {
@@ -3065,11 +3122,13 @@ void _attachSocketStreamingHandlers({
}
}
} else if (line is Map) {
_socketWatchdog.ping();
if (line['done'] == true) {
try {
socketService.offEvent(channel);
} catch (_) {}
ref.read(chatMessagesProvider.notifier).finishStreaming();
_socketWatchdog.stop();
return;
}
}
@@ -3077,11 +3136,8 @@ void _attachSocketStreamingHandlers({
}
socketService.onEvent(channel, handler);
Future.delayed(const Duration(minutes: 3), () {
try {
socketService.offEvent(channel);
} catch (_) {}
});
// Start activity watchdog now that handler is attached
_socketWatchdog.ping();
}
void chatHandler(Map<String, dynamic> ev) {
@@ -3175,6 +3231,9 @@ void _attachSocketStreamingHandlers({
try {
socketService.offChatEvents();
} catch (_) {}
try {
_socketWatchdog.stop();
} catch (_) {}
try {
unawaited(
api
@@ -3326,20 +3385,8 @@ void _attachSocketStreamingHandlers({
socketService.onChatEvents(chatHandler);
socketService.onChannelEvents(channelEventsHandler);
Future.delayed(const Duration(seconds: 90), () {
try {
socketService.offChatEvents();
socketService.offChannelEvents();
} catch (_) {}
try {
final msgs = ref.read(chatMessagesProvider);
if (msgs.isNotEmpty &&
msgs.last.role == 'assistant' &&
msgs.last.isStreaming) {
ref.read(chatMessagesProvider.notifier).finishStreaming();
}
} catch (_) {}
});
// Start activity watchdog for chat/channel events
_socketWatchdog.ping();
}
// ========== Tool Servers (OpenAPI) Helpers ==========