chore: update markdown dependency and refactor streaming handling

- Added `markdown` dependency version `^7.2.1` in `pubspec.yaml`.
- Updated `pubspec.lock` to reflect the direct dependency change.
- Refactored `streaming_helper.dart` to utilize `StreamingResponseController` for better stream management.
- Enhanced `ChatMessagesNotifier` to handle message streams with improved formatting and error handling.
- Updated `StreamingMarkdownWidget` to streamline markdown rendering and support new configurations.
This commit is contained in:
cogwheel0
2025-09-30 20:49:02 +05:30
parent d3b64716b9
commit 7debb7a055
10 changed files with 451 additions and 301 deletions

View File

@@ -0,0 +1,25 @@
import 'package:flutter/widgets.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import '../../../core/models/chat_message.dart';
typedef AssistantResponseBuilder =
Widget Function(BuildContext context, AssistantResponseContext response);
class AssistantResponseContext {
const AssistantResponseContext({
required this.message,
required this.markdown,
required this.isStreaming,
required this.buildDefault,
});
final ChatMessage message;
final String markdown;
final bool isStreaming;
final WidgetBuilder buildDefault;
}
final assistantResponseBuilderProvider = Provider<AssistantResponseBuilder?>(
(_) => null,
);

View File

@@ -13,8 +13,10 @@ import '../../../core/models/conversation.dart';
import '../../../core/models/socket_event.dart';
import '../../../core/providers/app_providers.dart';
import '../../../core/services/streaming_helper.dart';
import '../../../core/services/streaming_response_controller.dart';
import '../../../core/utils/debug_logger.dart';
import '../../../core/utils/inactivity_watchdog.dart';
import '../../../core/utils/markdown_stream_formatter.dart';
import '../../../core/utils/tool_calls_parser.dart';
import '../../../shared/services/tasks/task_queue.dart';
import '../../tools/providers/tools_providers.dart';
@@ -76,7 +78,7 @@ class ComposerHasFocus extends _$ComposerHasFocus {
// Chat messages notifier class
class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
StreamSubscription? _messageStream;
StreamingResponseController? _messageStream;
ProviderSubscription? _conversationListener;
final List<StreamSubscription> _subscriptions = [];
final List<VoidCallback> _socketSubscriptions = [];
@@ -85,6 +87,9 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
InactivityWatchdog? _typingWatchdog;
DateTime? _lastStreamingActivity;
MarkdownStreamFormatter? _markdownFormatter;
String? _activeStreamingMessageId;
bool _initialized = false;
@override
@@ -170,13 +175,13 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
return activeConversation?.messages ?? const [];
}
void _addSubscription(StreamSubscription subscription) {
_subscriptions.add(subscription);
}
void _cancelMessageStream() {
_messageStream?.cancel();
final controller = _messageStream;
_messageStream = null;
if (controller != null && controller.isActive) {
unawaited(controller.cancel());
}
_clearStreamingFormatter();
cancelSocketSubscriptions();
}
@@ -185,6 +190,47 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
_typingWatchdog = null;
}
void _clearStreamingFormatter() {
_markdownFormatter = null;
_activeStreamingMessageId = null;
}
void _ensureFormatterForMessage(ChatMessage message) {
if (_markdownFormatter != null && _activeStreamingMessageId == message.id) {
return;
}
final formatter = MarkdownStreamFormatter();
final seed = _stripStreamingPlaceholders(message.content);
if (seed.isNotEmpty) {
formatter.seed(seed);
}
_markdownFormatter = formatter;
_activeStreamingMessageId = message.id;
}
String _stripStreamingPlaceholders(String content) {
var result = content;
const ti = '[TYPING_INDICATOR]';
const searchBanner = '🔍 Searching the web...';
if (result.startsWith(ti)) {
result = result.substring(ti.length);
}
if (result.startsWith(searchBanner)) {
result = result.substring(searchBanner.length);
}
return result;
}
String _finalizeFormatter(String messageId, String fallback) {
if (_markdownFormatter != null && _activeStreamingMessageId == messageId) {
final output = _markdownFormatter!.finalize();
_clearStreamingFormatter();
return output;
}
return fallback;
}
void _scheduleTypingGuard({Duration? timeout}) {
// Default timeout tuned to balance long tool gaps and UX
final effectiveTimeout = timeout ?? const Duration(seconds: 25);
@@ -378,12 +424,9 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
}
}
void setMessageStream(StreamSubscription stream) {
void setMessageStream(StreamingResponseController controller) {
_cancelMessageStream();
_messageStream = stream;
// Add to tracked subscriptions for comprehensive cleanup
_addSubscription(stream);
_messageStream = controller;
}
void setSocketSubscriptions(
@@ -438,22 +481,9 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
final lastMessage = state.last;
if (lastMessage.role != 'assistant') return;
// Ensure we never keep the typing placeholder in persisted content
String sanitized(String s) {
const ti = '[TYPING_INDICATOR]';
const searchBanner = '🔍 Searching the web...';
if (s.startsWith(ti)) {
s = s.substring(ti.length);
}
if (s.startsWith(searchBanner)) {
s = s.substring(searchBanner.length);
}
return s;
}
state = [
...state.sublist(0, state.length - 1),
lastMessage.copyWith(content: sanitized(content)),
lastMessage.copyWith(content: _stripStreamingPlaceholders(content)),
];
_touchStreamingActivity();
}
@@ -565,21 +595,13 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
return;
}
// Strip a leading typing indicator if present, then append delta
const ti = '[TYPING_INDICATOR]';
const searchBanner = '🔍 Searching the web...';
String current = lastMessage.content;
if (current.startsWith(ti)) {
current = current.substring(ti.length);
}
if (current.startsWith(searchBanner)) {
current = current.substring(searchBanner.length);
}
final newContent = current.isEmpty ? content : current + content;
_ensureFormatterForMessage(lastMessage);
final formatter = _markdownFormatter!;
final preview = formatter.ingest(content);
state = [
...state.sublist(0, state.length - 1),
lastMessage.copyWith(content: newContent),
lastMessage.copyWith(content: preview),
];
_touchStreamingActivity();
}
@@ -594,16 +616,10 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
return;
}
// Remove typing indicator if present in the replacement
String sanitized = content;
const ti = '[TYPING_INDICATOR]';
const searchBanner = '🔍 Searching the web...';
if (sanitized.startsWith(ti)) {
sanitized = sanitized.substring(ti.length);
}
if (sanitized.startsWith(searchBanner)) {
sanitized = sanitized.substring(searchBanner.length);
}
_ensureFormatterForMessage(lastMessage);
final formatter = _markdownFormatter!;
final sanitized = formatter.replace(_stripStreamingPlaceholders(content));
state = [
...state.sublist(0, state.length - 1),
lastMessage.copyWith(content: sanitized),
@@ -617,21 +633,14 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
final lastMessage = state.last;
if (lastMessage.role != 'assistant' || !lastMessage.isStreaming) return;
// Also strip any leftover typing indicator before finalizing
const ti = '[TYPING_INDICATOR]';
const searchBanner = '🔍 Searching the web...';
String cleaned = lastMessage.content;
if (cleaned.startsWith(ti)) {
cleaned = cleaned.substring(ti.length);
}
if (cleaned.startsWith(searchBanner)) {
cleaned = cleaned.substring(searchBanner.length);
}
final finalized = _finalizeFormatter(lastMessage.id, lastMessage.content);
final cleaned = _stripStreamingPlaceholders(finalized);
state = [
...state.sublist(0, state.length - 1),
lastMessage.copyWith(isStreaming: false, content: cleaned),
];
_messageStream = null;
_cancelTypingGuard();
// Trigger a refresh of the conversations list so UI like the Chats Drawer
@@ -1407,7 +1416,7 @@ Future<void> regenerateMessage(
getMessages: () => ref.read(chatMessagesProvider),
);
ref.read(chatMessagesProvider.notifier)
..setMessageStream(activeStream.streamSubscription)
..setMessageStream(activeStream.controller)
..setSocketSubscriptions(
activeStream.socketSubscriptions,
onDispose: activeStream.disposeWatchdog,
@@ -1969,7 +1978,7 @@ Future<void> _sendMessageInternal(
);
ref.read(chatMessagesProvider.notifier)
..setMessageStream(activeStream.streamSubscription)
..setMessageStream(activeStream.controller)
..setSocketSubscriptions(
activeStream.socketSubscriptions,
onDispose: activeStream.disposeWatchdog,

View File

@@ -21,6 +21,7 @@ import 'package:url_launcher/url_launcher_string.dart';
import '../providers/chat_providers.dart' show sendMessage;
import '../../../core/utils/debug_logger.dart';
import 'sources/openwebui_sources.dart';
import '../providers/assistant_response_builder_provider.dart';
class AssistantMessageWidget extends ConsumerStatefulWidget {
final dynamic message;
@@ -746,10 +747,23 @@ class _AssistantMessageWidgetState extends ConsumerState<AssistantMessageWidget>
// Process images in the remaining text
final processedContent = _processContentForImages(cleaned);
return StreamingMarkdownWidget(
staticContent: processedContent,
Widget buildDefault(BuildContext context) => StreamingMarkdownWidget(
content: processedContent,
isStreaming: widget.isStreaming,
);
final responseBuilder = ref.watch(assistantResponseBuilderProvider);
if (responseBuilder != null) {
final contextData = AssistantResponseContext(
message: widget.message,
markdown: processedContent,
isStreaming: widget.isStreaming,
buildDefault: buildDefault,
);
return responseBuilder(context, contextData);
}
return buildDefault(context);
}
String _processContentForImages(String content) {