From 175b728dd9c17c18f81a0a9b59f3d74c95354bd5 Mon Sep 17 00:00:00 2001 From: Christian Byrne Date: Sat, 21 Jun 2025 16:33:52 -0700 Subject: [PATCH] [Manager] Filter task queue and history by client id (#4241) --- src/composables/useManagerQueue.ts | 89 ++-- .../widgets/useManagerQueue.test.ts | 434 ++++++++++++++++-- 2 files changed, 459 insertions(+), 64 deletions(-) diff --git a/src/composables/useManagerQueue.ts b/src/composables/useManagerQueue.ts index a64bd3773..7759c250d 100644 --- a/src/composables/useManagerQueue.ts +++ b/src/composables/useManagerQueue.ts @@ -1,7 +1,8 @@ import { useEventListener, whenever } from '@vueuse/core' +import { pickBy } from 'lodash' import { Ref, computed, ref } from 'vue' -import { api } from '@/scripts/api' +import { app } from '@/scripts/app' import { useDialogService } from '@/services/dialogService' import { components } from '@/types/generatedManagerTypes' @@ -12,6 +13,9 @@ type ManagerTaskHistory = Record< type ManagerTaskQueue = components['schemas']['TaskStateMessage'] type ManagerWsTaskDoneMsg = components['schemas']['MessageTaskDone'] type ManagerWsTaskStartedMsg = components['schemas']['MessageTaskStarted'] +type QueueTaskItem = components['schemas']['QueueTaskItem'] +type HistoryTaskItem = components['schemas']['TaskHistoryItem'] +type Task = QueueTaskItem | HistoryTaskItem const MANAGER_WS_TASK_DONE_NAME = 'cm-task-completed' const MANAGER_WS_TASK_STARTED_NAME = 'cm-task-started' @@ -35,55 +39,82 @@ export const useManagerQueue = ( taskQueue.value.pending_queue.length ) - const updateProcessingState = () => { + /** + * Update the processing state based on the current queue length. + * If the queue is empty, or all tasks in the queue are associated + * with different clients, then this client is not processing any tasks. + */ + const updateProcessingState = (): void => { isProcessing.value = currentQueueLength.value > 0 } - const allTasksDone = computed( - () => !isProcessing.value && currentQueueLength.value === 0 - ) + const allTasksDone = computed(() => currentQueueLength.value === 0) const historyCount = computed(() => Object.keys(taskHistory.value).length) + /** + * Check if a task is associated with this client. + * Task can be from running queue, pending queue, or history. + * @param task - The task to check + * @returns True if the task belongs to this client + */ + const isTaskFromThisClient = (task: Task): boolean => + task.client_id === app.api.clientId + + /** + * Filter queue tasks by client id. + * Ensures that only tasks associated with this client are processed and + * added to client state. + * @param tasks - Array of queue tasks to filter + * @returns Filtered array containing only tasks from this client + */ + const filterQueueByClientId = (tasks: QueueTaskItem[]): QueueTaskItem[] => + tasks.filter(isTaskFromThisClient) + + /** + * Filter history tasks by client id using lodash pickBy for optimal performance. + * Returns a new object containing only tasks associated with this client. + * @param history - The history object to filter + * @returns Filtered history object containing only tasks from this client + */ + const filterHistoryByClientId = (history: ManagerTaskHistory) => + pickBy(history, isTaskFromThisClient) + + /** + * Update task queue and history state with filtered data from server. + * Ensures only tasks from this client are stored in local state. + * @param state - The task state message from the server + */ + const updateTaskState = (state: ManagerTaskQueue) => { + taskQueue.value.running_queue = filterQueueByClientId(state.running_queue) + taskQueue.value.pending_queue = filterQueueByClientId(state.pending_queue) + taskHistory.value = filterHistoryByClientId(state.history) + + if (state.installed_packs) { + installedPacks.value = state.installed_packs + } + updateProcessingState() + } + // WebSocket event listener for task done const cleanupTaskDoneListener = useEventListener( - api, + app.api, MANAGER_WS_TASK_DONE_NAME, (event: CustomEvent) => { if (event?.type === MANAGER_WS_TASK_DONE_NAME) { const { state } = event.detail - taskQueue.value.running_queue = state.running_queue - taskQueue.value.pending_queue = state.pending_queue - taskHistory.value = state.history - if (state.installed_packs) { - console.log( - 'Updating installedPacks from WebSocket:', - Object.keys(state.installed_packs) - ) - installedPacks.value = state.installed_packs - } - updateProcessingState() + updateTaskState(state) } } ) // WebSocket event listener for task started const cleanupTaskStartedListener = useEventListener( - api, + app.api, MANAGER_WS_TASK_STARTED_NAME, (event: CustomEvent) => { if (event?.type === MANAGER_WS_TASK_STARTED_NAME) { const { state } = event.detail - taskQueue.value.running_queue = state.running_queue - taskQueue.value.pending_queue = state.pending_queue - taskHistory.value = state.history - if (state.installed_packs) { - console.log( - 'Updating installedPacks from WebSocket:', - Object.keys(state.installed_packs) - ) - installedPacks.value = state.installed_packs - } - updateProcessingState() + updateTaskState(state) } } ) diff --git a/tests-ui/tests/composables/widgets/useManagerQueue.test.ts b/tests-ui/tests/composables/widgets/useManagerQueue.test.ts index 81d8a35f3..f134b0db8 100644 --- a/tests-ui/tests/composables/widgets/useManagerQueue.test.ts +++ b/tests-ui/tests/composables/widgets/useManagerQueue.test.ts @@ -2,13 +2,48 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { nextTick, ref } from 'vue' import { useManagerQueue } from '@/composables/useManagerQueue' -import { api } from '@/scripts/api' +import { app } from '@/scripts/app' -vi.mock('@/scripts/api', () => ({ - api: { - addEventListener: vi.fn(), - removeEventListener: vi.fn(), - dispatchEvent: vi.fn() +// Mock VueUse's useEventListener +const mockEventListeners = new Map() +const mockWheneverCallback = vi.fn() + +vi.mock('@vueuse/core', async () => { + const actual = await vi.importActual('@vueuse/core') + return { + ...actual, + useEventListener: vi.fn((target, event, handler) => { + if (!mockEventListeners.has(event)) { + mockEventListeners.set(event, []) + } + mockEventListeners.get(event).push(handler) + + // Mock the addEventListener behavior + if (target && target.addEventListener) { + target.addEventListener(event, handler) + } + + // Return cleanup function + return () => { + if (target && target.removeEventListener) { + target.removeEventListener(event, handler) + } + } + }), + whenever: vi.fn((_source, cb) => { + mockWheneverCallback.mockImplementation(cb) + }) + } +}) + +vi.mock('@/scripts/app', () => ({ + app: { + api: { + clientId: 'test-client-id', + addEventListener: vi.fn(), + removeEventListener: vi.fn(), + dispatchEvent: vi.fn() + } } })) @@ -24,9 +59,10 @@ vi.mock('@/services/comfyManagerService', () => ({ })) })) +const mockShowManagerProgressDialog = vi.fn() vi.mock('@/services/dialogService', () => ({ useDialogService: vi.fn(() => ({ - showManagerProgressDialog: vi.fn() + showManagerProgressDialog: mockShowManagerProgressDialog })) })) @@ -35,8 +71,61 @@ describe('useManagerQueue', () => { let taskQueue: any let installedPacks: any + // Helper functions + const createMockTask = ( + id: string, + clientId = 'test-client-id', + additional = {} + ) => ({ + id, + client_id: clientId, + ...additional + }) + + const createMockHistoryItem = ( + clientId = 'test-client-id', + result = 'success', + additional = {} + ) => ({ + client_id: clientId, + result, + ...additional + }) + + const createMockState = (overrides = {}) => ({ + running_queue: [], + pending_queue: [], + history: {}, + installed_packs: {}, + ...overrides + }) + + const triggerWebSocketEvent = (eventType: string, state: any) => { + const mockEventListener = app.api.addEventListener as any + const eventCall = mockEventListener.mock.calls.find( + (call: any) => call[0] === eventType + ) + + if (eventCall) { + const handler = eventCall[1] + handler({ + type: eventType, + detail: { state } + }) + } + } + + const getEventHandler = (eventType: string) => { + const mockEventListener = app.api.addEventListener as any + const eventCall = mockEventListener.mock.calls.find( + (call: any) => call[0] === eventType + ) + return eventCall ? eventCall[1] : null + } + beforeEach(() => { vi.clearAllMocks() + mockEventListeners.clear() taskHistory = ref({}) taskQueue = ref({ history: {}, @@ -49,6 +138,7 @@ describe('useManagerQueue', () => { afterEach(() => { vi.clearAllMocks() + mockEventListeners.clear() }) describe('initialization', () => { @@ -62,7 +152,7 @@ describe('useManagerQueue', () => { it('should set up event listeners on creation', () => { useManagerQueue(taskHistory, taskQueue, installedPacks) - expect(api.addEventListener).toHaveBeenCalled() + expect(app.api.addEventListener).toHaveBeenCalled() }) }) @@ -75,14 +165,23 @@ describe('useManagerQueue', () => { expect(queue.allTasksDone.value).toBe(true) // Add tasks to queue - taskQueue.value.running_queue = [{ id: 'task1' } as any] - taskQueue.value.pending_queue = [{ id: 'task2' } as any] + taskQueue.value.running_queue = [createMockTask('task1')] + taskQueue.value.pending_queue = [createMockTask('task2')] // Force reactivity update await nextTick() expect(queue.queueLength.value).toBe(2) }) + + it('should trigger progress dialog when queue length changes', async () => { + useManagerQueue(taskHistory, taskQueue, installedPacks) + + // Trigger the whenever callback + mockWheneverCallback() + + expect(mockShowManagerProgressDialog).toHaveBeenCalled() + }) }) describe('task state management', () => { @@ -90,8 +189,8 @@ describe('useManagerQueue', () => { const queue = useManagerQueue(taskHistory, taskQueue, installedPacks) // Add running tasks - taskQueue.value.running_queue = [{ id: 'task1' } as any] - taskQueue.value.pending_queue = [{ id: 'task2' } as any] + taskQueue.value.running_queue = [createMockTask('task1')] + taskQueue.value.pending_queue = [createMockTask('task2')] await nextTick() @@ -110,33 +209,47 @@ describe('useManagerQueue', () => { expect(queue.queueLength.value).toBe(0) expect(queue.allTasksDone.value).toBe(true) }) + + it('should handle large queue sizes', async () => { + const queue = useManagerQueue(taskHistory, taskQueue, installedPacks) + + // Create large queues + taskQueue.value.running_queue = Array(50) + .fill(null) + .map((_, i) => createMockTask(`running-${i}`)) + taskQueue.value.pending_queue = Array(100) + .fill(null) + .map((_, i) => createMockTask(`pending-${i}`)) + + await nextTick() + + expect(queue.queueLength.value).toBe(150) + expect(queue.allTasksDone.value).toBe(false) + }) }) describe('queue data management', () => { it('should provide access to task queue state', async () => { - taskQueue.value.running_queue = [{ id: 'task1' } as any] - taskQueue.value.pending_queue = [ - { id: 'task2' }, - { id: 'task3' } - ] as any[] + const runningTasks = [createMockTask('task1')] + const pendingTasks = [createMockTask('task2'), createMockTask('task3')] + + taskQueue.value.running_queue = runningTasks + taskQueue.value.pending_queue = pendingTasks const queue = useManagerQueue(taskHistory, taskQueue, installedPacks) await nextTick() - expect(queue.taskQueue.value.running_queue).toEqual([{ id: 'task1' }]) - expect(queue.taskQueue.value.pending_queue).toEqual([ - { id: 'task2' }, - { id: 'task3' } - ]) + expect(queue.taskQueue.value.running_queue).toEqual(runningTasks) + expect(queue.taskQueue.value.pending_queue).toEqual(pendingTasks) expect(queue.queueLength.value).toBe(3) }) it('should provide access to task history', async () => { const mockHistory = { - task1: { result: 'success', timestamp: '2023-01-01' }, - task2: { result: 'error', timestamp: '2023-01-02' } + task1: createMockHistoryItem(), + task2: createMockHistoryItem('test-client-id', 'error') } - taskHistory.value = mockHistory as any + taskHistory.value = mockHistory const queue = useManagerQueue(taskHistory, taskQueue, installedPacks) await nextTick() @@ -163,16 +276,13 @@ describe('useManagerQueue', () => { const queue = useManagerQueue(taskHistory, taskQueue, installedPacks) taskHistory.value = { - task1: { result: 'success' } as any, - task2: { result: 'error' } as any + task1: createMockHistoryItem(), + task2: createMockHistoryItem('test-client-id', 'error') } await nextTick() - expect(queue.taskHistory.value).toEqual({ - task1: { result: 'success' }, - task2: { result: 'error' } - }) + expect(queue.taskHistory.value).toEqual(taskHistory.value) expect(queue.historyCount.value).toBe(2) }) @@ -200,7 +310,7 @@ describe('useManagerQueue', () => { expect(queue.allTasksDone.value).toBe(true) // Add pending tasks - taskQueue.value.pending_queue = [{ id: 'task1' } as any] + taskQueue.value.pending_queue = [createMockTask('task1')] await nextTick() @@ -220,10 +330,10 @@ describe('useManagerQueue', () => { expect(queue.queueLength.value).toBe(0) - taskQueue.value.running_queue = [{ id: 'task1' } as any] + taskQueue.value.running_queue = [createMockTask('task1')] taskQueue.value.pending_queue = [ - { id: 'task2' } as any, - { id: 'task3' } as any + createMockTask('task2'), + createMockTask('task3') ] await nextTick() @@ -231,4 +341,258 @@ describe('useManagerQueue', () => { expect(queue.queueLength.value).toBe(3) }) }) + + describe('client filtering functionality', () => { + it('should filter tasks by client ID in WebSocket events', async () => { + const queue = useManagerQueue(taskHistory, taskQueue, installedPacks) + + const mockState = createMockState({ + running_queue: [ + createMockTask('task1'), + createMockTask('task2', 'other-client-id') + ], + pending_queue: [createMockTask('task3')] + }) + + triggerWebSocketEvent('cm-task-completed', mockState) + await nextTick() + + // Should only include tasks from this client + expect(taskQueue.value.running_queue).toEqual([createMockTask('task1')]) + expect(taskQueue.value.pending_queue).toEqual([createMockTask('task3')]) + expect(queue.queueLength.value).toBe(2) + }) + + it('should filter history by client ID in WebSocket events', async () => { + const queue = useManagerQueue(taskHistory, taskQueue, installedPacks) + + const mockState = createMockState({ + history: { + task1: createMockHistoryItem(), + task2: createMockHistoryItem('other-client-id'), + task3: createMockHistoryItem() + } + }) + + triggerWebSocketEvent('cm-task-completed', mockState) + await nextTick() + + // Should only include history items from this client + expect(Object.keys(taskHistory.value)).toHaveLength(2) + expect(taskHistory.value).toHaveProperty('task1') + expect(taskHistory.value).toHaveProperty('task3') + expect(taskHistory.value).not.toHaveProperty('task2') + expect(queue.historyCount.value).toBe(2) + }) + + it('should handle all tasks being from other clients', async () => { + const queue = useManagerQueue(taskHistory, taskQueue, installedPacks) + + const mockState = createMockState({ + running_queue: [ + createMockTask('task1', 'other-client-1'), + createMockTask('task2', 'other-client-2') + ], + pending_queue: [createMockTask('task3', 'other-client-1')], + history: { + task4: createMockHistoryItem('other-client-1'), + task5: createMockHistoryItem('other-client-2') + } + }) + + triggerWebSocketEvent('cm-task-completed', mockState) + await nextTick() + + // Should have no tasks or history + expect(taskQueue.value.running_queue).toEqual([]) + expect(taskQueue.value.pending_queue).toEqual([]) + expect(taskHistory.value).toEqual({}) + expect(queue.queueLength.value).toBe(0) + expect(queue.historyCount.value).toBe(0) + }) + }) + + describe('WebSocket event handling', () => { + it('should handle task done events', async () => { + useManagerQueue(taskHistory, taskQueue, installedPacks) + + const mockState = createMockState({ + running_queue: [createMockTask('task1')], + history: { + task1: createMockHistoryItem() + }, + installed_packs: { pack1: { version: '1.0' } } + }) + + triggerWebSocketEvent('cm-task-completed', mockState) + await nextTick() + + expect(taskQueue.value.running_queue).toEqual([createMockTask('task1')]) + expect(taskQueue.value.pending_queue).toEqual([]) + expect(taskHistory.value).toEqual({ + task1: createMockHistoryItem() + }) + expect(installedPacks.value).toEqual({ pack1: { version: '1.0' } }) + }) + + it('should handle task started events', async () => { + useManagerQueue(taskHistory, taskQueue, installedPacks) + + const mockState = createMockState({ + running_queue: [createMockTask('task1')], + pending_queue: [createMockTask('task2')], + installed_packs: { pack1: { version: '1.0' } } + }) + + triggerWebSocketEvent('cm-task-started', mockState) + await nextTick() + + expect(taskQueue.value.running_queue).toEqual([createMockTask('task1')]) + expect(taskQueue.value.pending_queue).toEqual([createMockTask('task2')]) + expect(installedPacks.value).toEqual({ pack1: { version: '1.0' } }) + }) + + it('should filter out tasks from other clients in WebSocket events', async () => { + useManagerQueue(taskHistory, taskQueue, installedPacks) + + const mockState = createMockState({ + running_queue: [ + createMockTask('task1'), + createMockTask('task2', 'other-client-id') + ], + pending_queue: [createMockTask('task3', 'other-client-id')], + history: { + task1: createMockHistoryItem(), + task2: createMockHistoryItem('other-client-id') + } + }) + + triggerWebSocketEvent('cm-task-completed', mockState) + await nextTick() + + // Should only include tasks from this client + expect(taskQueue.value.running_queue).toEqual([createMockTask('task1')]) + expect(taskQueue.value.pending_queue).toEqual([]) + expect(taskHistory.value).toEqual({ + task1: createMockHistoryItem() + }) + }) + + it('should ignore events with wrong type', async () => { + useManagerQueue(taskHistory, taskQueue, installedPacks) + + const handler = getEventHandler('cm-task-completed') + + // Send event with wrong type + handler({ + type: 'wrong-event-type', + detail: { + state: createMockState({ running_queue: [createMockTask('task1')] }) + } + }) + await nextTick() + + // Should not update state + expect(taskQueue.value.running_queue).toEqual([]) + }) + }) + + describe('cleanup functionality', () => { + it('should clean up event listeners on stopListening', () => { + const queue = useManagerQueue(taskHistory, taskQueue, installedPacks) + const mockRemoveEventListener = app.api.removeEventListener as any + + queue.stopListening() + + expect(mockRemoveEventListener).toHaveBeenCalledTimes(2) + + // Check that both event types were called with the correct event names + const calls = mockRemoveEventListener.mock.calls + const eventTypes = calls.map((call: any) => call[0]) + expect(eventTypes).toContain('cm-task-completed') + expect(eventTypes).toContain('cm-task-started') + + // Check that functions were passed as second parameter + calls.forEach((call: any) => { + expect(typeof call[1]).toBe('function') + }) + }) + + it('should handle multiple stopListening calls gracefully', () => { + const queue = useManagerQueue(taskHistory, taskQueue, installedPacks) + const mockRemoveEventListener = app.api.removeEventListener as any + + queue.stopListening() + queue.stopListening() + + // Should still only be called twice (once per event type) + expect(mockRemoveEventListener).toHaveBeenCalledTimes(4) + }) + }) + + describe('edge cases', () => { + it('should handle undefined installed_packs in state update', async () => { + useManagerQueue(taskHistory, taskQueue, installedPacks) + + const mockState = createMockState({ + running_queue: [createMockTask('task1')], + installed_packs: undefined + }) + + triggerWebSocketEvent('cm-task-completed', mockState) + await nextTick() + + // Should not update installedPacks when undefined + expect(installedPacks.value).toEqual({}) + }) + + it('should handle rapid successive events', async () => { + const queue = useManagerQueue(taskHistory, taskQueue, installedPacks) + + // Send multiple events rapidly + for (let i = 0; i < 10; i++) { + triggerWebSocketEvent( + 'cm-task-completed', + createMockState({ + running_queue: [createMockTask(`task${i}`)], + history: { [`task${i}`]: createMockHistoryItem() } + }) + ) + } + + await nextTick() + + // Should have the last state + expect(taskQueue.value.running_queue).toEqual([createMockTask('task9')]) + expect(queue.queueLength.value).toBe(1) + }) + + it('should maintain consistency when mixing event types', async () => { + useManagerQueue(taskHistory, taskQueue, installedPacks) + + // Send alternating event types + triggerWebSocketEvent( + 'cm-task-started', + createMockState({ + running_queue: [createMockTask('task1')], + pending_queue: [createMockTask('task2')] + }) + ) + + triggerWebSocketEvent( + 'cm-task-completed', + createMockState({ + running_queue: [], + pending_queue: [createMockTask('task2')], + history: { task1: createMockHistoryItem() } + }) + ) + + await nextTick() + + expect(taskQueue.value.running_queue).toEqual([]) + expect(taskQueue.value.pending_queue).toEqual([createMockTask('task2')]) + expect(taskHistory.value).toHaveProperty('task1') + }) + }) })