feat(tts): Add server-side speech synthesis and playback pipeline

This commit is contained in:
cogwheel0
2025-11-10 02:43:31 +05:30
parent 62c9243e34
commit b05d9f84a5
3 changed files with 335 additions and 37 deletions

View File

@@ -2,6 +2,7 @@
<uses-permission android:name="android.permission.INTERNET"/>
<uses-permission android:name="android.permission.RECORD_AUDIO"/>
<uses-permission android:name="android.permission.MODIFY_AUDIO_SETTINGS"/>
<uses-permission android:name="android.permission.CAMERA"/>
<!-- No broad media/storage permissions; use Android Photo Picker (API 33+) and SAF/share intents -->

View File

@@ -11,6 +11,13 @@ import '../../../core/services/settings_service.dart';
typedef _SpeechChunk = ({Uint8List bytes, String mimeType});
class SpeechAudioChunk {
const SpeechAudioChunk({required this.bytes, required this.mimeType});
final Uint8List bytes;
final String mimeType;
}
/// Lightweight wrapper around FlutterTts to centralize configuration
class TextToSpeechService {
final FlutterTts _tts = FlutterTts();
@@ -45,6 +52,7 @@ class TextToSpeechService {
bool get isAvailable => _available;
bool get deviceEngineAvailable => _deviceEngineAvailable;
bool get serverEngineAvailable => _api != null;
bool get prefersServerEngine => _shouldUseServer();
TextToSpeechService({ApiService? api}) : _api = api {
// Wire minimal player events to callbacks
@@ -277,6 +285,29 @@ class TextToSpeechService {
_onSentenceIndex?.call(0);
}
Future<SpeechAudioChunk> synthesizeServerSpeechChunk(String text) async {
if (text.trim().isEmpty) {
throw ArgumentError('Cannot synthesize empty text');
}
if (_api == null) {
throw StateError('Server text-to-speech is unavailable');
}
if (!_initialized) {
await initialize(
deviceVoice: _preferredVoice,
serverVoice: _serverPreferredVoice,
engine: _engine,
);
}
final voice = await _resolveServerVoice();
final chunk = await _api.generateSpeech(
text: text,
voice: voice,
speed: _speechRate,
);
return SpeechAudioChunk(bytes: chunk.bytes, mimeType: chunk.mimeType);
}
Future<void> pause() async {
if (!_initialized) return;
try {
@@ -572,6 +603,15 @@ class TextToSpeechService {
}
}
Future<void> preloadServerDefaults() async {
if (_api == null) {
return;
}
try {
await _getServerDefaultVoice();
} catch (_) {}
}
// ===== Server chunked playback =====
Future<void> _startServerChunkedPlayback(String text) async {

View File

@@ -1,5 +1,7 @@
import 'dart:async';
import 'dart:collection';
import 'package:audioplayers/audioplayers.dart';
import 'package:riverpod_annotation/riverpod_annotation.dart';
import 'package:wakelock_plus/wakelock_plus.dart';
@@ -49,6 +51,18 @@ class VoiceCallService {
final Set<VoiceCallPauseReason> _pauseReasons = <VoiceCallPauseReason>{};
SocketEventSubscription? _socketSubscription;
Timer? _keepAliveTimer;
final ListQueue<String> _speechQueue = ListQueue<String>();
int _enqueuedSentenceCount = 0;
String? _activeAssistantMessageId;
bool _responseCompleted = false;
bool _listeningSuspendedForSpeech = false;
final Map<int, SpeechAudioChunk> _serverAudioBuffer = {};
final AudioPlayer _serverAudioPlayer = AudioPlayer();
int _serverAudioSession = 0;
int _pendingServerAudioFetches = 0;
bool _serverPipelineActive = false;
int _nextServerChunkId = 0;
int _nextServerPlaybackId = 0;
final StreamController<VoiceCallState> _stateController =
StreamController<VoiceCallState>.broadcast();
@@ -75,6 +89,12 @@ class VoiceCallService {
// sentence/word callbacks are not required for call UI, but harmless
);
_serverAudioPlayer.onPlayerComplete.listen((_) {
_handleServerAudioComplete();
});
unawaited(_tts.preloadServerDefaults());
// Set up notification action handler
_notificationService.onActionPressed = _handleNotificationAction;
}
@@ -197,6 +217,13 @@ class VoiceCallService {
if (_isDisposed) return;
try {
_speechQueue.clear();
_enqueuedSentenceCount = 0;
_activeAssistantMessageId = null;
_responseCompleted = false;
_listeningSuspendedForSpeech = false;
_resetServerAudio(stopPlayback: true);
if (_pauseReasons.isNotEmpty) {
_listeningPaused = true;
if (_state != VoiceCallState.paused) {
@@ -276,6 +303,14 @@ class VoiceCallService {
String _accumulatedResponse = '';
bool _isSpeaking = false;
bool get _hasPendingSpeech {
if (_serverPipelineActive) {
return _isSpeaking ||
_serverAudioBuffer.isNotEmpty ||
_pendingServerAudioFetches > 0;
}
return _isSpeaking || _speechQueue.isNotEmpty;
}
void _handleSocketEvent(
Map<String, dynamic> event,
@@ -284,18 +319,32 @@ class VoiceCallService {
if (_isDisposed) return;
final outerData = event['data'];
final messageId = event['message_id']?.toString();
if (outerData is Map<String, dynamic>) {
final eventType = outerData['type']?.toString();
final innerData = outerData['data'];
if (eventType == 'chat:completion' && innerData is Map<String, dynamic>) {
final bool doneFlag = innerData['done'] == true;
if (messageId != null && messageId.isNotEmpty) {
_handleAssistantMessageStart(messageId);
}
// Handle full content replacement (used by some models/backends)
if (innerData.containsKey('content')) {
final content = innerData['content']?.toString() ?? '';
if (content.isNotEmpty) {
_accumulatedResponse = content;
_responseController.add(content);
_processSpeakableSegments(isFinalChunk: doneFlag);
if (doneFlag) {
_responseCompleted = true;
_maybeResumeListeningAfterSpeech();
}
} else if (doneFlag) {
_responseCompleted = true;
_maybeResumeListeningAfterSpeech();
}
}
@@ -313,61 +362,248 @@ class VoiceCallService {
if (deltaContent.isNotEmpty) {
_accumulatedResponse += deltaContent;
_responseController.add(_accumulatedResponse);
_processSpeakableSegments(isFinalChunk: false);
}
}
// Check for completion
if (finishReason == 'stop') {
if (_accumulatedResponse.isNotEmpty && !_isSpeaking) {
_speakResponse(_accumulatedResponse);
_accumulatedResponse = '';
} else if (_accumulatedResponse.isEmpty) {
// No response, restart listening unless paused
if (_pauseReasons.isEmpty) {
_startListening();
} else if (_state != VoiceCallState.paused) {
_updateState(VoiceCallState.paused);
}
}
if (finishReason == 'stop' || finishReason == 'length') {
_responseCompleted = true;
_processSpeakableSegments(isFinalChunk: true);
_maybeResumeListeningAfterSpeech();
}
}
}
if (doneFlag && !_responseCompleted) {
_responseCompleted = true;
_processSpeakableSegments(isFinalChunk: true);
_maybeResumeListeningAfterSpeech();
}
}
}
}
Future<void> _speakResponse(String response) async {
if (_isDisposed || _isSpeaking) return;
void _handleAssistantMessageStart(String messageId) {
if (_activeAssistantMessageId == messageId) {
return;
}
_activeAssistantMessageId = messageId;
_accumulatedResponse = '';
_responseController.add('');
_speechQueue.clear();
_enqueuedSentenceCount = 0;
_responseCompleted = false;
_resetServerAudio(stopPlayback: true);
if (_isSpeaking) {
_isSpeaking = false;
unawaited(_tts.stop());
}
}
try {
_isSpeaking = true;
void _processSpeakableSegments({required bool isFinalChunk}) {
if (_isDisposed) return;
final cleanText = MarkdownToText.convert(_accumulatedResponse).trim();
if (cleanText.isEmpty) {
return;
}
// Stop listening before speaking
await _voiceInput.stopListening();
await _transcriptSubscription?.cancel();
await _intensitySubscription?.cancel();
final segments = _tts.splitTextForSpeech(cleanText);
if (segments.isEmpty) {
return;
}
_updateState(VoiceCallState.speaking);
var availableCount = segments.length;
if (!isFinalChunk && availableCount > 0) {
availableCount -= 1;
}
if (availableCount < 0) {
availableCount = 0;
}
// Convert markdown to clean text for TTS
final cleanText = MarkdownToText.convert(response);
if (cleanText.isEmpty) {
// No speakable content, restart listening
_isSpeaking = false;
await _startListening();
return;
if (_enqueuedSentenceCount > availableCount) {
_enqueuedSentenceCount = availableCount;
}
if (availableCount > _enqueuedSentenceCount) {
final newChunks = segments.sublist(
_enqueuedSentenceCount,
availableCount,
);
_enqueuedSentenceCount = availableCount;
for (final chunk in newChunks) {
_enqueueSpeechChunk(chunk);
}
}
await _tts.speak(cleanText);
// After speaking completes, _handleTtsComplete will restart listening
if (isFinalChunk && _enqueuedSentenceCount < segments.length) {
_enqueuedSentenceCount = segments.length;
_enqueueSpeechChunk(segments.last);
}
}
void _enqueueSpeechChunk(String chunk) {
if (_isDisposed) return;
final trimmed = chunk.trim();
if (trimmed.isEmpty) {
return;
}
if (_isMuted) {
return; // Skip playback while muted
}
if (_tts.prefersServerEngine) {
_serverPipelineActive = true;
final chunkId = _nextServerChunkId++;
_prefetchServerAudio(trimmed, chunkId);
return;
}
_speechQueue.add(trimmed);
if (!_isSpeaking) {
unawaited(_startNextSpeechChunk());
}
}
Future<void> _startNextSpeechChunk() async {
if (_isDisposed) return;
if (_speechQueue.isEmpty || _isSpeaking || _isMuted) {
return;
}
final next = _speechQueue.removeFirst();
try {
await _prepareForSpeechPlayback();
_isSpeaking = true;
_updateState(VoiceCallState.speaking);
await _tts.speak(next);
} catch (e) {
_isSpeaking = false;
_updateState(VoiceCallState.error);
// Restart listening even if TTS fails
await _startListening();
unawaited(_startListening());
}
}
void _prefetchServerAudio(String chunk, int chunkId) {
if (_isDisposed) {
return;
}
final session = _serverAudioSession;
_pendingServerAudioFetches++;
_tts
.synthesizeServerSpeechChunk(chunk)
.then((audioChunk) {
_pendingServerAudioFetches--;
if (_pendingServerAudioFetches < 0) {
_pendingServerAudioFetches = 0;
}
if (_isDisposed ||
!_serverPipelineActive ||
session != _serverAudioSession) {
return;
}
_serverAudioBuffer[chunkId] = audioChunk;
_maybeStartServerAudio();
})
.catchError((error, _) {
_pendingServerAudioFetches--;
if (_pendingServerAudioFetches < 0) {
_pendingServerAudioFetches = 0;
}
if (_isDisposed) {
return;
}
_handleTtsError(error.toString());
});
}
void _maybeStartServerAudio() {
if (_isDisposed || !_serverPipelineActive) {
return;
}
if (_isSpeaking || _isMuted) {
return;
}
final chunk = _serverAudioBuffer.remove(_nextServerPlaybackId);
if (chunk == null) {
return;
}
_nextServerPlaybackId++;
_playServerAudioChunk(chunk);
}
Future<void> _playServerAudioChunk(SpeechAudioChunk chunk) async {
try {
await _prepareForSpeechPlayback();
_isSpeaking = true;
_updateState(VoiceCallState.speaking);
await _serverAudioPlayer.play(
BytesSource(chunk.bytes, mimeType: chunk.mimeType),
);
} catch (e) {
_isSpeaking = false;
_handleTtsError(e.toString());
}
}
void _handleServerAudioComplete() {
if (_isDisposed) {
return;
}
_isSpeaking = false;
if (_serverAudioBuffer.containsKey(_nextServerPlaybackId)) {
_maybeStartServerAudio();
return;
}
_maybeResumeListeningAfterSpeech();
}
void _resetServerAudio({bool stopPlayback = false}) {
_serverAudioBuffer.clear();
_pendingServerAudioFetches = 0;
_serverAudioSession++;
_nextServerChunkId = 0;
_nextServerPlaybackId = 0;
if (stopPlayback) {
unawaited(_serverAudioPlayer.stop());
_isSpeaking = false;
}
_serverPipelineActive = false;
}
Future<void> _prepareForSpeechPlayback() async {
if (_listeningSuspendedForSpeech) {
return;
}
_listeningSuspendedForSpeech = true;
await _voiceInput.stopListening();
await _transcriptSubscription?.cancel();
_transcriptSubscription = null;
await _intensitySubscription?.cancel();
_intensitySubscription = null;
}
void _maybeResumeListeningAfterSpeech() {
if (!_responseCompleted) {
return;
}
if (_hasPendingSpeech) {
return;
}
if (_pauseReasons.isNotEmpty) {
_listeningPaused = true;
if (_state != VoiceCallState.paused) {
_updateState(VoiceCallState.paused);
}
return;
}
if (_serverPipelineActive && _pendingServerAudioFetches > 0) {
return;
}
unawaited(_startListening());
}
void _handleTtsStart() {
if (_isDisposed) return;
_updateState(VoiceCallState.speaking);
@@ -376,17 +612,19 @@ class VoiceCallService {
void _handleTtsComplete() {
if (_isDisposed) return;
_isSpeaking = false;
// After assistant finishes speaking, resume only if not paused
if (_pauseReasons.isNotEmpty) {
_listeningPaused = true;
_updateState(VoiceCallState.paused);
if (_speechQueue.isNotEmpty) {
unawaited(_startNextSpeechChunk());
return;
}
_startListening();
_maybeResumeListeningAfterSpeech();
}
void _handleTtsError(String error) {
if (_isDisposed) return;
_isSpeaking = false;
_speechQueue.clear();
_resetServerAudio(stopPlayback: true);
_listeningSuspendedForSpeech = false;
_updateState(VoiceCallState.error);
// Try to recover by restarting listening
_startListening();
@@ -405,6 +643,7 @@ class VoiceCallService {
await _voiceInput.stopListening();
await _tts.stop();
await _serverAudioPlayer.stop();
await BackgroundStreamingHandler.instance.stopBackgroundExecution(const [
_voiceCallStreamId,
@@ -421,6 +660,13 @@ class VoiceCallService {
_isMuted = false;
_listeningPaused = false;
_pauseReasons.clear();
_speechQueue.clear();
_enqueuedSentenceCount = 0;
_responseCompleted = false;
_listeningSuspendedForSpeech = false;
_activeAssistantMessageId = null;
_isSpeaking = false;
_resetServerAudio(stopPlayback: true);
_updateState(VoiceCallState.disconnected);
}
@@ -462,6 +708,11 @@ class VoiceCallService {
Future<void> cancelSpeaking() async {
if (_isDisposed) return;
_speechQueue.clear();
_enqueuedSentenceCount = 0;
_responseCompleted = false;
_listeningSuspendedForSpeech = false;
_resetServerAudio(stopPlayback: true);
await _tts.stop();
_isSpeaking = false;
_accumulatedResponse = '';
@@ -527,6 +778,11 @@ class VoiceCallService {
_isSpeaking = false;
_accumulatedResponse = '';
}
_speechQueue.clear();
_enqueuedSentenceCount = 0;
_responseCompleted = false;
_listeningSuspendedForSpeech = false;
_resetServerAudio(stopPlayback: true);
pauseListening(reason: VoiceCallPauseReason.mute);
} else {
resumeListening(reason: VoiceCallPauseReason.mute);
@@ -547,6 +803,7 @@ class VoiceCallService {
_voiceInput.dispose();
await _tts.dispose();
await _serverAudioPlayer.dispose();
// Cancel notification
await _notificationService.cancelNotification();