mirror of
https://github.com/ostris/ai-toolkit.git
synced 2026-04-30 03:01:28 +00:00
Added queing system to the UI
This commit is contained in:
71
ui/cron/actions/processQueue.ts
Normal file
71
ui/cron/actions/processQueue.ts
Normal file
@@ -0,0 +1,71 @@
|
||||
import prisma from '../prisma';
|
||||
|
||||
import { Job, Queue } from '@prisma/client';
|
||||
import startJob from './startJob';
|
||||
|
||||
export default async function processQueue() {
|
||||
const queues: Queue[] = await prisma.queue.findMany({
|
||||
orderBy: {
|
||||
id: 'asc',
|
||||
},
|
||||
});
|
||||
|
||||
for (const queue of queues) {
|
||||
if (!queue.is_running) {
|
||||
// stop any running jobs first
|
||||
const runningJobs: Job[] = await prisma.job.findMany({
|
||||
where: {
|
||||
status: 'running',
|
||||
gpu_ids: queue.gpu_ids,
|
||||
},
|
||||
});
|
||||
|
||||
for (const job of runningJobs) {
|
||||
console.log(`Stopping job ${job.id} on GPU(s) ${job.gpu_ids}`);
|
||||
await prisma.job.update({
|
||||
where: { id: job.id },
|
||||
data: {
|
||||
return_to_queue: true,
|
||||
info: 'Stopping job...',
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
if (queue.is_running) {
|
||||
// first see if one is already running, status of running or stopping
|
||||
const runningJob: Job | null = await prisma.job.findFirst({
|
||||
where: {
|
||||
status: { in: ['running', 'stopping'] },
|
||||
gpu_ids: queue.gpu_ids,
|
||||
},
|
||||
});
|
||||
|
||||
if (runningJob) {
|
||||
// already running, nothing to do
|
||||
continue; // skip to next queue
|
||||
} else {
|
||||
// find the next job in the queue
|
||||
const nextJob: Job | null = await prisma.job.findFirst({
|
||||
where: {
|
||||
status: 'queued',
|
||||
gpu_ids: queue.gpu_ids,
|
||||
},
|
||||
orderBy: {
|
||||
queue_position: 'asc',
|
||||
},
|
||||
});
|
||||
if (nextJob) {
|
||||
console.log(`Starting job ${nextJob.id} on GPU(s) ${nextJob.gpu_ids}`);
|
||||
await startJob(nextJob.id);
|
||||
} else {
|
||||
// no more jobs, stop the queue
|
||||
console.log(`No more jobs in queue for GPU(s) ${queue.gpu_ids}, stopping queue`);
|
||||
await prisma.queue.update({
|
||||
where: { id: queue.id },
|
||||
data: { is_running: false },
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
179
ui/cron/actions/startJob.ts
Normal file
179
ui/cron/actions/startJob.ts
Normal file
@@ -0,0 +1,179 @@
|
||||
import prisma from '../prisma';
|
||||
import { Job } from '@prisma/client';
|
||||
import { spawn } from 'child_process';
|
||||
import path from 'path';
|
||||
import fs from 'fs';
|
||||
import { TOOLKIT_ROOT, getTrainingFolder, getHFToken } from '../paths';
|
||||
const isWindows = process.platform === 'win32';
|
||||
|
||||
const startAndWatchJob = (job: Job) => {
|
||||
// starts and watches the job asynchronously
|
||||
return new Promise<void>(async (resolve, reject) => {
|
||||
const jobID = job.id;
|
||||
|
||||
// setup the training
|
||||
const trainingRoot = await getTrainingFolder();
|
||||
|
||||
const trainingFolder = path.join(trainingRoot, job.name);
|
||||
if (!fs.existsSync(trainingFolder)) {
|
||||
fs.mkdirSync(trainingFolder, { recursive: true });
|
||||
}
|
||||
|
||||
// make the config file
|
||||
const configPath = path.join(trainingFolder, '.job_config.json');
|
||||
|
||||
//log to path
|
||||
const logPath = path.join(trainingFolder, 'log.txt');
|
||||
|
||||
try {
|
||||
// if the log path exists, move it to a folder called logs and rename it {num}_log.txt, looking for the highest num
|
||||
// if the log path does not exist, create it
|
||||
if (fs.existsSync(logPath)) {
|
||||
const logsFolder = path.join(trainingFolder, 'logs');
|
||||
if (!fs.existsSync(logsFolder)) {
|
||||
fs.mkdirSync(logsFolder, { recursive: true });
|
||||
}
|
||||
|
||||
let num = 0;
|
||||
while (fs.existsSync(path.join(logsFolder, `${num}_log.txt`))) {
|
||||
num++;
|
||||
}
|
||||
|
||||
fs.renameSync(logPath, path.join(logsFolder, `${num}_log.txt`));
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('Error moving log file:', e);
|
||||
}
|
||||
|
||||
// update the config dataset path
|
||||
const jobConfig = JSON.parse(job.job_config);
|
||||
jobConfig.config.process[0].sqlite_db_path = path.join(TOOLKIT_ROOT, 'aitk_db.db');
|
||||
|
||||
// write the config file
|
||||
fs.writeFileSync(configPath, JSON.stringify(jobConfig, null, 2));
|
||||
|
||||
let pythonPath = 'python';
|
||||
// use .venv or venv if it exists
|
||||
if (fs.existsSync(path.join(TOOLKIT_ROOT, '.venv'))) {
|
||||
if (isWindows) {
|
||||
pythonPath = path.join(TOOLKIT_ROOT, '.venv', 'Scripts', 'python.exe');
|
||||
} else {
|
||||
pythonPath = path.join(TOOLKIT_ROOT, '.venv', 'bin', 'python');
|
||||
}
|
||||
} else if (fs.existsSync(path.join(TOOLKIT_ROOT, 'venv'))) {
|
||||
if (isWindows) {
|
||||
pythonPath = path.join(TOOLKIT_ROOT, 'venv', 'Scripts', 'python.exe');
|
||||
} else {
|
||||
pythonPath = path.join(TOOLKIT_ROOT, 'venv', 'bin', 'python');
|
||||
}
|
||||
}
|
||||
|
||||
const runFilePath = path.join(TOOLKIT_ROOT, 'run.py');
|
||||
if (!fs.existsSync(runFilePath)) {
|
||||
console.error(`run.py not found at path: ${runFilePath}`);
|
||||
await prisma.job.update({
|
||||
where: { id: jobID },
|
||||
data: {
|
||||
status: 'error',
|
||||
info: `Error launching job: run.py not found`,
|
||||
},
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const additionalEnv: any = {
|
||||
AITK_JOB_ID: jobID,
|
||||
CUDA_DEVICE_ORDER: 'PCI_BUS_ID',
|
||||
CUDA_VISIBLE_DEVICES: `${job.gpu_ids}`,
|
||||
IS_AI_TOOLKIT_UI: '1',
|
||||
};
|
||||
|
||||
// HF_TOKEN
|
||||
const hfToken = await getHFToken();
|
||||
if (hfToken && hfToken.trim() !== '') {
|
||||
additionalEnv.HF_TOKEN = hfToken;
|
||||
}
|
||||
|
||||
// Add the --log argument to the command
|
||||
const args = [runFilePath, configPath, '--log', logPath];
|
||||
|
||||
try {
|
||||
let subprocess;
|
||||
|
||||
if (isWindows) {
|
||||
// Spawn Python directly on Windows so the process can survive parent exit
|
||||
subprocess = spawn(pythonPath, args, {
|
||||
env: {
|
||||
...process.env,
|
||||
...additionalEnv,
|
||||
},
|
||||
cwd: TOOLKIT_ROOT,
|
||||
detached: true,
|
||||
windowsHide: true,
|
||||
stdio: 'ignore', // don't tie stdio to parent
|
||||
});
|
||||
} else {
|
||||
// For non-Windows platforms, fully detach and ignore stdio so it survives daemon-like
|
||||
subprocess = spawn(pythonPath, args, {
|
||||
detached: true,
|
||||
stdio: 'ignore',
|
||||
env: {
|
||||
...process.env,
|
||||
...additionalEnv,
|
||||
},
|
||||
cwd: TOOLKIT_ROOT,
|
||||
});
|
||||
}
|
||||
|
||||
// Important: let the child run independently of this Node process.
|
||||
if (subprocess.unref) {
|
||||
subprocess.unref();
|
||||
}
|
||||
|
||||
// Optionally write a pid file for future management (stop/inspect) without keeping streams open
|
||||
try {
|
||||
fs.writeFileSync(path.join(trainingFolder, 'pid.txt'), String(subprocess.pid ?? ''), { flag: 'w' });
|
||||
} catch (e) {
|
||||
console.error('Error writing pid file:', e);
|
||||
}
|
||||
|
||||
// (No stdout/stderr listeners — logging should go to --log handled by your Python)
|
||||
// (No monitoring loop — the whole point is to let it live past this worker)
|
||||
} catch (error: any) {
|
||||
// Handle any exceptions during process launch
|
||||
console.error('Error launching process:', error);
|
||||
|
||||
await prisma.job.update({
|
||||
where: { id: jobID },
|
||||
data: {
|
||||
status: 'error',
|
||||
info: `Error launching job: ${error?.message || 'Unknown error'}`,
|
||||
},
|
||||
});
|
||||
return;
|
||||
}
|
||||
// Resolve the promise immediately after starting the process
|
||||
resolve();
|
||||
});
|
||||
};
|
||||
|
||||
export default async function startJob(jobID: string) {
|
||||
const job: Job | null = await prisma.job.findUnique({
|
||||
where: { id: jobID },
|
||||
});
|
||||
if (!job) {
|
||||
console.error(`Job with ID ${jobID} not found`);
|
||||
return;
|
||||
}
|
||||
// update job status to 'running', this will run sync so we don't start multiple jobs.
|
||||
await prisma.job.update({
|
||||
where: { id: jobID },
|
||||
data: {
|
||||
status: 'running',
|
||||
stop: false,
|
||||
info: 'Starting job...',
|
||||
},
|
||||
});
|
||||
// start and watch the job asynchronously so the cron can continue
|
||||
startAndWatchJob(job);
|
||||
}
|
||||
37
ui/cron/paths.ts
Normal file
37
ui/cron/paths.ts
Normal file
@@ -0,0 +1,37 @@
|
||||
import path from 'path';
|
||||
import prisma from './prisma';
|
||||
|
||||
export const TOOLKIT_ROOT = path.resolve('@', '..', '..');
|
||||
export const defaultTrainFolder = path.join(TOOLKIT_ROOT, 'output');
|
||||
export const defaultDatasetsFolder = path.join(TOOLKIT_ROOT, 'datasets');
|
||||
export const defaultDataRoot = path.join(TOOLKIT_ROOT, 'data');
|
||||
|
||||
console.log('TOOLKIT_ROOT:', TOOLKIT_ROOT);
|
||||
|
||||
export const getTrainingFolder = async () => {
|
||||
const key = 'TRAINING_FOLDER';
|
||||
let row = await prisma.settings.findFirst({
|
||||
where: {
|
||||
key: key,
|
||||
},
|
||||
});
|
||||
let trainingRoot = defaultTrainFolder;
|
||||
if (row?.value && row.value !== '') {
|
||||
trainingRoot = row.value;
|
||||
}
|
||||
return trainingRoot as string;
|
||||
};
|
||||
|
||||
export const getHFToken = async () => {
|
||||
const key = 'HF_TOKEN';
|
||||
let row = await prisma.settings.findFirst({
|
||||
where: {
|
||||
key: key,
|
||||
},
|
||||
});
|
||||
let token = '';
|
||||
if (row?.value && row.value !== '') {
|
||||
token = row.value;
|
||||
}
|
||||
return token;
|
||||
};
|
||||
4
ui/cron/prisma.ts
Normal file
4
ui/cron/prisma.ts
Normal file
@@ -0,0 +1,4 @@
|
||||
import { PrismaClient } from '@prisma/client';
|
||||
const prisma = new PrismaClient();
|
||||
|
||||
export default prisma;
|
||||
@@ -1,3 +1,4 @@
|
||||
import processQueue from './actions/processQueue';
|
||||
class CronWorker {
|
||||
interval: number;
|
||||
is_running: boolean;
|
||||
@@ -23,7 +24,9 @@ class CronWorker {
|
||||
this.is_running = false;
|
||||
}
|
||||
|
||||
async loop() {}
|
||||
async loop() {
|
||||
await processQueue();
|
||||
}
|
||||
}
|
||||
|
||||
// it automatically starts the loop
|
||||
|
||||
Reference in New Issue
Block a user