diff --git a/src/composables/nodePack/useWorkflowPacks.ts b/src/composables/nodePack/useWorkflowPacks.ts index 8096d1d49..296c4191d 100644 --- a/src/composables/nodePack/useWorkflowPacks.ts +++ b/src/composables/nodePack/useWorkflowPacks.ts @@ -7,7 +7,7 @@ import { app } from '@/scripts/app' import { useComfyRegistryStore } from '@/stores/comfyRegistryStore' import { useNodeDefStore } from '@/stores/nodeDefStore' import { useSystemStatsStore } from '@/stores/systemStatsStore' -import { SelectedVersion, UseNodePacksOptions } from '@/types/comfyManagerTypes' +import { UseNodePacksOptions } from '@/types/comfyManagerTypes' import type { components } from '@/types/comfyRegistryTypes' import { collectAllNodes } from '@/utils/graphTraversalUtil' @@ -66,8 +66,7 @@ export const useWorkflowPacks = (options: UseNodePacksOptions = {}) => { return { id: CORE_NODES_PACK_NAME, version: - systemStatsStore.systemStats?.system?.comfyui_version ?? - SelectedVersion.NIGHTLY + systemStatsStore.systemStats?.system?.comfyui_version ?? 'nightly' } } @@ -77,7 +76,7 @@ export const useWorkflowPacks = (options: UseNodePacksOptions = {}) => { if (pack) { return { id: pack.id, - version: pack.latest_version?.version ?? SelectedVersion.NIGHTLY + version: pack.latest_version?.version ?? 'nightly' } } diff --git a/src/composables/useManagerQueue.ts b/src/composables/useManagerQueue.ts index 792d9e5ab..a64bd3773 100644 --- a/src/composables/useManagerQueue.ts +++ b/src/composables/useManagerQueue.ts @@ -1,101 +1,114 @@ import { useEventListener, whenever } from '@vueuse/core' -import { computed, readonly, ref } from 'vue' +import { Ref, computed, ref } from 'vue' import { api } from '@/scripts/api' -import { ManagerWsQueueStatus } from '@/types/comfyManagerTypes' +import { useDialogService } from '@/services/dialogService' +import { components } from '@/types/generatedManagerTypes' -type QueuedTask = { - task: () => Promise - onComplete?: () => void -} +type ManagerTaskHistory = Record< + string, + components['schemas']['TaskHistoryItem'] +> +type ManagerTaskQueue = components['schemas']['TaskStateMessage'] +type ManagerWsTaskDoneMsg = components['schemas']['MessageTaskDone'] +type ManagerWsTaskStartedMsg = components['schemas']['MessageTaskStarted'] -const MANAGER_WS_MSG_TYPE = 'cm-queue-status' +const MANAGER_WS_TASK_DONE_NAME = 'cm-task-completed' +const MANAGER_WS_TASK_STARTED_NAME = 'cm-task-started' -export const useManagerQueue = () => { - const clientQueueItems = ref[]>([]) - const clientQueueLength = computed(() => clientQueueItems.value.length) - const onCompletedQueue = ref<((() => void) | undefined)[]>([]) - const onCompleteWaitingCount = ref(0) - const uncompletedCount = computed( - () => clientQueueLength.value + onCompleteWaitingCount.value +export const useManagerQueue = ( + taskHistory: Ref, + taskQueue: Ref, + installedPacks: Ref> +) => { + const { showManagerProgressDialog } = useDialogService() + + // Task queue state (read-only from server) + const maxHistoryItems = ref(64) + const isLoading = ref(false) + const isProcessing = ref(false) + + // Computed values + const currentQueueLength = computed( + () => + taskQueue.value.running_queue.length + + taskQueue.value.pending_queue.length ) - const serverQueueStatus = ref(ManagerWsQueueStatus.DONE) - const isServerIdle = computed( - () => serverQueueStatus.value === ManagerWsQueueStatus.DONE - ) + const updateProcessingState = () => { + isProcessing.value = currentQueueLength.value > 0 + } const allTasksDone = computed( - () => isServerIdle.value && clientQueueLength.value === 0 - ) - const nextTaskReady = computed( - () => isServerIdle.value && clientQueueLength.value > 0 + () => !isProcessing.value && currentQueueLength.value === 0 ) + const historyCount = computed(() => Object.keys(taskHistory.value).length) - const cleanupListener = useEventListener( + // WebSocket event listener for task done + const cleanupTaskDoneListener = useEventListener( api, - MANAGER_WS_MSG_TYPE, - (event: CustomEvent<{ status: ManagerWsQueueStatus }>) => { - if (event?.type === MANAGER_WS_MSG_TYPE && event.detail?.status) { - serverQueueStatus.value = event.detail.status + MANAGER_WS_TASK_DONE_NAME, + (event: CustomEvent) => { + if (event?.type === MANAGER_WS_TASK_DONE_NAME) { + const { state } = event.detail + taskQueue.value.running_queue = state.running_queue + taskQueue.value.pending_queue = state.pending_queue + taskHistory.value = state.history + if (state.installed_packs) { + console.log( + 'Updating installedPacks from WebSocket:', + Object.keys(state.installed_packs) + ) + installedPacks.value = state.installed_packs + } + updateProcessingState() } } ) - const startNextTask = () => { - const nextTask = clientQueueItems.value.shift() - if (!nextTask) return - - const { task, onComplete } = nextTask - if (onComplete) { - // Set the task's onComplete to be executed the next time the server is idle - onCompletedQueue.value.push(onComplete) - onCompleteWaitingCount.value++ - } - - task().catch((e) => { - const message = `Error enqueuing task for ComfyUI Manager: ${e}` - console.error(message) - }) - } - - const enqueueTask = (task: QueuedTask): void => { - clientQueueItems.value.push(task) - } - - const clearQueue = () => { - clientQueueItems.value = [] - onCompletedQueue.value = [] - onCompleteWaitingCount.value = 0 - } - - const cleanup = () => { - clearQueue() - cleanupListener() - } - - whenever(nextTaskReady, startNextTask) - whenever(isServerIdle, () => { - if (onCompletedQueue.value?.length) { - while ( - onCompleteWaitingCount.value > 0 && - onCompletedQueue.value.length > 0 - ) { - const onComplete = onCompletedQueue.value.shift() - onComplete?.() - onCompleteWaitingCount.value-- + // WebSocket event listener for task started + const cleanupTaskStartedListener = useEventListener( + api, + MANAGER_WS_TASK_STARTED_NAME, + (event: CustomEvent) => { + if (event?.type === MANAGER_WS_TASK_STARTED_NAME) { + const { state } = event.detail + taskQueue.value.running_queue = state.running_queue + taskQueue.value.pending_queue = state.pending_queue + taskHistory.value = state.history + if (state.installed_packs) { + console.log( + 'Updating installedPacks from WebSocket:', + Object.keys(state.installed_packs) + ) + installedPacks.value = state.installed_packs + } + updateProcessingState() } } - }) + ) + + whenever(currentQueueLength, () => showManagerProgressDialog()) + + const stopListening = () => { + cleanupTaskDoneListener() + cleanupTaskStartedListener() + } return { - allTasksDone, - statusMessage: readonly(serverQueueStatus), - queueLength: clientQueueLength, - uncompletedCount, + // Queue state (read-only from server) + taskHistory, + taskQueue, + maxHistoryItems, + isLoading, - enqueueTask, - clearQueue, - cleanup + // Computed state + allTasksDone, + isProcessing, + queueLength: currentQueueLength, + historyCount, + + // Actions + stopListening } } diff --git a/src/composables/useServerLogs.ts b/src/composables/useServerLogs.ts index a802c7b8f..27db713ff 100644 --- a/src/composables/useServerLogs.ts +++ b/src/composables/useServerLogs.ts @@ -1,24 +1,30 @@ import { useEventListener } from '@vueuse/core' -import { onUnmounted, ref } from 'vue' +import { ref } from 'vue' import { LogsWsMessage } from '@/schemas/apiSchema' import { api } from '@/scripts/api' +import { components } from '@/types/generatedManagerTypes' const LOGS_MESSAGE_TYPE = 'logs' +const MANAGER_WS_TASK_DONE_NAME = 'cm-task-completed' + +type ManagerWsTaskDoneMsg = components['schemas']['MessageTaskDone'] interface UseServerLogsOptions { + ui_id: string immediate?: boolean messageFilter?: (message: string) => boolean } -export const useServerLogs = (options: UseServerLogsOptions = {}) => { +export const useServerLogs = (options: UseServerLogsOptions) => { const { immediate = false, messageFilter = (msg: string) => Boolean(msg.trim()) } = options const logs = ref([]) - let stop: ReturnType | null = null + let stopLogs: ReturnType | null = null + let stopTaskDone: ReturnType | null = null const isValidLogEvent = (event: CustomEvent) => event?.type === LOGS_MESSAGE_TYPE && event.detail?.entries?.length > 0 @@ -28,33 +34,55 @@ export const useServerLogs = (options: UseServerLogsOptions = {}) => { const handleLogMessage = (event: CustomEvent) => { if (isValidLogEvent(event)) { - logs.value.push(...parseLogMessage(event)) + const messages = parseLogMessage(event) + if (messages.length > 0) { + logs.value.push(...messages) + } } } - const start = async () => { + const handleTaskDone = (event: CustomEvent) => { + if (event?.type === MANAGER_WS_TASK_DONE_NAME) { + const { state } = event.detail + // Check if our task is now in the history (completed) + if (state.history[options.ui_id]) { + void stopListening() + } + } + } + + const startListening = async () => { await api.subscribeLogs(true) - stop = useEventListener(api, LOGS_MESSAGE_TYPE, handleLogMessage) + stopLogs = useEventListener(api, LOGS_MESSAGE_TYPE, handleLogMessage) + stopTaskDone = useEventListener( + api, + MANAGER_WS_TASK_DONE_NAME, + handleTaskDone + ) } const stopListening = async () => { - stop?.() - stop = null + console.log('stopListening') + stopLogs?.() + stopTaskDone?.() + stopLogs = null + stopTaskDone = null await api.subscribeLogs(false) } if (immediate) { - void start() + void startListening() } - onUnmounted(async () => { + const cleanup = async () => { await stopListening() logs.value = [] - }) + } return { logs, - startListening: start, - stopListening + startListening, + stopListening, + cleanup } } diff --git a/src/stores/comfyManagerStore.ts b/src/stores/comfyManagerStore.ts index 0568711f7..a226b3813 100644 --- a/src/stores/comfyManagerStore.ts +++ b/src/stores/comfyManagerStore.ts @@ -1,21 +1,28 @@ import { whenever } from '@vueuse/core' import { defineStore } from 'pinia' +import { v4 as uuidv4 } from 'uuid' import { ref, watch } from 'vue' import { useI18n } from 'vue-i18n' import { useCachedRequest } from '@/composables/useCachedRequest' import { useManagerQueue } from '@/composables/useManagerQueue' import { useServerLogs } from '@/composables/useServerLogs' +import { api } from '@/scripts/api' import { useComfyManagerService } from '@/services/comfyManagerService' import { useDialogService } from '@/services/dialogService' -import { - InstallPackParams, - InstalledPacksResponse, - ManagerPackInfo, - ManagerPackInstalled, - TaskLog, - UpdateAllPacksParams -} from '@/types/comfyManagerTypes' +import { TaskLog } from '@/types/comfyManagerTypes' +import { components } from '@/types/generatedManagerTypes' + +type InstallPackParams = components['schemas']['InstallPackParams'] +type InstalledPacksResponse = components['schemas']['InstalledPacksResponse'] +type ManagerPackInfo = components['schemas']['ManagerPackInfo'] +type ManagerPackInstalled = components['schemas']['ManagerPackInstalled'] +type ManagerTaskHistory = Record< + string, + components['schemas']['TaskHistoryItem'] +> +type ManagerTaskQueue = components['schemas']['TaskStateMessage'] +type UpdateAllPacksParams = components['schemas']['UpdateAllPacksParams'] /** * Store for state of installed node packs @@ -31,14 +38,63 @@ export const useComfyManagerStore = defineStore('comfyManager', () => { const installedPacksIds = ref>(new Set()) const isStale = ref(true) const taskLogs = ref([]) + const succeededTasksLogs = ref([]) + const failedTasksLogs = ref([]) - const { statusMessage, allTasksDone, enqueueTask, uncompletedCount } = - useManagerQueue() + const taskHistory = ref({}) + const succeededTasksIds = ref([]) + const failedTasksIds = ref([]) + const taskQueue = ref({ + history: {}, + running_queue: [], + pending_queue: [], + installed_packs: {} + }) + + const managerQueue = useManagerQueue(taskHistory, taskQueue, installedPacks) const setStale = () => { isStale.value = true } + const partitionTaskLogs = () => { + const successTaskLogs: TaskLog[] = [] + const failTaskLogs: TaskLog[] = [] + for (const log of taskLogs.value) { + if (failedTasksIds.value.includes(log.taskId)) { + failTaskLogs.push(log) + } else { + successTaskLogs.push(log) + } + } + succeededTasksLogs.value = successTaskLogs + failedTasksLogs.value = failTaskLogs + } + + const partitionTasks = () => { + const successTasksIds = [] + const failTasksIds = [] + for (const task of Object.values(taskHistory.value)) { + if (task.status?.status_str === 'success') { + successTasksIds.push(task.ui_id) + } else { + failTasksIds.push(task.ui_id) + } + } + succeededTasksIds.value = successTasksIds + failedTasksIds.value = failTasksIds + } + + whenever( + taskHistory, + () => { + partitionTasks() + partitionTaskLogs() + console.log('installed pack ids', installedPacksIds.value) + }, + { deep: true } + ) + const getPackId = (pack: ManagerPackInstalled) => pack.cnr_id || pack.aux_id const isInstalledPackId = (packName: string | undefined): boolean => @@ -97,11 +153,27 @@ export const useComfyManagerStore = defineStore('comfyManager', () => { } const updateInstalledIds = (packs: ManagerPackInstalled[]) => { - installedPacksIds.value = packsToIdSet(packs) + const newIds = packsToIdSet(packs) + console.log('updateInstalledIds: creating set with:', Array.from(newIds)) + installedPacksIds.value = newIds + console.log( + 'updateInstalledIds: final installedPacksIds:', + Array.from(installedPacksIds.value) + ) } const onPacksChanged = () => { const packs = Object.values(installedPacks.value) + console.log( + 'onPacksChanged called with packs:', + packs.map((p) => ({ + key: Object.keys(installedPacks.value).find( + (k) => installedPacks.value[k] === p + ), + cnr_id: p.cnr_id, + aux_id: p.aux_id + })) + ) updateDisabledIds(packs) updateInstalledIds(packs) } @@ -115,23 +187,46 @@ export const useComfyManagerStore = defineStore('comfyManager', () => { } whenever(isStale, refreshInstalledList, { immediate: true }) - whenever(uncompletedCount, () => showManagerProgressDialog()) - const withLogs = (task: () => Promise, taskName: string) => { - const { startListening, stopListening, logs } = useServerLogs() + const enqueueTaskWithLogs = async ( + task: (taskId: string) => Promise, + taskName: string + ) => { + const taskId = uuidv4() + const { startListening, logs } = useServerLogs({ + ui_id: taskId + }) - const loggedTask = async () => { - taskLogs.value.push({ taskName, logs: logs.value }) + try { + // Show progress dialog immediately when task is queued + showManagerProgressDialog() + managerQueue.isProcessing.value = true + + // Prepare logging hook + taskLogs.value.push({ taskName, taskId, logs: logs.value }) await startListening() - return task() - } - const onComplete = async () => { - await stopListening() - setStale() - } + // Queue the task to the server + await task(taskId) + } catch (error) { + // Reset processing state on error + managerQueue.isProcessing.value = false - return { task: loggedTask, onComplete } + // The server has authority over task history in general, but in rare + // case of client-side error, we add that to failed tasks from the client side + taskHistory.value[taskId] = { + ui_id: taskId, + client_id: api.clientId || 'unknown', + kind: 'error', + result: 'failed', + status: { + status_str: 'error', + completed: false, + messages: [error instanceof Error ? error.message : String(error)] + }, + timestamp: new Date().toISOString() + } + } } const installPack = useCachedRequest( @@ -152,39 +247,62 @@ export const useComfyManagerStore = defineStore('comfyManager', () => { } } - const task = () => managerService.installPack(params, signal) - enqueueTask(withLogs(task, `${actionDescription} ${params.id}`)) + const task = (taskId: string) => + managerService.installPack(params, taskId, signal) + await enqueueTaskWithLogs(task, `${actionDescription} ${params.id}`) }, { maxSize: 1 } ) - const uninstallPack = (params: ManagerPackInfo, signal?: AbortSignal) => { + const uninstallPack = async ( + params: ManagerPackInfo, + signal?: AbortSignal + ) => { installPack.clear() installPack.cancel() - const task = () => managerService.uninstallPack(params, signal) - enqueueTask(withLogs(task, t('manager.uninstalling', { id: params.id }))) + const uninstallParams: components['schemas']['UninstallPackParams'] = { + node_name: params.id, + is_unknown: false + } + const task = (taskId: string) => + managerService.uninstallPack(uninstallParams, taskId, signal) + await enqueueTaskWithLogs( + task, + t('manager.uninstalling', { id: params.id }) + ) } const updatePack = useCachedRequest( async (params: ManagerPackInfo, signal?: AbortSignal) => { updateAllPacks.cancel() - const task = () => managerService.updatePack(params, signal) - enqueueTask(withLogs(task, t('g.updating', { id: params.id }))) + const updateParams: components['schemas']['UpdatePackParams'] = { + node_name: params.id, + node_ver: params.version + } + const task = (taskId: string) => + managerService.updatePack(updateParams, taskId, signal) + await enqueueTaskWithLogs(task, t('g.updating', { id: params.id })) }, { maxSize: 1 } ) const updateAllPacks = useCachedRequest( async (params: UpdateAllPacksParams, signal?: AbortSignal) => { - const task = () => managerService.updateAllPacks(params, signal) - enqueueTask(withLogs(task, t('manager.updatingAllPacks'))) + const task = (taskId: string) => + managerService.updateAllPacks(params, taskId, signal) + await enqueueTaskWithLogs(task, t('manager.updatingAllPacks')) }, { maxSize: 1 } ) - const disablePack = (params: ManagerPackInfo, signal?: AbortSignal) => { - const task = () => managerService.disablePack(params, signal) - enqueueTask(withLogs(task, t('g.disabling', { id: params.id }))) + const disablePack = async (params: ManagerPackInfo, signal?: AbortSignal) => { + const disableParams: components['schemas']['DisablePackParams'] = { + node_name: params.id, + is_unknown: false + } + const task = (taskId: string) => + managerService.disablePack(disableParams, taskId, signal) + await enqueueTaskWithLogs(task, t('g.disabling', { id: params.id })) } const getInstalledPackVersion = (packId: string) => { @@ -200,9 +318,6 @@ export const useComfyManagerStore = defineStore('comfyManager', () => { // Manager state isLoading: managerService.isLoading, error: managerService.error, - statusMessage, - allTasksDone, - uncompletedCount, taskLogs, clearLogs, setStale, @@ -215,6 +330,15 @@ export const useComfyManagerStore = defineStore('comfyManager', () => { getInstalledPackVersion, refreshInstalledList, + // Task queue state and actions + taskHistory, + isProcessingTasks: managerQueue.isProcessing, + succeededTasksIds, + failedTasksIds, + succeededTasksLogs, + failedTasksLogs, + managerQueue, // Expose full queue composable for advanced usage + // Pack actions installPack, uninstallPack, @@ -234,6 +358,15 @@ export const useManagerProgressDialogStore = defineStore( 'managerProgressDialog', () => { const isExpanded = ref(false) + const activeTabIndex = ref(0) + + const setActiveTabIndex = (index: number) => { + activeTabIndex.value = index + } + + const getActiveTabIndex = () => { + return activeTabIndex.value + } const toggle = () => { isExpanded.value = !isExpanded.value @@ -250,7 +383,9 @@ export const useManagerProgressDialogStore = defineStore( isExpanded, toggle, collapse, - expand + expand, + setActiveTabIndex, + getActiveTabIndex } } )