From c4ca694d1dc0d340e58bf76782f134a1e277a7d0 Mon Sep 17 00:00:00 2001 From: Christian Byrne Date: Tue, 11 Mar 2025 06:47:29 -0700 Subject: [PATCH] Add composable to manage ComfyUI-Manager task queue (#2970) --- src/composables/useManagerQueue.ts | 93 ++++++ .../widgets/useManagerQueue.test.ts | 276 ++++++++++++++++++ 2 files changed, 369 insertions(+) create mode 100644 src/composables/useManagerQueue.ts create mode 100644 tests-ui/tests/composables/widgets/useManagerQueue.test.ts diff --git a/src/composables/useManagerQueue.ts b/src/composables/useManagerQueue.ts new file mode 100644 index 000000000..9e25c083f --- /dev/null +++ b/src/composables/useManagerQueue.ts @@ -0,0 +1,93 @@ +import { useEventListener, whenever } from '@vueuse/core' +import { computed, readonly, ref } from 'vue' + +import { api } from '@/scripts/api' + +type QueuedTask = { + task: () => Promise + onComplete?: () => void +} + +const MANAGER_WS_MSG_TYPE = 'cm-queue-status' + +enum ManagerWsQueueStatus { + DONE = 'done', + IN_PROGRESS = 'in_progress' +} + +export const useManagerQueue = () => { + const clientQueueItems = ref[]>([]) + const clientQueueLength = computed(() => clientQueueItems.value.length) + const nextOnCompleted = ref<(() => void) | undefined>() + + const serverQueueStatus = ref(ManagerWsQueueStatus.DONE) + const isServerIdle = computed( + () => serverQueueStatus.value === ManagerWsQueueStatus.DONE + ) + + const allTasksDone = computed( + () => isServerIdle.value && clientQueueLength.value === 0 + ) + const nextTaskReady = computed( + () => isServerIdle.value && clientQueueLength.value > 0 + ) + + const cleanupListener = useEventListener( + api, + MANAGER_WS_MSG_TYPE, + (event: CustomEvent<{ status: ManagerWsQueueStatus }>) => { + if (event?.type === MANAGER_WS_MSG_TYPE && event.detail?.status) { + serverQueueStatus.value = event.detail.status + } + } + ) + + const startNextTask = () => { + const nextTask = clientQueueItems.value.shift() + if (!nextTask) return + + const { task, onComplete } = nextTask + + task() + .then(() => { + // Set the task's onComplete to be executed the next time the server is idle + nextOnCompleted.value = onComplete + }) + .catch((e) => { + const message = `Error enqueuing task for ComfyUI Manager: ${e}` + console.error(message) + }) + } + + const enqueueTask = (task: QueuedTask): void => { + clientQueueItems.value.push(task) + } + + const clearQueue = () => { + nextOnCompleted.value = undefined + clientQueueItems.value = [] + } + + const cleanup = () => { + clearQueue() + cleanupListener() + } + + whenever(nextTaskReady, startNextTask) + whenever(isServerIdle, () => { + if (nextOnCompleted.value) { + nextOnCompleted.value() + nextOnCompleted.value = undefined + } + }) + + return { + allTasksDone, + statusMessage: readonly(serverQueueStatus), + queueLength: clientQueueLength, + + enqueueTask, + clearQueue, + cleanup + } +} diff --git a/tests-ui/tests/composables/widgets/useManagerQueue.test.ts b/tests-ui/tests/composables/widgets/useManagerQueue.test.ts new file mode 100644 index 000000000..541645882 --- /dev/null +++ b/tests-ui/tests/composables/widgets/useManagerQueue.test.ts @@ -0,0 +1,276 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import { nextTick } from 'vue' + +import { useManagerQueue } from '@/composables/useManagerQueue' +import { api } from '@/scripts/api' + +vi.mock('@/scripts/api', () => ({ + api: { + addEventListener: vi.fn(), + removeEventListener: vi.fn(), + dispatchEvent: vi.fn() + } +})) + +describe('useManagerQueue', () => { + const createMockTask = (result: any = 'result') => ({ + task: vi.fn().mockResolvedValue(result), + onComplete: vi.fn() + }) + + const createQueueWithMockTask = () => { + const queue = useManagerQueue() + const mockTask = createMockTask() + queue.enqueueTask(mockTask) + return { queue, mockTask } + } + + const getEventListenerCallback = () => + vi.mocked(api.addEventListener).mock.calls[0][1] + + const simulateServerStatus = async (status: 'done' | 'in_progress') => { + const event = new CustomEvent('cm-queue-status', { + detail: { status } + }) + getEventListenerCallback()!(event as any) + await nextTick() + } + + beforeEach(() => { + vi.clearAllMocks() + }) + + afterEach(() => { + vi.clearAllMocks() + }) + + describe('initialization', () => { + it('should initialize with empty queue and DONE status', () => { + const queue = useManagerQueue() + + expect(queue.queueLength.value).toBe(0) + expect(queue.statusMessage.value).toBe('done') + expect(queue.allTasksDone.value).toBe(true) + }) + }) + + describe('queue management', () => { + it('should add tasks to the queue', () => { + const queue = useManagerQueue() + const mockTask = createMockTask() + + queue.enqueueTask(mockTask) + + expect(queue.queueLength.value).toBe(1) + expect(queue.allTasksDone.value).toBe(false) + }) + + it('should clear the queue when clearQueue is called', () => { + const queue = useManagerQueue() + + // Add some tasks + queue.enqueueTask(createMockTask()) + queue.enqueueTask(createMockTask()) + + expect(queue.queueLength.value).toBe(2) + + // Clear the queue + queue.clearQueue() + + expect(queue.queueLength.value).toBe(0) + expect(queue.allTasksDone.value).toBe(true) + }) + }) + + describe('server status handling', () => { + it('should update server status when receiving websocket events', async () => { + const queue = useManagerQueue() + + await simulateServerStatus('in_progress') + + expect(queue.statusMessage.value).toBe('in_progress') + expect(queue.allTasksDone.value).toBe(false) + }) + + it('should handle invalid status values gracefully', async () => { + const queue = useManagerQueue() + + // Simulate an invalid status + const event = new CustomEvent('cm-queue-status', { + detail: null as any + }) + + getEventListenerCallback()!(event) + await nextTick() + + // Should maintain the default status + expect(queue.statusMessage.value).toBe('done') + }) + + it('should handle missing status property gracefully', async () => { + const queue = useManagerQueue() + + // Simulate a detail object without status property + const event = new CustomEvent('cm-queue-status', { + detail: { someOtherProperty: 'value' } as any + }) + + getEventListenerCallback()!(event) + await nextTick() + + // Should maintain the default status + expect(queue.statusMessage.value).toBe('done') + }) + }) + + describe('task execution', () => { + it('should start the next task when server is idle and queue has items', async () => { + const { queue, mockTask } = createQueueWithMockTask() + + await simulateServerStatus('done') + + // Task should have been started + expect(mockTask.task).toHaveBeenCalled() + expect(queue.queueLength.value).toBe(0) + }) + + it('should execute onComplete callback when task completes and server becomes idle', async () => { + const { mockTask } = createQueueWithMockTask() + + // Start the task + await simulateServerStatus('done') + expect(mockTask.task).toHaveBeenCalled() + + // Simulate task completion + await mockTask.task.mock.results[0].value + + // Simulate server cycle (in_progress -> done) + await simulateServerStatus('in_progress') + expect(mockTask.onComplete).not.toHaveBeenCalled() + + await simulateServerStatus('done') + expect(mockTask.onComplete).toHaveBeenCalled() + }) + + it('should handle tasks without onComplete callback', async () => { + const queue = useManagerQueue() + const mockTask = { task: vi.fn().mockResolvedValue('result') } + + queue.enqueueTask(mockTask) + + // Start the task + await simulateServerStatus('done') + expect(mockTask.task).toHaveBeenCalled() + + // Simulate task completion + await mockTask.task.mock.results[0].value + + // Simulate server cycle + await simulateServerStatus('in_progress') + await simulateServerStatus('done') + + // Should not throw errors even without onComplete + expect(queue.allTasksDone.value).toBe(true) + }) + + it('should process multiple tasks in sequence', async () => { + const queue = useManagerQueue() + const mockTask1 = createMockTask('result1') + const mockTask2 = createMockTask('result2') + + // Add tasks to the queue + queue.enqueueTask(mockTask1) + queue.enqueueTask(mockTask2) + expect(queue.queueLength.value).toBe(2) + + // Process first task + await simulateServerStatus('done') + expect(mockTask1.task).toHaveBeenCalled() + expect(queue.queueLength.value).toBe(1) + + // Complete first task + await mockTask1.task.mock.results[0].value + await simulateServerStatus('in_progress') + await simulateServerStatus('done') + expect(mockTask1.onComplete).toHaveBeenCalled() + + // Process second task + expect(mockTask2.task).toHaveBeenCalled() + expect(queue.queueLength.value).toBe(0) + + // Complete second task + await mockTask2.task.mock.results[0].value + await simulateServerStatus('in_progress') + await simulateServerStatus('done') + expect(mockTask2.onComplete).toHaveBeenCalled() + + // Queue should be empty and all tasks done + expect(queue.queueLength.value).toBe(0) + expect(queue.allTasksDone.value).toBe(true) + }) + + it('should handle task that returns rejected promise', async () => { + const queue = useManagerQueue() + const mockTask = { + task: vi.fn().mockRejectedValue(new Error('Task failed')), + onComplete: vi.fn() + } + + queue.enqueueTask(mockTask) + + // Start the task + await simulateServerStatus('done') + expect(mockTask.task).toHaveBeenCalled() + + // Let the promise rejection happen + try { + await mockTask.task() + } catch (e) { + // Ignore the error + } + + // Simulate server cycle + await simulateServerStatus('in_progress') + await simulateServerStatus('done') + + // onComplete should not be called for failed tasks + expect(mockTask.onComplete).not.toHaveBeenCalled() + }) + + it('should handle adding tasks while processing is in progress', async () => { + const queue = useManagerQueue() + const mockTask1 = createMockTask() + const mockTask2 = createMockTask() + + // Add first task and start processing + queue.enqueueTask(mockTask1) + await simulateServerStatus('done') + expect(mockTask1.task).toHaveBeenCalled() + + // Add second task while first is processing + queue.enqueueTask(mockTask2) + expect(queue.queueLength.value).toBe(1) + + // Complete first task + await mockTask1.task.mock.results[0].value + await simulateServerStatus('in_progress') + await simulateServerStatus('done') + + // Second task should now be processed + expect(mockTask2.task).toHaveBeenCalled() + }) + + it('should handle server status changes without tasks in queue', async () => { + const queue = useManagerQueue() + + // Cycle server status without any tasks + await simulateServerStatus('in_progress') + await simulateServerStatus('done') + await simulateServerStatus('in_progress') + await simulateServerStatus('done') + + // Should not cause any errors + expect(queue.allTasksDone.value).toBe(true) + }) + }) +})