fix: default model edge cases

This commit is contained in:
cogwheel0
2025-08-17 17:43:19 +05:30
parent cf449fb796
commit 7b598d7c04
14 changed files with 90 additions and 429 deletions

View File

@@ -2530,7 +2530,7 @@ class ApiService {
debugPrint('Persistent: Attempting to recover stream $streamId');
// Restart the streaming request
_streamSSE(data, streamController, messageId);
};
}
// Declare variables that need to be accessible in catch block
String? persistentStreamId;
@@ -2929,348 +2929,9 @@ class ApiService {
}
}
// Enhanced SSE parser that matches OpenWebUI's EventSourceParserStream approach
void _streamChatCompletionEnhanced(
Map<String, dynamic> data,
StreamController<String> streamController,
String messageId,
) async {
try {
debugPrint('DEBUG: Making enhanced SSE request to /api/chat/completions');
final response = await _dio.post(
'/api/chat/completions',
data: data,
options: Options(
responseType: ResponseType.stream,
headers: {
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
receiveTimeout: null,
),
);
debugPrint('DEBUG: Enhanced SSE response received, status: ${response.statusCode}');
if (response.statusCode != 200) {
throw Exception('HTTP ${response.statusCode}: Failed to start streaming');
}
// Transform raw stream through SSE parser (like OpenWebUI's pipeline)
final rawStream = response.data.stream as Stream<List<int>>;
final textStream = StreamController<String>();
// Convert bytes to text manually (like TextDecoderStream)
rawStream.listen(
(chunk) {
try {
final text = utf8.decode(chunk);
textStream.add(text);
} catch (e) {
debugPrint('DEBUG: Enhanced SSE decode error: $e');
}
},
onDone: () => textStream.close(),
onError: (error) => textStream.addError(error),
);
// Apply SSE parsing (like EventSourceParserStream)
textStream.stream
.transform(_createEventSourceTransformer()) // Text → ParsedEvent
.listen(
(event) => _handleSSEEvent(event, streamController),
onDone: () {
debugPrint('DEBUG: Enhanced SSE stream completed');
streamController.close();
},
onError: (error) {
debugPrint('DEBUG: Enhanced SSE stream error: $error');
streamController.addError(error);
},
);
} catch (e) {
debugPrint('DEBUG: Enhanced SSE streaming error: $e');
streamController.addError(e);
}
}
// Create a stream transformer that parses SSE events (like EventSourceParserStream)
StreamTransformer<String, Map<String, String>> _createEventSourceTransformer() {
String buffer = '';
return StreamTransformer<String, Map<String, String>>.fromHandlers(
handleData: (chunk, sink) {
buffer += chunk;
final lines = buffer.split('\n');
buffer = lines.removeLast(); // Keep incomplete line
String? eventType;
String? data;
String? id;
for (final line in lines) {
final trimmed = line.trim();
if (trimmed.isEmpty) {
// Empty line indicates end of event - emit it
if (data != null) {
sink.add({
'type': eventType ?? 'message',
'data': data,
if (id != null) 'id': id,
});
}
// Reset for next event
eventType = null;
data = null;
id = null;
} else if (trimmed.startsWith('data: ')) {
final eventData = trimmed.substring(6);
data = data == null ? eventData : '$data\n$eventData';
} else if (trimmed.startsWith('event: ')) {
eventType = trimmed.substring(7);
} else if (trimmed.startsWith('id: ')) {
id = trimmed.substring(4);
}
// Ignore retry: and other fields
}
},
handleDone: (sink) {
// Handle any remaining data
if (buffer.trim().isNotEmpty) {
sink.add({
'type': 'message',
'data': buffer.trim(),
});
}
sink.close();
},
);
}
// Handle individual SSE events (like OpenWebUI's event handler)
void _handleSSEEvent(Map<String, String> event, StreamController<String> streamController) {
final data = event['data'];
if (data == null) return;
debugPrint('DEBUG: Enhanced SSE event: ${event['type']}, data: $data');
if (data == '[DONE]') {
debugPrint('DEBUG: Enhanced SSE stream finished with [DONE]');
streamController.close();
return;
}
try {
final json = jsonDecode(data) as Map<String, dynamic>;
// Handle errors (like OpenWebUI)
if (json.containsKey('error')) {
final error = json['error'];
debugPrint('DEBUG: Enhanced SSE error: $error');
streamController.addError('Server error: $error');
return;
}
// Handle content streaming (like OpenWebUI's choices processing)
if (json.containsKey('choices')) {
final choices = json['choices'] as List?;
if (choices != null && choices.isNotEmpty) {
final choice = choices[0] as Map<String, dynamic>;
if (choice.containsKey('delta')) {
final delta = choice['delta'] as Map<String, dynamic>;
// Extract content (like OpenWebUI's delta.content)
if (delta.containsKey('content')) {
final content = delta['content'] as String?;
if (content != null && content.isNotEmpty) {
debugPrint('DEBUG: Enhanced SSE content chunk: "$content"');
streamController.add(content);
}
}
// Handle tool calls if present
if (delta.containsKey('tool_calls')) {
final toolCalls = delta['tool_calls'] as List?;
if (toolCalls != null && toolCalls.isNotEmpty) {
debugPrint('DEBUG: Enhanced SSE tool calls: $toolCalls');
// Could emit special events for tool calls if needed
}
}
}
// Handle finish reason
if (choice.containsKey('finish_reason')) {
final finishReason = choice['finish_reason'];
if (finishReason != null) {
debugPrint('DEBUG: Enhanced SSE finished with reason: $finishReason');
streamController.close();
return;
}
}
}
}
// Handle other event types (sources, usage, etc.) like OpenWebUI
if (json.containsKey('sources')) {
debugPrint('DEBUG: Enhanced SSE sources: ${json['sources']}');
// Could emit sources events if needed
}
if (json.containsKey('usage')) {
debugPrint('DEBUG: Enhanced SSE usage: ${json['usage']}');
// Could emit usage events if needed
}
} catch (e) {
debugPrint('DEBUG: Enhanced SSE JSON parse error: $e');
// Continue processing - don't fail the entire stream
}
}
// Original working SSE streaming implementation
void _streamChatCompletionOriginal(
Map<String, dynamic> data,
StreamController<String> streamController,
String messageId,
) async {
try {
debugPrint('DEBUG: Making SSE request to /api/chat/completions');
final response = await _dio.post(
'/api/chat/completions',
data: data,
options: Options(
responseType: ResponseType.stream,
headers: {
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
receiveTimeout: null,
),
);
debugPrint('DEBUG: SSE response received, status: ${response.statusCode}');
debugPrint('DEBUG: SSE response headers: ${response.headers}');
debugPrint('DEBUG: SSE response content-type: ${response.headers.value('content-type')}');
if (response.statusCode != 200) {
throw Exception('HTTP ${response.statusCode}: Failed to start streaming');
}
// Process the SSE stream exactly like OpenWebUI frontend
final stream = response.data.stream as Stream<List<int>>;
String buffer = '';
debugPrint('DEBUG: Starting to process SSE stream chunks');
await for (final chunk in stream) {
debugPrint('DEBUG: Received SSE chunk of size: ${chunk.length}');
try {
// Decode chunk to string
final chunkStr = utf8.decode(chunk);
buffer += chunkStr;
// Process complete lines (SSE format)
final lines = buffer.split('\n');
buffer = lines.removeLast(); // Keep incomplete line in buffer
for (final line in lines) {
final trimmedLine = line.trim();
if (trimmedLine.isEmpty) continue;
debugPrint('DEBUG: SSE line: $trimmedLine');
if (trimmedLine.startsWith('data: ')) {
final jsonStr = trimmedLine.substring(6); // Remove "data: "
if (jsonStr == '[DONE]') {
debugPrint('DEBUG: SSE stream finished with [DONE]');
streamController.close();
return;
}
try {
final json = jsonDecode(jsonStr) as Map<String, dynamic>;
debugPrint('DEBUG: SSE JSON: $json');
// Process exactly like OpenWebUI
if (json.containsKey('choices')) {
final choices = json['choices'] as List?;
if (choices != null && choices.isNotEmpty) {
final choice = choices[0] as Map<String, dynamic>;
if (choice.containsKey('delta')) {
final delta = choice['delta'] as Map<String, dynamic>;
// Handle content streaming (word by word)
if (delta.containsKey('content')) {
final content = delta['content'] as String?;
if (content != null && content.isNotEmpty) {
debugPrint('DEBUG: Adding content chunk: "$content"');
streamController.add(content);
}
}
// Handle function calls
if (delta.containsKey('tool_calls')) {
final toolCalls = delta['tool_calls'] as List?;
if (toolCalls != null && toolCalls.isNotEmpty) {
debugPrint('DEBUG: Tool calls received: $toolCalls');
// Handle tool calls if needed
}
}
}
// Handle finish reason
if (choice.containsKey('finish_reason')) {
final finishReason = choice['finish_reason'];
if (finishReason != null) {
debugPrint('DEBUG: Stream finished with reason: $finishReason');
streamController.close();
return;
}
}
}
} else if (json.containsKey('error')) {
// Handle server errors
final error = json['error'];
debugPrint('DEBUG: SSE error: $error');
streamController.addError('Server error: $error');
return;
} else {
debugPrint('DEBUG: Unknown SSE JSON format: $json');
}
} catch (e) {
debugPrint('DEBUG: Error parsing SSE JSON "$jsonStr": $e');
// Continue processing other lines
}
} else if (trimmedLine.startsWith('event: ') ||
trimmedLine.startsWith('id: ') ||
trimmedLine.startsWith('retry: ')) {
// Handle other SSE fields (ignore for now)
debugPrint('DEBUG: SSE metadata: $trimmedLine');
} else {
debugPrint('DEBUG: Unknown SSE line format: $trimmedLine');
}
}
} catch (e) {
debugPrint('DEBUG: Error processing SSE chunk: $e');
// Continue processing
}
}
// Stream ended without [DONE] marker
debugPrint('DEBUG: SSE stream ended unexpectedly');
streamController.close();
} catch (e) {
debugPrint('DEBUG: SSE streaming error: $e');
streamController.addError(e);
}
}
// Initialize Socket.IO connection
Future<void> _initializeSocket() async {