From b05d9f84a564383056f9e8b70237702442aaede1 Mon Sep 17 00:00:00 2001
From: cogwheel0 <172976095+cogwheel0@users.noreply.github.com>
Date: Mon, 10 Nov 2025 02:43:31 +0530
Subject: [PATCH] feat(tts): Add server-side speech synthesis and playback
pipeline
---
android/app/src/main/AndroidManifest.xml | 1 +
.../chat/services/text_to_speech_service.dart | 40 +++
.../chat/services/voice_call_service.dart | 331 ++++++++++++++++--
3 files changed, 335 insertions(+), 37 deletions(-)
diff --git a/android/app/src/main/AndroidManifest.xml b/android/app/src/main/AndroidManifest.xml
index c3b95a5..38a833e 100644
--- a/android/app/src/main/AndroidManifest.xml
+++ b/android/app/src/main/AndroidManifest.xml
@@ -2,6 +2,7 @@
+
diff --git a/lib/features/chat/services/text_to_speech_service.dart b/lib/features/chat/services/text_to_speech_service.dart
index 7c8f4b4..0fe7240 100644
--- a/lib/features/chat/services/text_to_speech_service.dart
+++ b/lib/features/chat/services/text_to_speech_service.dart
@@ -11,6 +11,13 @@ import '../../../core/services/settings_service.dart';
typedef _SpeechChunk = ({Uint8List bytes, String mimeType});
+class SpeechAudioChunk {
+ const SpeechAudioChunk({required this.bytes, required this.mimeType});
+
+ final Uint8List bytes;
+ final String mimeType;
+}
+
/// Lightweight wrapper around FlutterTts to centralize configuration
class TextToSpeechService {
final FlutterTts _tts = FlutterTts();
@@ -45,6 +52,7 @@ class TextToSpeechService {
bool get isAvailable => _available;
bool get deviceEngineAvailable => _deviceEngineAvailable;
bool get serverEngineAvailable => _api != null;
+ bool get prefersServerEngine => _shouldUseServer();
TextToSpeechService({ApiService? api}) : _api = api {
// Wire minimal player events to callbacks
@@ -277,6 +285,29 @@ class TextToSpeechService {
_onSentenceIndex?.call(0);
}
+ Future synthesizeServerSpeechChunk(String text) async {
+ if (text.trim().isEmpty) {
+ throw ArgumentError('Cannot synthesize empty text');
+ }
+ if (_api == null) {
+ throw StateError('Server text-to-speech is unavailable');
+ }
+ if (!_initialized) {
+ await initialize(
+ deviceVoice: _preferredVoice,
+ serverVoice: _serverPreferredVoice,
+ engine: _engine,
+ );
+ }
+ final voice = await _resolveServerVoice();
+ final chunk = await _api.generateSpeech(
+ text: text,
+ voice: voice,
+ speed: _speechRate,
+ );
+ return SpeechAudioChunk(bytes: chunk.bytes, mimeType: chunk.mimeType);
+ }
+
Future pause() async {
if (!_initialized) return;
try {
@@ -572,6 +603,15 @@ class TextToSpeechService {
}
}
+ Future preloadServerDefaults() async {
+ if (_api == null) {
+ return;
+ }
+ try {
+ await _getServerDefaultVoice();
+ } catch (_) {}
+ }
+
// ===== Server chunked playback =====
Future _startServerChunkedPlayback(String text) async {
diff --git a/lib/features/chat/services/voice_call_service.dart b/lib/features/chat/services/voice_call_service.dart
index 1fb4123..fa32b90 100644
--- a/lib/features/chat/services/voice_call_service.dart
+++ b/lib/features/chat/services/voice_call_service.dart
@@ -1,5 +1,7 @@
import 'dart:async';
+import 'dart:collection';
+import 'package:audioplayers/audioplayers.dart';
import 'package:riverpod_annotation/riverpod_annotation.dart';
import 'package:wakelock_plus/wakelock_plus.dart';
@@ -49,6 +51,18 @@ class VoiceCallService {
final Set _pauseReasons = {};
SocketEventSubscription? _socketSubscription;
Timer? _keepAliveTimer;
+ final ListQueue _speechQueue = ListQueue();
+ int _enqueuedSentenceCount = 0;
+ String? _activeAssistantMessageId;
+ bool _responseCompleted = false;
+ bool _listeningSuspendedForSpeech = false;
+ final Map _serverAudioBuffer = {};
+ final AudioPlayer _serverAudioPlayer = AudioPlayer();
+ int _serverAudioSession = 0;
+ int _pendingServerAudioFetches = 0;
+ bool _serverPipelineActive = false;
+ int _nextServerChunkId = 0;
+ int _nextServerPlaybackId = 0;
final StreamController _stateController =
StreamController.broadcast();
@@ -75,6 +89,12 @@ class VoiceCallService {
// sentence/word callbacks are not required for call UI, but harmless
);
+ _serverAudioPlayer.onPlayerComplete.listen((_) {
+ _handleServerAudioComplete();
+ });
+
+ unawaited(_tts.preloadServerDefaults());
+
// Set up notification action handler
_notificationService.onActionPressed = _handleNotificationAction;
}
@@ -197,6 +217,13 @@ class VoiceCallService {
if (_isDisposed) return;
try {
+ _speechQueue.clear();
+ _enqueuedSentenceCount = 0;
+ _activeAssistantMessageId = null;
+ _responseCompleted = false;
+ _listeningSuspendedForSpeech = false;
+ _resetServerAudio(stopPlayback: true);
+
if (_pauseReasons.isNotEmpty) {
_listeningPaused = true;
if (_state != VoiceCallState.paused) {
@@ -276,6 +303,14 @@ class VoiceCallService {
String _accumulatedResponse = '';
bool _isSpeaking = false;
+ bool get _hasPendingSpeech {
+ if (_serverPipelineActive) {
+ return _isSpeaking ||
+ _serverAudioBuffer.isNotEmpty ||
+ _pendingServerAudioFetches > 0;
+ }
+ return _isSpeaking || _speechQueue.isNotEmpty;
+ }
void _handleSocketEvent(
Map event,
@@ -284,18 +319,32 @@ class VoiceCallService {
if (_isDisposed) return;
final outerData = event['data'];
+ final messageId = event['message_id']?.toString();
if (outerData is Map) {
final eventType = outerData['type']?.toString();
final innerData = outerData['data'];
if (eventType == 'chat:completion' && innerData is Map) {
+ final bool doneFlag = innerData['done'] == true;
+ if (messageId != null && messageId.isNotEmpty) {
+ _handleAssistantMessageStart(messageId);
+ }
+
// Handle full content replacement (used by some models/backends)
if (innerData.containsKey('content')) {
final content = innerData['content']?.toString() ?? '';
if (content.isNotEmpty) {
_accumulatedResponse = content;
_responseController.add(content);
+ _processSpeakableSegments(isFinalChunk: doneFlag);
+ if (doneFlag) {
+ _responseCompleted = true;
+ _maybeResumeListeningAfterSpeech();
+ }
+ } else if (doneFlag) {
+ _responseCompleted = true;
+ _maybeResumeListeningAfterSpeech();
}
}
@@ -313,61 +362,248 @@ class VoiceCallService {
if (deltaContent.isNotEmpty) {
_accumulatedResponse += deltaContent;
_responseController.add(_accumulatedResponse);
+ _processSpeakableSegments(isFinalChunk: false);
}
}
// Check for completion
- if (finishReason == 'stop') {
- if (_accumulatedResponse.isNotEmpty && !_isSpeaking) {
- _speakResponse(_accumulatedResponse);
- _accumulatedResponse = '';
- } else if (_accumulatedResponse.isEmpty) {
- // No response, restart listening unless paused
- if (_pauseReasons.isEmpty) {
- _startListening();
- } else if (_state != VoiceCallState.paused) {
- _updateState(VoiceCallState.paused);
- }
- }
+ if (finishReason == 'stop' || finishReason == 'length') {
+ _responseCompleted = true;
+ _processSpeakableSegments(isFinalChunk: true);
+ _maybeResumeListeningAfterSpeech();
}
}
}
+
+ if (doneFlag && !_responseCompleted) {
+ _responseCompleted = true;
+ _processSpeakableSegments(isFinalChunk: true);
+ _maybeResumeListeningAfterSpeech();
+ }
}
}
}
- Future _speakResponse(String response) async {
- if (_isDisposed || _isSpeaking) return;
+ void _handleAssistantMessageStart(String messageId) {
+ if (_activeAssistantMessageId == messageId) {
+ return;
+ }
+ _activeAssistantMessageId = messageId;
+ _accumulatedResponse = '';
+ _responseController.add('');
+ _speechQueue.clear();
+ _enqueuedSentenceCount = 0;
+ _responseCompleted = false;
+ _resetServerAudio(stopPlayback: true);
+ if (_isSpeaking) {
+ _isSpeaking = false;
+ unawaited(_tts.stop());
+ }
+ }
- try {
- _isSpeaking = true;
+ void _processSpeakableSegments({required bool isFinalChunk}) {
+ if (_isDisposed) return;
+ final cleanText = MarkdownToText.convert(_accumulatedResponse).trim();
+ if (cleanText.isEmpty) {
+ return;
+ }
- // Stop listening before speaking
- await _voiceInput.stopListening();
- await _transcriptSubscription?.cancel();
- await _intensitySubscription?.cancel();
+ final segments = _tts.splitTextForSpeech(cleanText);
+ if (segments.isEmpty) {
+ return;
+ }
- _updateState(VoiceCallState.speaking);
+ var availableCount = segments.length;
+ if (!isFinalChunk && availableCount > 0) {
+ availableCount -= 1;
+ }
+ if (availableCount < 0) {
+ availableCount = 0;
+ }
- // Convert markdown to clean text for TTS
- final cleanText = MarkdownToText.convert(response);
- if (cleanText.isEmpty) {
- // No speakable content, restart listening
- _isSpeaking = false;
- await _startListening();
- return;
+ if (_enqueuedSentenceCount > availableCount) {
+ _enqueuedSentenceCount = availableCount;
+ }
+
+ if (availableCount > _enqueuedSentenceCount) {
+ final newChunks = segments.sublist(
+ _enqueuedSentenceCount,
+ availableCount,
+ );
+ _enqueuedSentenceCount = availableCount;
+ for (final chunk in newChunks) {
+ _enqueueSpeechChunk(chunk);
}
+ }
- await _tts.speak(cleanText);
- // After speaking completes, _handleTtsComplete will restart listening
+ if (isFinalChunk && _enqueuedSentenceCount < segments.length) {
+ _enqueuedSentenceCount = segments.length;
+ _enqueueSpeechChunk(segments.last);
+ }
+ }
+
+ void _enqueueSpeechChunk(String chunk) {
+ if (_isDisposed) return;
+ final trimmed = chunk.trim();
+ if (trimmed.isEmpty) {
+ return;
+ }
+ if (_isMuted) {
+ return; // Skip playback while muted
+ }
+ if (_tts.prefersServerEngine) {
+ _serverPipelineActive = true;
+ final chunkId = _nextServerChunkId++;
+ _prefetchServerAudio(trimmed, chunkId);
+ return;
+ }
+ _speechQueue.add(trimmed);
+ if (!_isSpeaking) {
+ unawaited(_startNextSpeechChunk());
+ }
+ }
+
+ Future _startNextSpeechChunk() async {
+ if (_isDisposed) return;
+ if (_speechQueue.isEmpty || _isSpeaking || _isMuted) {
+ return;
+ }
+
+ final next = _speechQueue.removeFirst();
+ try {
+ await _prepareForSpeechPlayback();
+ _isSpeaking = true;
+ _updateState(VoiceCallState.speaking);
+ await _tts.speak(next);
} catch (e) {
_isSpeaking = false;
_updateState(VoiceCallState.error);
- // Restart listening even if TTS fails
- await _startListening();
+ unawaited(_startListening());
}
}
+ void _prefetchServerAudio(String chunk, int chunkId) {
+ if (_isDisposed) {
+ return;
+ }
+ final session = _serverAudioSession;
+ _pendingServerAudioFetches++;
+ _tts
+ .synthesizeServerSpeechChunk(chunk)
+ .then((audioChunk) {
+ _pendingServerAudioFetches--;
+ if (_pendingServerAudioFetches < 0) {
+ _pendingServerAudioFetches = 0;
+ }
+ if (_isDisposed ||
+ !_serverPipelineActive ||
+ session != _serverAudioSession) {
+ return;
+ }
+ _serverAudioBuffer[chunkId] = audioChunk;
+ _maybeStartServerAudio();
+ })
+ .catchError((error, _) {
+ _pendingServerAudioFetches--;
+ if (_pendingServerAudioFetches < 0) {
+ _pendingServerAudioFetches = 0;
+ }
+ if (_isDisposed) {
+ return;
+ }
+ _handleTtsError(error.toString());
+ });
+ }
+
+ void _maybeStartServerAudio() {
+ if (_isDisposed || !_serverPipelineActive) {
+ return;
+ }
+ if (_isSpeaking || _isMuted) {
+ return;
+ }
+ final chunk = _serverAudioBuffer.remove(_nextServerPlaybackId);
+ if (chunk == null) {
+ return;
+ }
+ _nextServerPlaybackId++;
+ _playServerAudioChunk(chunk);
+ }
+
+ Future _playServerAudioChunk(SpeechAudioChunk chunk) async {
+ try {
+ await _prepareForSpeechPlayback();
+ _isSpeaking = true;
+ _updateState(VoiceCallState.speaking);
+ await _serverAudioPlayer.play(
+ BytesSource(chunk.bytes, mimeType: chunk.mimeType),
+ );
+ } catch (e) {
+ _isSpeaking = false;
+ _handleTtsError(e.toString());
+ }
+ }
+
+ void _handleServerAudioComplete() {
+ if (_isDisposed) {
+ return;
+ }
+ _isSpeaking = false;
+ if (_serverAudioBuffer.containsKey(_nextServerPlaybackId)) {
+ _maybeStartServerAudio();
+ return;
+ }
+ _maybeResumeListeningAfterSpeech();
+ }
+
+ void _resetServerAudio({bool stopPlayback = false}) {
+ _serverAudioBuffer.clear();
+ _pendingServerAudioFetches = 0;
+ _serverAudioSession++;
+ _nextServerChunkId = 0;
+ _nextServerPlaybackId = 0;
+ if (stopPlayback) {
+ unawaited(_serverAudioPlayer.stop());
+ _isSpeaking = false;
+ }
+ _serverPipelineActive = false;
+ }
+
+ Future _prepareForSpeechPlayback() async {
+ if (_listeningSuspendedForSpeech) {
+ return;
+ }
+ _listeningSuspendedForSpeech = true;
+ await _voiceInput.stopListening();
+ await _transcriptSubscription?.cancel();
+ _transcriptSubscription = null;
+ await _intensitySubscription?.cancel();
+ _intensitySubscription = null;
+ }
+
+ void _maybeResumeListeningAfterSpeech() {
+ if (!_responseCompleted) {
+ return;
+ }
+ if (_hasPendingSpeech) {
+ return;
+ }
+
+ if (_pauseReasons.isNotEmpty) {
+ _listeningPaused = true;
+ if (_state != VoiceCallState.paused) {
+ _updateState(VoiceCallState.paused);
+ }
+ return;
+ }
+
+ if (_serverPipelineActive && _pendingServerAudioFetches > 0) {
+ return;
+ }
+
+ unawaited(_startListening());
+ }
+
void _handleTtsStart() {
if (_isDisposed) return;
_updateState(VoiceCallState.speaking);
@@ -376,17 +612,19 @@ class VoiceCallService {
void _handleTtsComplete() {
if (_isDisposed) return;
_isSpeaking = false;
- // After assistant finishes speaking, resume only if not paused
- if (_pauseReasons.isNotEmpty) {
- _listeningPaused = true;
- _updateState(VoiceCallState.paused);
+ if (_speechQueue.isNotEmpty) {
+ unawaited(_startNextSpeechChunk());
return;
}
- _startListening();
+ _maybeResumeListeningAfterSpeech();
}
void _handleTtsError(String error) {
if (_isDisposed) return;
+ _isSpeaking = false;
+ _speechQueue.clear();
+ _resetServerAudio(stopPlayback: true);
+ _listeningSuspendedForSpeech = false;
_updateState(VoiceCallState.error);
// Try to recover by restarting listening
_startListening();
@@ -405,6 +643,7 @@ class VoiceCallService {
await _voiceInput.stopListening();
await _tts.stop();
+ await _serverAudioPlayer.stop();
await BackgroundStreamingHandler.instance.stopBackgroundExecution(const [
_voiceCallStreamId,
@@ -421,6 +660,13 @@ class VoiceCallService {
_isMuted = false;
_listeningPaused = false;
_pauseReasons.clear();
+ _speechQueue.clear();
+ _enqueuedSentenceCount = 0;
+ _responseCompleted = false;
+ _listeningSuspendedForSpeech = false;
+ _activeAssistantMessageId = null;
+ _isSpeaking = false;
+ _resetServerAudio(stopPlayback: true);
_updateState(VoiceCallState.disconnected);
}
@@ -462,6 +708,11 @@ class VoiceCallService {
Future cancelSpeaking() async {
if (_isDisposed) return;
+ _speechQueue.clear();
+ _enqueuedSentenceCount = 0;
+ _responseCompleted = false;
+ _listeningSuspendedForSpeech = false;
+ _resetServerAudio(stopPlayback: true);
await _tts.stop();
_isSpeaking = false;
_accumulatedResponse = '';
@@ -527,6 +778,11 @@ class VoiceCallService {
_isSpeaking = false;
_accumulatedResponse = '';
}
+ _speechQueue.clear();
+ _enqueuedSentenceCount = 0;
+ _responseCompleted = false;
+ _listeningSuspendedForSpeech = false;
+ _resetServerAudio(stopPlayback: true);
pauseListening(reason: VoiceCallPauseReason.mute);
} else {
resumeListening(reason: VoiceCallPauseReason.mute);
@@ -547,6 +803,7 @@ class VoiceCallService {
_voiceInput.dispose();
await _tts.dispose();
+ await _serverAudioPlayer.dispose();
// Cancel notification
await _notificationService.cancelNotification();