2025-08-16 17:36:02 +05:30
|
|
|
import 'dart:async';
|
|
|
|
|
import 'dart:convert';
|
2025-08-16 20:27:44 +05:30
|
|
|
import 'package:flutter/foundation.dart';
|
2025-08-16 17:36:02 +05:30
|
|
|
|
|
|
|
|
/// Event data from Server-Sent Events stream
|
|
|
|
|
class SSEEvent {
|
|
|
|
|
final String? id;
|
|
|
|
|
final String? event;
|
|
|
|
|
final String data;
|
|
|
|
|
final int? retry;
|
|
|
|
|
|
|
|
|
|
SSEEvent({
|
|
|
|
|
this.id,
|
|
|
|
|
this.event,
|
|
|
|
|
required this.data,
|
|
|
|
|
this.retry,
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
2025-08-16 20:27:44 +05:30
|
|
|
/// Parser for Server-Sent Events with robust error handling and heartbeat support
|
2025-08-16 17:36:02 +05:30
|
|
|
class SSEParser {
|
|
|
|
|
final _controller = StreamController<SSEEvent>.broadcast();
|
|
|
|
|
|
|
|
|
|
String _buffer = '';
|
|
|
|
|
String? _currentId;
|
|
|
|
|
String? _currentEvent;
|
|
|
|
|
String _currentData = '';
|
|
|
|
|
int? _currentRetry;
|
|
|
|
|
|
2025-08-16 20:27:44 +05:30
|
|
|
// Heartbeat and health monitoring
|
|
|
|
|
Timer? _heartbeatTimer;
|
|
|
|
|
DateTime _lastDataReceived = DateTime.now();
|
|
|
|
|
Duration _heartbeatTimeout = const Duration(seconds: 30);
|
|
|
|
|
bool _isClosed = false;
|
|
|
|
|
|
|
|
|
|
// Recovery state
|
|
|
|
|
String? _lastEventId;
|
|
|
|
|
bool _reconnectRequested = false;
|
|
|
|
|
|
2025-08-16 17:36:02 +05:30
|
|
|
Stream<SSEEvent> get stream => _controller.stream;
|
|
|
|
|
|
2025-08-16 20:27:44 +05:30
|
|
|
// Events for monitoring connection health
|
|
|
|
|
final _heartbeatController = StreamController<void>.broadcast();
|
|
|
|
|
final _reconnectController = StreamController<String?>.broadcast();
|
|
|
|
|
|
|
|
|
|
Stream<void> get heartbeat => _heartbeatController.stream;
|
|
|
|
|
Stream<String?> get reconnectRequests => _reconnectController.stream;
|
|
|
|
|
|
|
|
|
|
SSEParser({Duration? heartbeatTimeout}) {
|
|
|
|
|
if (heartbeatTimeout != null) {
|
|
|
|
|
_heartbeatTimeout = heartbeatTimeout;
|
|
|
|
|
}
|
|
|
|
|
_startHeartbeatTimer();
|
|
|
|
|
}
|
|
|
|
|
|
2025-08-16 17:36:02 +05:30
|
|
|
/// Feed raw text data to the parser
|
|
|
|
|
void feed(String chunk) {
|
2025-08-16 20:27:44 +05:30
|
|
|
if (_isClosed) return;
|
|
|
|
|
|
|
|
|
|
_lastDataReceived = DateTime.now();
|
2025-08-16 17:36:02 +05:30
|
|
|
_buffer += chunk;
|
|
|
|
|
_processBuffer();
|
2025-08-16 20:27:44 +05:30
|
|
|
|
|
|
|
|
// Reset heartbeat timer since we received data
|
|
|
|
|
_resetHeartbeatTimer();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void _startHeartbeatTimer() {
|
|
|
|
|
_heartbeatTimer?.cancel();
|
|
|
|
|
_heartbeatTimer = Timer(_heartbeatTimeout, _onHeartbeatTimeout);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void _resetHeartbeatTimer() {
|
|
|
|
|
if (!_isClosed) {
|
|
|
|
|
_startHeartbeatTimer();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void _onHeartbeatTimeout() {
|
|
|
|
|
debugPrint('SSEParser: Heartbeat timeout - no data received in ${_heartbeatTimeout.inSeconds}s');
|
|
|
|
|
|
|
|
|
|
if (!_isClosed) {
|
|
|
|
|
// Emit heartbeat timeout event
|
|
|
|
|
_heartbeatController.add(null);
|
|
|
|
|
|
|
|
|
|
// Request reconnection with last event ID for recovery
|
|
|
|
|
_reconnectRequested = true;
|
|
|
|
|
_reconnectController.add(_lastEventId);
|
|
|
|
|
}
|
2025-08-16 17:36:02 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Process buffered data and emit events
|
|
|
|
|
void _processBuffer() {
|
2025-08-16 20:27:44 +05:30
|
|
|
try {
|
|
|
|
|
// Handle potential Unicode boundary issues by checking for incomplete characters
|
|
|
|
|
if (_buffer.isNotEmpty && _hasIncompleteUnicode(_buffer)) {
|
|
|
|
|
// Keep buffer intact if it might contain incomplete Unicode
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Split by newlines but keep the last incomplete line
|
|
|
|
|
final lines = _buffer.split('\n');
|
|
|
|
|
|
|
|
|
|
// Keep the last line in buffer if it doesn't end with newline
|
|
|
|
|
if (!_buffer.endsWith('\n')) {
|
|
|
|
|
_buffer = lines.removeLast();
|
|
|
|
|
} else {
|
|
|
|
|
_buffer = '';
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (final line in lines) {
|
|
|
|
|
_processLine(line);
|
|
|
|
|
}
|
|
|
|
|
} catch (e) {
|
|
|
|
|
debugPrint('SSEParser: Error processing buffer: $e');
|
|
|
|
|
// Reset buffer on parsing error to prevent cascading failures
|
2025-08-16 17:36:02 +05:30
|
|
|
_buffer = '';
|
|
|
|
|
}
|
2025-08-16 20:27:44 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool _hasIncompleteUnicode(String text) {
|
|
|
|
|
if (text.isEmpty) return false;
|
2025-08-16 17:36:02 +05:30
|
|
|
|
2025-08-16 20:27:44 +05:30
|
|
|
// Check if the last few characters might be incomplete Unicode
|
|
|
|
|
// This is a simple heuristic - in practice, Dart's UTF-8 decoder handles this
|
|
|
|
|
final lastChar = text.codeUnitAt(text.length - 1);
|
|
|
|
|
|
|
|
|
|
// If it's a high surrogate, we might be missing the low surrogate
|
|
|
|
|
return lastChar >= 0xD800 && lastChar <= 0xDBFF;
|
2025-08-16 17:36:02 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Process a single line according to SSE spec
|
|
|
|
|
void _processLine(String line) {
|
2025-08-16 20:27:44 +05:30
|
|
|
// Handle carriage return if present (some servers use \r\n)
|
|
|
|
|
final cleanLine = line.replaceAll('\r', '');
|
|
|
|
|
|
2025-08-16 17:36:02 +05:30
|
|
|
// Empty line signals end of event
|
2025-08-16 20:27:44 +05:30
|
|
|
if (cleanLine.trim().isEmpty) {
|
2025-08-16 17:36:02 +05:30
|
|
|
if (_currentData.isNotEmpty) {
|
|
|
|
|
_emitEvent();
|
|
|
|
|
}
|
|
|
|
|
_resetCurrentEvent();
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
2025-08-16 20:27:44 +05:30
|
|
|
// Comment line (starts with :) - these serve as keep-alives
|
|
|
|
|
if (cleanLine.startsWith(':')) {
|
|
|
|
|
// Treat comments as heartbeat signals
|
|
|
|
|
_lastDataReceived = DateTime.now();
|
|
|
|
|
_resetHeartbeatTimer();
|
|
|
|
|
|
|
|
|
|
// Log processing indicators but don't spam debug output
|
|
|
|
|
if (cleanLine.contains('OPENROUTER') && kDebugMode) {
|
|
|
|
|
debugPrint('SSEParser: OpenRouter processing...');
|
|
|
|
|
} else if (cleanLine.contains('PROCESSING') && kDebugMode) {
|
|
|
|
|
debugPrint('SSEParser: Server processing...');
|
2025-08-16 17:36:02 +05:30
|
|
|
}
|
2025-08-16 20:27:44 +05:30
|
|
|
return;
|
2025-08-16 17:36:02 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Parse field and value
|
2025-08-16 20:27:44 +05:30
|
|
|
final colonIndex = cleanLine.indexOf(':');
|
2025-08-16 17:36:02 +05:30
|
|
|
String field;
|
|
|
|
|
String value;
|
|
|
|
|
|
|
|
|
|
if (colonIndex == -1) {
|
2025-08-16 20:27:44 +05:30
|
|
|
field = cleanLine;
|
2025-08-16 17:36:02 +05:30
|
|
|
value = '';
|
|
|
|
|
} else {
|
2025-08-16 20:27:44 +05:30
|
|
|
field = cleanLine.substring(0, colonIndex);
|
|
|
|
|
value = cleanLine.substring(colonIndex + 1);
|
2025-08-16 17:36:02 +05:30
|
|
|
// Remove leading space from value if present
|
|
|
|
|
if (value.startsWith(' ')) {
|
|
|
|
|
value = value.substring(1);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Process field according to SSE spec
|
|
|
|
|
switch (field) {
|
|
|
|
|
case 'data':
|
|
|
|
|
if (_currentData.isNotEmpty) {
|
|
|
|
|
_currentData += '\n';
|
|
|
|
|
}
|
|
|
|
|
_currentData += value;
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case 'event':
|
|
|
|
|
_currentEvent = value;
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case 'id':
|
|
|
|
|
_currentId = value;
|
2025-08-16 20:27:44 +05:30
|
|
|
_lastEventId = value; // Track for reconnection
|
2025-08-16 17:36:02 +05:30
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case 'retry':
|
|
|
|
|
final retryValue = int.tryParse(value);
|
|
|
|
|
if (retryValue != null) {
|
|
|
|
|
_currentRetry = retryValue;
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
// Ignore unknown fields
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Emit the current event
|
|
|
|
|
void _emitEvent() {
|
2025-08-16 20:27:44 +05:30
|
|
|
if (_isClosed) return;
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
final event = SSEEvent(
|
|
|
|
|
id: _currentId,
|
|
|
|
|
event: _currentEvent,
|
|
|
|
|
data: _currentData,
|
|
|
|
|
retry: _currentRetry,
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
_controller.add(event);
|
|
|
|
|
|
|
|
|
|
// Track last event ID for potential reconnection
|
|
|
|
|
if (_currentId != null) {
|
|
|
|
|
_lastEventId = _currentId;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} catch (e) {
|
|
|
|
|
debugPrint('SSEParser: Error emitting event: $e');
|
|
|
|
|
_controller.addError(e);
|
|
|
|
|
}
|
2025-08-16 17:36:02 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Reset current event state
|
|
|
|
|
void _resetCurrentEvent() {
|
|
|
|
|
_currentEvent = null;
|
|
|
|
|
_currentData = '';
|
|
|
|
|
// Note: id and retry are not reset per SSE spec
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Close the parser
|
|
|
|
|
void close() {
|
2025-08-16 20:27:44 +05:30
|
|
|
if (_isClosed) return;
|
|
|
|
|
_isClosed = true;
|
|
|
|
|
|
|
|
|
|
// Cancel heartbeat timer
|
|
|
|
|
_heartbeatTimer?.cancel();
|
|
|
|
|
_heartbeatTimer = null;
|
|
|
|
|
|
2025-08-16 17:36:02 +05:30
|
|
|
// Emit any remaining data
|
|
|
|
|
if (_currentData.isNotEmpty) {
|
|
|
|
|
_emitEvent();
|
|
|
|
|
}
|
2025-08-16 20:27:44 +05:30
|
|
|
|
|
|
|
|
// Close controllers
|
2025-08-16 17:36:02 +05:30
|
|
|
_controller.close();
|
2025-08-16 20:27:44 +05:30
|
|
|
_heartbeatController.close();
|
|
|
|
|
_reconnectController.close();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Get the last event ID for reconnection
|
|
|
|
|
String? get lastEventId => _lastEventId;
|
|
|
|
|
|
|
|
|
|
/// Check if parser is closed
|
|
|
|
|
bool get isClosed => _isClosed;
|
|
|
|
|
|
|
|
|
|
/// Check if reconnection was requested due to timeout
|
|
|
|
|
bool get reconnectRequested => _reconnectRequested;
|
|
|
|
|
|
|
|
|
|
/// Reset reconnect flag (call when reconnection is handled)
|
|
|
|
|
void resetReconnectFlag() {
|
|
|
|
|
_reconnectRequested = false;
|
2025-08-16 17:36:02 +05:30
|
|
|
}
|
|
|
|
|
|
2025-08-16 20:27:44 +05:30
|
|
|
/// Get time since last data was received
|
|
|
|
|
Duration get timeSinceLastData => DateTime.now().difference(_lastDataReceived);
|
|
|
|
|
|
|
|
|
|
/// Parse SSE events from a stream of bytes with robust error handling
|
|
|
|
|
static Stream<SSEEvent> parseStream(
|
|
|
|
|
Stream<List<int>> byteStream, {
|
|
|
|
|
Duration? heartbeatTimeout,
|
|
|
|
|
}) {
|
|
|
|
|
final parser = SSEParser(heartbeatTimeout: heartbeatTimeout);
|
|
|
|
|
|
|
|
|
|
// Convert bytes to text and feed to parser with error recovery
|
|
|
|
|
StreamSubscription? subscription;
|
2025-08-16 17:36:02 +05:30
|
|
|
|
2025-08-16 20:27:44 +05:30
|
|
|
subscription = byteStream
|
2025-08-16 17:36:02 +05:30
|
|
|
.transform(utf8.decoder)
|
|
|
|
|
.listen(
|
2025-08-16 20:27:44 +05:30
|
|
|
(chunk) {
|
|
|
|
|
try {
|
|
|
|
|
parser.feed(chunk);
|
|
|
|
|
} catch (e) {
|
|
|
|
|
debugPrint('SSEParser: Error feeding chunk: $e');
|
|
|
|
|
// Don't propagate feed errors - just skip the problematic chunk
|
|
|
|
|
}
|
|
|
|
|
},
|
2025-08-16 17:36:02 +05:30
|
|
|
onDone: () => parser.close(),
|
2025-08-16 20:27:44 +05:30
|
|
|
onError: (error) {
|
|
|
|
|
debugPrint('SSEParser: Stream error: $error');
|
|
|
|
|
parser._controller.addError(error);
|
|
|
|
|
},
|
|
|
|
|
cancelOnError: false, // Continue processing despite errors
|
2025-08-16 17:36:02 +05:30
|
|
|
);
|
|
|
|
|
|
2025-08-16 20:27:44 +05:30
|
|
|
// Clean up subscription when parser is closed
|
|
|
|
|
parser._controller.onCancel = () {
|
|
|
|
|
subscription?.cancel();
|
|
|
|
|
};
|
|
|
|
|
|
2025-08-16 17:36:02 +05:30
|
|
|
return parser.stream;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-08-16 20:27:44 +05:30
|
|
|
/// Transform a text stream into SSE events with heartbeat monitoring
|
2025-08-16 17:36:02 +05:30
|
|
|
class SSETransformer extends StreamTransformerBase<String, SSEEvent> {
|
2025-08-16 20:27:44 +05:30
|
|
|
final Duration? heartbeatTimeout;
|
|
|
|
|
|
|
|
|
|
const SSETransformer({this.heartbeatTimeout});
|
|
|
|
|
|
2025-08-16 17:36:02 +05:30
|
|
|
@override
|
|
|
|
|
Stream<SSEEvent> bind(Stream<String> stream) {
|
2025-08-16 20:27:44 +05:30
|
|
|
final parser = SSEParser(heartbeatTimeout: heartbeatTimeout);
|
|
|
|
|
|
|
|
|
|
StreamSubscription? subscription;
|
2025-08-16 17:36:02 +05:30
|
|
|
|
2025-08-16 20:27:44 +05:30
|
|
|
subscription = stream.listen(
|
|
|
|
|
(chunk) {
|
|
|
|
|
try {
|
|
|
|
|
parser.feed(chunk);
|
|
|
|
|
} catch (e) {
|
|
|
|
|
debugPrint('SSETransformer: Error feeding chunk: $e');
|
|
|
|
|
// Continue processing despite errors
|
|
|
|
|
}
|
|
|
|
|
},
|
2025-08-16 17:36:02 +05:30
|
|
|
onDone: () => parser.close(),
|
2025-08-16 20:27:44 +05:30
|
|
|
onError: (error) {
|
|
|
|
|
debugPrint('SSETransformer: Stream error: $error');
|
|
|
|
|
parser._controller.addError(error);
|
|
|
|
|
},
|
|
|
|
|
cancelOnError: false,
|
2025-08-16 17:36:02 +05:30
|
|
|
);
|
|
|
|
|
|
2025-08-16 20:27:44 +05:30
|
|
|
// Clean up subscription when parser is closed
|
|
|
|
|
parser._controller.onCancel = () {
|
|
|
|
|
subscription?.cancel();
|
|
|
|
|
};
|
|
|
|
|
|
2025-08-16 17:36:02 +05:30
|
|
|
return parser.stream;
|
|
|
|
|
}
|
2025-08-16 20:27:44 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Enhanced SSE event with additional metadata for resilient streaming
|
|
|
|
|
class EnhancedSSEEvent extends SSEEvent {
|
|
|
|
|
final DateTime timestamp;
|
|
|
|
|
final int sequenceNumber;
|
|
|
|
|
final String? sessionId;
|
|
|
|
|
|
|
|
|
|
EnhancedSSEEvent({
|
|
|
|
|
required super.data,
|
|
|
|
|
super.id,
|
|
|
|
|
super.event,
|
|
|
|
|
super.retry,
|
|
|
|
|
required this.timestamp,
|
|
|
|
|
required this.sequenceNumber,
|
|
|
|
|
this.sessionId,
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
factory EnhancedSSEEvent.fromSSEEvent(
|
|
|
|
|
SSEEvent event, {
|
|
|
|
|
required int sequenceNumber,
|
|
|
|
|
String? sessionId,
|
|
|
|
|
}) {
|
|
|
|
|
return EnhancedSSEEvent(
|
|
|
|
|
data: event.data,
|
|
|
|
|
id: event.id,
|
|
|
|
|
event: event.event,
|
|
|
|
|
retry: event.retry,
|
|
|
|
|
timestamp: DateTime.now(),
|
|
|
|
|
sequenceNumber: sequenceNumber,
|
|
|
|
|
sessionId: sessionId,
|
|
|
|
|
);
|
|
|
|
|
}
|
2025-08-16 17:36:02 +05:30
|
|
|
}
|