[manager] Update composables and state management

Updated manager queue composable, server logs composable, workflow packs composable, and manager store to support the new manager API structure and state management patterns.
This commit is contained in:
bymyself
2025-06-13 13:56:27 -07:00
committed by Jin Yi
parent 3db88b7172
commit 42a7deae2b
4 changed files with 308 additions and 133 deletions

View File

@@ -7,7 +7,7 @@ import { app } from '@/scripts/app'
import { useComfyRegistryStore } from '@/stores/comfyRegistryStore'
import { useNodeDefStore } from '@/stores/nodeDefStore'
import { useSystemStatsStore } from '@/stores/systemStatsStore'
import { SelectedVersion, UseNodePacksOptions } from '@/types/comfyManagerTypes'
import { UseNodePacksOptions } from '@/types/comfyManagerTypes'
import type { components } from '@/types/comfyRegistryTypes'
import { collectAllNodes } from '@/utils/graphTraversalUtil'
@@ -66,8 +66,7 @@ export const useWorkflowPacks = (options: UseNodePacksOptions = {}) => {
return {
id: CORE_NODES_PACK_NAME,
version:
systemStatsStore.systemStats?.system?.comfyui_version ??
SelectedVersion.NIGHTLY
systemStatsStore.systemStats?.system?.comfyui_version ?? 'nightly'
}
}
@@ -77,7 +76,7 @@ export const useWorkflowPacks = (options: UseNodePacksOptions = {}) => {
if (pack) {
return {
id: pack.id,
version: pack.latest_version?.version ?? SelectedVersion.NIGHTLY
version: pack.latest_version?.version ?? 'nightly'
}
}

View File

@@ -1,101 +1,114 @@
import { useEventListener, whenever } from '@vueuse/core'
import { computed, readonly, ref } from 'vue'
import { Ref, computed, ref } from 'vue'
import { api } from '@/scripts/api'
import { ManagerWsQueueStatus } from '@/types/comfyManagerTypes'
import { useDialogService } from '@/services/dialogService'
import { components } from '@/types/generatedManagerTypes'
type QueuedTask<T> = {
task: () => Promise<T>
onComplete?: () => void
}
type ManagerTaskHistory = Record<
string,
components['schemas']['TaskHistoryItem']
>
type ManagerTaskQueue = components['schemas']['TaskStateMessage']
type ManagerWsTaskDoneMsg = components['schemas']['MessageTaskDone']
type ManagerWsTaskStartedMsg = components['schemas']['MessageTaskStarted']
const MANAGER_WS_MSG_TYPE = 'cm-queue-status'
const MANAGER_WS_TASK_DONE_NAME = 'cm-task-completed'
const MANAGER_WS_TASK_STARTED_NAME = 'cm-task-started'
export const useManagerQueue = () => {
const clientQueueItems = ref<QueuedTask<unknown>[]>([])
const clientQueueLength = computed(() => clientQueueItems.value.length)
const onCompletedQueue = ref<((() => void) | undefined)[]>([])
const onCompleteWaitingCount = ref(0)
const uncompletedCount = computed(
() => clientQueueLength.value + onCompleteWaitingCount.value
export const useManagerQueue = (
taskHistory: Ref<ManagerTaskHistory>,
taskQueue: Ref<ManagerTaskQueue>,
installedPacks: Ref<Record<string, any>>
) => {
const { showManagerProgressDialog } = useDialogService()
// Task queue state (read-only from server)
const maxHistoryItems = ref(64)
const isLoading = ref(false)
const isProcessing = ref(false)
// Computed values
const currentQueueLength = computed(
() =>
taskQueue.value.running_queue.length +
taskQueue.value.pending_queue.length
)
const serverQueueStatus = ref<ManagerWsQueueStatus>(ManagerWsQueueStatus.DONE)
const isServerIdle = computed(
() => serverQueueStatus.value === ManagerWsQueueStatus.DONE
)
const updateProcessingState = () => {
isProcessing.value = currentQueueLength.value > 0
}
const allTasksDone = computed(
() => isServerIdle.value && clientQueueLength.value === 0
)
const nextTaskReady = computed(
() => isServerIdle.value && clientQueueLength.value > 0
() => !isProcessing.value && currentQueueLength.value === 0
)
const historyCount = computed(() => Object.keys(taskHistory.value).length)
const cleanupListener = useEventListener(
// WebSocket event listener for task done
const cleanupTaskDoneListener = useEventListener(
api,
MANAGER_WS_MSG_TYPE,
(event: CustomEvent<{ status: ManagerWsQueueStatus }>) => {
if (event?.type === MANAGER_WS_MSG_TYPE && event.detail?.status) {
serverQueueStatus.value = event.detail.status
MANAGER_WS_TASK_DONE_NAME,
(event: CustomEvent<ManagerWsTaskDoneMsg>) => {
if (event?.type === MANAGER_WS_TASK_DONE_NAME) {
const { state } = event.detail
taskQueue.value.running_queue = state.running_queue
taskQueue.value.pending_queue = state.pending_queue
taskHistory.value = state.history
if (state.installed_packs) {
console.log(
'Updating installedPacks from WebSocket:',
Object.keys(state.installed_packs)
)
installedPacks.value = state.installed_packs
}
updateProcessingState()
}
}
)
const startNextTask = () => {
const nextTask = clientQueueItems.value.shift()
if (!nextTask) return
const { task, onComplete } = nextTask
if (onComplete) {
// Set the task's onComplete to be executed the next time the server is idle
onCompletedQueue.value.push(onComplete)
onCompleteWaitingCount.value++
}
task().catch((e) => {
const message = `Error enqueuing task for ComfyUI Manager: ${e}`
console.error(message)
})
}
const enqueueTask = <T>(task: QueuedTask<T>): void => {
clientQueueItems.value.push(task)
}
const clearQueue = () => {
clientQueueItems.value = []
onCompletedQueue.value = []
onCompleteWaitingCount.value = 0
}
const cleanup = () => {
clearQueue()
cleanupListener()
}
whenever(nextTaskReady, startNextTask)
whenever(isServerIdle, () => {
if (onCompletedQueue.value?.length) {
while (
onCompleteWaitingCount.value > 0 &&
onCompletedQueue.value.length > 0
) {
const onComplete = onCompletedQueue.value.shift()
onComplete?.()
onCompleteWaitingCount.value--
// WebSocket event listener for task started
const cleanupTaskStartedListener = useEventListener(
api,
MANAGER_WS_TASK_STARTED_NAME,
(event: CustomEvent<ManagerWsTaskStartedMsg>) => {
if (event?.type === MANAGER_WS_TASK_STARTED_NAME) {
const { state } = event.detail
taskQueue.value.running_queue = state.running_queue
taskQueue.value.pending_queue = state.pending_queue
taskHistory.value = state.history
if (state.installed_packs) {
console.log(
'Updating installedPacks from WebSocket:',
Object.keys(state.installed_packs)
)
installedPacks.value = state.installed_packs
}
updateProcessingState()
}
}
})
)
whenever(currentQueueLength, () => showManagerProgressDialog())
const stopListening = () => {
cleanupTaskDoneListener()
cleanupTaskStartedListener()
}
return {
allTasksDone,
statusMessage: readonly(serverQueueStatus),
queueLength: clientQueueLength,
uncompletedCount,
// Queue state (read-only from server)
taskHistory,
taskQueue,
maxHistoryItems,
isLoading,
enqueueTask,
clearQueue,
cleanup
// Computed state
allTasksDone,
isProcessing,
queueLength: currentQueueLength,
historyCount,
// Actions
stopListening
}
}

View File

@@ -1,24 +1,30 @@
import { useEventListener } from '@vueuse/core'
import { onUnmounted, ref } from 'vue'
import { ref } from 'vue'
import { LogsWsMessage } from '@/schemas/apiSchema'
import { api } from '@/scripts/api'
import { components } from '@/types/generatedManagerTypes'
const LOGS_MESSAGE_TYPE = 'logs'
const MANAGER_WS_TASK_DONE_NAME = 'cm-task-completed'
type ManagerWsTaskDoneMsg = components['schemas']['MessageTaskDone']
interface UseServerLogsOptions {
ui_id: string
immediate?: boolean
messageFilter?: (message: string) => boolean
}
export const useServerLogs = (options: UseServerLogsOptions = {}) => {
export const useServerLogs = (options: UseServerLogsOptions) => {
const {
immediate = false,
messageFilter = (msg: string) => Boolean(msg.trim())
} = options
const logs = ref<string[]>([])
let stop: ReturnType<typeof useEventListener> | null = null
let stopLogs: ReturnType<typeof useEventListener> | null = null
let stopTaskDone: ReturnType<typeof useEventListener> | null = null
const isValidLogEvent = (event: CustomEvent<LogsWsMessage>) =>
event?.type === LOGS_MESSAGE_TYPE && event.detail?.entries?.length > 0
@@ -28,33 +34,55 @@ export const useServerLogs = (options: UseServerLogsOptions = {}) => {
const handleLogMessage = (event: CustomEvent<LogsWsMessage>) => {
if (isValidLogEvent(event)) {
logs.value.push(...parseLogMessage(event))
const messages = parseLogMessage(event)
if (messages.length > 0) {
logs.value.push(...messages)
}
}
}
const start = async () => {
const handleTaskDone = (event: CustomEvent<ManagerWsTaskDoneMsg>) => {
if (event?.type === MANAGER_WS_TASK_DONE_NAME) {
const { state } = event.detail
// Check if our task is now in the history (completed)
if (state.history[options.ui_id]) {
void stopListening()
}
}
}
const startListening = async () => {
await api.subscribeLogs(true)
stop = useEventListener(api, LOGS_MESSAGE_TYPE, handleLogMessage)
stopLogs = useEventListener(api, LOGS_MESSAGE_TYPE, handleLogMessage)
stopTaskDone = useEventListener(
api,
MANAGER_WS_TASK_DONE_NAME,
handleTaskDone
)
}
const stopListening = async () => {
stop?.()
stop = null
console.log('stopListening')
stopLogs?.()
stopTaskDone?.()
stopLogs = null
stopTaskDone = null
await api.subscribeLogs(false)
}
if (immediate) {
void start()
void startListening()
}
onUnmounted(async () => {
const cleanup = async () => {
await stopListening()
logs.value = []
})
}
return {
logs,
startListening: start,
stopListening
startListening,
stopListening,
cleanup
}
}

View File

@@ -1,21 +1,28 @@
import { whenever } from '@vueuse/core'
import { defineStore } from 'pinia'
import { v4 as uuidv4 } from 'uuid'
import { ref, watch } from 'vue'
import { useI18n } from 'vue-i18n'
import { useCachedRequest } from '@/composables/useCachedRequest'
import { useManagerQueue } from '@/composables/useManagerQueue'
import { useServerLogs } from '@/composables/useServerLogs'
import { api } from '@/scripts/api'
import { useComfyManagerService } from '@/services/comfyManagerService'
import { useDialogService } from '@/services/dialogService'
import {
InstallPackParams,
InstalledPacksResponse,
ManagerPackInfo,
ManagerPackInstalled,
TaskLog,
UpdateAllPacksParams
} from '@/types/comfyManagerTypes'
import { TaskLog } from '@/types/comfyManagerTypes'
import { components } from '@/types/generatedManagerTypes'
type InstallPackParams = components['schemas']['InstallPackParams']
type InstalledPacksResponse = components['schemas']['InstalledPacksResponse']
type ManagerPackInfo = components['schemas']['ManagerPackInfo']
type ManagerPackInstalled = components['schemas']['ManagerPackInstalled']
type ManagerTaskHistory = Record<
string,
components['schemas']['TaskHistoryItem']
>
type ManagerTaskQueue = components['schemas']['TaskStateMessage']
type UpdateAllPacksParams = components['schemas']['UpdateAllPacksParams']
/**
* Store for state of installed node packs
@@ -31,14 +38,63 @@ export const useComfyManagerStore = defineStore('comfyManager', () => {
const installedPacksIds = ref<Set<string>>(new Set())
const isStale = ref(true)
const taskLogs = ref<TaskLog[]>([])
const succeededTasksLogs = ref<TaskLog[]>([])
const failedTasksLogs = ref<TaskLog[]>([])
const { statusMessage, allTasksDone, enqueueTask, uncompletedCount } =
useManagerQueue()
const taskHistory = ref<ManagerTaskHistory>({})
const succeededTasksIds = ref<string[]>([])
const failedTasksIds = ref<string[]>([])
const taskQueue = ref<ManagerTaskQueue>({
history: {},
running_queue: [],
pending_queue: [],
installed_packs: {}
})
const managerQueue = useManagerQueue(taskHistory, taskQueue, installedPacks)
const setStale = () => {
isStale.value = true
}
const partitionTaskLogs = () => {
const successTaskLogs: TaskLog[] = []
const failTaskLogs: TaskLog[] = []
for (const log of taskLogs.value) {
if (failedTasksIds.value.includes(log.taskId)) {
failTaskLogs.push(log)
} else {
successTaskLogs.push(log)
}
}
succeededTasksLogs.value = successTaskLogs
failedTasksLogs.value = failTaskLogs
}
const partitionTasks = () => {
const successTasksIds = []
const failTasksIds = []
for (const task of Object.values(taskHistory.value)) {
if (task.status?.status_str === 'success') {
successTasksIds.push(task.ui_id)
} else {
failTasksIds.push(task.ui_id)
}
}
succeededTasksIds.value = successTasksIds
failedTasksIds.value = failTasksIds
}
whenever(
taskHistory,
() => {
partitionTasks()
partitionTaskLogs()
console.log('installed pack ids', installedPacksIds.value)
},
{ deep: true }
)
const getPackId = (pack: ManagerPackInstalled) => pack.cnr_id || pack.aux_id
const isInstalledPackId = (packName: string | undefined): boolean =>
@@ -97,11 +153,27 @@ export const useComfyManagerStore = defineStore('comfyManager', () => {
}
const updateInstalledIds = (packs: ManagerPackInstalled[]) => {
installedPacksIds.value = packsToIdSet(packs)
const newIds = packsToIdSet(packs)
console.log('updateInstalledIds: creating set with:', Array.from(newIds))
installedPacksIds.value = newIds
console.log(
'updateInstalledIds: final installedPacksIds:',
Array.from(installedPacksIds.value)
)
}
const onPacksChanged = () => {
const packs = Object.values(installedPacks.value)
console.log(
'onPacksChanged called with packs:',
packs.map((p) => ({
key: Object.keys(installedPacks.value).find(
(k) => installedPacks.value[k] === p
),
cnr_id: p.cnr_id,
aux_id: p.aux_id
}))
)
updateDisabledIds(packs)
updateInstalledIds(packs)
}
@@ -115,23 +187,46 @@ export const useComfyManagerStore = defineStore('comfyManager', () => {
}
whenever(isStale, refreshInstalledList, { immediate: true })
whenever(uncompletedCount, () => showManagerProgressDialog())
const withLogs = (task: () => Promise<null>, taskName: string) => {
const { startListening, stopListening, logs } = useServerLogs()
const enqueueTaskWithLogs = async (
task: (taskId: string) => Promise<null>,
taskName: string
) => {
const taskId = uuidv4()
const { startListening, logs } = useServerLogs({
ui_id: taskId
})
const loggedTask = async () => {
taskLogs.value.push({ taskName, logs: logs.value })
try {
// Show progress dialog immediately when task is queued
showManagerProgressDialog()
managerQueue.isProcessing.value = true
// Prepare logging hook
taskLogs.value.push({ taskName, taskId, logs: logs.value })
await startListening()
return task()
}
const onComplete = async () => {
await stopListening()
setStale()
}
// Queue the task to the server
await task(taskId)
} catch (error) {
// Reset processing state on error
managerQueue.isProcessing.value = false
return { task: loggedTask, onComplete }
// The server has authority over task history in general, but in rare
// case of client-side error, we add that to failed tasks from the client side
taskHistory.value[taskId] = {
ui_id: taskId,
client_id: api.clientId || 'unknown',
kind: 'error',
result: 'failed',
status: {
status_str: 'error',
completed: false,
messages: [error instanceof Error ? error.message : String(error)]
},
timestamp: new Date().toISOString()
}
}
}
const installPack = useCachedRequest<InstallPackParams, void>(
@@ -152,39 +247,62 @@ export const useComfyManagerStore = defineStore('comfyManager', () => {
}
}
const task = () => managerService.installPack(params, signal)
enqueueTask(withLogs(task, `${actionDescription} ${params.id}`))
const task = (taskId: string) =>
managerService.installPack(params, taskId, signal)
await enqueueTaskWithLogs(task, `${actionDescription} ${params.id}`)
},
{ maxSize: 1 }
)
const uninstallPack = (params: ManagerPackInfo, signal?: AbortSignal) => {
const uninstallPack = async (
params: ManagerPackInfo,
signal?: AbortSignal
) => {
installPack.clear()
installPack.cancel()
const task = () => managerService.uninstallPack(params, signal)
enqueueTask(withLogs(task, t('manager.uninstalling', { id: params.id })))
const uninstallParams: components['schemas']['UninstallPackParams'] = {
node_name: params.id,
is_unknown: false
}
const task = (taskId: string) =>
managerService.uninstallPack(uninstallParams, taskId, signal)
await enqueueTaskWithLogs(
task,
t('manager.uninstalling', { id: params.id })
)
}
const updatePack = useCachedRequest<ManagerPackInfo, void>(
async (params: ManagerPackInfo, signal?: AbortSignal) => {
updateAllPacks.cancel()
const task = () => managerService.updatePack(params, signal)
enqueueTask(withLogs(task, t('g.updating', { id: params.id })))
const updateParams: components['schemas']['UpdatePackParams'] = {
node_name: params.id,
node_ver: params.version
}
const task = (taskId: string) =>
managerService.updatePack(updateParams, taskId, signal)
await enqueueTaskWithLogs(task, t('g.updating', { id: params.id }))
},
{ maxSize: 1 }
)
const updateAllPacks = useCachedRequest<UpdateAllPacksParams, void>(
async (params: UpdateAllPacksParams, signal?: AbortSignal) => {
const task = () => managerService.updateAllPacks(params, signal)
enqueueTask(withLogs(task, t('manager.updatingAllPacks')))
const task = (taskId: string) =>
managerService.updateAllPacks(params, taskId, signal)
await enqueueTaskWithLogs(task, t('manager.updatingAllPacks'))
},
{ maxSize: 1 }
)
const disablePack = (params: ManagerPackInfo, signal?: AbortSignal) => {
const task = () => managerService.disablePack(params, signal)
enqueueTask(withLogs(task, t('g.disabling', { id: params.id })))
const disablePack = async (params: ManagerPackInfo, signal?: AbortSignal) => {
const disableParams: components['schemas']['DisablePackParams'] = {
node_name: params.id,
is_unknown: false
}
const task = (taskId: string) =>
managerService.disablePack(disableParams, taskId, signal)
await enqueueTaskWithLogs(task, t('g.disabling', { id: params.id }))
}
const getInstalledPackVersion = (packId: string) => {
@@ -200,9 +318,6 @@ export const useComfyManagerStore = defineStore('comfyManager', () => {
// Manager state
isLoading: managerService.isLoading,
error: managerService.error,
statusMessage,
allTasksDone,
uncompletedCount,
taskLogs,
clearLogs,
setStale,
@@ -215,6 +330,15 @@ export const useComfyManagerStore = defineStore('comfyManager', () => {
getInstalledPackVersion,
refreshInstalledList,
// Task queue state and actions
taskHistory,
isProcessingTasks: managerQueue.isProcessing,
succeededTasksIds,
failedTasksIds,
succeededTasksLogs,
failedTasksLogs,
managerQueue, // Expose full queue composable for advanced usage
// Pack actions
installPack,
uninstallPack,
@@ -234,6 +358,15 @@ export const useManagerProgressDialogStore = defineStore(
'managerProgressDialog',
() => {
const isExpanded = ref(false)
const activeTabIndex = ref(0)
const setActiveTabIndex = (index: number) => {
activeTabIndex.value = index
}
const getActiveTabIndex = () => {
return activeTabIndex.value
}
const toggle = () => {
isExpanded.value = !isExpanded.value
@@ -250,7 +383,9 @@ export const useManagerProgressDialogStore = defineStore(
isExpanded,
toggle,
collapse,
expand
expand,
setActiveTabIndex,
getActiveTabIndex
}
}
)