mirror of
https://github.com/ostris/ai-toolkit.git
synced 2026-05-01 03:31:35 +00:00
Fix issue where the job would hang in the ui if it failed to start
This commit is contained in:
12
run.py
12
run.py
@@ -21,7 +21,7 @@ if os.environ.get("DEBUG_TOOLKIT", "0") == "1":
|
|||||||
import argparse
|
import argparse
|
||||||
from toolkit.job import get_job
|
from toolkit.job import get_job
|
||||||
from toolkit.accelerator import get_accelerator
|
from toolkit.accelerator import get_accelerator
|
||||||
from toolkit.print import print_acc
|
from toolkit.print import print_acc, setup_log_to_file
|
||||||
|
|
||||||
accelerator = get_accelerator()
|
accelerator = get_accelerator()
|
||||||
|
|
||||||
@@ -67,7 +67,17 @@ def main():
|
|||||||
default=None,
|
default=None,
|
||||||
help='Name to replace [name] tag in config file, useful for shared config file'
|
help='Name to replace [name] tag in config file, useful for shared config file'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
'-l', '--log',
|
||||||
|
type=str,
|
||||||
|
default=None,
|
||||||
|
help='Log file to write output to'
|
||||||
|
)
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if args.log is not None:
|
||||||
|
setup_log_to_file(args.log)
|
||||||
|
|
||||||
config_file_list = args.config_file_list
|
config_file_list = args.config_file_list
|
||||||
if len(config_file_list) == 0:
|
if len(config_file_list) == 0:
|
||||||
|
|||||||
@@ -1,6 +1,31 @@
|
|||||||
|
import sys
|
||||||
|
import os
|
||||||
from toolkit.accelerator import get_accelerator
|
from toolkit.accelerator import get_accelerator
|
||||||
|
|
||||||
|
|
||||||
def print_acc(*args, **kwargs):
|
def print_acc(*args, **kwargs):
|
||||||
if get_accelerator().is_local_main_process:
|
if get_accelerator().is_local_main_process:
|
||||||
print(*args, **kwargs)
|
print(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class Logger:
|
||||||
|
def __init__(self, filename):
|
||||||
|
self.terminal = sys.stdout
|
||||||
|
self.log = open(filename, 'a')
|
||||||
|
|
||||||
|
def write(self, message):
|
||||||
|
self.terminal.write(message)
|
||||||
|
self.log.write(message)
|
||||||
|
self.log.flush() # Make sure it's written immediately
|
||||||
|
|
||||||
|
def flush(self):
|
||||||
|
self.terminal.flush()
|
||||||
|
self.log.flush()
|
||||||
|
|
||||||
|
|
||||||
|
def setup_log_to_file(filename):
|
||||||
|
if get_accelerator().is_local_main_process:
|
||||||
|
if not os.path.exists(os.path.dirname(filename)):
|
||||||
|
os.makedirs(os.path.dirname(filename))
|
||||||
|
sys.stdout = Logger(filename)
|
||||||
|
sys.stderr = Logger(filename)
|
||||||
|
|||||||
@@ -32,7 +32,6 @@ export async function GET(request: NextRequest, { params }: { params: { jobID: s
|
|||||||
});
|
});
|
||||||
|
|
||||||
// setup the training
|
// setup the training
|
||||||
|
|
||||||
const trainingRoot = await getTrainingFolder();
|
const trainingRoot = await getTrainingFolder();
|
||||||
|
|
||||||
const trainingFolder = path.join(trainingRoot, job.name);
|
const trainingFolder = path.join(trainingRoot, job.name);
|
||||||
@@ -43,6 +42,29 @@ export async function GET(request: NextRequest, { params }: { params: { jobID: s
|
|||||||
// make the config file
|
// make the config file
|
||||||
const configPath = path.join(trainingFolder, '.job_config.json');
|
const configPath = path.join(trainingFolder, '.job_config.json');
|
||||||
|
|
||||||
|
//log to path
|
||||||
|
const logPath = path.join(trainingFolder, 'log.txt');
|
||||||
|
|
||||||
|
try {
|
||||||
|
// if the log path exists, move it to a folder called logs and rename it {num}_log.txt, looking for the highest num
|
||||||
|
// if the log path does not exist, create it
|
||||||
|
if (fs.existsSync(logPath)) {
|
||||||
|
const logsFolder = path.join(trainingFolder, 'logs');
|
||||||
|
if (!fs.existsSync(logsFolder)) {
|
||||||
|
fs.mkdirSync(logsFolder, { recursive: true });
|
||||||
|
}
|
||||||
|
|
||||||
|
let num = 0;
|
||||||
|
while (fs.existsSync(path.join(logsFolder, `${num}_log.txt`))) {
|
||||||
|
num++;
|
||||||
|
}
|
||||||
|
|
||||||
|
fs.renameSync(logPath, path.join(logsFolder, `${num}_log.txt`));
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
console.error('Error moving log file:', e);
|
||||||
|
}
|
||||||
|
|
||||||
// update the config dataset path
|
// update the config dataset path
|
||||||
const jobConfig = JSON.parse(job.job_config);
|
const jobConfig = JSON.parse(job.job_config);
|
||||||
jobConfig.config.process[0].sqlite_db_path = path.join(TOOLKIT_ROOT, 'aitk_db.db');
|
jobConfig.config.process[0].sqlite_db_path = path.join(TOOLKIT_ROOT, 'aitk_db.db');
|
||||||
@@ -70,6 +92,7 @@ export async function GET(request: NextRequest, { params }: { params: { jobID: s
|
|||||||
if (!fs.existsSync(runFilePath)) {
|
if (!fs.existsSync(runFilePath)) {
|
||||||
return NextResponse.json({ error: 'run.py not found' }, { status: 500 });
|
return NextResponse.json({ error: 'run.py not found' }, { status: 500 });
|
||||||
}
|
}
|
||||||
|
|
||||||
const additionalEnv: any = {
|
const additionalEnv: any = {
|
||||||
AITK_JOB_ID: jobID,
|
AITK_JOB_ID: jobID,
|
||||||
CUDA_VISIBLE_DEVICES: `${job.gpu_ids}`,
|
CUDA_VISIBLE_DEVICES: `${job.gpu_ids}`,
|
||||||
@@ -81,55 +104,111 @@ export async function GET(request: NextRequest, { params }: { params: { jobID: s
|
|||||||
additionalEnv.HF_TOKEN = hfToken;
|
additionalEnv.HF_TOKEN = hfToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
let cmd = `${pythonPath} ${runFilePath} ${configPath}`;
|
// Add the --log argument to the command
|
||||||
for (const key in additionalEnv) {
|
const args = [runFilePath, configPath, '--log', logPath];
|
||||||
if (os.platform() === 'win32') {
|
|
||||||
cmd = `set ${key}=${additionalEnv[key]} && ${cmd}`;
|
try {
|
||||||
|
let subprocess;
|
||||||
|
|
||||||
|
if (isWindows) {
|
||||||
|
// For Windows, use 'cmd.exe' to open a new command window
|
||||||
|
subprocess = spawn('cmd.exe', ['/c', 'start', 'cmd.exe', '/k', pythonPath, ...args], {
|
||||||
|
env: {
|
||||||
|
...process.env,
|
||||||
|
...additionalEnv,
|
||||||
|
},
|
||||||
|
cwd: TOOLKIT_ROOT,
|
||||||
|
windowsHide: false,
|
||||||
|
});
|
||||||
} else {
|
} else {
|
||||||
cmd = `${key}=${additionalEnv[key]} ${cmd}`;
|
// For non-Windows platforms
|
||||||
|
subprocess = spawn(pythonPath, args, {
|
||||||
|
detached: true,
|
||||||
|
stdio: ['ignore', 'pipe', 'pipe'], // Changed from 'ignore' to capture output
|
||||||
|
env: {
|
||||||
|
...process.env,
|
||||||
|
...additionalEnv,
|
||||||
|
},
|
||||||
|
cwd: TOOLKIT_ROOT,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
console.log('Spawning command:', cmd);
|
// Start monitoring in the background without blocking the response
|
||||||
|
const monitorProcess = async () => {
|
||||||
|
const startTime = Date.now();
|
||||||
|
let errorOutput = '';
|
||||||
|
let stdoutput = '';
|
||||||
|
|
||||||
// start job
|
if (subprocess.stderr) {
|
||||||
if (isWindows) {
|
subprocess.stderr.on('data', data => {
|
||||||
// For Windows, use 'cmd.exe' to open a new command window
|
errorOutput += data.toString();
|
||||||
const subprocess = spawn('cmd.exe', ['/c', 'start', 'cmd.exe', '/k', pythonPath, runFilePath, configPath], {
|
});
|
||||||
env: {
|
subprocess.stdout.on('data', data => {
|
||||||
...process.env,
|
stdoutput += data.toString();
|
||||||
...additionalEnv,
|
// truncate to only get the last 500 characters
|
||||||
},
|
if (stdoutput.length > 500) {
|
||||||
cwd: TOOLKIT_ROOT,
|
stdoutput = stdoutput.substring(stdoutput.length - 500);
|
||||||
windowsHide: false,
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
subprocess.on('exit', async code => {
|
||||||
|
const currentTime = Date.now();
|
||||||
|
const duration = (currentTime - startTime) / 1000;
|
||||||
|
console.log(`Job ${jobID} exited with code ${code} after ${duration} seconds.`);
|
||||||
|
// wait for 5 seconds to give it time to stop itself. It id still has a status of running in the db, update it to stopped
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 5000));
|
||||||
|
const updatedJob = await prisma.job.findUnique({
|
||||||
|
where: { id: jobID },
|
||||||
|
});
|
||||||
|
if (updatedJob?.status === 'running') {
|
||||||
|
let errorString = errorOutput;
|
||||||
|
if (errorString.trim() === '') {
|
||||||
|
errorString = stdoutput;
|
||||||
|
}
|
||||||
|
await prisma.job.update({
|
||||||
|
where: { id: jobID },
|
||||||
|
data: {
|
||||||
|
status: 'error',
|
||||||
|
info: `Error launching job: ${errorString.substring(0, 500)}`,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Wait 30 seconds before releasing the process
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 30000));
|
||||||
|
// Detach the process for non-Windows systems
|
||||||
|
if (!isWindows && subprocess.unref) {
|
||||||
|
subprocess.unref();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Start the monitoring without awaiting it
|
||||||
|
monitorProcess().catch(err => {
|
||||||
|
console.error(`Error in process monitoring for job ${jobID}:`, err);
|
||||||
});
|
});
|
||||||
|
|
||||||
subprocess.unref();
|
// Return the response immediately
|
||||||
} else {
|
return NextResponse.json(job);
|
||||||
// For non-Windows platforms, use your original approach
|
} catch (error: any) {
|
||||||
const subprocess = spawn(pythonPath, [runFilePath, configPath], {
|
// Handle any exceptions during process launch
|
||||||
detached: true,
|
console.error('Error launching process:', error);
|
||||||
stdio: 'ignore',
|
|
||||||
env: {
|
await prisma.job.update({
|
||||||
...process.env,
|
where: { id: jobID },
|
||||||
...additionalEnv,
|
data: {
|
||||||
|
status: 'error',
|
||||||
|
info: `Error launching job: ${error?.message || 'Unknown error'}`,
|
||||||
},
|
},
|
||||||
cwd: TOOLKIT_ROOT,
|
|
||||||
});
|
});
|
||||||
|
|
||||||
subprocess.unref();
|
return NextResponse.json(
|
||||||
|
{
|
||||||
|
error: 'Failed to launch job process',
|
||||||
|
details: error?.message || 'Unknown error',
|
||||||
|
},
|
||||||
|
{ status: 500 },
|
||||||
|
);
|
||||||
}
|
}
|
||||||
// const subprocess = spawn(pythonPath, [runFilePath, configPath], {
|
|
||||||
// detached: true,
|
|
||||||
// stdio: 'ignore',
|
|
||||||
// env: {
|
|
||||||
// ...process.env,
|
|
||||||
// ...additionalEnv,
|
|
||||||
// },
|
|
||||||
// cwd: TOOLKIT_ROOT,
|
|
||||||
// });
|
|
||||||
|
|
||||||
// subprocess.unref();
|
|
||||||
|
|
||||||
return NextResponse.json(job);
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -62,26 +62,34 @@ export default function JobActionBar({ job, onRefresh, afterDelete, className, h
|
|||||||
<Pen />
|
<Pen />
|
||||||
</Link>
|
</Link>
|
||||||
)}
|
)}
|
||||||
{canDelete && (
|
<Button
|
||||||
<Button
|
onClick={() => {
|
||||||
onClick={() => {
|
let message = `Are you sure you want to delete the job "${job.name}"? This will also permanently remove it from your disk.`;
|
||||||
if (!canDelete) return;
|
if (job.status === 'running') {
|
||||||
openConfirm({
|
message += ' WARNING: The job is currently running. You should stop it first if you can.';
|
||||||
title: 'Delete Job',
|
}
|
||||||
message: `Are you sure you want to delete the job "${job.name}"? This will also permanently remove it from your disk.`,
|
openConfirm({
|
||||||
type: 'warning',
|
title: 'Delete Job',
|
||||||
confirmText: 'Delete',
|
message: message,
|
||||||
onConfirm: async () => {
|
type: 'warning',
|
||||||
await deleteJob(job.id);
|
confirmText: 'Delete',
|
||||||
if (afterDelete) afterDelete();
|
onConfirm: async () => {
|
||||||
},
|
if (job.status === 'running') {
|
||||||
});
|
try {
|
||||||
}}
|
await stopJob(job.id);
|
||||||
className={`ml-2 opacity-100`}
|
} catch (e) {
|
||||||
>
|
console.error('Error stopping job before deleting:', e);
|
||||||
<Trash2 />
|
}
|
||||||
</Button>
|
}
|
||||||
)}
|
await deleteJob(job.id);
|
||||||
|
if (afterDelete) afterDelete();
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}}
|
||||||
|
className={`ml-2 opacity-100`}
|
||||||
|
>
|
||||||
|
<Trash2 />
|
||||||
|
</Button>
|
||||||
</div>
|
</div>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user