[manager] Update composables and state management

Updated manager queue composable, server logs composable, workflow packs composable, and manager store to support the new manager API structure and state management patterns.
This commit is contained in:
bymyself
2025-06-13 13:56:27 -07:00
committed by Jin Yi
parent b5b7945d8b
commit d04a02e3c7
4 changed files with 308 additions and 133 deletions

View File

@@ -7,7 +7,7 @@ import { app } from '@/scripts/app'
import { useComfyRegistryStore } from '@/stores/comfyRegistryStore'
import { useNodeDefStore } from '@/stores/nodeDefStore'
import { useSystemStatsStore } from '@/stores/systemStatsStore'
import { SelectedVersion, UseNodePacksOptions } from '@/types/comfyManagerTypes'
import { UseNodePacksOptions } from '@/types/comfyManagerTypes'
import type { components } from '@/types/comfyRegistryTypes'
type WorkflowPack = {
@@ -65,8 +65,7 @@ export const useWorkflowPacks = (options: UseNodePacksOptions = {}) => {
return {
id: CORE_NODES_PACK_NAME,
version:
systemStatsStore.systemStats?.system?.comfyui_version ??
SelectedVersion.NIGHTLY
systemStatsStore.systemStats?.system?.comfyui_version ?? 'nightly'
}
}
@@ -76,7 +75,7 @@ export const useWorkflowPacks = (options: UseNodePacksOptions = {}) => {
if (pack) {
return {
id: pack.id,
version: pack.latest_version?.version ?? SelectedVersion.NIGHTLY
version: pack.latest_version?.version ?? 'nightly'
}
}

View File

@@ -1,101 +1,114 @@
import { useEventListener, whenever } from '@vueuse/core'
import { computed, readonly, ref } from 'vue'
import { Ref, computed, ref } from 'vue'
import { api } from '@/scripts/api'
import { ManagerWsQueueStatus } from '@/types/comfyManagerTypes'
import { useDialogService } from '@/services/dialogService'
import { components } from '@/types/generatedManagerTypes'
type QueuedTask<T> = {
task: () => Promise<T>
onComplete?: () => void
}
type ManagerTaskHistory = Record<
string,
components['schemas']['TaskHistoryItem']
>
type ManagerTaskQueue = components['schemas']['TaskStateMessage']
type ManagerWsTaskDoneMsg = components['schemas']['MessageTaskDone']
type ManagerWsTaskStartedMsg = components['schemas']['MessageTaskStarted']
const MANAGER_WS_MSG_TYPE = 'cm-queue-status'
const MANAGER_WS_TASK_DONE_NAME = 'cm-task-completed'
const MANAGER_WS_TASK_STARTED_NAME = 'cm-task-started'
export const useManagerQueue = () => {
const clientQueueItems = ref<QueuedTask<unknown>[]>([])
const clientQueueLength = computed(() => clientQueueItems.value.length)
const onCompletedQueue = ref<((() => void) | undefined)[]>([])
const onCompleteWaitingCount = ref(0)
const uncompletedCount = computed(
() => clientQueueLength.value + onCompleteWaitingCount.value
export const useManagerQueue = (
taskHistory: Ref<ManagerTaskHistory>,
taskQueue: Ref<ManagerTaskQueue>,
installedPacks: Ref<Record<string, any>>
) => {
const { showManagerProgressDialog } = useDialogService()
// Task queue state (read-only from server)
const maxHistoryItems = ref(64)
const isLoading = ref(false)
const isProcessing = ref(false)
// Computed values
const currentQueueLength = computed(
() =>
taskQueue.value.running_queue.length +
taskQueue.value.pending_queue.length
)
const serverQueueStatus = ref<ManagerWsQueueStatus>(ManagerWsQueueStatus.DONE)
const isServerIdle = computed(
() => serverQueueStatus.value === ManagerWsQueueStatus.DONE
)
const updateProcessingState = () => {
isProcessing.value = currentQueueLength.value > 0
}
const allTasksDone = computed(
() => isServerIdle.value && clientQueueLength.value === 0
)
const nextTaskReady = computed(
() => isServerIdle.value && clientQueueLength.value > 0
() => !isProcessing.value && currentQueueLength.value === 0
)
const historyCount = computed(() => Object.keys(taskHistory.value).length)
const cleanupListener = useEventListener(
// WebSocket event listener for task done
const cleanupTaskDoneListener = useEventListener(
api,
MANAGER_WS_MSG_TYPE,
(event: CustomEvent<{ status: ManagerWsQueueStatus }>) => {
if (event?.type === MANAGER_WS_MSG_TYPE && event.detail?.status) {
serverQueueStatus.value = event.detail.status
MANAGER_WS_TASK_DONE_NAME,
(event: CustomEvent<ManagerWsTaskDoneMsg>) => {
if (event?.type === MANAGER_WS_TASK_DONE_NAME) {
const { state } = event.detail
taskQueue.value.running_queue = state.running_queue
taskQueue.value.pending_queue = state.pending_queue
taskHistory.value = state.history
if (state.installed_packs) {
console.log(
'Updating installedPacks from WebSocket:',
Object.keys(state.installed_packs)
)
installedPacks.value = state.installed_packs
}
updateProcessingState()
}
}
)
const startNextTask = () => {
const nextTask = clientQueueItems.value.shift()
if (!nextTask) return
const { task, onComplete } = nextTask
if (onComplete) {
// Set the task's onComplete to be executed the next time the server is idle
onCompletedQueue.value.push(onComplete)
onCompleteWaitingCount.value++
}
task().catch((e) => {
const message = `Error enqueuing task for ComfyUI Manager: ${e}`
console.error(message)
})
}
const enqueueTask = <T>(task: QueuedTask<T>): void => {
clientQueueItems.value.push(task)
}
const clearQueue = () => {
clientQueueItems.value = []
onCompletedQueue.value = []
onCompleteWaitingCount.value = 0
}
const cleanup = () => {
clearQueue()
cleanupListener()
}
whenever(nextTaskReady, startNextTask)
whenever(isServerIdle, () => {
if (onCompletedQueue.value?.length) {
while (
onCompleteWaitingCount.value > 0 &&
onCompletedQueue.value.length > 0
) {
const onComplete = onCompletedQueue.value.shift()
onComplete?.()
onCompleteWaitingCount.value--
// WebSocket event listener for task started
const cleanupTaskStartedListener = useEventListener(
api,
MANAGER_WS_TASK_STARTED_NAME,
(event: CustomEvent<ManagerWsTaskStartedMsg>) => {
if (event?.type === MANAGER_WS_TASK_STARTED_NAME) {
const { state } = event.detail
taskQueue.value.running_queue = state.running_queue
taskQueue.value.pending_queue = state.pending_queue
taskHistory.value = state.history
if (state.installed_packs) {
console.log(
'Updating installedPacks from WebSocket:',
Object.keys(state.installed_packs)
)
installedPacks.value = state.installed_packs
}
updateProcessingState()
}
}
})
)
whenever(currentQueueLength, () => showManagerProgressDialog())
const stopListening = () => {
cleanupTaskDoneListener()
cleanupTaskStartedListener()
}
return {
allTasksDone,
statusMessage: readonly(serverQueueStatus),
queueLength: clientQueueLength,
uncompletedCount,
// Queue state (read-only from server)
taskHistory,
taskQueue,
maxHistoryItems,
isLoading,
enqueueTask,
clearQueue,
cleanup
// Computed state
allTasksDone,
isProcessing,
queueLength: currentQueueLength,
historyCount,
// Actions
stopListening
}
}

View File

@@ -1,24 +1,30 @@
import { useEventListener } from '@vueuse/core'
import { onUnmounted, ref } from 'vue'
import { ref } from 'vue'
import { LogsWsMessage } from '@/schemas/apiSchema'
import { api } from '@/scripts/api'
import { components } from '@/types/generatedManagerTypes'
const LOGS_MESSAGE_TYPE = 'logs'
const MANAGER_WS_TASK_DONE_NAME = 'cm-task-completed'
type ManagerWsTaskDoneMsg = components['schemas']['MessageTaskDone']
interface UseServerLogsOptions {
ui_id: string
immediate?: boolean
messageFilter?: (message: string) => boolean
}
export const useServerLogs = (options: UseServerLogsOptions = {}) => {
export const useServerLogs = (options: UseServerLogsOptions) => {
const {
immediate = false,
messageFilter = (msg: string) => Boolean(msg.trim())
} = options
const logs = ref<string[]>([])
let stop: ReturnType<typeof useEventListener> | null = null
let stopLogs: ReturnType<typeof useEventListener> | null = null
let stopTaskDone: ReturnType<typeof useEventListener> | null = null
const isValidLogEvent = (event: CustomEvent<LogsWsMessage>) =>
event?.type === LOGS_MESSAGE_TYPE && event.detail?.entries?.length > 0
@@ -28,33 +34,55 @@ export const useServerLogs = (options: UseServerLogsOptions = {}) => {
const handleLogMessage = (event: CustomEvent<LogsWsMessage>) => {
if (isValidLogEvent(event)) {
logs.value.push(...parseLogMessage(event))
const messages = parseLogMessage(event)
if (messages.length > 0) {
logs.value.push(...messages)
}
}
}
const start = async () => {
const handleTaskDone = (event: CustomEvent<ManagerWsTaskDoneMsg>) => {
if (event?.type === MANAGER_WS_TASK_DONE_NAME) {
const { state } = event.detail
// Check if our task is now in the history (completed)
if (state.history[options.ui_id]) {
void stopListening()
}
}
}
const startListening = async () => {
await api.subscribeLogs(true)
stop = useEventListener(api, LOGS_MESSAGE_TYPE, handleLogMessage)
stopLogs = useEventListener(api, LOGS_MESSAGE_TYPE, handleLogMessage)
stopTaskDone = useEventListener(
api,
MANAGER_WS_TASK_DONE_NAME,
handleTaskDone
)
}
const stopListening = async () => {
stop?.()
stop = null
console.log('stopListening')
stopLogs?.()
stopTaskDone?.()
stopLogs = null
stopTaskDone = null
await api.subscribeLogs(false)
}
if (immediate) {
void start()
void startListening()
}
onUnmounted(async () => {
const cleanup = async () => {
await stopListening()
logs.value = []
})
}
return {
logs,
startListening: start,
stopListening
startListening,
stopListening,
cleanup
}
}