Skip to content
Snippets Groups Projects
Commit c0e8dadf authored by Ian's avatar Ian
Browse files

refactor: improve job resubmission logic and logging in JobManager

refactor: update _handle_interrupt to prompt resubmission

fix: correct return type annotation from 'none' to 'None'

refactor: simplify job monitoring and resubmission flow

refactor: update monitor() to return Optional[bool] for status

fix: update KeyboardInterrupt handler to print message and return None

feat: add interactive job monitoring with resubmission and exit

fix: handle three-state monitor result to avoid infinite prompt loop

refactor: clarify job monitoring loop

refactor: simplify prompt logic with resume_after_timeout boolean

refactor: split prompt_for_next_step

refactor: update job monitor to return string literal statuses

refactor: clean up JobManager by removing redo_enabled

fix: update job monitoring logic to use status strings
parent 03d09242
No related branches found
No related tags found
No related merge requests found
...@@ -22,13 +22,10 @@ class JobManager: ...@@ -22,13 +22,10 @@ class JobManager:
config: RunConfig, config: RunConfig,
job_kwargs_list: List[Dict], job_kwargs_list: List[Dict],
check_interval: int = 120, check_interval: int = 120,
redo_enabled: bool = True,
): ):
self.config = config self.config = config
self.job_kwargs_list = job_kwargs_list self.job_kwargs_list = job_kwargs_list
self.check_interval = check_interval self.check_interval = check_interval
self.redo_enabled = redo_enabled
self.original_job_kwargs = job_kwargs_list # for potential resubmit
self.ledger = JobLedger() self.ledger = JobLedger()
self.start_time = datetime.datetime.now() self.start_time = datetime.datetime.now()
...@@ -84,9 +81,15 @@ class JobManager: ...@@ -84,9 +81,15 @@ class JobManager:
new_state = job.get_status() new_state = job.get_status()
self.ledger.update_status(job.job_id, new_state) self.ledger.update_status(job.job_id, new_state)
def monitor(self) -> bool: def monitor(self) -> str:
"""Monitor jobs and return True if all completed successfully""" """
"""Monitor job progress with live updates and handle interrupts""" Monitor job progress with live updates and handle interrupts
Returns:
'interrupted' if user interrupt and jobs still pending,
'succeeded' if all jobs succeeded,
'partial' if all jobs finished but some failed.
"""
# scheduling parameters # scheduling parameters
INNER_BATCH = 10 INNER_BATCH = 10
INNER_DELAY = 5 INNER_DELAY = 5
...@@ -122,40 +125,55 @@ class JobManager: ...@@ -122,40 +125,55 @@ class JobManager:
cycle_count = 0 cycle_count = 0
except KeyboardInterrupt: except KeyboardInterrupt:
print("\nJob monitoring interrupted") print("\nJob monitoring interrupted by user")
self._handle_interrupt() if self.ledger.all_final():
if self.ledger.all_succeeded():
# Return whether all jobs completed successfully return "succeeded"
all_succeeded = self.ledger.all_succeeded() else:
if not all_succeeded and self.redo_enabled and self.prompt_for_redo(): return "partial"
self.resubmit_jobs() return "interrupted"
self.monitor()
if self.ledger.all_succeeded():
return all_succeeded return "succeeded"
return "partial"
def resubmit_jobs(self) -> None:
def resubmit_unsuccessful_jobs(self) -> None:
"""Resubmit unsuccessful jobs to retry within this session""" """Resubmit unsuccessful jobs to retry within this session"""
# Get IDs of jobs that finished unsuccessfully logging.info("Resubmitting unsuccessful jobs...")
bad_jobs = [
unsuccessful_jobs = [
job job
for job in self.ledger.get_finished_jobs() for job in self.ledger.get_finished_jobs()
if job not in self.ledger.get_jobs_in_state("Succeeded") if job not in self.ledger.get_jobs_in_state("Succeeded")
] ]
if unsuccessful_jobs:
self.resubmit_jobs(unsuccessful_jobs)
def resubmit_jobs(self, to_resubmit: List[Job]) -> None:
# Create new Job objects for failed jobs using original kwargs # Create new Job objects for failed jobs using original kwargs
new_jobs = [Job(job.kwargs) for job in bad_jobs] new_jobs = [Job(job.kwargs) for job in to_resubmit]
logging.info(f"{len(new_jobs)} jobs queued for resubmission.")
# Clear ledger tracking for failed jobs # Clear ledger tracking for failed jobs
# Note: no cancellation necessary since they are already finished cancelled = 0
for job in bad_jobs: for job in to_resubmit:
# Cancel jobs that are not in final states
if job in self.ledger.get_pending_jobs():
job.cancel()
cancelled += 1
self.ledger.remove_job(job.job_id) self.ledger.remove_job(job.job_id)
if cancelled > 0:
logging.info(f"Cancelling {cancelled} jobs in non-final states.")
# Submit new jobs # Submit new jobs
logging.info(f"Resubmitting {len(new_jobs)} failed jobs...") resubmitted = 0
for job in new_jobs: for job in new_jobs:
try: try:
job.submit() job.submit()
resubmitted += 1
# Append resubmitted job ID to file # Append resubmitted job ID to file
with open(self.output_dir / "job_ids.txt", "a") as f: with open(self.output_dir / "job_ids.txt", "a") as f:
f.write(f"Resubmitted: {job.job_id}\n") f.write(f"Resubmitted: {job.job_id}\n")
...@@ -163,41 +181,7 @@ class JobManager: ...@@ -163,41 +181,7 @@ class JobManager:
except Exception as e: except Exception as e:
logging.error(f"Error resubmitting job: {e}") logging.error(f"Error resubmitting job: {e}")
continue continue
logging.info(f"Resubmitted {resubmitted} jobs.")
def _handle_interrupt(self) -> None:
"""Handle keyboard interrupt and cleanup"""
print(
"Press Ctrl+C again to confirm cancellation,"
"or wait to continue..."
)
try:
time.sleep(3)
logging.info("Resuming monitoring...")
self.monitor()
except KeyboardInterrupt:
if self.redo_enabled and self.prompt_for_redo():
logging.info("Resubmitting failed jobs...")
self.resubmit_jobs()
logging.info("Resuming monitoring...")
self.monitor()
else:
self.exit_gracefully()
def prompt_for_redo(self) -> bool:
"""Prompt user for interactive resubmission of failed jobs"""
redo_answer = input(
"\nResubmit failed jobs? [y/N] "
"This will resubmit all finished jobs that do not have 'Succeeded'"
" status and continue monitoring.\n"
).strip()
if redo_answer == "y":
redo = True
elif redo_answer == "N":
redo = False
else:
print("Invalid input. Please enter 'y' or 'N'.")
redo = self.prompt_for_redo()
return redo
def exit_gracefully(self) -> None: def exit_gracefully(self) -> None:
"""Cancel pending jobs, print a report, and exit""" """Cancel pending jobs, print a report, and exit"""
...@@ -212,7 +196,7 @@ class JobManager: ...@@ -212,7 +196,7 @@ class JobManager:
logging.error(f"Error cancelling job: {e}") logging.error(f"Error cancelling job: {e}")
logging.info("All pending jobs cancelled.") logging.info("All pending jobs cancelled.")
self.report() self.report()
exit(0) return self.ledger.all_succeeded()
def report(self) -> Dict[str, int]: def report(self) -> Dict[str, int]:
"""Logging.Info final job statistics report""" """Logging.Info final job statistics report"""
......
...@@ -34,6 +34,7 @@ import os ...@@ -34,6 +34,7 @@ import os
import shutil import shutil
from pathlib import Path from pathlib import Path
from typing import Dict, List from typing import Dict, List
import time
import click import click
from maap.maap import MAAP from maap.maap import MAAP
...@@ -219,15 +220,78 @@ def main( ...@@ -219,15 +220,78 @@ def main(
run_config, run_config,
job_kwargs_list, job_kwargs_list,
check_interval=run_config.check_interval, check_interval=run_config.check_interval,
redo_enabled=not no_redo,
) )
job_manager.submit(output_dir) job_manager.submit(output_dir)
# Handle monitoring and potential resubmissions # Handle monitoring and potential resubmissions
job_manager.monitor() def prompt_after_interrupt() -> str:
"""Prompt user after monitoring is interrupted by Ctrl-C."""
timeout = 10
if no_redo:
print(f"Monitoring suspended. Waiting {timeout} seconds to resume or press Ctrl-C to exit.")
try:
time.sleep(timeout)
return "resume"
except KeyboardInterrupt:
return "exit"
else:
print(f"\nMonitoring suspended. Press 'r' to resubmit failed jobs, "
f"press Ctrl-C to exit, or wait {timeout} seconds to resume.")
try:
answer = input().strip().lower()
if answer == "r":
return "resubmit"
time.sleep(timeout)
return "resume"
except KeyboardInterrupt:
return "exit"
def prompt_after_run() -> str:
"""Prompt user after jobs finish with some failures."""
if no_redo:
return "exit"
else:
answer = input(
"\nAll jobs finished with some failures. "
"Press 'r' to retry failures or any other key to exit: "
).strip().lower()
if answer == "r":
return "resubmit"
return "exit"
# Main monitoring loop
while True:
run_status = job_manager.monitor()
# monitoring exited because user intiated an interrupt via Ctrl-C
# and some jobs are still pending
if run_status == "interrupted":
next_step = prompt_after_interrupt()
if next_step == "resubmit":
job_manager.resubmit_unsuccessful_jobs()
continue
elif next_step == "exit":
break
else:
continue
# monitoring exited because all jobs were successful
elif run_status == "succeeded":
break
# monitoring exited because all jobs finished,
# but some were not successful
elif run_status == "partial":
next_step = prompt_after_run()
if next_step == "resubmit":
job_manager.resubmit_unsuccessful_jobs()
continue
else:
break
job_manager.exit_gracefully() job_manager.exit_gracefully()
end_time = datetime.datetime.now()
end_time = datetime.datetime.now()
logging.info(f"Model run completed at {end_time}.") logging.info(f"Model run completed at {end_time}.")
logging.info("****************************************" logging.info("****************************************"
"****************************************") "****************************************")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment