From 116a11bd5869581271bfb41a35b92cfcdfaf5a4c Mon Sep 17 00:00:00 2001 From: Christian Byrne Date: Thu, 13 Mar 2025 16:45:53 -0700 Subject: [PATCH] [Manager] Allow multiple queued callbacks in manager task queue hook (#3027) --- src/composables/useManagerQueue.ts | 36 +++++++----- src/stores/comfyManagerStore.ts | 36 ++++++------ .../widgets/useManagerQueue.test.ts | 57 ++++++++++++++++++- 3 files changed, 96 insertions(+), 33 deletions(-) diff --git a/src/composables/useManagerQueue.ts b/src/composables/useManagerQueue.ts index 9e25c083f..efc8bd6e3 100644 --- a/src/composables/useManagerQueue.ts +++ b/src/composables/useManagerQueue.ts @@ -18,7 +18,8 @@ enum ManagerWsQueueStatus { export const useManagerQueue = () => { const clientQueueItems = ref[]>([]) const clientQueueLength = computed(() => clientQueueItems.value.length) - const nextOnCompleted = ref<(() => void) | undefined>() + const onCompletedQueue = ref<((() => void) | undefined)[]>([]) + const onCompleteWaitingCount = ref(0) const serverQueueStatus = ref(ManagerWsQueueStatus.DONE) const isServerIdle = computed( @@ -47,16 +48,16 @@ export const useManagerQueue = () => { if (!nextTask) return const { task, onComplete } = nextTask + if (onComplete) { + // Set the task's onComplete to be executed the next time the server is idle + onCompletedQueue.value.push(onComplete) + onCompleteWaitingCount.value++ + } - task() - .then(() => { - // Set the task's onComplete to be executed the next time the server is idle - nextOnCompleted.value = onComplete - }) - .catch((e) => { - const message = `Error enqueuing task for ComfyUI Manager: ${e}` - console.error(message) - }) + task().catch((e) => { + const message = `Error enqueuing task for ComfyUI Manager: ${e}` + console.error(message) + }) } const enqueueTask = (task: QueuedTask): void => { @@ -64,8 +65,9 @@ export const useManagerQueue = () => { } const clearQueue = () => { - nextOnCompleted.value = undefined clientQueueItems.value = [] + onCompletedQueue.value = [] + onCompleteWaitingCount.value = 0 } const cleanup = () => { @@ -75,9 +77,15 @@ export const useManagerQueue = () => { whenever(nextTaskReady, startNextTask) whenever(isServerIdle, () => { - if (nextOnCompleted.value) { - nextOnCompleted.value() - nextOnCompleted.value = undefined + if (onCompletedQueue.value?.length) { + while ( + onCompleteWaitingCount.value > 0 && + onCompletedQueue.value.length > 0 + ) { + const onComplete = onCompletedQueue.value.shift() + onComplete?.() + onCompleteWaitingCount.value-- + } } }) diff --git a/src/stores/comfyManagerStore.ts b/src/stores/comfyManagerStore.ts index 3c39fab3d..e2c18e706 100644 --- a/src/stores/comfyManagerStore.ts +++ b/src/stores/comfyManagerStore.ts @@ -20,8 +20,9 @@ import { export const useComfyManagerStore = defineStore('comfyManager', () => { const managerService = useComfyManagerService() const installedPacks = ref({}) - const enabledPacks = ref>(new Set()) - const disabledPacks = ref>(new Set()) + const enabledPacksIds = ref>(new Set()) + const disabledPacksIds = ref>(new Set()) + const installedPacksIds = ref>(new Set()) const isStale = ref(true) const { statusMessage, allTasksDone, enqueueTask } = useManagerQueue() @@ -30,14 +31,6 @@ export const useComfyManagerStore = defineStore('comfyManager', () => { isStale.value = true } - const isPackInstalled = (packName: string | undefined): boolean => { - if (!packName) return false - return !!installedPacks.value[packName] || disabledPacks.value.has(packName) - } - - const isPackEnabled = (packName: string | undefined): boolean => - !!packName && enabledPacks.value.has(packName) - /** * A pack is disabled if there is a disabled entry and no corresponding enabled entry * @example @@ -52,9 +45,17 @@ export const useComfyManagerStore = defineStore('comfyManager', () => { * } * isDisabled("packname") // true */ - const isPackDisabled = (pack: ManagerPackInstalled) => + const isDisabledPack = (pack: ManagerPackInstalled) => pack.enabled === false && pack.cnr_id && !installedPacks.value[pack.cnr_id] + const isInstalledPackId = (packName: string | undefined): boolean => + !!packName && installedPacksIds.value.has(packName) + + const isEnabledPackId = (packName: string | undefined): boolean => + !!packName && + isInstalledPackId(packName) && + enabledPacksIds.value.has(packName) + const packsToIdSet = (packs: ManagerPackInstalled[]) => packs.reduce((acc, pack) => { const id = pack.cnr_id || pack.aux_id @@ -64,15 +65,16 @@ export const useComfyManagerStore = defineStore('comfyManager', () => { watchEffect(() => { const packs = Object.values(installedPacks.value) - const [disabled, enabled] = partition(packs, isPackDisabled) - enabledPacks.value = packsToIdSet(enabled) - disabledPacks.value = packsToIdSet(disabled) + const [disabled, enabled] = partition(packs, isDisabledPack) + enabledPacksIds.value = packsToIdSet(enabled) + disabledPacksIds.value = packsToIdSet(disabled) + installedPacksIds.value = packsToIdSet(packs) }) const refreshInstalledList = async () => { - isStale.value = false const packs = await managerService.listInstalledPacks() if (packs) installedPacks.value = packs + isStale.value = false } whenever(isStale, refreshInstalledList, { immediate: true }) @@ -136,8 +138,8 @@ export const useComfyManagerStore = defineStore('comfyManager', () => { // Installed packs state installedPacks, - isPackInstalled, - isPackEnabled, + isPackInstalled: isInstalledPackId, + isPackEnabled: isEnabledPackId, // Pack actions installPack, diff --git a/tests-ui/tests/composables/widgets/useManagerQueue.test.ts b/tests-ui/tests/composables/widgets/useManagerQueue.test.ts index 541645882..30af37f30 100644 --- a/tests-ui/tests/composables/widgets/useManagerQueue.test.ts +++ b/tests-ui/tests/composables/widgets/useManagerQueue.test.ts @@ -233,8 +233,61 @@ describe('useManagerQueue', () => { await simulateServerStatus('in_progress') await simulateServerStatus('done') - // onComplete should not be called for failed tasks - expect(mockTask.onComplete).not.toHaveBeenCalled() + // onComplete should still be called for failed tasks + expect(mockTask.onComplete).toHaveBeenCalled() + }) + + it('should handle multiple multiple tasks enqueued at once while server busy', async () => { + const queue = useManagerQueue() + const mockTask1 = createMockTask() + const mockTask2 = createMockTask() + const mockTask3 = createMockTask() + + // Three tasks enqueued at once + await simulateServerStatus('in_progress') + await Promise.all([ + queue.enqueueTask(mockTask1), + queue.enqueueTask(mockTask2), + queue.enqueueTask(mockTask3) + ]) + + // Task 1 + await simulateServerStatus('done') + expect(mockTask1.task).toHaveBeenCalled() + + // Verify state of onComplete callbacks + expect(mockTask1.onComplete).toHaveBeenCalled() + expect(mockTask2.onComplete).not.toHaveBeenCalled() + expect(mockTask3.onComplete).not.toHaveBeenCalled() + + // Verify state of queue + expect(queue.queueLength.value).toBe(2) + expect(queue.allTasksDone.value).toBe(false) + + // Task 2 + await simulateServerStatus('in_progress') + await simulateServerStatus('done') + expect(mockTask2.task).toHaveBeenCalled() + + // Verify state of onComplete callbacks + expect(mockTask2.onComplete).toHaveBeenCalled() + expect(mockTask3.onComplete).not.toHaveBeenCalled() + + // Verify state of queue + expect(queue.queueLength.value).toBe(1) + expect(queue.allTasksDone.value).toBe(false) + + // Task 3 + await simulateServerStatus('in_progress') + await simulateServerStatus('done') + + // Verify state of onComplete callbacks + expect(mockTask3.task).toHaveBeenCalled() + expect(mockTask3.onComplete).toHaveBeenCalled() + + // Verify state of queue + expect(queue.queueLength.value).toBe(0) + expect(queue.allTasksDone.value).toBe(true) }) it('should handle adding tasks while processing is in progress', async () => {