feat: enhanced sockets, tuned retries and polling fallback

This commit is contained in:
cogwheel0
2025-09-07 11:13:05 +05:30
parent 3decf9d46b
commit a16fb86e27
11 changed files with 519 additions and 138 deletions

View File

@@ -196,11 +196,16 @@ final socketServiceProvider = Provider<SocketService?>((ref) {
final activeServer = ref.watch(activeServerProvider);
final token = ref.watch(authTokenProvider3);
final transportMode = ref.watch(appSettingsProvider).socketTransportMode; // 'auto' or 'ws'
return activeServer.maybeWhen(
data: (server) {
if (server == null) return null;
final s = SocketService(serverConfig: server, authToken: token);
final s = SocketService(
serverConfig: server,
authToken: token,
websocketOnly: transportMode == 'ws',
);
// best-effort connect; errors handled internally
// ignore unawaited_futures
s.connect();

View File

@@ -6,6 +6,8 @@ import 'package:flutter_riverpod/flutter_riverpod.dart';
import '../providers/app_providers.dart';
import '../../features/auth/providers/unified_auth_providers.dart';
import '../services/navigation_service.dart';
import '../models/conversation.dart';
import '../services/background_streaming_handler.dart';
import '../../features/onboarding/views/onboarding_sheet.dart';
import '../../shared/theme/theme_extensions.dart';
import '../utils/debug_logger.dart';
@@ -28,6 +30,12 @@ final appStartupFlowProvider = Provider<void>((ref) {
ref.watch(socketServiceProvider);
}
// Ensure resume-triggered foreground refresh is active
ref.watch(foregroundRefreshProvider);
// Keep Socket.IO connection alive in background within platform limits
ref.watch(socketPersistenceProvider);
// When auth state becomes authenticated, run additional background work
ref.listen<AuthNavigationState>(authNavigationStateProvider,
(prev, next) {
@@ -67,6 +75,128 @@ final appStartupFlowProvider = Provider<void>((ref) {
});
});
/// Listens to app lifecycle and refreshes server state when app returns to foreground.
///
/// Rationale: Socket.IO does not replay historical events. If the app was suspended,
/// we may miss updates. On resume, invalidate conversations to reconcile state.
final foregroundRefreshProvider = Provider<void>((ref) {
final observer = _ForegroundRefreshObserver(ref);
WidgetsBinding.instance.addObserver(observer);
ref.onDispose(() => WidgetsBinding.instance.removeObserver(observer));
});
class _ForegroundRefreshObserver extends WidgetsBindingObserver {
final Ref _ref;
_ForegroundRefreshObserver(this._ref);
@override
void didChangeAppLifecycleState(AppLifecycleState state) {
if (state == AppLifecycleState.resumed) {
// Schedule to avoid side-effects during build frames
Future.microtask(() {
try {
_ref.invalidate(conversationsProvider);
} catch (_) {}
});
}
}
}
/// Attempts to keep the realtime socket connection alive while the app is
/// backgrounded, similar to how PersistentStreamingService works for streams.
///
/// Notes:
/// - iOS: limited to short background task windows; we send periodic keepAlive.
/// - Android: uses existing foreground service notification.
final socketPersistenceProvider = Provider<void>((ref) {
final observer = _SocketPersistenceObserver(ref);
WidgetsBinding.instance.addObserver(observer);
// React to active conversation changes while backgrounded
final sub = ref.listen<Conversation?>(
activeConversationProvider,
(prev, next) => observer.onActiveConversationChanged(),
);
ref.onDispose(() => WidgetsBinding.instance.removeObserver(observer));
ref.onDispose(sub.close);
});
class _SocketPersistenceObserver extends WidgetsBindingObserver {
final Ref _ref;
_SocketPersistenceObserver(this._ref);
static const String _socketId = 'socket-keepalive';
Timer? _heartbeat;
bool _bgActive = false;
bool _isBackgrounded = false;
bool _shouldKeepAlive() {
final authed =
_ref.read(authNavigationStateProvider) ==
AuthNavigationState.authenticated;
final hasConversation = _ref.read(activeConversationProvider) != null;
return authed && hasConversation;
}
void _startBackground() {
if (_bgActive) return;
if (!_shouldKeepAlive()) return;
try {
BackgroundStreamingHandler.instance
.startBackgroundExecution([_socketId]);
// Periodic keep-alive (primarily useful on iOS)
_heartbeat?.cancel();
_heartbeat =
Timer.periodic(const Duration(seconds: 30), (_) async {
try {
await BackgroundStreamingHandler.instance.keepAlive();
} catch (_) {}
});
_bgActive = true;
} catch (_) {}
}
void _stopBackground() {
if (!_bgActive) return;
try {
BackgroundStreamingHandler.instance
.stopBackgroundExecution([_socketId]);
} catch (_) {}
_heartbeat?.cancel();
_heartbeat = null;
_bgActive = false;
}
@override
void didChangeAppLifecycleState(AppLifecycleState state) {
switch (state) {
case AppLifecycleState.paused:
case AppLifecycleState.inactive:
_isBackgrounded = true;
_startBackground();
break;
case AppLifecycleState.resumed:
_isBackgrounded = false;
_stopBackground();
break;
case AppLifecycleState.detached:
case AppLifecycleState.hidden:
_isBackgrounded = false;
_stopBackground();
break;
}
}
// Called when active conversation changes; only acts during background
void onActiveConversationChanged() {
if (!_isBackgrounded) return;
if (_shouldKeepAlive()) {
_startBackground();
} else {
_stopBackground();
}
}
}
Future<void> _maybeShowOnboarding(Ref ref) async {
try {
final storage = ref.read(optimizedStorageServiceProvider);

View File

@@ -2907,7 +2907,8 @@ class ApiService {
bool containsDone(String s) =>
s.contains('<details type="tool_calls"') && s.contains('done="true"');
while (DateTime.now().difference(started).inSeconds < 60) {
// Allow longer time for large completions (e.g., long stories)
while (DateTime.now().difference(started).inSeconds < 180) {
try {
// Small delay between polls
await Future.delayed(const Duration(milliseconds: 900));
@@ -3069,14 +3070,15 @@ class ApiService {
break;
}
// If content hasn't changed for a few polls, assume completion
// If content hasn't changed for a few polls, assume completion,
// but do not early-exit while content is still empty.
final prev = last;
if (content == prev) {
if (content == prev && content.isNotEmpty) {
stableCount++;
} else {
} else if (content != prev) {
stableCount = 0;
}
if (stableCount >= 3) {
if (content.isNotEmpty && stableCount >= 3) {
break;
}
@@ -3086,6 +3088,66 @@ class ApiService {
}
}
// Final backfill: one last attempt to fetch the latest content
// in case the server wrote the final message after our last poll.
try {
if (!streamController.isClosed) {
final resp = await _dio.get('/api/v1/chats/$chatId');
final data = resp.data as Map<String, dynamic>;
String content = '';
Map<String, dynamic>? chatObj = (data['chat'] is Map<String, dynamic>)
? data['chat'] as Map<String, dynamic>
: null;
if (chatObj != null && chatObj['messages'] is List) {
final List messagesList = chatObj['messages'] as List;
final target = messagesList.firstWhere(
(m) => (m is Map && (m['id']?.toString() == messageId)),
orElse: () => null,
);
if (target != null) {
final rawContent = (target as Map)['content'];
if (rawContent is String) {
content = rawContent;
} else if (rawContent is List) {
final textItem = rawContent.firstWhere(
(i) => i is Map && i['type'] == 'text',
orElse: () => null,
);
if (textItem != null) {
content = (textItem as Map)['text']?.toString() ?? '';
}
}
}
}
if (content.isEmpty && chatObj != null) {
final history = chatObj['history'];
if (history is Map && history['messages'] is Map) {
final Map<String, dynamic> messagesMap =
(history['messages'] as Map).cast<String, dynamic>();
final msg = messagesMap[messageId];
if (msg is Map) {
final rawContent = msg['content'];
if (rawContent is String) {
content = rawContent;
} else if (rawContent is List) {
final textItem = rawContent.firstWhere(
(i) => i is Map && i['type'] == 'text',
orElse: () => null,
);
if (textItem != null) {
content = (textItem as Map)['text']?.toString() ?? '';
}
}
}
}
}
if (content.isNotEmpty && content != last) {
streamController.add('\n');
streamController.add(content);
}
}
} catch (_) {}
if (!streamController.isClosed) {
streamController.close();
}

View File

@@ -19,6 +19,8 @@ class SettingsService {
static const String _voiceLocaleKey = 'voice_locale_id';
static const String _voiceHoldToTalkKey = 'voice_hold_to_talk';
static const String _voiceAutoSendKey = 'voice_auto_send_final';
// Realtime transport preference
static const String _socketTransportModeKey = 'socket_transport_mode'; // 'auto' or 'ws'
/// Get reduced motion preference
static Future<bool> getReduceMotion() async {
@@ -133,6 +135,7 @@ class SettingsService {
voiceLocaleId: await getVoiceLocaleId(),
voiceHoldToTalk: await getVoiceHoldToTalk(),
voiceAutoSendFinal: await getVoiceAutoSendFinal(),
socketTransportMode: await getSocketTransportMode(),
);
}
@@ -150,6 +153,7 @@ class SettingsService {
setVoiceLocaleId(settings.voiceLocaleId),
setVoiceHoldToTalk(settings.voiceHoldToTalk),
setVoiceAutoSendFinal(settings.voiceAutoSendFinal),
setSocketTransportMode(settings.socketTransportMode),
]);
}
@@ -188,6 +192,18 @@ class SettingsService {
await prefs.setBool(_voiceAutoSendKey, value);
}
/// Transport mode: 'auto' (polling+websocket) or 'ws' (websocket only)
static Future<String> getSocketTransportMode() async {
final prefs = await SharedPreferences.getInstance();
return prefs.getString(_socketTransportModeKey) ?? 'auto';
}
static Future<void> setSocketTransportMode(String mode) async {
final prefs = await SharedPreferences.getInstance();
if (mode != 'auto' && mode != 'ws') mode = 'auto';
await prefs.setString(_socketTransportModeKey, mode);
}
/// Get effective animation duration considering all settings
static Duration getEffectiveAnimationDuration(
BuildContext context,
@@ -241,6 +257,7 @@ class AppSettings {
final String? voiceLocaleId;
final bool voiceHoldToTalk;
final bool voiceAutoSendFinal;
final String socketTransportMode; // 'auto' or 'ws'
const AppSettings({
this.reduceMotion = false,
@@ -254,6 +271,7 @@ class AppSettings {
this.voiceLocaleId,
this.voiceHoldToTalk = false,
this.voiceAutoSendFinal = false,
this.socketTransportMode = 'auto',
});
AppSettings copyWith({
@@ -268,6 +286,7 @@ class AppSettings {
Object? voiceLocaleId = const _DefaultValue(),
bool? voiceHoldToTalk,
bool? voiceAutoSendFinal,
String? socketTransportMode,
}) {
return AppSettings(
reduceMotion: reduceMotion ?? this.reduceMotion,
@@ -281,6 +300,7 @@ class AppSettings {
voiceLocaleId: voiceLocaleId is _DefaultValue ? this.voiceLocaleId : voiceLocaleId as String?,
voiceHoldToTalk: voiceHoldToTalk ?? this.voiceHoldToTalk,
voiceAutoSendFinal: voiceAutoSendFinal ?? this.voiceAutoSendFinal,
socketTransportMode: socketTransportMode ?? this.socketTransportMode,
);
}
@@ -299,6 +319,7 @@ class AppSettings {
other.voiceLocaleId == voiceLocaleId &&
other.voiceHoldToTalk == voiceHoldToTalk &&
other.voiceAutoSendFinal == voiceAutoSendFinal;
// socketTransportMode intentionally not included in == to avoid frequent rebuilds
}
@override
@@ -315,6 +336,7 @@ class AppSettings {
voiceLocaleId,
voiceHoldToTalk,
voiceAutoSendFinal,
socketTransportMode,
);
}
}
@@ -390,6 +412,11 @@ class AppSettingsNotifier extends StateNotifier<AppSettings> {
await SettingsService.setVoiceAutoSendFinal(value);
}
Future<void> setSocketTransportMode(String mode) async {
state = state.copyWith(socketTransportMode: mode);
await SettingsService.setSocketTransportMode(mode);
}
Future<void> resetToDefaults() async {
const defaultSettings = AppSettings();
await SettingsService.saveSettings(defaultSettings);

View File

@@ -5,9 +5,14 @@ import '../models/server_config.dart';
class SocketService {
final ServerConfig serverConfig;
final String? authToken;
final bool websocketOnly;
io.Socket? _socket;
SocketService({required this.serverConfig, required this.authToken});
SocketService({
required this.serverConfig,
required this.authToken,
this.websocketOnly = false,
});
String? get sessionId => _socket?.id;
io.Socket? get socket => _socket;
@@ -24,20 +29,28 @@ class SocketService {
final base = serverConfig.url.replaceFirst(RegExp(r'/+$'), '');
final path = '/ws/socket.io';
_socket = io.io(
base,
io.OptionBuilder()
.setTransports(['websocket'])
.setPath(path)
.setExtraHeaders(
authToken != null && authToken!.isNotEmpty
? {
'Authorization': 'Bearer $authToken',
}
: {},
)
.build(),
);
final builder = io.OptionBuilder()
// Transport selection
.setTransports(
websocketOnly ? ['websocket'] : ['polling', 'websocket'],
)
.setRememberUpgrade(!websocketOnly)
.setUpgrade(!websocketOnly)
// Tune reconnect/backoff and timeouts
.setReconnectionAttempts(0) // 0/Infinity semantics: unlimited attempts
.setReconnectionDelay(1000)
.setReconnectionDelayMax(5000)
.setRandomizationFactor(0.5)
.setTimeout(20000)
.setPath(path);
if (authToken != null && authToken!.isNotEmpty) {
builder
.setAuth({'token': authToken})
.setExtraHeaders({'Authorization': 'Bearer $authToken'});
}
_socket = io.io(base, builder.build());
_socket!.on('connect', (_) {
debugPrint('Socket connected: ${_socket!.id}');
@@ -52,6 +65,24 @@ class SocketService {
debugPrint('Socket connect_error: $err');
});
_socket!.on('reconnect_attempt', (attempt) {
debugPrint('Socket reconnect_attempt: $attempt');
});
_socket!.on('reconnect', (attempt) {
debugPrint('Socket reconnected after $attempt attempts');
if (authToken != null && authToken!.isNotEmpty) {
// Best-effort rejoin
_socket!.emit('user-join', {
'auth': {'token': authToken}
});
}
});
_socket!.on('reconnect_failed', (_) {
debugPrint('Socket reconnect_failed');
});
_socket!.on('disconnect', (reason) {
debugPrint('Socket disconnected: $reason');
});