mirror of
https://github.com/Comfy-Org/ComfyUI_frontend.git
synced 2026-05-25 15:15:47 +00:00
fix: converge asset tag cache and server on partial-failure (#11695)
## Summary `assetsStore.updateAssetTags` issues `removeAssetTags` and `addAssetTags` as two separate network calls. When the remove succeeds server-side but the subsequent add rejects, the cache rolls back to the original tags while the server has already dropped the removed tags — cache and backend diverge until the next refetch. This adds a compensating action: if remove succeeded and add then fails, attempt to re-add the just-removed tags so the server returns to its prior state. If the compensating add also fails, invalidate the category cache so the next access refetches fresh state. - Fixes #11694 - Fixes [FE-473](https://linear.app/comfyorg/issue/FE-473/fix-converge-asset-tag-cache-and-server-on-partial-failure) ## Changes - `src/stores/assetsStore.ts`: track which tags were already removed server-side; on add-failure, re-add them; if compensation fails, invalidate the resolved category cache via `resolveCategory` + `invalidateCategory`. - `src/stores/assetsStore.test.ts`: extend the cloud asset-service mock with `addAssetTags` / `removeAssetTags` and add a `updateAssetTags partial-failure compensation` describe block: - re-adds removed tags when add fails so cache and server converge - invalidates the category cache when compensation also fails - does not attempt compensation when only the add was attempted (no remove ran) ## Test plan - [x] `pnpm exec vitest run src/stores/assetsStore.test.ts` — 42 passed (39 prior + 3 new) - [x] `pnpm typecheck` - [x] `pnpm lint` - [x] `pnpm knip` ┆Issue is synchronized with this [Notion page](https://www.notion.so/PR-11695-fix-converge-asset-tag-cache-and-server-on-partial-failure-34f6d73d365081149900f95b6ee4bfa9) by [Unito](https://www.unito.io)
This commit is contained in:
@@ -1148,13 +1148,20 @@ describe('assetsStore - Model Assets Cache (Cloud)', () => {
|
||||
'featured'
|
||||
])
|
||||
})
|
||||
})
|
||||
|
||||
it('rolls back the cache when removeAssetTags succeeds but addAssetTags rejects', async () => {
|
||||
// Documents the known recovery gap on partial-failure during a
|
||||
// "change category" mutation: remove succeeds server-side, add fails,
|
||||
// and the cache is restored to the original tags. The server now has
|
||||
// the old category tag removed, so the cache and backend diverge until
|
||||
// the next refetch — surface that gap here rather than papering over it.
|
||||
describe('updateAssetTags partial-failure compensation', () => {
|
||||
let consoleSpy: ReturnType<typeof vi.spyOn>
|
||||
|
||||
beforeEach(() => {
|
||||
consoleSpy = vi.spyOn(console, 'error').mockImplementation(() => {})
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
consoleSpy.mockRestore()
|
||||
})
|
||||
|
||||
it('re-adds removed tags when add fails so cache and server converge', async () => {
|
||||
const store = useAssetsStore()
|
||||
const asset = createMockAsset('tags-partial-fail', ['models', 'loras'])
|
||||
|
||||
@@ -1167,10 +1174,12 @@ describe('assetsStore - Model Assets Cache (Cloud)', () => {
|
||||
removed: ['loras'],
|
||||
total_tags: ['models']
|
||||
})
|
||||
vi.mocked(assetService.addAssetTags).mockRejectedValueOnce(
|
||||
new Error('500 add failed')
|
||||
)
|
||||
const consoleSpy = vi.spyOn(console, 'error').mockImplementation(() => {})
|
||||
vi.mocked(assetService.addAssetTags)
|
||||
.mockRejectedValueOnce(new Error('500 add failed'))
|
||||
.mockResolvedValueOnce({
|
||||
added: ['loras'],
|
||||
total_tags: ['models', 'loras']
|
||||
})
|
||||
|
||||
await store.updateAssetTags(
|
||||
asset,
|
||||
@@ -1178,20 +1187,161 @@ describe('assetsStore - Model Assets Cache (Cloud)', () => {
|
||||
'LoraLoader'
|
||||
)
|
||||
|
||||
expect(vi.mocked(assetService.removeAssetTags)).toHaveBeenCalledWith(
|
||||
'tags-partial-fail',
|
||||
['loras']
|
||||
)
|
||||
expect(vi.mocked(assetService.addAssetTags)).toHaveBeenCalledWith(
|
||||
expect(vi.mocked(assetService.addAssetTags)).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
'tags-partial-fail',
|
||||
['checkpoints']
|
||||
)
|
||||
// Cache restored to original tags even though the server has already
|
||||
// removed 'loras'. This codifies a known divergence — fix the recovery
|
||||
// semantics in updateAssetTags to address it (e.g. invalidate the
|
||||
// category cache, or reconcile against the last confirmed total_tags).
|
||||
expect(vi.mocked(assetService.addAssetTags)).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
'tags-partial-fail',
|
||||
['loras']
|
||||
)
|
||||
expect(store.getAssets('LoraLoader')[0].tags).toEqual(['models', 'loras'])
|
||||
consoleSpy.mockRestore()
|
||||
})
|
||||
|
||||
it('invalidates the category cache when compensation also fails', async () => {
|
||||
const store = useAssetsStore()
|
||||
const asset = createMockAsset('tags-compensation-fail', [
|
||||
'models',
|
||||
'loras'
|
||||
])
|
||||
|
||||
vi.mocked(assetService.getAssetsForNodeType).mockResolvedValueOnce([
|
||||
asset
|
||||
])
|
||||
await store.updateModelsForNodeType('LoraLoader')
|
||||
|
||||
vi.mocked(assetService.removeAssetTags).mockResolvedValueOnce({
|
||||
removed: ['loras'],
|
||||
total_tags: ['models']
|
||||
})
|
||||
vi.mocked(assetService.addAssetTags)
|
||||
.mockRejectedValueOnce(new Error('500 add failed'))
|
||||
.mockRejectedValueOnce(new Error('503 compensation failed'))
|
||||
|
||||
await store.updateAssetTags(
|
||||
asset,
|
||||
['models', 'checkpoints'],
|
||||
'LoraLoader'
|
||||
)
|
||||
|
||||
expect(store.hasCategory('loras')).toBe(false)
|
||||
expect(vi.mocked(assetService.addAssetTags)).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
|
||||
it('invalidates overlapping tag caches that also contain the asset when cacheKey is provided', async () => {
|
||||
const store = useAssetsStore()
|
||||
const asset = createMockAsset('tags-overlap-fail', ['models', 'loras'])
|
||||
|
||||
vi.mocked(assetService.getAssetsForNodeType).mockResolvedValueOnce([
|
||||
asset
|
||||
])
|
||||
await store.updateModelsForNodeType('LoraLoader')
|
||||
vi.mocked(assetService.getAssetsByTag).mockResolvedValueOnce([asset])
|
||||
await store.updateModelsForTag('models')
|
||||
|
||||
expect(store.hasCategory('loras')).toBe(true)
|
||||
expect(store.hasCategory('tag:models')).toBe(true)
|
||||
|
||||
vi.mocked(assetService.removeAssetTags).mockResolvedValueOnce({
|
||||
removed: ['loras'],
|
||||
total_tags: ['models']
|
||||
})
|
||||
vi.mocked(assetService.addAssetTags)
|
||||
.mockRejectedValueOnce(new Error('500 add failed'))
|
||||
.mockRejectedValueOnce(new Error('503 compensation failed'))
|
||||
|
||||
await store.updateAssetTags(
|
||||
asset,
|
||||
['models', 'checkpoints'],
|
||||
'LoraLoader'
|
||||
)
|
||||
|
||||
expect(store.hasCategory('loras')).toBe(false)
|
||||
expect(store.hasCategory('tag:models')).toBe(false)
|
||||
})
|
||||
|
||||
it('does not attempt compensation when only the add was attempted', async () => {
|
||||
const store = useAssetsStore()
|
||||
const asset = createMockAsset('tags-add-only-fail', ['models'])
|
||||
|
||||
vi.mocked(assetService.getAssetsForNodeType).mockResolvedValueOnce([
|
||||
asset
|
||||
])
|
||||
await store.updateModelsForNodeType('CheckpointLoaderSimple')
|
||||
|
||||
vi.mocked(assetService.addAssetTags).mockRejectedValueOnce(
|
||||
new Error('500 add failed')
|
||||
)
|
||||
|
||||
await store.updateAssetTags(
|
||||
asset,
|
||||
['models', 'featured'],
|
||||
'CheckpointLoaderSimple'
|
||||
)
|
||||
|
||||
expect(vi.mocked(assetService.addAssetTags)).toHaveBeenCalledTimes(1)
|
||||
expect(vi.mocked(assetService.removeAssetTags)).not.toHaveBeenCalled()
|
||||
expect(store.getAssets('CheckpointLoaderSimple')[0].tags).toEqual([
|
||||
'models'
|
||||
])
|
||||
})
|
||||
|
||||
it('treats removeAssetTags resolution as success even when removed is empty', async () => {
|
||||
const store = useAssetsStore()
|
||||
const asset = createMockAsset('tags-empty-removed', ['models', 'loras'])
|
||||
|
||||
vi.mocked(assetService.getAssetsForNodeType).mockResolvedValueOnce([
|
||||
asset
|
||||
])
|
||||
await store.updateModelsForNodeType('LoraLoader')
|
||||
|
||||
vi.mocked(assetService.removeAssetTags).mockResolvedValueOnce({
|
||||
removed: [],
|
||||
not_present: ['loras'],
|
||||
total_tags: ['models']
|
||||
})
|
||||
vi.mocked(assetService.addAssetTags).mockRejectedValueOnce(
|
||||
new Error('500 add failed')
|
||||
)
|
||||
|
||||
await store.updateAssetTags(
|
||||
asset,
|
||||
['models', 'checkpoints'],
|
||||
'LoraLoader'
|
||||
)
|
||||
|
||||
expect(vi.mocked(assetService.addAssetTags)).toHaveBeenCalledTimes(1)
|
||||
expect(store.getAssets('LoraLoader')[0].tags).toEqual(['models', 'loras'])
|
||||
})
|
||||
|
||||
it('invalidates every category containing the asset when no cacheKey is provided', async () => {
|
||||
const store = useAssetsStore()
|
||||
const asset = createMockAsset('tags-no-cachekey', ['models', 'loras'])
|
||||
|
||||
vi.mocked(assetService.getAssetsForNodeType).mockResolvedValueOnce([
|
||||
asset
|
||||
])
|
||||
await store.updateModelsForNodeType('LoraLoader')
|
||||
vi.mocked(assetService.getAssetsByTag).mockResolvedValueOnce([asset])
|
||||
await store.updateModelsForTag('models')
|
||||
|
||||
expect(store.hasCategory('loras')).toBe(true)
|
||||
expect(store.hasCategory('tag:models')).toBe(true)
|
||||
|
||||
vi.mocked(assetService.removeAssetTags).mockResolvedValueOnce({
|
||||
removed: ['loras'],
|
||||
total_tags: ['models']
|
||||
})
|
||||
vi.mocked(assetService.addAssetTags)
|
||||
.mockRejectedValueOnce(new Error('500 add failed'))
|
||||
.mockRejectedValueOnce(new Error('503 compensation failed'))
|
||||
|
||||
await store.updateAssetTags(asset, ['models', 'checkpoints'])
|
||||
|
||||
expect(store.hasCategory('loras')).toBe(false)
|
||||
expect(store.hasCategory('tag:models')).toBe(false)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -6,7 +6,10 @@ import {
|
||||
mapInputFileToAssetItem,
|
||||
mapTaskOutputToAssetItem
|
||||
} from '@/platform/assets/composables/media/assetMappers'
|
||||
import type { AssetItem } from '@/platform/assets/schemas/assetSchema'
|
||||
import type {
|
||||
AssetItem,
|
||||
TagsOperationResult
|
||||
} from '@/platform/assets/schemas/assetSchema'
|
||||
import { assetService } from '@/platform/assets/services/assetService'
|
||||
import type { PaginationOptions } from '@/platform/assets/services/assetService'
|
||||
import { isCloud } from '@/platform/distribution/types'
|
||||
@@ -610,11 +613,16 @@ export const useAssetsStore = defineStore('assets', () => {
|
||||
|
||||
updateAssetInCache(asset.id, { tags: newTags }, cacheKey)
|
||||
|
||||
let removedTagsOnServer: string[] = []
|
||||
try {
|
||||
const removeResult =
|
||||
tagsToRemove.length > 0
|
||||
? await assetService.removeAssetTags(asset.id, tagsToRemove)
|
||||
: undefined
|
||||
let removeResult: TagsOperationResult | undefined
|
||||
if (tagsToRemove.length > 0) {
|
||||
removeResult = await assetService.removeAssetTags(
|
||||
asset.id,
|
||||
tagsToRemove
|
||||
)
|
||||
removedTagsOnServer = removeResult.removed ?? tagsToRemove
|
||||
}
|
||||
|
||||
const addResult =
|
||||
tagsToAdd.length > 0
|
||||
@@ -628,6 +636,33 @@ export const useAssetsStore = defineStore('assets', () => {
|
||||
} catch (error) {
|
||||
console.error('Failed to update asset tags:', error)
|
||||
updateAssetInCache(asset.id, { tags: originalTags }, cacheKey)
|
||||
|
||||
if (removedTagsOnServer.length > 0) {
|
||||
try {
|
||||
await assetService.addAssetTags(asset.id, removedTagsOnServer)
|
||||
} catch (compensationError) {
|
||||
console.error(
|
||||
'Failed to restore tags after partial failure; invalidating cache to force refetch:',
|
||||
compensationError
|
||||
)
|
||||
const categoriesToInvalidate = new Set<string>()
|
||||
const resolved = cacheKey ? resolveCategory(cacheKey) : undefined
|
||||
if (resolved) {
|
||||
categoriesToInvalidate.add(resolved)
|
||||
}
|
||||
for (const [
|
||||
category,
|
||||
state
|
||||
] of modelStateByCategory.value.entries()) {
|
||||
if (state.assets?.has(asset.id)) {
|
||||
categoriesToInvalidate.add(category)
|
||||
}
|
||||
}
|
||||
for (const category of categoriesToInvalidate) {
|
||||
invalidateCategory(category)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user