diff --git a/android/app/src/main/AndroidManifest.xml b/android/app/src/main/AndroidManifest.xml index c3b95a5..38a833e 100644 --- a/android/app/src/main/AndroidManifest.xml +++ b/android/app/src/main/AndroidManifest.xml @@ -2,6 +2,7 @@ + diff --git a/lib/features/chat/services/text_to_speech_service.dart b/lib/features/chat/services/text_to_speech_service.dart index 7c8f4b4..0fe7240 100644 --- a/lib/features/chat/services/text_to_speech_service.dart +++ b/lib/features/chat/services/text_to_speech_service.dart @@ -11,6 +11,13 @@ import '../../../core/services/settings_service.dart'; typedef _SpeechChunk = ({Uint8List bytes, String mimeType}); +class SpeechAudioChunk { + const SpeechAudioChunk({required this.bytes, required this.mimeType}); + + final Uint8List bytes; + final String mimeType; +} + /// Lightweight wrapper around FlutterTts to centralize configuration class TextToSpeechService { final FlutterTts _tts = FlutterTts(); @@ -45,6 +52,7 @@ class TextToSpeechService { bool get isAvailable => _available; bool get deviceEngineAvailable => _deviceEngineAvailable; bool get serverEngineAvailable => _api != null; + bool get prefersServerEngine => _shouldUseServer(); TextToSpeechService({ApiService? api}) : _api = api { // Wire minimal player events to callbacks @@ -277,6 +285,29 @@ class TextToSpeechService { _onSentenceIndex?.call(0); } + Future synthesizeServerSpeechChunk(String text) async { + if (text.trim().isEmpty) { + throw ArgumentError('Cannot synthesize empty text'); + } + if (_api == null) { + throw StateError('Server text-to-speech is unavailable'); + } + if (!_initialized) { + await initialize( + deviceVoice: _preferredVoice, + serverVoice: _serverPreferredVoice, + engine: _engine, + ); + } + final voice = await _resolveServerVoice(); + final chunk = await _api.generateSpeech( + text: text, + voice: voice, + speed: _speechRate, + ); + return SpeechAudioChunk(bytes: chunk.bytes, mimeType: chunk.mimeType); + } + Future pause() async { if (!_initialized) return; try { @@ -572,6 +603,15 @@ class TextToSpeechService { } } + Future preloadServerDefaults() async { + if (_api == null) { + return; + } + try { + await _getServerDefaultVoice(); + } catch (_) {} + } + // ===== Server chunked playback ===== Future _startServerChunkedPlayback(String text) async { diff --git a/lib/features/chat/services/voice_call_service.dart b/lib/features/chat/services/voice_call_service.dart index 1fb4123..fa32b90 100644 --- a/lib/features/chat/services/voice_call_service.dart +++ b/lib/features/chat/services/voice_call_service.dart @@ -1,5 +1,7 @@ import 'dart:async'; +import 'dart:collection'; +import 'package:audioplayers/audioplayers.dart'; import 'package:riverpod_annotation/riverpod_annotation.dart'; import 'package:wakelock_plus/wakelock_plus.dart'; @@ -49,6 +51,18 @@ class VoiceCallService { final Set _pauseReasons = {}; SocketEventSubscription? _socketSubscription; Timer? _keepAliveTimer; + final ListQueue _speechQueue = ListQueue(); + int _enqueuedSentenceCount = 0; + String? _activeAssistantMessageId; + bool _responseCompleted = false; + bool _listeningSuspendedForSpeech = false; + final Map _serverAudioBuffer = {}; + final AudioPlayer _serverAudioPlayer = AudioPlayer(); + int _serverAudioSession = 0; + int _pendingServerAudioFetches = 0; + bool _serverPipelineActive = false; + int _nextServerChunkId = 0; + int _nextServerPlaybackId = 0; final StreamController _stateController = StreamController.broadcast(); @@ -75,6 +89,12 @@ class VoiceCallService { // sentence/word callbacks are not required for call UI, but harmless ); + _serverAudioPlayer.onPlayerComplete.listen((_) { + _handleServerAudioComplete(); + }); + + unawaited(_tts.preloadServerDefaults()); + // Set up notification action handler _notificationService.onActionPressed = _handleNotificationAction; } @@ -197,6 +217,13 @@ class VoiceCallService { if (_isDisposed) return; try { + _speechQueue.clear(); + _enqueuedSentenceCount = 0; + _activeAssistantMessageId = null; + _responseCompleted = false; + _listeningSuspendedForSpeech = false; + _resetServerAudio(stopPlayback: true); + if (_pauseReasons.isNotEmpty) { _listeningPaused = true; if (_state != VoiceCallState.paused) { @@ -276,6 +303,14 @@ class VoiceCallService { String _accumulatedResponse = ''; bool _isSpeaking = false; + bool get _hasPendingSpeech { + if (_serverPipelineActive) { + return _isSpeaking || + _serverAudioBuffer.isNotEmpty || + _pendingServerAudioFetches > 0; + } + return _isSpeaking || _speechQueue.isNotEmpty; + } void _handleSocketEvent( Map event, @@ -284,18 +319,32 @@ class VoiceCallService { if (_isDisposed) return; final outerData = event['data']; + final messageId = event['message_id']?.toString(); if (outerData is Map) { final eventType = outerData['type']?.toString(); final innerData = outerData['data']; if (eventType == 'chat:completion' && innerData is Map) { + final bool doneFlag = innerData['done'] == true; + if (messageId != null && messageId.isNotEmpty) { + _handleAssistantMessageStart(messageId); + } + // Handle full content replacement (used by some models/backends) if (innerData.containsKey('content')) { final content = innerData['content']?.toString() ?? ''; if (content.isNotEmpty) { _accumulatedResponse = content; _responseController.add(content); + _processSpeakableSegments(isFinalChunk: doneFlag); + if (doneFlag) { + _responseCompleted = true; + _maybeResumeListeningAfterSpeech(); + } + } else if (doneFlag) { + _responseCompleted = true; + _maybeResumeListeningAfterSpeech(); } } @@ -313,61 +362,248 @@ class VoiceCallService { if (deltaContent.isNotEmpty) { _accumulatedResponse += deltaContent; _responseController.add(_accumulatedResponse); + _processSpeakableSegments(isFinalChunk: false); } } // Check for completion - if (finishReason == 'stop') { - if (_accumulatedResponse.isNotEmpty && !_isSpeaking) { - _speakResponse(_accumulatedResponse); - _accumulatedResponse = ''; - } else if (_accumulatedResponse.isEmpty) { - // No response, restart listening unless paused - if (_pauseReasons.isEmpty) { - _startListening(); - } else if (_state != VoiceCallState.paused) { - _updateState(VoiceCallState.paused); - } - } + if (finishReason == 'stop' || finishReason == 'length') { + _responseCompleted = true; + _processSpeakableSegments(isFinalChunk: true); + _maybeResumeListeningAfterSpeech(); } } } + + if (doneFlag && !_responseCompleted) { + _responseCompleted = true; + _processSpeakableSegments(isFinalChunk: true); + _maybeResumeListeningAfterSpeech(); + } } } } - Future _speakResponse(String response) async { - if (_isDisposed || _isSpeaking) return; + void _handleAssistantMessageStart(String messageId) { + if (_activeAssistantMessageId == messageId) { + return; + } + _activeAssistantMessageId = messageId; + _accumulatedResponse = ''; + _responseController.add(''); + _speechQueue.clear(); + _enqueuedSentenceCount = 0; + _responseCompleted = false; + _resetServerAudio(stopPlayback: true); + if (_isSpeaking) { + _isSpeaking = false; + unawaited(_tts.stop()); + } + } - try { - _isSpeaking = true; + void _processSpeakableSegments({required bool isFinalChunk}) { + if (_isDisposed) return; + final cleanText = MarkdownToText.convert(_accumulatedResponse).trim(); + if (cleanText.isEmpty) { + return; + } - // Stop listening before speaking - await _voiceInput.stopListening(); - await _transcriptSubscription?.cancel(); - await _intensitySubscription?.cancel(); + final segments = _tts.splitTextForSpeech(cleanText); + if (segments.isEmpty) { + return; + } - _updateState(VoiceCallState.speaking); + var availableCount = segments.length; + if (!isFinalChunk && availableCount > 0) { + availableCount -= 1; + } + if (availableCount < 0) { + availableCount = 0; + } - // Convert markdown to clean text for TTS - final cleanText = MarkdownToText.convert(response); - if (cleanText.isEmpty) { - // No speakable content, restart listening - _isSpeaking = false; - await _startListening(); - return; + if (_enqueuedSentenceCount > availableCount) { + _enqueuedSentenceCount = availableCount; + } + + if (availableCount > _enqueuedSentenceCount) { + final newChunks = segments.sublist( + _enqueuedSentenceCount, + availableCount, + ); + _enqueuedSentenceCount = availableCount; + for (final chunk in newChunks) { + _enqueueSpeechChunk(chunk); } + } - await _tts.speak(cleanText); - // After speaking completes, _handleTtsComplete will restart listening + if (isFinalChunk && _enqueuedSentenceCount < segments.length) { + _enqueuedSentenceCount = segments.length; + _enqueueSpeechChunk(segments.last); + } + } + + void _enqueueSpeechChunk(String chunk) { + if (_isDisposed) return; + final trimmed = chunk.trim(); + if (trimmed.isEmpty) { + return; + } + if (_isMuted) { + return; // Skip playback while muted + } + if (_tts.prefersServerEngine) { + _serverPipelineActive = true; + final chunkId = _nextServerChunkId++; + _prefetchServerAudio(trimmed, chunkId); + return; + } + _speechQueue.add(trimmed); + if (!_isSpeaking) { + unawaited(_startNextSpeechChunk()); + } + } + + Future _startNextSpeechChunk() async { + if (_isDisposed) return; + if (_speechQueue.isEmpty || _isSpeaking || _isMuted) { + return; + } + + final next = _speechQueue.removeFirst(); + try { + await _prepareForSpeechPlayback(); + _isSpeaking = true; + _updateState(VoiceCallState.speaking); + await _tts.speak(next); } catch (e) { _isSpeaking = false; _updateState(VoiceCallState.error); - // Restart listening even if TTS fails - await _startListening(); + unawaited(_startListening()); } } + void _prefetchServerAudio(String chunk, int chunkId) { + if (_isDisposed) { + return; + } + final session = _serverAudioSession; + _pendingServerAudioFetches++; + _tts + .synthesizeServerSpeechChunk(chunk) + .then((audioChunk) { + _pendingServerAudioFetches--; + if (_pendingServerAudioFetches < 0) { + _pendingServerAudioFetches = 0; + } + if (_isDisposed || + !_serverPipelineActive || + session != _serverAudioSession) { + return; + } + _serverAudioBuffer[chunkId] = audioChunk; + _maybeStartServerAudio(); + }) + .catchError((error, _) { + _pendingServerAudioFetches--; + if (_pendingServerAudioFetches < 0) { + _pendingServerAudioFetches = 0; + } + if (_isDisposed) { + return; + } + _handleTtsError(error.toString()); + }); + } + + void _maybeStartServerAudio() { + if (_isDisposed || !_serverPipelineActive) { + return; + } + if (_isSpeaking || _isMuted) { + return; + } + final chunk = _serverAudioBuffer.remove(_nextServerPlaybackId); + if (chunk == null) { + return; + } + _nextServerPlaybackId++; + _playServerAudioChunk(chunk); + } + + Future _playServerAudioChunk(SpeechAudioChunk chunk) async { + try { + await _prepareForSpeechPlayback(); + _isSpeaking = true; + _updateState(VoiceCallState.speaking); + await _serverAudioPlayer.play( + BytesSource(chunk.bytes, mimeType: chunk.mimeType), + ); + } catch (e) { + _isSpeaking = false; + _handleTtsError(e.toString()); + } + } + + void _handleServerAudioComplete() { + if (_isDisposed) { + return; + } + _isSpeaking = false; + if (_serverAudioBuffer.containsKey(_nextServerPlaybackId)) { + _maybeStartServerAudio(); + return; + } + _maybeResumeListeningAfterSpeech(); + } + + void _resetServerAudio({bool stopPlayback = false}) { + _serverAudioBuffer.clear(); + _pendingServerAudioFetches = 0; + _serverAudioSession++; + _nextServerChunkId = 0; + _nextServerPlaybackId = 0; + if (stopPlayback) { + unawaited(_serverAudioPlayer.stop()); + _isSpeaking = false; + } + _serverPipelineActive = false; + } + + Future _prepareForSpeechPlayback() async { + if (_listeningSuspendedForSpeech) { + return; + } + _listeningSuspendedForSpeech = true; + await _voiceInput.stopListening(); + await _transcriptSubscription?.cancel(); + _transcriptSubscription = null; + await _intensitySubscription?.cancel(); + _intensitySubscription = null; + } + + void _maybeResumeListeningAfterSpeech() { + if (!_responseCompleted) { + return; + } + if (_hasPendingSpeech) { + return; + } + + if (_pauseReasons.isNotEmpty) { + _listeningPaused = true; + if (_state != VoiceCallState.paused) { + _updateState(VoiceCallState.paused); + } + return; + } + + if (_serverPipelineActive && _pendingServerAudioFetches > 0) { + return; + } + + unawaited(_startListening()); + } + void _handleTtsStart() { if (_isDisposed) return; _updateState(VoiceCallState.speaking); @@ -376,17 +612,19 @@ class VoiceCallService { void _handleTtsComplete() { if (_isDisposed) return; _isSpeaking = false; - // After assistant finishes speaking, resume only if not paused - if (_pauseReasons.isNotEmpty) { - _listeningPaused = true; - _updateState(VoiceCallState.paused); + if (_speechQueue.isNotEmpty) { + unawaited(_startNextSpeechChunk()); return; } - _startListening(); + _maybeResumeListeningAfterSpeech(); } void _handleTtsError(String error) { if (_isDisposed) return; + _isSpeaking = false; + _speechQueue.clear(); + _resetServerAudio(stopPlayback: true); + _listeningSuspendedForSpeech = false; _updateState(VoiceCallState.error); // Try to recover by restarting listening _startListening(); @@ -405,6 +643,7 @@ class VoiceCallService { await _voiceInput.stopListening(); await _tts.stop(); + await _serverAudioPlayer.stop(); await BackgroundStreamingHandler.instance.stopBackgroundExecution(const [ _voiceCallStreamId, @@ -421,6 +660,13 @@ class VoiceCallService { _isMuted = false; _listeningPaused = false; _pauseReasons.clear(); + _speechQueue.clear(); + _enqueuedSentenceCount = 0; + _responseCompleted = false; + _listeningSuspendedForSpeech = false; + _activeAssistantMessageId = null; + _isSpeaking = false; + _resetServerAudio(stopPlayback: true); _updateState(VoiceCallState.disconnected); } @@ -462,6 +708,11 @@ class VoiceCallService { Future cancelSpeaking() async { if (_isDisposed) return; + _speechQueue.clear(); + _enqueuedSentenceCount = 0; + _responseCompleted = false; + _listeningSuspendedForSpeech = false; + _resetServerAudio(stopPlayback: true); await _tts.stop(); _isSpeaking = false; _accumulatedResponse = ''; @@ -527,6 +778,11 @@ class VoiceCallService { _isSpeaking = false; _accumulatedResponse = ''; } + _speechQueue.clear(); + _enqueuedSentenceCount = 0; + _responseCompleted = false; + _listeningSuspendedForSpeech = false; + _resetServerAudio(stopPlayback: true); pauseListening(reason: VoiceCallPauseReason.mute); } else { resumeListening(reason: VoiceCallPauseReason.mute); @@ -547,6 +803,7 @@ class VoiceCallService { _voiceInput.dispose(); await _tts.dispose(); + await _serverAudioPlayer.dispose(); // Cancel notification await _notificationService.cancelNotification();