Files
iiEsaywebUIapp/lib/shared/services/tasks/task_queue.dart

325 lines
8.4 KiB
Dart
Raw Normal View History

2025-09-01 23:41:22 +05:30
import 'dart:async';
import 'dart:convert';
import 'package:flutter/foundation.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:uuid/uuid.dart';
import '../../../core/providers/app_providers.dart';
import 'outbound_task.dart';
import 'task_worker.dart';
final taskQueueProvider =
2025-09-21 22:31:44 +05:30
NotifierProvider<TaskQueueNotifier, List<OutboundTask>>(
TaskQueueNotifier.new,
);
2025-09-01 23:41:22 +05:30
2025-09-21 22:31:44 +05:30
class TaskQueueNotifier extends Notifier<List<OutboundTask>> {
2025-09-01 23:41:22 +05:30
static const _prefsKey = 'outbound_task_queue_v1';
final _uuid = const Uuid();
2025-09-21 22:31:44 +05:30
bool _bootstrapScheduled = false;
@override
List<OutboundTask> build() {
if (!_bootstrapScheduled) {
_bootstrapScheduled = true;
Future.microtask(_load);
}
return const [];
}
2025-09-01 23:41:22 +05:30
bool _processing = false;
final Set<String> _activeThreads = <String>{};
2025-09-02 00:04:21 +05:30
final int _maxParallel = 2; // bounded parallelism across conversations
2025-09-01 23:41:22 +05:30
Future<void> _load() async {
try {
2025-09-21 22:31:44 +05:30
final prefs = ref.read(sharedPreferencesProvider);
2025-09-01 23:41:22 +05:30
final jsonStr = prefs.getString(_prefsKey);
if (jsonStr == null || jsonStr.isEmpty) return;
final raw = (jsonDecode(jsonStr) as List).cast<Map<String, dynamic>>();
final tasks = raw.map(OutboundTask.fromJson).toList();
// Only restore non-completed tasks
state = tasks
2025-09-21 22:31:44 +05:30
.where(
(t) =>
t.status == TaskStatus.queued || t.status == TaskStatus.running,
)
.map(
(t) => t.copyWith(
status: TaskStatus.queued,
startedAt: null,
completedAt: null,
),
)
2025-09-01 23:41:22 +05:30
.toList();
// Kick processing after load
_process();
} catch (e) {
debugPrint('DEBUG: Failed to load task queue: $e');
}
}
Future<void> _save() async {
try {
2025-09-21 22:31:44 +05:30
final prefs = ref.read(sharedPreferencesProvider);
2025-09-24 10:52:15 +05:30
final retained = [
for (final task in state)
if (task.status == TaskStatus.queued ||
task.status == TaskStatus.running)
task,
];
if (retained.length != state.length) {
// Remove completed entries from state to keep the in-memory queue lean.
state = retained;
}
final raw = retained.map((t) => t.toJson()).toList(growable: false);
2025-09-01 23:41:22 +05:30
await prefs.setString(_prefsKey, jsonEncode(raw));
} catch (e) {
debugPrint('DEBUG: Failed to persist task queue: $e');
}
}
Future<String> enqueueSendText({
required String? conversationId,
required String text,
List<String>? attachments,
List<String>? toolIds,
String? idempotencyKey,
}) async {
final id = _uuid.v4();
final task = OutboundTask.sendTextMessage(
id: id,
conversationId: conversationId,
text: text,
attachments: attachments ?? const [],
toolIds: toolIds ?? const [],
idempotencyKey: idempotencyKey,
enqueuedAt: DateTime.now(),
);
state = [...state, task];
await _save();
_process();
return id;
}
Future<void> cancel(String id) async {
state = [
for (final t in state)
if (t.id == id)
2025-09-21 22:31:44 +05:30
t.copyWith(status: TaskStatus.cancelled, completedAt: DateTime.now())
2025-09-01 23:41:22 +05:30
else
t,
];
await _save();
}
Future<void> cancelByConversation(String conversationId) async {
state = [
for (final t in state)
if ((t.maybeConversationId ?? '') == conversationId &&
2025-09-01 23:41:22 +05:30
(t.status == TaskStatus.queued || t.status == TaskStatus.running))
2025-09-21 22:31:44 +05:30
t.copyWith(status: TaskStatus.cancelled, completedAt: DateTime.now())
2025-09-01 23:41:22 +05:30
else
t,
];
await _save();
}
Future<void> retry(String id) async {
state = [
for (final t in state)
if (t.id == id)
t.copyWith(
status: TaskStatus.queued,
attempt: (t.attempt + 1),
error: null,
startedAt: null,
completedAt: null,
)
else
t,
];
await _save();
_process();
}
Future<String> enqueueGenerateImage({
required String? conversationId,
required String prompt,
String? idempotencyKey,
}) async {
final id = _uuid.v4();
final task = OutboundTask.generateImage(
id: id,
conversationId: conversationId,
prompt: prompt,
idempotencyKey: idempotencyKey,
enqueuedAt: DateTime.now(),
);
state = [...state, task];
await _save();
_process();
return id;
}
Future<String> enqueueExecuteToolCall({
required String? conversationId,
required String toolName,
Map<String, dynamic> arguments = const <String, dynamic>{},
String? idempotencyKey,
}) async {
final id = _uuid.v4();
final task = OutboundTask.executeToolCall(
id: id,
conversationId: conversationId,
toolName: toolName,
arguments: arguments,
idempotencyKey: idempotencyKey,
enqueuedAt: DateTime.now(),
);
state = [...state, task];
await _save();
_process();
return id;
}
2025-09-01 23:41:22 +05:30
Future<void> _process() async {
if (_processing) return;
_processing = true;
try {
// Pump while there is capacity and queued tasks remain
while (true) {
// Filter runnable tasks
2025-09-21 22:31:44 +05:30
final queued = state
.where((t) => t.status == TaskStatus.queued)
.toList();
2025-09-01 23:41:22 +05:30
if (queued.isEmpty) break;
// Respect parallelism and one-per-thread
final availableCapacity = _maxParallel - _activeThreads.length;
if (availableCapacity <= 0) break;
OutboundTask? next;
for (final t in queued) {
final thread = t.threadKey;
if (!_activeThreads.contains(thread)) {
next = t;
break;
}
}
// If no eligible task (all threads busy), exit loop
if (next == null) break;
// Mark running and launch without awaiting (parallel across threads)
final threadKey = next.threadKey;
_activeThreads.add(threadKey);
state = [
for (final t in state)
if (t.id == next.id)
2025-09-21 22:31:44 +05:30
next.copyWith(
status: TaskStatus.running,
startedAt: DateTime.now(),
)
2025-09-01 23:41:22 +05:30
else
t,
];
await _save();
// Launch worker
2025-09-21 22:31:44 +05:30
unawaited(
_run(next).whenComplete(() {
_activeThreads.remove(threadKey);
// After a task completes, try to schedule more
_process();
}),
);
2025-09-01 23:41:22 +05:30
}
} finally {
_processing = false;
}
}
Future<void> _run(OutboundTask task) async {
try {
2025-09-21 22:31:44 +05:30
await TaskWorker(ref).perform(task);
2025-09-01 23:41:22 +05:30
state = [
for (final t in state)
if (t.id == task.id)
2025-09-21 22:31:44 +05:30
t.copyWith(
status: TaskStatus.succeeded,
completedAt: DateTime.now(),
)
2025-09-01 23:41:22 +05:30
else
t,
];
} catch (e, st) {
debugPrint('Task failed (${task.runtimeType}): $e\n$st');
state = [
for (final t in state)
if (t.id == task.id)
t.copyWith(
status: TaskStatus.failed,
error: e.toString(),
completedAt: DateTime.now(),
)
else
t,
];
} finally {
await _save();
}
}
2025-09-02 13:20:02 +05:30
Future<String> enqueueUploadMedia({
required String? conversationId,
required String filePath,
required String fileName,
int? fileSize,
String? mimeType,
String? checksum,
}) async {
final id = _uuid.v4();
final task = OutboundTask.uploadMedia(
id: id,
conversationId: conversationId,
filePath: filePath,
fileName: fileName,
fileSize: fileSize,
mimeType: mimeType,
checksum: checksum,
enqueuedAt: DateTime.now(),
);
state = [...state, task];
await _save();
_process();
return id;
}
2025-09-05 11:15:39 +05:30
// Removed: enqueueSaveConversation — mobile app no longer persists chats to server.
2025-09-02 13:20:02 +05:30
2025-09-02 19:08:23 +05:30
Future<String> enqueueImageToDataUrl({
required String? conversationId,
required String filePath,
required String fileName,
String? idempotencyKey,
}) async {
final id = _uuid.v4();
final task = OutboundTask.imageToDataUrl(
id: id,
conversationId: conversationId,
filePath: filePath,
fileName: fileName,
idempotencyKey: idempotencyKey,
enqueuedAt: DateTime.now(),
);
state = [...state, task];
await _save();
_process();
return id;
}
2025-09-01 23:41:22 +05:30
}