feat(chat): Add worker manager to streaming helper for image processing
This commit is contained in:
@@ -17,6 +17,7 @@ import '../utils/debug_logger.dart';
|
||||
import '../utils/openwebui_source_parser.dart';
|
||||
import 'streaming_response_controller.dart';
|
||||
import 'api_service.dart';
|
||||
import 'worker_manager.dart';
|
||||
|
||||
// Keep local verbosity toggle for socket logs
|
||||
const bool kSocketVerboseLogging = false;
|
||||
@@ -43,6 +44,74 @@ final _imageFilePattern = RegExp(
|
||||
caseSensitive: false,
|
||||
);
|
||||
|
||||
List<Map<String, dynamic>> _collectImageReferencesWorker(String content) {
|
||||
final collected = <Map<String, dynamic>>[];
|
||||
if (content.isEmpty) {
|
||||
return collected;
|
||||
}
|
||||
|
||||
if (content.contains('<details') && content.contains('</details>')) {
|
||||
final parsed = ToolCallsParser.parse(content);
|
||||
if (parsed != null) {
|
||||
for (final entry in parsed.toolCalls) {
|
||||
if (entry.files != null && entry.files!.isNotEmpty) {
|
||||
collected.addAll(_extractFilesFromResult(entry.files));
|
||||
}
|
||||
if (entry.result != null) {
|
||||
collected.addAll(_extractFilesFromResult(entry.result));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (collected.isNotEmpty) {
|
||||
return collected;
|
||||
}
|
||||
|
||||
final base64Matches = _base64ImagePattern.allMatches(content);
|
||||
for (final match in base64Matches) {
|
||||
final url = match.group(0);
|
||||
if (url != null && url.isNotEmpty) {
|
||||
collected.add({'type': 'image', 'url': url});
|
||||
}
|
||||
}
|
||||
|
||||
final urlMatches = _urlImagePattern.allMatches(content);
|
||||
for (final match in urlMatches) {
|
||||
final url = match.group(0);
|
||||
if (url != null && url.isNotEmpty) {
|
||||
collected.add({'type': 'image', 'url': url});
|
||||
}
|
||||
}
|
||||
|
||||
final jsonMatches = _jsonImagePattern.allMatches(content);
|
||||
for (final match in jsonMatches) {
|
||||
final url = _jsonUrlExtractPattern
|
||||
.firstMatch(match.group(0) ?? '')
|
||||
?.group(1);
|
||||
if (url != null && url.isNotEmpty) {
|
||||
collected.add({'type': 'image', 'url': url});
|
||||
}
|
||||
}
|
||||
|
||||
final partialMatches = _partialResultsPattern.allMatches(content);
|
||||
for (final match in partialMatches) {
|
||||
final attrValue = match.group(2);
|
||||
if (attrValue == null) continue;
|
||||
try {
|
||||
final decoded = json.decode(attrValue);
|
||||
collected.addAll(_extractFilesFromResult(decoded));
|
||||
} catch (_) {
|
||||
if (attrValue.startsWith('data:image/') ||
|
||||
_imageFilePattern.hasMatch(attrValue)) {
|
||||
collected.add({'type': 'image', 'url': attrValue});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return collected;
|
||||
}
|
||||
|
||||
class ActiveSocketStream {
|
||||
ActiveSocketStream({
|
||||
required this.controller,
|
||||
@@ -70,6 +139,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
required String? activeConversationId,
|
||||
required ApiService api,
|
||||
required SocketService? socketService,
|
||||
required WorkerManager workerManager,
|
||||
RegisterConversationDeltaListener? registerDeltaListener,
|
||||
// Message update callbacks
|
||||
required void Function(String) appendToLastMessage,
|
||||
@@ -228,88 +298,44 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
||||
final content = msgs.last.content;
|
||||
if (content.isEmpty) return;
|
||||
|
||||
final collected = <Map<String, dynamic>>[];
|
||||
|
||||
// Quick check: only parse tool calls if complete details blocks exist
|
||||
if (content.contains('<details') && content.contains('</details>')) {
|
||||
final parsed = ToolCallsParser.parse(content);
|
||||
if (parsed != null) {
|
||||
for (final entry in parsed.toolCalls) {
|
||||
if (entry.files != null && entry.files!.isNotEmpty) {
|
||||
collected.addAll(_extractFilesFromResult(entry.files));
|
||||
}
|
||||
if (entry.result != null) {
|
||||
collected.addAll(_extractFilesFromResult(entry.result));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (collected.isEmpty) {
|
||||
// Use pre-compiled patterns for better performance
|
||||
final base64Matches = _base64ImagePattern.allMatches(content);
|
||||
for (final match in base64Matches) {
|
||||
final url = match.group(0);
|
||||
if (url != null && url.isNotEmpty) {
|
||||
collected.add({'type': 'image', 'url': url});
|
||||
}
|
||||
}
|
||||
|
||||
final urlMatches = _urlImagePattern.allMatches(content);
|
||||
for (final match in urlMatches) {
|
||||
final url = match.group(0);
|
||||
if (url != null && url.isNotEmpty) {
|
||||
collected.add({'type': 'image', 'url': url});
|
||||
}
|
||||
}
|
||||
|
||||
final jsonMatches = _jsonImagePattern.allMatches(content);
|
||||
for (final match in jsonMatches) {
|
||||
final url = _jsonUrlExtractPattern
|
||||
.firstMatch(match.group(0) ?? '')
|
||||
?.group(1);
|
||||
if (url != null && url.isNotEmpty) {
|
||||
collected.add({'type': 'image', 'url': url});
|
||||
}
|
||||
}
|
||||
|
||||
final partialMatches = _partialResultsPattern.allMatches(content);
|
||||
for (final match in partialMatches) {
|
||||
final attrValue = match.group(2);
|
||||
if (attrValue != null) {
|
||||
try {
|
||||
final decoded = json.decode(attrValue);
|
||||
collected.addAll(_extractFilesFromResult(decoded));
|
||||
} catch (_) {
|
||||
if (attrValue.startsWith('data:image/') ||
|
||||
_imageFilePattern.hasMatch(attrValue)) {
|
||||
collected.add({'type': 'image', 'url': attrValue});
|
||||
final targetMessageId = msgs.last.id;
|
||||
unawaited(
|
||||
workerManager
|
||||
.schedule<String, List<Map<String, dynamic>>>(
|
||||
_collectImageReferencesWorker,
|
||||
content,
|
||||
debugLabel: 'stream_collect_images',
|
||||
)
|
||||
.then((collected) {
|
||||
if (collected.isEmpty) return;
|
||||
final currentMessages = getMessages();
|
||||
if (currentMessages.isEmpty) return;
|
||||
final last = currentMessages.last;
|
||||
if (last.id != targetMessageId || last.role != 'assistant') {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (collected.isEmpty) return;
|
||||
final existing = last.files ?? <Map<String, dynamic>>[];
|
||||
final seen = <String>{
|
||||
for (final f in existing)
|
||||
if (f['url'] is String) (f['url'] as String) else '',
|
||||
}..removeWhere((e) => e.isEmpty);
|
||||
|
||||
final existing = msgs.last.files ?? <Map<String, dynamic>>[];
|
||||
final seen = <String>{
|
||||
for (final f in existing)
|
||||
if (f['url'] is String) (f['url'] as String) else '',
|
||||
}..removeWhere((e) => e.isEmpty);
|
||||
final merged = <Map<String, dynamic>>[...existing];
|
||||
for (final f in collected) {
|
||||
final url = f['url'] as String?;
|
||||
if (url != null && url.isNotEmpty && !seen.contains(url)) {
|
||||
merged.add({'type': 'image', 'url': url});
|
||||
seen.add(url);
|
||||
}
|
||||
}
|
||||
|
||||
final merged = <Map<String, dynamic>>[...existing];
|
||||
for (final f in collected) {
|
||||
final url = f['url'] as String?;
|
||||
if (url != null && url.isNotEmpty && !seen.contains(url)) {
|
||||
merged.add({'type': 'image', 'url': url});
|
||||
seen.add(url);
|
||||
}
|
||||
}
|
||||
|
||||
if (merged.length != existing.length) {
|
||||
updateLastMessageWith((m) => m.copyWith(files: merged));
|
||||
}
|
||||
if (merged.length != existing.length) {
|
||||
updateLastMessageWith((m) => m.copyWith(files: merged));
|
||||
}
|
||||
})
|
||||
.catchError((_) {}),
|
||||
);
|
||||
} catch (_) {}
|
||||
}
|
||||
|
||||
|
||||
200
lib/core/services/worker_manager.dart
Normal file
200
lib/core/services/worker_manager.dart
Normal file
@@ -0,0 +1,200 @@
|
||||
import 'dart:async';
|
||||
import 'dart:collection';
|
||||
import 'dart:math' as math;
|
||||
|
||||
import 'package:flutter/foundation.dart';
|
||||
import 'package:riverpod_annotation/riverpod_annotation.dart';
|
||||
|
||||
import '../utils/debug_logger.dart';
|
||||
|
||||
part 'worker_manager.g.dart';
|
||||
|
||||
/// Signature of a task that can be executed by [WorkerManager].
|
||||
typedef WorkerTask<Q, R> = ComputeCallback<Q, R>;
|
||||
|
||||
/// Coordinates CPU intensive work off the UI isolate with lightweight pooling.
|
||||
///
|
||||
/// The manager throttles concurrent isolate usage to avoid overwhelming the
|
||||
/// platform while still enabling parallel work. On web the callback executes
|
||||
/// synchronously because secondary isolates are not supported.
|
||||
class WorkerManager {
|
||||
WorkerManager({int maxConcurrentTasks = _defaultMaxConcurrentTasks})
|
||||
: _maxConcurrentTasks = math.max(1, maxConcurrentTasks) {
|
||||
DebugLogger.log(
|
||||
'initialized',
|
||||
scope: 'worker',
|
||||
data: {'max': _maxConcurrentTasks},
|
||||
);
|
||||
}
|
||||
|
||||
static const int _defaultMaxConcurrentTasks = 2;
|
||||
|
||||
final int _maxConcurrentTasks;
|
||||
final Queue<_EnqueuedJob> _pendingJobs = Queue<_EnqueuedJob>();
|
||||
bool _disposed = false;
|
||||
int _activeJobs = 0;
|
||||
int _jobCounter = 0;
|
||||
|
||||
/// Schedule [callback] with [message] to run on a worker isolate.
|
||||
///
|
||||
/// The [callback] must be a top-level or static function, mirroring the
|
||||
/// constraints of `compute`. Errors from the task are propagated to the
|
||||
/// returned [Future].
|
||||
Future<R> schedule<Q, R>(
|
||||
WorkerTask<Q, R> callback,
|
||||
Q message, {
|
||||
String? debugLabel,
|
||||
}) {
|
||||
if (_disposed) {
|
||||
return Future.error(StateError('WorkerManager has been disposed'));
|
||||
}
|
||||
|
||||
final jobId = ++_jobCounter;
|
||||
final completer = Completer<R>();
|
||||
final job = _EnqueuedJob(
|
||||
id: jobId,
|
||||
debugLabel: debugLabel,
|
||||
run: () {
|
||||
if (kIsWeb) {
|
||||
return Future<R>.sync(() => callback(message));
|
||||
}
|
||||
return compute(callback, message);
|
||||
},
|
||||
onComplete: (value) {
|
||||
if (!completer.isCompleted) {
|
||||
completer.complete(value as R);
|
||||
}
|
||||
},
|
||||
onError: (error, stackTrace) {
|
||||
if (!completer.isCompleted) {
|
||||
completer.completeError(error, stackTrace);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
_pendingJobs.add(job);
|
||||
|
||||
DebugLogger.log(
|
||||
'queued',
|
||||
scope: 'worker',
|
||||
data: {
|
||||
'id': jobId,
|
||||
if (debugLabel != null) 'label': debugLabel,
|
||||
'pending': _pendingJobs.length,
|
||||
'active': _activeJobs,
|
||||
},
|
||||
);
|
||||
|
||||
_processQueue();
|
||||
|
||||
return completer.future;
|
||||
}
|
||||
|
||||
/// Dispose the manager and reject all pending work.
|
||||
void dispose() {
|
||||
if (_disposed) {
|
||||
return;
|
||||
}
|
||||
_disposed = true;
|
||||
|
||||
while (_pendingJobs.isNotEmpty) {
|
||||
final job = _pendingJobs.removeFirst();
|
||||
job.cancel(
|
||||
StateError('WorkerManager disposed before job ${job.id} started'),
|
||||
);
|
||||
}
|
||||
|
||||
DebugLogger.log('disposed', scope: 'worker', data: {'active': _activeJobs});
|
||||
}
|
||||
|
||||
void _processQueue() {
|
||||
if (_disposed) {
|
||||
return;
|
||||
}
|
||||
|
||||
while (_activeJobs < _maxConcurrentTasks && _pendingJobs.isNotEmpty) {
|
||||
final job = _pendingJobs.removeFirst();
|
||||
_startJob(job);
|
||||
}
|
||||
}
|
||||
|
||||
void _startJob(_EnqueuedJob job) {
|
||||
_activeJobs++;
|
||||
|
||||
DebugLogger.log(
|
||||
'started',
|
||||
scope: 'worker',
|
||||
data: {
|
||||
'id': job.id,
|
||||
if (job.debugLabel != null) 'label': job.debugLabel,
|
||||
'active': _activeJobs,
|
||||
},
|
||||
);
|
||||
|
||||
unawaited(_runJob(job));
|
||||
}
|
||||
|
||||
Future<void> _runJob(_EnqueuedJob job) async {
|
||||
try {
|
||||
final result = await job.run();
|
||||
job.onComplete(result);
|
||||
|
||||
DebugLogger.log(
|
||||
'completed',
|
||||
scope: 'worker',
|
||||
data: {
|
||||
'id': job.id,
|
||||
if (job.debugLabel != null) 'label': job.debugLabel,
|
||||
'pending': _pendingJobs.length,
|
||||
},
|
||||
);
|
||||
} catch (error, stackTrace) {
|
||||
job.onError(error, stackTrace);
|
||||
|
||||
DebugLogger.error(
|
||||
'failed',
|
||||
scope: 'worker',
|
||||
error: error,
|
||||
stackTrace: stackTrace,
|
||||
data: {
|
||||
'id': job.id,
|
||||
if (job.debugLabel != null) 'label': job.debugLabel,
|
||||
},
|
||||
);
|
||||
} finally {
|
||||
_activeJobs = math.max(0, _activeJobs - 1);
|
||||
_processQueue();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Keep a single [WorkerManager] alive across the app.
|
||||
@Riverpod(keepAlive: true)
|
||||
// ignore: functional_ref
|
||||
WorkerManager workerManager(Ref ref) {
|
||||
final concurrency = kIsWeb ? 1 : WorkerManager._defaultMaxConcurrentTasks;
|
||||
final manager = WorkerManager(maxConcurrentTasks: concurrency);
|
||||
ref.onDispose(manager.dispose);
|
||||
return manager;
|
||||
}
|
||||
|
||||
class _EnqueuedJob {
|
||||
_EnqueuedJob({
|
||||
required this.id,
|
||||
required this.run,
|
||||
required this.onComplete,
|
||||
required this.onError,
|
||||
this.debugLabel,
|
||||
});
|
||||
|
||||
final int id;
|
||||
final FutureOr<dynamic> Function() run;
|
||||
final void Function(dynamic value) onComplete;
|
||||
final void Function(Object error, StackTrace stackTrace) onError;
|
||||
final String? debugLabel;
|
||||
final DateTime queuedAt = DateTime.now();
|
||||
|
||||
void cancel(Object error) {
|
||||
onError(error, StackTrace.current);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user