diff --git a/run.py b/run.py index 9a2b7f19..4c36046d 100644 --- a/run.py +++ b/run.py @@ -21,7 +21,7 @@ if os.environ.get("DEBUG_TOOLKIT", "0") == "1": import argparse from toolkit.job import get_job from toolkit.accelerator import get_accelerator -from toolkit.print import print_acc +from toolkit.print import print_acc, setup_log_to_file accelerator = get_accelerator() @@ -67,7 +67,17 @@ def main(): default=None, help='Name to replace [name] tag in config file, useful for shared config file' ) + + parser.add_argument( + '-l', '--log', + type=str, + default=None, + help='Log file to write output to' + ) args = parser.parse_args() + + if args.log is not None: + setup_log_to_file(args.log) config_file_list = args.config_file_list if len(config_file_list) == 0: diff --git a/toolkit/print.py b/toolkit/print.py index 0ada4102..e0f6c23b 100644 --- a/toolkit/print.py +++ b/toolkit/print.py @@ -1,6 +1,31 @@ +import sys +import os from toolkit.accelerator import get_accelerator def print_acc(*args, **kwargs): if get_accelerator().is_local_main_process: print(*args, **kwargs) + + +class Logger: + def __init__(self, filename): + self.terminal = sys.stdout + self.log = open(filename, 'a') + + def write(self, message): + self.terminal.write(message) + self.log.write(message) + self.log.flush() # Make sure it's written immediately + + def flush(self): + self.terminal.flush() + self.log.flush() + + +def setup_log_to_file(filename): + if get_accelerator().is_local_main_process: + if not os.path.exists(os.path.dirname(filename)): + os.makedirs(os.path.dirname(filename)) + sys.stdout = Logger(filename) + sys.stderr = Logger(filename) diff --git a/ui/src/app/api/jobs/[jobID]/start/route.ts b/ui/src/app/api/jobs/[jobID]/start/route.ts index e8260713..ba1feaf9 100644 --- a/ui/src/app/api/jobs/[jobID]/start/route.ts +++ b/ui/src/app/api/jobs/[jobID]/start/route.ts @@ -32,7 +32,6 @@ export async function GET(request: NextRequest, { params }: { params: { jobID: s }); // setup the training - const trainingRoot = await getTrainingFolder(); const trainingFolder = path.join(trainingRoot, job.name); @@ -43,6 +42,29 @@ export async function GET(request: NextRequest, { params }: { params: { jobID: s // make the config file const configPath = path.join(trainingFolder, '.job_config.json'); + //log to path + const logPath = path.join(trainingFolder, 'log.txt'); + + try { + // if the log path exists, move it to a folder called logs and rename it {num}_log.txt, looking for the highest num + // if the log path does not exist, create it + if (fs.existsSync(logPath)) { + const logsFolder = path.join(trainingFolder, 'logs'); + if (!fs.existsSync(logsFolder)) { + fs.mkdirSync(logsFolder, { recursive: true }); + } + + let num = 0; + while (fs.existsSync(path.join(logsFolder, `${num}_log.txt`))) { + num++; + } + + fs.renameSync(logPath, path.join(logsFolder, `${num}_log.txt`)); + } + } catch (e) { + console.error('Error moving log file:', e); + } + // update the config dataset path const jobConfig = JSON.parse(job.job_config); jobConfig.config.process[0].sqlite_db_path = path.join(TOOLKIT_ROOT, 'aitk_db.db'); @@ -70,6 +92,7 @@ export async function GET(request: NextRequest, { params }: { params: { jobID: s if (!fs.existsSync(runFilePath)) { return NextResponse.json({ error: 'run.py not found' }, { status: 500 }); } + const additionalEnv: any = { AITK_JOB_ID: jobID, CUDA_VISIBLE_DEVICES: `${job.gpu_ids}`, @@ -81,55 +104,111 @@ export async function GET(request: NextRequest, { params }: { params: { jobID: s additionalEnv.HF_TOKEN = hfToken; } - let cmd = `${pythonPath} ${runFilePath} ${configPath}`; - for (const key in additionalEnv) { - if (os.platform() === 'win32') { - cmd = `set ${key}=${additionalEnv[key]} && ${cmd}`; + // Add the --log argument to the command + const args = [runFilePath, configPath, '--log', logPath]; + + try { + let subprocess; + + if (isWindows) { + // For Windows, use 'cmd.exe' to open a new command window + subprocess = spawn('cmd.exe', ['/c', 'start', 'cmd.exe', '/k', pythonPath, ...args], { + env: { + ...process.env, + ...additionalEnv, + }, + cwd: TOOLKIT_ROOT, + windowsHide: false, + }); } else { - cmd = `${key}=${additionalEnv[key]} ${cmd}`; + // For non-Windows platforms + subprocess = spawn(pythonPath, args, { + detached: true, + stdio: ['ignore', 'pipe', 'pipe'], // Changed from 'ignore' to capture output + env: { + ...process.env, + ...additionalEnv, + }, + cwd: TOOLKIT_ROOT, + }); } - } - console.log('Spawning command:', cmd); + // Start monitoring in the background without blocking the response + const monitorProcess = async () => { + const startTime = Date.now(); + let errorOutput = ''; + let stdoutput = ''; - // start job - if (isWindows) { - // For Windows, use 'cmd.exe' to open a new command window - const subprocess = spawn('cmd.exe', ['/c', 'start', 'cmd.exe', '/k', pythonPath, runFilePath, configPath], { - env: { - ...process.env, - ...additionalEnv, - }, - cwd: TOOLKIT_ROOT, - windowsHide: false, + if (subprocess.stderr) { + subprocess.stderr.on('data', data => { + errorOutput += data.toString(); + }); + subprocess.stdout.on('data', data => { + stdoutput += data.toString(); + // truncate to only get the last 500 characters + if (stdoutput.length > 500) { + stdoutput = stdoutput.substring(stdoutput.length - 500); + } + }); + } + + subprocess.on('exit', async code => { + const currentTime = Date.now(); + const duration = (currentTime - startTime) / 1000; + console.log(`Job ${jobID} exited with code ${code} after ${duration} seconds.`); + // wait for 5 seconds to give it time to stop itself. It id still has a status of running in the db, update it to stopped + await new Promise(resolve => setTimeout(resolve, 5000)); + const updatedJob = await prisma.job.findUnique({ + where: { id: jobID }, + }); + if (updatedJob?.status === 'running') { + let errorString = errorOutput; + if (errorString.trim() === '') { + errorString = stdoutput; + } + await prisma.job.update({ + where: { id: jobID }, + data: { + status: 'error', + info: `Error launching job: ${errorString.substring(0, 500)}`, + }, + }); + } + }); + + // Wait 30 seconds before releasing the process + await new Promise(resolve => setTimeout(resolve, 30000)); + // Detach the process for non-Windows systems + if (!isWindows && subprocess.unref) { + subprocess.unref(); + } + }; + + // Start the monitoring without awaiting it + monitorProcess().catch(err => { + console.error(`Error in process monitoring for job ${jobID}:`, err); }); - subprocess.unref(); - } else { - // For non-Windows platforms, use your original approach - const subprocess = spawn(pythonPath, [runFilePath, configPath], { - detached: true, - stdio: 'ignore', - env: { - ...process.env, - ...additionalEnv, + // Return the response immediately + return NextResponse.json(job); + } catch (error: any) { + // Handle any exceptions during process launch + console.error('Error launching process:', error); + + await prisma.job.update({ + where: { id: jobID }, + data: { + status: 'error', + info: `Error launching job: ${error?.message || 'Unknown error'}`, }, - cwd: TOOLKIT_ROOT, }); - subprocess.unref(); + return NextResponse.json( + { + error: 'Failed to launch job process', + details: error?.message || 'Unknown error', + }, + { status: 500 }, + ); } - // const subprocess = spawn(pythonPath, [runFilePath, configPath], { - // detached: true, - // stdio: 'ignore', - // env: { - // ...process.env, - // ...additionalEnv, - // }, - // cwd: TOOLKIT_ROOT, - // }); - - // subprocess.unref(); - - return NextResponse.json(job); } diff --git a/ui/src/components/JobActionBar.tsx b/ui/src/components/JobActionBar.tsx index 3aac3da0..4c4c910b 100644 --- a/ui/src/components/JobActionBar.tsx +++ b/ui/src/components/JobActionBar.tsx @@ -62,26 +62,34 @@ export default function JobActionBar({ job, onRefresh, afterDelete, className, h )} - {canDelete && ( - - )} + ); }