Consolidated paginated output tracking

This commit is contained in:
Austin
2026-04-15 13:40:20 -07:00
parent ecb6fbe8fb
commit 3d87a28460
3 changed files with 102 additions and 179 deletions

View File

@@ -1,6 +1,6 @@
import { useAsyncState, whenever } from '@vueuse/core'
import { difference } from 'es-toolkit'
import { defineStore } from 'pinia'
import { defineStore, storeToRefs } from 'pinia'
import { computed, reactive, ref, shallowReactive } from 'vue'
import {
mapInputFileToAssetItem,
@@ -13,7 +13,7 @@ import { isCloud } from '@/platform/distribution/types'
import type { JobListItem } from '@/platform/remote/comfyui/jobs/jobTypes'
import { api } from '@/scripts/api'
import { TaskItemImpl } from './queueStore'
import { TaskItemImpl, useHistoryStore } from './queueStore'
import { useAssetDownloadStore } from './assetDownloadStore'
import { useModelToNodeStore } from './modelToNodeStore'
@@ -84,12 +84,16 @@ function mapHistoryToAssets(historyItems: JobListItem[]): AssetItem[] {
)
}
const BATCH_SIZE = 200
const MAX_HISTORY_ITEMS = 1000 // Maximum items to keep in memory
export const useAssetsStore = defineStore('assets', () => {
const assetDownloadStore = useAssetDownloadStore()
const modelToNodeStore = useModelToNodeStore()
const historyStore = useHistoryStore()
const { isLoadingMore, hasMoreHistory, historyError } =
storeToRefs(historyStore)
const historyAssets = computed(() =>
mapHistoryToAssets(historyStore.historyItems)
)
// Track assets currently being deleted (for loading overlay)
const deletingAssetIds = shallowReactive(new Set<string>())
@@ -106,15 +110,6 @@ export const useAssetsStore = defineStore('assets', () => {
return deletingAssetIds.has(assetId)
}
// Pagination state
const historyOffset = ref(0)
const hasMoreHistory = ref(true)
const isLoadingMore = ref(false)
const allHistoryItems = ref<AssetItem[]>([])
const loadedIds = shallowReactive(new Set<string>())
const fetchInputFiles = isCloud
? fetchInputFilesFromCloud
: fetchInputFilesFromAPI
@@ -132,120 +127,6 @@ export const useAssetsStore = defineStore('assets', () => {
}
})
/**
* Fetch history assets with pagination support
* @param loadMore - true for pagination (append), false for initial load (replace)
*/
const fetchHistoryAssets = async (loadMore = false): Promise<AssetItem[]> => {
// Reset state for initial load
if (!loadMore) {
historyOffset.value = 0
hasMoreHistory.value = true
allHistoryItems.value = []
loadedIds.clear()
}
// Fetch from server with offset
const history = await api.getHistory(BATCH_SIZE, {
offset: historyOffset.value
})
// Convert JobListItems to AssetItems
const newAssets = mapHistoryToAssets(history)
if (loadMore) {
// Filter out duplicates and insert in sorted order
for (const asset of newAssets) {
if (loadedIds.has(asset.id)) {
continue // Skip duplicates
}
loadedIds.add(asset.id)
// Find insertion index to maintain sorted order (newest first)
const assetTime = new Date(asset.created_at ?? 0).getTime()
const insertIndex = allHistoryItems.value.findIndex(
(item) => new Date(item.created_at ?? 0).getTime() < assetTime
)
if (insertIndex === -1) {
// Asset is oldest, append to end
allHistoryItems.value.push(asset)
} else {
// Insert at the correct position
allHistoryItems.value.splice(insertIndex, 0, asset)
}
}
} else {
// Initial load: replace all
allHistoryItems.value = newAssets
newAssets.forEach((asset) => loadedIds.add(asset.id))
}
// Update pagination state
historyOffset.value += BATCH_SIZE
hasMoreHistory.value = history.length === BATCH_SIZE
if (allHistoryItems.value.length > MAX_HISTORY_ITEMS) {
const removed = allHistoryItems.value.slice(MAX_HISTORY_ITEMS)
allHistoryItems.value = allHistoryItems.value.slice(0, MAX_HISTORY_ITEMS)
// Clean up Set
removed.forEach((item) => loadedIds.delete(item.id))
}
return allHistoryItems.value
}
const historyAssets = ref<AssetItem[]>([])
const historyLoading = ref(false)
const historyError = ref<unknown>(null)
/**
* Initial load of history assets
*/
const updateHistory = async () => {
historyLoading.value = true
historyError.value = null
try {
await fetchHistoryAssets(false)
historyAssets.value = allHistoryItems.value
} catch (err) {
console.error('Error fetching history assets:', err)
historyError.value = err
// Keep existing data when error occurs
if (!historyAssets.value.length) {
historyAssets.value = []
}
} finally {
historyLoading.value = false
}
}
/**
* Load more history items (infinite scroll)
*/
const loadMoreHistory = async () => {
// Guard: prevent concurrent loads and check if more items available
if (!hasMoreHistory.value || isLoadingMore.value) return
isLoadingMore.value = true
historyError.value = null
try {
await fetchHistoryAssets(true)
historyAssets.value = allHistoryItems.value
} catch (err) {
console.error('Error loading more history:', err)
historyError.value = err
// Keep existing data when error occurs (consistent with updateHistory)
if (!historyAssets.value.length) {
historyAssets.value = []
}
} finally {
isLoadingMore.value = false
}
}
/**
* Map of asset hash filename to asset item for O(1) lookup
* Cloud assets use asset_hash for the hash-based filename
@@ -726,7 +607,7 @@ export const useAssetsStore = defineStore('assets', () => {
inputAssets,
historyAssets,
inputLoading,
historyLoading,
historyLoading: isLoadingMore,
inputError,
historyError,
hasMoreHistory,
@@ -739,8 +620,8 @@ export const useAssetsStore = defineStore('assets', () => {
// Actions
updateInputs,
updateHistory,
loadMoreHistory,
updateHistory: historyStore.updateHistory,
loadMoreHistory: historyStore.loadMoreHistory,
// Input mapping helpers
inputAssetsByFilename,

View File

@@ -1,5 +1,13 @@
import { defineStore } from 'pinia'
import { computed, ref, shallowRef, toRaw, toValue } from 'vue'
import {
computed,
ref,
shallowRef,
toRaw,
toValue,
watch,
watchEffect
} from 'vue'
import { extractWorkflow } from '@/platform/remote/comfyui/jobs/fetchJobs'
import type {
@@ -474,6 +482,76 @@ export class TaskItemImpl {
)
}
}
export const useHistoryStore = defineStore('history', () => {
const BATCH_SIZE = 200
const MAX_HISTORY_ITEMS = 1000 // Maximum items to keep in memory
let offset = 0
const hasMoreHistory = ref(true)
const isLoadingMore = ref(false)
const historyItems = ref<JobListItem[]>([])
const historyError = ref<unknown>(null)
const loadedIds = new Set<string>()
const fetchHistory = async (): Promise<JobListItem[]> => {
const history = await api.getHistory(BATCH_SIZE, { offset })
const newHistory = history.filter((item) => !loadedIds.has(item.id))
historyItems.value.push(...newHistory)
historyItems.value.sort((a, b) => a.create_time - b.create_time)
newHistory.forEach((item) => loadedIds.add(item.id))
offset += BATCH_SIZE
hasMoreHistory.value = history.length === BATCH_SIZE
if (historyItems.value.length > MAX_HISTORY_ITEMS) {
const removed = historyItems.value.slice(MAX_HISTORY_ITEMS)
historyItems.value = historyItems.value.slice(0, MAX_HISTORY_ITEMS)
removed.forEach((item) => loadedIds.delete(item.id))
}
return historyItems.value
}
const updateHistory = async () => {
offset = 0
hasMoreHistory.value = true
historyItems.value = []
loadedIds.clear()
await loadMoreHistory()
}
const loadMoreHistory = async () => {
if (!hasMoreHistory.value || isLoadingMore.value) return
if (isLoadingMore.value) {
await new Promise((r) => watch(isLoadingMore, r, { once: true }))
return
}
isLoadingMore.value = true
historyError.value = null
try {
await fetchHistory()
} catch (err) {
console.error('Error loading more history:', err)
historyError.value = err
} finally {
isLoadingMore.value = false
}
}
void loadMoreHistory()
return {
hasMoreHistory,
historyError,
historyItems,
isLoadingMore,
loadMoreHistory,
updateHistory
}
})
export const useQueueStore = defineStore('queue', () => {
// Use shallowRef because TaskItemImpl instances are immutable and arrays are
@@ -485,6 +563,15 @@ export const useQueueStore = defineStore('queue', () => {
const maxHistoryItems = ref(64)
const isLoading = ref(false)
const historyStore = useHistoryStore()
//TODO: Fix tests so this can be a computed
watchEffect(
() =>
(historyTasks.value = historyStore.historyItems
.slice(0, toValue(maxHistoryItems))
.map((job) => new TaskItemImpl(job)))
)
// Single-flight coalescing: at most one fetch in flight at a time.
// If update() is called while a fetch is running, the call is coalesced
// and a single re-fetch fires after the current one completes.
@@ -525,17 +612,15 @@ export const useQueueStore = defineStore('queue', () => {
dirty = false
isLoading.value = true
try {
const [queue, history] = await Promise.all([
const [queue] = await Promise.all([
api.getQueue(),
api.getHistory(maxHistoryItems.value)
historyStore.updateHistory()
])
// API returns pre-sorted data (sort_by=create_time&order=desc)
runningTasks.value = queue.Running.map((job) => new TaskItemImpl(job))
pendingTasks.value = queue.Pending.map((job) => new TaskItemImpl(job))
const currentHistory = toValue(historyTasks)
const appearedTasks = [...pendingTasks.value, ...runningTasks.value]
const executionStore = useExecutionStore()
appearedTasks.forEach((task) => {
@@ -557,36 +642,6 @@ export const useQueueStore = defineStore('queue', () => {
])
executionStore.reconcileInitializingJobs(activeJobIds)
}
// Sort by create_time descending and limit to maxItems
const sortedHistory = [...history]
.sort((a, b) => b.create_time - a.create_time)
.slice(0, toValue(maxHistoryItems))
// Reuse existing TaskItemImpl instances or create new
// Must recreate if outputs_count changed (e.g., API started returning it)
const existingByJobId = new Map(
currentHistory.map((impl) => [impl.jobId, impl])
)
const nextHistoryTasks = sortedHistory.map((job) => {
const existing = existingByJobId.get(job.id)
if (!existing) return new TaskItemImpl(job)
// Recreate if outputs_count changed to ensure lazy loading works
if (existing.outputsCount !== (job.outputs_count ?? undefined)) {
return new TaskItemImpl(job)
}
return existing
})
const isHistoryUnchanged =
nextHistoryTasks.length === currentHistory.length &&
nextHistoryTasks.every((task, index) => task === currentHistory[index])
if (!isHistoryUnchanged) {
historyTasks.value = nextHistoryTasks
}
hasFetchedHistorySnapshot.value = true
} finally {
isLoading.value = false
inFlight = false

View File

@@ -76,7 +76,6 @@ import { app } from '@/scripts/app'
import { setupAutoQueueHandler } from '@/services/autoQueueService'
import { useKeybindingService } from '@/platform/keybindings/keybindingService'
import { useAppMode } from '@/composables/useAppMode'
import { useAssetsStore } from '@/stores/assetsStore'
import { useCommandStore } from '@/stores/commandStore'
import { useExecutionStore } from '@/stores/executionStore'
import { useAuthStore } from '@/stores/authStore'
@@ -106,7 +105,6 @@ const settingStore = useSettingStore()
const executionStore = useExecutionStore()
const colorPaletteStore = useColorPaletteStore()
const queueStore = useQueueStore()
const assetsStore = useAssetsStore()
const versionCompatibilityStore = useVersionCompatibilityStore()
const graphCanvasContainerRef = ref<HTMLDivElement | null>(null)
const { isBuilderMode } = useAppMode()
@@ -226,25 +224,14 @@ void useBottomPanelStore().registerCoreBottomPanelTabs()
useQueuePolling()
const queuePendingTaskCountStore = useQueuePendingTaskCountStore()
const sidebarTabStore = useSidebarTabStore()
const onStatus = async (e: CustomEvent<StatusWsMessageStatus>) => {
queuePendingTaskCountStore.update(e)
await queueStore.update()
// Only update assets if the assets sidebar is currently open
// When sidebar is closed, AssetsSidebarTab.vue will refresh on mount
if (sidebarTabStore.activeSidebarTabId === 'assets' || linearMode.value) {
await assetsStore.updateHistory()
}
}
const onExecutionSuccess = async () => {
await queueStore.update()
// Only update assets if the assets sidebar is currently open
// When sidebar is closed, AssetsSidebarTab.vue will refresh on mount
if (sidebarTabStore.activeSidebarTabId === 'assets' || linearMode.value) {
await assetsStore.updateHistory()
}
}
const { onReconnecting, onReconnected } = useReconnectingNotification()