refactor: text streaming

This commit is contained in:
cogwheel0
2025-09-13 10:16:58 +05:30
parent d903e795d9
commit 7e6009d2cc
16 changed files with 719 additions and 348 deletions

View File

@@ -565,8 +565,7 @@ class ApiService {
}
// Fallback to chat.messages (list format) if history is missing or empty
if (((messagesList?.isEmpty ?? true)) &&
chatObject['messages'] != null) {
if (((messagesList?.isEmpty ?? true)) && chatObject['messages'] != null) {
messagesList = chatObject['messages'] as List;
debugPrint(
'DEBUG: Found ${messagesList.length} messages in chat.messages (fallback)',
@@ -1229,7 +1228,8 @@ class ApiService {
Future<void> updateUserSettings(Map<String, dynamic> settings) async {
debugPrint('DEBUG: Updating user settings');
await _dio.post('/api/v1/users/user/settings', data: settings);
// Align with web client update route
await _dio.post('/api/v1/users/user/settings/update', data: settings);
}
// Suggestions
@@ -1384,8 +1384,35 @@ class ApiService {
// Files
Future<String> getFileContent(String fileId) async {
debugPrint('DEBUG: Fetching file content: $fileId');
final response = await _dio.get('/api/v1/files/$fileId/content');
return response.data as String;
// The Open-WebUI endpoint returns the raw file bytes with appropriate
// Content-Type headers, not JSON. We must read bytes and base64-encode
// them for consistent handling across platforms/widgets.
final response = await _dio.get(
'/api/v1/files/$fileId/content',
options: Options(responseType: ResponseType.bytes),
);
// Try to determine the mime type from response headers; fallback to text/plain
final contentType =
response.headers.value(HttpHeaders.contentTypeHeader) ?? '';
String mimeType = 'text/plain';
if (contentType.isNotEmpty) {
// Strip charset if present
mimeType = contentType.split(';').first.trim();
}
final bytes = response.data is List<int>
? (response.data as List<int>)
: (response.data as Uint8List).toList();
final base64Data = base64Encode(bytes);
// For images, return a data URL so UI can render directly; otherwise return raw base64
if (mimeType.startsWith('image/')) {
return 'data:$mimeType;base64,$base64Data';
}
return base64Data;
}
Future<Map<String, dynamic>> getFileInfo(String fileId) async {
@@ -2633,7 +2660,8 @@ class ApiService {
final streamController = StreamController<String>();
// Generate unique IDs
final messageId = (responseMessageId != null && responseMessageId.isNotEmpty)
final messageId =
(responseMessageId != null && responseMessageId.isNotEmpty)
? responseMessageId
: const Uuid().v4();
final sessionId =
@@ -2770,20 +2798,23 @@ class ApiService {
debugPrint(
'DEBUG: Has background_tasks (pre-bg): ${data.containsKey('background_tasks')}',
);
debugPrint('DEBUG: Has session_id (pre-bg): ${data.containsKey('session_id')}');
debugPrint('DEBUG: background_tasks value (pre-bg): ${data['background_tasks']}');
debugPrint(
'DEBUG: Has session_id (pre-bg): ${data.containsKey('session_id')}',
);
debugPrint(
'DEBUG: background_tasks value (pre-bg): ${data['background_tasks']}',
);
debugPrint('DEBUG: session_id value (pre-bg): ${data['session_id']}');
debugPrint('DEBUG: id value (pre-bg): ${data['id']}');
// Decide whether to use background task flow.
// Only enable background task mode when we actually need socket/dynamic-channel
// behavior (e.g., provider-native tools or explicit background tasks with a session).
// Always use task-based background flow for unified pipeline.
// When a dynamic channel (session_id) is not provided, this method falls
// back to polling and streams deltas to the UI.
// Always use background task flow (matches web client) to ensure
// server maintains correct history with pre-seeded assistant id.
final bool useBackgroundTasks = true;
// Use background task mode when tools, web_search, image_generation are enabled,
// or when an explicit dynamic socket session binding is requested.
final bool useBackgroundTasks =
(toolIds != null && toolIds.isNotEmpty) ||
enableWebSearch ||
enableImageGeneration ||
(sessionIdOverride != null && sessionIdOverride.isNotEmpty);
// Use background flow only when required; otherwise prefer SSE even with chat_id.
// SSE must not include session_id/id to avoid server falling back to task mode.
@@ -2804,8 +2835,12 @@ class ApiService {
debugPrint('DEBUG: Background flow payload keys: ${data.keys.toList()}');
debugPrint('DEBUG: Using session_id: $sessionId');
debugPrint('DEBUG: Using message id: $messageId');
debugPrint('DEBUG: Has tool_ids: ${data.containsKey('tool_ids')} -> ${data['tool_ids']}');
debugPrint('DEBUG: Has background_tasks: ${data.containsKey('background_tasks')}');
debugPrint(
'DEBUG: Has tool_ids: ${data.containsKey('tool_ids')} -> ${data['tool_ids']}',
);
debugPrint(
'DEBUG: Has background_tasks: ${data.containsKey('background_tasks')}',
);
debugPrint('DEBUG: Initiating background tools flow (task-based)');
debugPrint('DEBUG: Posting to /api/chat/completions (no SSE)');
@@ -2838,6 +2873,113 @@ class ApiService {
if (!streamController.isClosed) streamController.close();
}
}();
} else {
// SSE streaming path for low-latency pure completions (no background tasks)
() async {
try {
// Request SSE stream; avoid session_id/id which break streaming
final resp = await _dio.post(
'/api/chat/completions',
data: data,
options: Options(
responseType: ResponseType.stream,
headers: {'Accept': 'text/event-stream'},
),
);
// Parse SSE lines and forward deltas to the controller
final body = resp.data;
// Dio returns ResponseBody for stream responseType
final stream = (body is ResponseBody) ? body.stream : null;
if (stream == null) {
// Fallback: if server responded JSON, emit once
try {
final dataStr = resp.data?.toString() ?? '';
if (dataStr.isNotEmpty && !streamController.isClosed) {
streamController.add(dataStr);
}
} catch (_) {}
if (!streamController.isClosed) streamController.close();
return;
}
String buffer = '';
late final StreamSubscription<List<int>> sub;
sub = stream.listen(
(chunk) {
try {
buffer += utf8.decode(chunk, allowMalformed: true);
// Process complete lines; keep remainder in buffer
final parts = buffer.split('\n');
buffer = parts.isNotEmpty ? parts.removeLast() : '';
for (final raw in parts) {
final line = raw.trim();
if (line.isEmpty) continue;
if (line == 'data: [DONE]') {
try {
if (!streamController.isClosed) streamController.close();
} catch (_) {}
sub.cancel();
return;
}
if (line.startsWith('data:')) {
final payloadStr = line.substring(5).trim();
if (payloadStr.isEmpty) continue;
try {
final Map<String, dynamic> j = jsonDecode(payloadStr);
final choices = j['choices'];
if (choices is List && choices.isNotEmpty) {
final choice = choices.first;
final delta = choice is Map ? choice['delta'] : null;
if (delta is Map) {
final content = delta['content']?.toString() ?? '';
if (content.isNotEmpty &&
!streamController.isClosed) {
streamController.add(content);
}
} else {
final message = (choice is Map)
? (choice['message']?['content']?.toString() ??
'')
: '';
if (message.isNotEmpty &&
!streamController.isClosed) {
streamController.add(message);
}
}
} else if (j['content'] is String) {
final content = j['content'] as String;
if (content.isNotEmpty && !streamController.isClosed) {
streamController.add(content);
}
}
} catch (_) {
// Non-JSON payload; forward as-is
if (!streamController.isClosed) {
streamController.add(payloadStr);
}
}
}
}
} catch (_) {}
},
onDone: () {
try {
if (!streamController.isClosed) streamController.close();
} catch (_) {}
},
onError: (_) {
try {
if (!streamController.isClosed) streamController.close();
} catch (_) {}
},
cancelOnError: true,
);
} catch (e) {
debugPrint('DEBUG: SSE streaming failed: $e');
if (!streamController.isClosed) streamController.close();
}
}();
}
return (