[Manager] Allow multiple queued callbacks in manager task queue hook (#3027)

This commit is contained in:
Christian Byrne
2025-03-13 16:45:53 -07:00
committed by GitHub
parent 58be9ad4c3
commit 116a11bd58
3 changed files with 96 additions and 33 deletions

View File

@@ -18,7 +18,8 @@ enum ManagerWsQueueStatus {
export const useManagerQueue = () => { export const useManagerQueue = () => {
const clientQueueItems = ref<QueuedTask<unknown>[]>([]) const clientQueueItems = ref<QueuedTask<unknown>[]>([])
const clientQueueLength = computed(() => clientQueueItems.value.length) const clientQueueLength = computed(() => clientQueueItems.value.length)
const nextOnCompleted = ref<(() => void) | undefined>() const onCompletedQueue = ref<((() => void) | undefined)[]>([])
const onCompleteWaitingCount = ref(0)
const serverQueueStatus = ref<ManagerWsQueueStatus>(ManagerWsQueueStatus.DONE) const serverQueueStatus = ref<ManagerWsQueueStatus>(ManagerWsQueueStatus.DONE)
const isServerIdle = computed( const isServerIdle = computed(
@@ -47,16 +48,16 @@ export const useManagerQueue = () => {
if (!nextTask) return if (!nextTask) return
const { task, onComplete } = nextTask const { task, onComplete } = nextTask
if (onComplete) {
// Set the task's onComplete to be executed the next time the server is idle
onCompletedQueue.value.push(onComplete)
onCompleteWaitingCount.value++
}
task() task().catch((e) => {
.then(() => { const message = `Error enqueuing task for ComfyUI Manager: ${e}`
// Set the task's onComplete to be executed the next time the server is idle console.error(message)
nextOnCompleted.value = onComplete })
})
.catch((e) => {
const message = `Error enqueuing task for ComfyUI Manager: ${e}`
console.error(message)
})
} }
const enqueueTask = <T>(task: QueuedTask<T>): void => { const enqueueTask = <T>(task: QueuedTask<T>): void => {
@@ -64,8 +65,9 @@ export const useManagerQueue = () => {
} }
const clearQueue = () => { const clearQueue = () => {
nextOnCompleted.value = undefined
clientQueueItems.value = [] clientQueueItems.value = []
onCompletedQueue.value = []
onCompleteWaitingCount.value = 0
} }
const cleanup = () => { const cleanup = () => {
@@ -75,9 +77,15 @@ export const useManagerQueue = () => {
whenever(nextTaskReady, startNextTask) whenever(nextTaskReady, startNextTask)
whenever(isServerIdle, () => { whenever(isServerIdle, () => {
if (nextOnCompleted.value) { if (onCompletedQueue.value?.length) {
nextOnCompleted.value() while (
nextOnCompleted.value = undefined onCompleteWaitingCount.value > 0 &&
onCompletedQueue.value.length > 0
) {
const onComplete = onCompletedQueue.value.shift()
onComplete?.()
onCompleteWaitingCount.value--
}
} }
}) })

View File

@@ -20,8 +20,9 @@ import {
export const useComfyManagerStore = defineStore('comfyManager', () => { export const useComfyManagerStore = defineStore('comfyManager', () => {
const managerService = useComfyManagerService() const managerService = useComfyManagerService()
const installedPacks = ref<InstalledPacksResponse>({}) const installedPacks = ref<InstalledPacksResponse>({})
const enabledPacks = ref<Set<string>>(new Set()) const enabledPacksIds = ref<Set<string>>(new Set())
const disabledPacks = ref<Set<string>>(new Set()) const disabledPacksIds = ref<Set<string>>(new Set())
const installedPacksIds = ref<Set<string>>(new Set())
const isStale = ref(true) const isStale = ref(true)
const { statusMessage, allTasksDone, enqueueTask } = useManagerQueue() const { statusMessage, allTasksDone, enqueueTask } = useManagerQueue()
@@ -30,14 +31,6 @@ export const useComfyManagerStore = defineStore('comfyManager', () => {
isStale.value = true isStale.value = true
} }
const isPackInstalled = (packName: string | undefined): boolean => {
if (!packName) return false
return !!installedPacks.value[packName] || disabledPacks.value.has(packName)
}
const isPackEnabled = (packName: string | undefined): boolean =>
!!packName && enabledPacks.value.has(packName)
/** /**
* A pack is disabled if there is a disabled entry and no corresponding enabled entry * A pack is disabled if there is a disabled entry and no corresponding enabled entry
* @example * @example
@@ -52,9 +45,17 @@ export const useComfyManagerStore = defineStore('comfyManager', () => {
* } * }
* isDisabled("packname") // true * isDisabled("packname") // true
*/ */
const isPackDisabled = (pack: ManagerPackInstalled) => const isDisabledPack = (pack: ManagerPackInstalled) =>
pack.enabled === false && pack.cnr_id && !installedPacks.value[pack.cnr_id] pack.enabled === false && pack.cnr_id && !installedPacks.value[pack.cnr_id]
const isInstalledPackId = (packName: string | undefined): boolean =>
!!packName && installedPacksIds.value.has(packName)
const isEnabledPackId = (packName: string | undefined): boolean =>
!!packName &&
isInstalledPackId(packName) &&
enabledPacksIds.value.has(packName)
const packsToIdSet = (packs: ManagerPackInstalled[]) => const packsToIdSet = (packs: ManagerPackInstalled[]) =>
packs.reduce((acc, pack) => { packs.reduce((acc, pack) => {
const id = pack.cnr_id || pack.aux_id const id = pack.cnr_id || pack.aux_id
@@ -64,15 +65,16 @@ export const useComfyManagerStore = defineStore('comfyManager', () => {
watchEffect(() => { watchEffect(() => {
const packs = Object.values(installedPacks.value) const packs = Object.values(installedPacks.value)
const [disabled, enabled] = partition(packs, isPackDisabled) const [disabled, enabled] = partition(packs, isDisabledPack)
enabledPacks.value = packsToIdSet(enabled) enabledPacksIds.value = packsToIdSet(enabled)
disabledPacks.value = packsToIdSet(disabled) disabledPacksIds.value = packsToIdSet(disabled)
installedPacksIds.value = packsToIdSet(packs)
}) })
const refreshInstalledList = async () => { const refreshInstalledList = async () => {
isStale.value = false
const packs = await managerService.listInstalledPacks() const packs = await managerService.listInstalledPacks()
if (packs) installedPacks.value = packs if (packs) installedPacks.value = packs
isStale.value = false
} }
whenever(isStale, refreshInstalledList, { immediate: true }) whenever(isStale, refreshInstalledList, { immediate: true })
@@ -136,8 +138,8 @@ export const useComfyManagerStore = defineStore('comfyManager', () => {
// Installed packs state // Installed packs state
installedPacks, installedPacks,
isPackInstalled, isPackInstalled: isInstalledPackId,
isPackEnabled, isPackEnabled: isEnabledPackId,
// Pack actions // Pack actions
installPack, installPack,

View File

@@ -233,8 +233,61 @@ describe('useManagerQueue', () => {
await simulateServerStatus('in_progress') await simulateServerStatus('in_progress')
await simulateServerStatus('done') await simulateServerStatus('done')
// onComplete should not be called for failed tasks // onComplete should still be called for failed tasks
expect(mockTask.onComplete).not.toHaveBeenCalled() expect(mockTask.onComplete).toHaveBeenCalled()
})
it('should handle multiple multiple tasks enqueued at once while server busy', async () => {
const queue = useManagerQueue()
const mockTask1 = createMockTask()
const mockTask2 = createMockTask()
const mockTask3 = createMockTask()
// Three tasks enqueued at once
await simulateServerStatus('in_progress')
await Promise.all([
queue.enqueueTask(mockTask1),
queue.enqueueTask(mockTask2),
queue.enqueueTask(mockTask3)
])
// Task 1
await simulateServerStatus('done')
expect(mockTask1.task).toHaveBeenCalled()
// Verify state of onComplete callbacks
expect(mockTask1.onComplete).toHaveBeenCalled()
expect(mockTask2.onComplete).not.toHaveBeenCalled()
expect(mockTask3.onComplete).not.toHaveBeenCalled()
// Verify state of queue
expect(queue.queueLength.value).toBe(2)
expect(queue.allTasksDone.value).toBe(false)
// Task 2
await simulateServerStatus('in_progress')
await simulateServerStatus('done')
expect(mockTask2.task).toHaveBeenCalled()
// Verify state of onComplete callbacks
expect(mockTask2.onComplete).toHaveBeenCalled()
expect(mockTask3.onComplete).not.toHaveBeenCalled()
// Verify state of queue
expect(queue.queueLength.value).toBe(1)
expect(queue.allTasksDone.value).toBe(false)
// Task 3
await simulateServerStatus('in_progress')
await simulateServerStatus('done')
// Verify state of onComplete callbacks
expect(mockTask3.task).toHaveBeenCalled()
expect(mockTask3.onComplete).toHaveBeenCalled()
// Verify state of queue
expect(queue.queueLength.value).toBe(0)
expect(queue.allTasksDone.value).toBe(true)
}) })
it('should handle adding tasks while processing is in progress', async () => { it('should handle adding tasks while processing is in progress', async () => {