refactor: removing legacy socket code

This commit is contained in:
cogwheel0
2025-09-25 12:28:02 +05:30
parent 5f013b1b73
commit 637274133f
2 changed files with 65 additions and 1546 deletions

View File

@@ -6,6 +6,7 @@ import 'package:flutter/foundation.dart';
import '../../core/models/chat_message.dart';
import '../../core/services/persistent_streaming_service.dart';
import '../../core/services/socket_service.dart';
import '../../core/utils/inactivity_watchdog.dart';
import '../../core/utils/stream_chunker.dart';
import '../../core/utils/tool_calls_parser.dart';
@@ -68,6 +69,27 @@ StreamSubscription<String> attachUnifiedChunkedStreaming({
},
);
InactivityWatchdog? socketWatchdog;
if (socketService != null) {
socketWatchdog = InactivityWatchdog(
window: const Duration(minutes: 5),
onTimeout: () {
try {
socketService.offChatEvents();
socketService.offChannelEvents();
} catch (_) {}
try {
final msgs = getMessages();
if (msgs.isNotEmpty &&
msgs.last.role == 'assistant' &&
msgs.last.isStreaming) {
finishStreaming();
}
} catch (_) {}
},
)..start();
}
bool isSearching = false;
bool suppressSocketContent = suppressSocketContentInitially;
bool usingDynamicChannel = usingDynamicChannelInitially;
@@ -185,6 +207,7 @@ StreamSubscription<String> attachUnifiedChunkedStreaming({
try {
if (line is String) {
final s = line.trim();
socketWatchdog?.ping();
if (s == '[DONE]' || s == 'DONE') {
try {
socketService?.offEvent(channel);
@@ -202,6 +225,7 @@ StreamSubscription<String> attachUnifiedChunkedStreaming({
);
} catch (_) {}
finishStreaming();
socketWatchdog?.stop();
return;
}
if (s.startsWith('data:')) {
@@ -222,6 +246,7 @@ StreamSubscription<String> attachUnifiedChunkedStreaming({
);
} catch (_) {}
finishStreaming();
socketWatchdog?.stop();
return;
}
try {
@@ -280,11 +305,13 @@ StreamSubscription<String> attachUnifiedChunkedStreaming({
}
}
} else if (line is Map) {
socketWatchdog?.ping();
if (line['done'] == true) {
try {
socketService?.offEvent(channel);
} catch (_) {}
finishStreaming();
socketWatchdog?.stop();
return;
}
}
@@ -294,10 +321,12 @@ StreamSubscription<String> attachUnifiedChunkedStreaming({
try {
socketService?.onEvent(channel, handler);
} catch (_) {}
socketWatchdog?.ping();
Future.delayed(const Duration(minutes: 3), () {
try {
socketService?.offEvent(channel);
} catch (_) {}
socketWatchdog?.stop();
});
}
@@ -307,6 +336,7 @@ StreamSubscription<String> attachUnifiedChunkedStreaming({
if (data == null) return;
final type = data['type'];
final payload = data['data'];
socketWatchdog?.ping();
if (type == 'chat:completion' && payload != null) {
if (payload is Map<String, dynamic>) {
@@ -471,6 +501,7 @@ StreamSubscription<String> attachUnifiedChunkedStreaming({
}
}
finishStreaming();
socketWatchdog?.stop();
}
}
} else if (type == 'chat:message:error' && payload != null) {
@@ -497,6 +528,7 @@ StreamSubscription<String> attachUnifiedChunkedStreaming({
} catch (_) {}
// Ensure UI exits streaming state
finishStreaming();
socketWatchdog?.stop();
} else if ((type == 'chat:message:delta' || type == 'message') &&
payload != null) {
// Incremental message content over socket; respect suppression on SSE-driven flows
@@ -672,6 +704,7 @@ StreamSubscription<String> attachUnifiedChunkedStreaming({
finishStreaming();
}
} catch (_) {}
socketWatchdog?.stop();
});
}
@@ -727,6 +760,7 @@ StreamSubscription<String> attachUnifiedChunkedStreaming({
if (!usingDynamicChannel && !isBackgroundFlow) {
finishStreaming();
}
socketWatchdog?.stop();
},
onError: (error) async {
try {
@@ -738,6 +772,7 @@ StreamSubscription<String> attachUnifiedChunkedStreaming({
socketService.offChatEvents();
} catch (_) {}
}
socketWatchdog?.stop();
},
);