Merge pull request #124 from cogwheel0/refactor-streaming-socket-management

refactor-streaming-socket-management
This commit is contained in:
cogwheel
2025-11-01 22:52:43 +05:30
committed by GitHub
4 changed files with 23 additions and 174 deletions

View File

@@ -390,119 +390,6 @@ final socketServiceProvider = Provider<SocketService?>((ref) {
return asyncService.maybeWhen(data: (service) => service, orElse: () => null);
});
enum SocketConnectionState { disconnected, connecting, connected }
@Riverpod(keepAlive: true)
class SocketConnectionStream extends _$SocketConnectionStream {
StreamController<SocketConnectionState>? _controller;
ProviderSubscription<AsyncValue<SocketService?>>? _serviceSubscription;
VoidCallback? _cancelConnectListener;
VoidCallback? _cancelDisconnectListener;
SocketConnectionState _latestState = SocketConnectionState.connecting;
@override
Stream<SocketConnectionState> build() {
final controller = StreamController<SocketConnectionState>.broadcast(
sync: true,
);
controller
..onListen = _primeState
..onCancel = _maybeNotifyDisconnected;
_controller = controller;
final initialService = ref
.watch(socketServiceManagerProvider)
.maybeWhen(data: (service) => service, orElse: () => null);
_handleServiceChange(initialService);
_serviceSubscription = ref.listen<AsyncValue<SocketService?>>(
socketServiceManagerProvider,
(_, next) => _handleServiceChange(
next.maybeWhen(data: (service) => service, orElse: () => null),
),
);
ref.onDispose(() {
_serviceSubscription?.close();
_serviceSubscription = null;
_unbindSocket();
_controller?.close();
_controller = null;
});
return controller.stream;
}
/// Publishes a disconnected state when the final listener cancels.
void _maybeNotifyDisconnected() {
try {
_controller?.add(SocketConnectionState.disconnected);
_latestState = SocketConnectionState.disconnected;
} catch (_) {}
}
/// Replays the cached state to new listeners.
void _primeState() {
try {
_controller?.add(_latestState);
} catch (_) {}
}
void _handleServiceChange(SocketService? service) {
if (service == null) {
_unbindSocket();
_emit(SocketConnectionState.connecting);
return;
}
_emit(
service.isConnected
? SocketConnectionState.connected
: SocketConnectionState.connecting,
);
_bindSocket(service);
}
void _bindSocket(SocketService service) {
_unbindSocket();
void handleConnect(dynamic _) {
_emit(SocketConnectionState.connected);
}
void handleDisconnect(dynamic _) {
_emit(SocketConnectionState.disconnected);
}
service.socket?.on('connect', handleConnect);
service.socket?.on('disconnect', handleDisconnect);
_cancelConnectListener = () {
service.socket?.off('connect', handleConnect);
};
_cancelDisconnectListener = () {
service.socket?.off('disconnect', handleDisconnect);
};
}
void _emit(SocketConnectionState next) {
if (_latestState == next) {
return;
}
_latestState = next;
try {
_controller?.add(next);
} catch (_) {}
}
void _unbindSocket() {
_cancelConnectListener?.call();
_cancelDisconnectListener?.call();
_cancelConnectListener = null;
_cancelDisconnectListener = null;
}
}
@Riverpod(keepAlive: true)
class ConversationDeltaStream extends _$ConversationDeltaStream {
StreamController<ConversationDelta>? _controller;

View File

@@ -1170,9 +1170,7 @@ class ApiService {
data,
debugLabel: 'parse_file_search',
);
return normalized
.map(FileInfo.fromJson)
.toList(growable: false);
return normalized.map(FileInfo.fromJson).toList(growable: false);
}
return const [];
}
@@ -1186,9 +1184,7 @@ class ApiService {
data,
debugLabel: 'parse_file_all',
);
return normalized
.map(FileInfo.fromJson)
.toList(growable: false);
return normalized.map(FileInfo.fromJson).toList(growable: false);
}
return const [];
}
@@ -1599,10 +1595,7 @@ class ApiService {
if (data is Map<String, dynamic>) {
final voices = data['voices'];
if (voices is List) {
return _normalizeList(
voices,
debugLabel: 'parse_voice_list',
);
return _normalizeList(voices, debugLabel: 'parse_voice_list');
}
}
if (data is List) {
@@ -1705,10 +1698,7 @@ class ApiService {
final response = await _dio.get('/api/v1/images/models');
final data = response.data;
if (data is List) {
return _normalizeList(
data,
debugLabel: 'parse_image_models',
);
return _normalizeList(data, debugLabel: 'parse_image_models');
}
return [];
}
@@ -2406,7 +2396,7 @@ class ApiService {
({
Stream<String> stream,
String messageId,
String sessionId,
String? sessionId,
String? socketSessionId,
bool isBackgroundFlow,
})
@@ -2432,10 +2422,9 @@ class ApiService {
(responseMessageId != null && responseMessageId.isNotEmpty)
? responseMessageId
: const Uuid().v4();
final sessionId =
(sessionIdOverride != null && sessionIdOverride.isNotEmpty)
? sessionIdOverride
: const Uuid().v4().substring(0, 20);
final bool hasSocketBinding =
sessionIdOverride != null && sessionIdOverride.isNotEmpty;
final String? sessionId = hasSocketBinding ? sessionIdOverride : null;
// NOTE: Previously used to branch for Gemini-specific handling; not needed now.
@@ -2586,14 +2575,20 @@ class ApiService {
);
// Attach identifiers to trigger background task processing on the server
data['session_id'] = sessionId;
if (sessionId != null) {
data['session_id'] = sessionId;
}
data['id'] = messageId;
if (conversationId != null) {
data['chat_id'] = conversationId;
}
// Attach background_tasks if provided
if (backgroundTasks != null && backgroundTasks.isNotEmpty) {
final bool includeBackgroundTasks =
backgroundTasks != null &&
backgroundTasks.isNotEmpty &&
sessionId != null;
if (includeBackgroundTasks) {
data['background_tasks'] = backgroundTasks;
}
@@ -2717,7 +2712,7 @@ class ApiService {
messageId: messageId,
sessionId: sessionId,
socketSessionId: socketSessionId,
isBackgroundFlow: true,
isBackgroundFlow: includeBackgroundTasks,
);
}
@@ -3123,10 +3118,7 @@ class ApiService {
final data = response.data;
if (data is List) {
return _normalizeList(
data,
debugLabel: 'parse_message_search',
);
return _normalizeList(data, debugLabel: 'parse_message_search');
}
if (data is Map<String, dynamic>) {
final list = (data['items'] ?? data['results'] ?? data['messages']);

View File

@@ -2,10 +2,7 @@ import 'package:flutter/widgets.dart';
import 'package:socket_io_client/socket_io_client.dart' as io;
import '../models/server_config.dart';
import '../../l10n/app_localizations.dart';
import 'navigation_service.dart';
import 'socket_tls_override.dart';
import '../../shared/utils/ui_utils.dart';
typedef SocketChatEventHandler =
void Function(
@@ -282,37 +279,9 @@ class SocketService with WidgetsBindingObserver {
}
}
void _handleConnectError(dynamic err) {
// Show user-facing error notification
final context = NavigationService.context;
if (context != null) {
final l10n = AppLocalizations.of(context);
if (l10n != null) {
UiUtils.showMessage(
context,
l10n.websocketConnectionError,
isError: true,
duration: const Duration(seconds: 5),
);
}
}
}
void _handleConnectError(dynamic err) {}
void _handleReconnectFailed(dynamic _) {
// Show user-facing error notification
final context = NavigationService.context;
if (context != null) {
final l10n = AppLocalizations.of(context);
if (l10n != null) {
UiUtils.showMessage(
context,
l10n.websocketReconnectFailed,
isError: true,
duration: const Duration(seconds: 5),
);
}
}
}
void _handleReconnectFailed(dynamic _) {}
void _handleDisconnect(dynamic reason) {
// Silent disconnect

View File

@@ -135,7 +135,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
required String assistantMessageId,
required String modelId,
required Map<String, dynamic> modelItem,
required String sessionId,
required String? sessionId,
required String? activeConversationId,
required ApiService api,
required SocketService? socketService,
@@ -238,6 +238,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
'conversationId': activeConversationId,
'messageId': assistantMessageId,
'modelId': modelId,
if (sessionId != null) 'sessionId': sessionId,
},
);
api.registerPersistentStreamForMessage(assistantMessageId, streamId);