feat: task based send flow
This commit is contained in:
70
lib/shared/services/tasks/outbound_task.dart
Normal file
70
lib/shared/services/tasks/outbound_task.dart
Normal file
@@ -0,0 +1,70 @@
|
||||
import 'package:freezed_annotation/freezed_annotation.dart';
|
||||
|
||||
part 'outbound_task.freezed.dart';
|
||||
part 'outbound_task.g.dart';
|
||||
|
||||
enum TaskStatus {
|
||||
queued,
|
||||
running,
|
||||
succeeded,
|
||||
failed,
|
||||
cancelled,
|
||||
}
|
||||
|
||||
@freezed
|
||||
abstract class OutboundTask with _$OutboundTask {
|
||||
const OutboundTask._();
|
||||
|
||||
const factory OutboundTask.sendTextMessage({
|
||||
required String id,
|
||||
String? conversationId,
|
||||
required String text,
|
||||
@Default(<String>[]) List<String> attachments,
|
||||
@Default(<String>[]) List<String> toolIds,
|
||||
@Default(TaskStatus.queued) TaskStatus status,
|
||||
@Default(0) int attempt,
|
||||
String? idempotencyKey,
|
||||
DateTime? enqueuedAt,
|
||||
DateTime? startedAt,
|
||||
DateTime? completedAt,
|
||||
String? error,
|
||||
}) = SendTextMessageTask;
|
||||
|
||||
const factory OutboundTask.uploadMedia({
|
||||
required String id,
|
||||
String? conversationId,
|
||||
required String filePath,
|
||||
required String fileName,
|
||||
int? fileSize,
|
||||
String? mimeType,
|
||||
String? checksum,
|
||||
@Default(TaskStatus.queued) TaskStatus status,
|
||||
@Default(0) int attempt,
|
||||
String? idempotencyKey,
|
||||
DateTime? enqueuedAt,
|
||||
DateTime? startedAt,
|
||||
DateTime? completedAt,
|
||||
String? error,
|
||||
}) = UploadMediaTask;
|
||||
|
||||
const factory OutboundTask.executeToolCall({
|
||||
required String id,
|
||||
String? conversationId,
|
||||
required String toolName,
|
||||
@Default(<String, dynamic>{}) Map<String, dynamic> arguments,
|
||||
@Default(TaskStatus.queued) TaskStatus status,
|
||||
@Default(0) int attempt,
|
||||
String? idempotencyKey,
|
||||
DateTime? enqueuedAt,
|
||||
DateTime? startedAt,
|
||||
DateTime? completedAt,
|
||||
String? error,
|
||||
}) = ExecuteToolCallTask;
|
||||
|
||||
factory OutboundTask.fromJson(Map<String, dynamic> json) =>
|
||||
_$OutboundTaskFromJson(json);
|
||||
|
||||
String get threadKey => (conversationId == null || conversationId!.isEmpty)
|
||||
? 'new'
|
||||
: conversationId!;
|
||||
}
|
||||
210
lib/shared/services/tasks/task_queue.dart
Normal file
210
lib/shared/services/tasks/task_queue.dart
Normal file
@@ -0,0 +1,210 @@
|
||||
import 'dart:async';
|
||||
import 'dart:convert';
|
||||
|
||||
import 'package:flutter/foundation.dart';
|
||||
import 'package:flutter_riverpod/flutter_riverpod.dart';
|
||||
import 'package:uuid/uuid.dart';
|
||||
|
||||
import '../../../core/providers/app_providers.dart';
|
||||
import 'outbound_task.dart';
|
||||
import 'task_worker.dart';
|
||||
|
||||
final taskQueueProvider =
|
||||
StateNotifierProvider<TaskQueueNotifier, List<OutboundTask>>((ref) {
|
||||
return TaskQueueNotifier(ref);
|
||||
});
|
||||
|
||||
class TaskQueueNotifier extends StateNotifier<List<OutboundTask>> {
|
||||
TaskQueueNotifier(this._ref) : super(const []) {
|
||||
_load();
|
||||
}
|
||||
|
||||
static const _prefsKey = 'outbound_task_queue_v1';
|
||||
final Ref _ref;
|
||||
final _uuid = const Uuid();
|
||||
|
||||
bool _processing = false;
|
||||
final Set<String> _activeThreads = <String>{};
|
||||
int _maxParallel = 2; // bounded parallelism across conversations
|
||||
|
||||
Future<void> _load() async {
|
||||
try {
|
||||
final prefs = _ref.read(sharedPreferencesProvider);
|
||||
final jsonStr = prefs.getString(_prefsKey);
|
||||
if (jsonStr == null || jsonStr.isEmpty) return;
|
||||
final raw = (jsonDecode(jsonStr) as List).cast<Map<String, dynamic>>();
|
||||
final tasks = raw.map(OutboundTask.fromJson).toList();
|
||||
// Only restore non-completed tasks
|
||||
state = tasks
|
||||
.where((t) => t.status == TaskStatus.queued || t.status == TaskStatus.running)
|
||||
.map((t) => t.copyWith(
|
||||
status: TaskStatus.queued,
|
||||
startedAt: null,
|
||||
completedAt: null,
|
||||
))
|
||||
.toList();
|
||||
// Kick processing after load
|
||||
_process();
|
||||
} catch (e) {
|
||||
debugPrint('DEBUG: Failed to load task queue: $e');
|
||||
}
|
||||
}
|
||||
|
||||
Future<void> _save() async {
|
||||
try {
|
||||
final prefs = _ref.read(sharedPreferencesProvider);
|
||||
final raw = state.map((t) => t.toJson()).toList(growable: false);
|
||||
await prefs.setString(_prefsKey, jsonEncode(raw));
|
||||
} catch (e) {
|
||||
debugPrint('DEBUG: Failed to persist task queue: $e');
|
||||
}
|
||||
}
|
||||
|
||||
Future<String> enqueueSendText({
|
||||
required String? conversationId,
|
||||
required String text,
|
||||
List<String>? attachments,
|
||||
List<String>? toolIds,
|
||||
String? idempotencyKey,
|
||||
}) async {
|
||||
final id = _uuid.v4();
|
||||
final task = OutboundTask.sendTextMessage(
|
||||
id: id,
|
||||
conversationId: conversationId,
|
||||
text: text,
|
||||
attachments: attachments ?? const [],
|
||||
toolIds: toolIds ?? const [],
|
||||
idempotencyKey: idempotencyKey,
|
||||
enqueuedAt: DateTime.now(),
|
||||
);
|
||||
state = [...state, task];
|
||||
await _save();
|
||||
_process();
|
||||
return id;
|
||||
}
|
||||
|
||||
Future<void> cancel(String id) async {
|
||||
state = [
|
||||
for (final t in state)
|
||||
if (t.id == id)
|
||||
t.copyWith(
|
||||
status: TaskStatus.cancelled,
|
||||
completedAt: DateTime.now(),
|
||||
)
|
||||
else
|
||||
t,
|
||||
];
|
||||
await _save();
|
||||
}
|
||||
|
||||
Future<void> cancelByConversation(String conversationId) async {
|
||||
state = [
|
||||
for (final t in state)
|
||||
if ((t.conversationId ?? '') == conversationId &&
|
||||
(t.status == TaskStatus.queued || t.status == TaskStatus.running))
|
||||
t.copyWith(
|
||||
status: TaskStatus.cancelled,
|
||||
completedAt: DateTime.now(),
|
||||
)
|
||||
else
|
||||
t,
|
||||
];
|
||||
await _save();
|
||||
}
|
||||
|
||||
Future<void> retry(String id) async {
|
||||
state = [
|
||||
for (final t in state)
|
||||
if (t.id == id)
|
||||
t.copyWith(
|
||||
status: TaskStatus.queued,
|
||||
attempt: (t.attempt + 1),
|
||||
error: null,
|
||||
startedAt: null,
|
||||
completedAt: null,
|
||||
)
|
||||
else
|
||||
t,
|
||||
];
|
||||
await _save();
|
||||
_process();
|
||||
}
|
||||
|
||||
Future<void> _process() async {
|
||||
if (_processing) return;
|
||||
_processing = true;
|
||||
try {
|
||||
// Pump while there is capacity and queued tasks remain
|
||||
while (true) {
|
||||
// Filter runnable tasks
|
||||
final queued = state.where((t) => t.status == TaskStatus.queued).toList();
|
||||
if (queued.isEmpty) break;
|
||||
|
||||
// Respect parallelism and one-per-thread
|
||||
final availableCapacity = _maxParallel - _activeThreads.length;
|
||||
if (availableCapacity <= 0) break;
|
||||
|
||||
OutboundTask? next;
|
||||
for (final t in queued) {
|
||||
final thread = t.threadKey;
|
||||
if (!_activeThreads.contains(thread)) {
|
||||
next = t;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// If no eligible task (all threads busy), exit loop
|
||||
if (next == null) break;
|
||||
|
||||
// Mark running and launch without awaiting (parallel across threads)
|
||||
final threadKey = next.threadKey;
|
||||
_activeThreads.add(threadKey);
|
||||
state = [
|
||||
for (final t in state)
|
||||
if (t.id == next.id)
|
||||
next.copyWith(status: TaskStatus.running, startedAt: DateTime.now())
|
||||
else
|
||||
t,
|
||||
];
|
||||
await _save();
|
||||
|
||||
// Launch worker
|
||||
unawaited(_run(next).whenComplete(() {
|
||||
_activeThreads.remove(threadKey);
|
||||
// After a task completes, try to schedule more
|
||||
_process();
|
||||
}));
|
||||
}
|
||||
} finally {
|
||||
_processing = false;
|
||||
}
|
||||
}
|
||||
|
||||
Future<void> _run(OutboundTask task) async {
|
||||
try {
|
||||
await TaskWorker(_ref).perform(task);
|
||||
state = [
|
||||
for (final t in state)
|
||||
if (t.id == task.id)
|
||||
t.copyWith(status: TaskStatus.succeeded, completedAt: DateTime.now())
|
||||
else
|
||||
t,
|
||||
];
|
||||
} catch (e, st) {
|
||||
debugPrint('Task failed (${task.runtimeType}): $e\n$st');
|
||||
state = [
|
||||
for (final t in state)
|
||||
if (t.id == task.id)
|
||||
t.copyWith(
|
||||
status: TaskStatus.failed,
|
||||
error: e.toString(),
|
||||
completedAt: DateTime.now(),
|
||||
)
|
||||
else
|
||||
t,
|
||||
];
|
||||
} finally {
|
||||
await _save();
|
||||
}
|
||||
}
|
||||
}
|
||||
120
lib/shared/services/tasks/task_worker.dart
Normal file
120
lib/shared/services/tasks/task_worker.dart
Normal file
@@ -0,0 +1,120 @@
|
||||
import 'dart:async';
|
||||
import 'package:flutter/foundation.dart';
|
||||
import 'package:flutter_riverpod/flutter_riverpod.dart';
|
||||
|
||||
import '../../../core/providers/app_providers.dart';
|
||||
import '../../../core/services/attachment_upload_queue.dart';
|
||||
import '../../../core/utils/debug_logger.dart';
|
||||
import '../../../features/chat/providers/chat_providers.dart' as chat;
|
||||
import 'outbound_task.dart';
|
||||
|
||||
class TaskWorker {
|
||||
final Ref _ref;
|
||||
TaskWorker(this._ref);
|
||||
|
||||
Future<void> perform(OutboundTask task) async {
|
||||
await task.map<Future<void>>(
|
||||
sendTextMessage: _performSendText,
|
||||
uploadMedia: _performUploadMedia,
|
||||
executeToolCall: _performExecuteToolCall,
|
||||
);
|
||||
}
|
||||
|
||||
Future<void> _performSendText(SendTextMessageTask task) async {
|
||||
// Ensure uploads referenced in attachments are completed if they are local queued ids
|
||||
// For now, assume attachments are already uploaded (fileIds or data URLs) as UI uploads eagerly.
|
||||
// If needed, we could resolve queued uploads here by integrating with AttachmentUploadQueue.
|
||||
final isReviewer = _ref.read(reviewerModeProvider);
|
||||
if (!isReviewer) {
|
||||
final api = _ref.read(apiServiceProvider);
|
||||
if (api == null) {
|
||||
throw Exception('API not available');
|
||||
}
|
||||
}
|
||||
|
||||
// Set active conversation if provided; otherwise keep current
|
||||
try {
|
||||
// If a specific conversation id is provided and differs from current, load it
|
||||
final active = _ref.read(activeConversationProvider);
|
||||
if (task.conversationId != null &&
|
||||
task.conversationId!.isNotEmpty &&
|
||||
(active == null || active.id != task.conversationId)) {
|
||||
try {
|
||||
final api = _ref.read(apiServiceProvider);
|
||||
if (api != null) {
|
||||
final conv = await api.getConversation(task.conversationId!);
|
||||
_ref.read(activeConversationProvider.notifier).state = conv;
|
||||
}
|
||||
} catch (_) {
|
||||
// If loading fails, proceed; send flow can create a new conversation
|
||||
}
|
||||
}
|
||||
} catch (_) {}
|
||||
|
||||
// Delegate to existing unified send implementation
|
||||
await chat.sendMessageFromService(
|
||||
_ref,
|
||||
task.text,
|
||||
task.attachments.isEmpty ? null : task.attachments,
|
||||
task.toolIds.isEmpty ? null : task.toolIds,
|
||||
);
|
||||
}
|
||||
|
||||
Future<void> _performUploadMedia(UploadMediaTask task) async {
|
||||
final uploader = AttachmentUploadQueue();
|
||||
// Ensure queue initialized with API upload callback
|
||||
try {
|
||||
final api = _ref.read(apiServiceProvider);
|
||||
if (api != null) {
|
||||
await uploader.initialize(
|
||||
onUpload: (p, n) => api.uploadFile(p, n),
|
||||
);
|
||||
}
|
||||
} catch (_) {}
|
||||
|
||||
// Enqueue and then wait until the item reaches a terminal state for basic parity
|
||||
final id = await uploader.enqueue(
|
||||
filePath: task.filePath,
|
||||
fileName: task.fileName,
|
||||
fileSize: task.fileSize ?? 0,
|
||||
mimeType: task.mimeType,
|
||||
checksum: task.checksum,
|
||||
);
|
||||
|
||||
final completer = Completer<void>();
|
||||
late final StreamSubscription<List<QueuedAttachment>> sub;
|
||||
sub = uploader.queueStream.listen((items) {
|
||||
QueuedAttachment? entry;
|
||||
try {
|
||||
entry = items.firstWhere((e) => e.id == id);
|
||||
} catch (_) {
|
||||
entry = null;
|
||||
}
|
||||
if (entry == null) return;
|
||||
switch (entry.status) {
|
||||
case QueuedAttachmentStatus.completed:
|
||||
case QueuedAttachmentStatus.failed:
|
||||
case QueuedAttachmentStatus.cancelled:
|
||||
sub.cancel();
|
||||
completer.complete();
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
});
|
||||
|
||||
// Fire a process tick
|
||||
unawaited(uploader.processQueue());
|
||||
await completer.future.timeout(const Duration(minutes: 2), onTimeout: () {
|
||||
try { sub.cancel(); } catch (_) {}
|
||||
DebugLogger.warning('UploadMediaTask timed out: ${task.fileName}');
|
||||
return;
|
||||
});
|
||||
}
|
||||
|
||||
Future<void> _performExecuteToolCall(ExecuteToolCallTask task) async {
|
||||
// Placeholder: In this client, native tool execution is orchestrated server-side.
|
||||
// We keep this task type for future local tools or MCP bridges.
|
||||
debugPrint('ExecuteToolCallTask stub: ${task.toolName}');
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user