diff --git a/maap_utils/JobManager.py b/maap_utils/JobManager.py index 7d529fa240c791882ece121e0d01042fe988a4d0..319499b4587a847d99a139ed79b0df966f3d4ea0 100644 --- a/maap_utils/JobManager.py +++ b/maap_utils/JobManager.py @@ -22,13 +22,10 @@ class JobManager: config: RunConfig, job_kwargs_list: List[Dict], check_interval: int = 120, - redo_enabled: bool = True, ): self.config = config self.job_kwargs_list = job_kwargs_list self.check_interval = check_interval - self.redo_enabled = redo_enabled - self.original_job_kwargs = job_kwargs_list # for potential resubmit self.ledger = JobLedger() self.start_time = datetime.datetime.now() @@ -84,9 +81,15 @@ class JobManager: new_state = job.get_status() self.ledger.update_status(job.job_id, new_state) - def monitor(self) -> bool: - """Monitor jobs and return True if all completed successfully""" - """Monitor job progress with live updates and handle interrupts""" + def monitor(self) -> str: + """ + Monitor job progress with live updates and handle interrupts + + Returns: + 'interrupted' if user interrupt and jobs still pending, + 'succeeded' if all jobs succeeded, + 'partial' if all jobs finished but some failed. + """ # scheduling parameters INNER_BATCH = 10 INNER_DELAY = 5 @@ -122,40 +125,55 @@ class JobManager: cycle_count = 0 except KeyboardInterrupt: - print("\nJob monitoring interrupted") - self._handle_interrupt() - - # Return whether all jobs completed successfully - all_succeeded = self.ledger.all_succeeded() - if not all_succeeded and self.redo_enabled and self.prompt_for_redo(): - self.resubmit_jobs() - self.monitor() - - return all_succeeded - - def resubmit_jobs(self) -> None: + print("\nJob monitoring interrupted by user") + if self.ledger.all_final(): + if self.ledger.all_succeeded(): + return "succeeded" + else: + return "partial" + return "interrupted" + + if self.ledger.all_succeeded(): + return "succeeded" + return "partial" + + def resubmit_unsuccessful_jobs(self) -> None: """Resubmit unsuccessful jobs to retry within this session""" - # Get IDs of jobs that finished unsuccessfully - bad_jobs = [ + logging.info("Resubmitting unsuccessful jobs...") + + unsuccessful_jobs = [ job for job in self.ledger.get_finished_jobs() if job not in self.ledger.get_jobs_in_state("Succeeded") ] + if unsuccessful_jobs: + self.resubmit_jobs(unsuccessful_jobs) + + def resubmit_jobs(self, to_resubmit: List[Job]) -> None: # Create new Job objects for failed jobs using original kwargs - new_jobs = [Job(job.kwargs) for job in bad_jobs] + new_jobs = [Job(job.kwargs) for job in to_resubmit] + logging.info(f"{len(new_jobs)} jobs queued for resubmission.") # Clear ledger tracking for failed jobs - # Note: no cancellation necessary since they are already finished - for job in bad_jobs: + cancelled = 0 + for job in to_resubmit: + # Cancel jobs that are not in final states + if job in self.ledger.get_pending_jobs(): + job.cancel() + cancelled += 1 self.ledger.remove_job(job.job_id) + if cancelled > 0: + logging.info(f"Cancelling {cancelled} jobs in non-final states.") + # Submit new jobs - logging.info(f"Resubmitting {len(new_jobs)} failed jobs...") + resubmitted = 0 for job in new_jobs: try: job.submit() + resubmitted += 1 # Append resubmitted job ID to file with open(self.output_dir / "job_ids.txt", "a") as f: f.write(f"Resubmitted: {job.job_id}\n") @@ -163,41 +181,7 @@ class JobManager: except Exception as e: logging.error(f"Error resubmitting job: {e}") continue - - def _handle_interrupt(self) -> None: - """Handle keyboard interrupt and cleanup""" - print( - "Press Ctrl+C again to confirm cancellation," - "or wait to continue..." - ) - try: - time.sleep(3) - logging.info("Resuming monitoring...") - self.monitor() - except KeyboardInterrupt: - if self.redo_enabled and self.prompt_for_redo(): - logging.info("Resubmitting failed jobs...") - self.resubmit_jobs() - logging.info("Resuming monitoring...") - self.monitor() - else: - self.exit_gracefully() - - def prompt_for_redo(self) -> bool: - """Prompt user for interactive resubmission of failed jobs""" - redo_answer = input( - "\nResubmit failed jobs? [y/N] " - "This will resubmit all finished jobs that do not have 'Succeeded'" - " status and continue monitoring.\n" - ).strip() - if redo_answer == "y": - redo = True - elif redo_answer == "N": - redo = False - else: - print("Invalid input. Please enter 'y' or 'N'.") - redo = self.prompt_for_redo() - return redo + logging.info(f"Resubmitted {resubmitted} jobs.") def exit_gracefully(self) -> None: """Cancel pending jobs, print a report, and exit""" @@ -212,7 +196,7 @@ class JobManager: logging.error(f"Error cancelling job: {e}") logging.info("All pending jobs cancelled.") self.report() - exit(0) + return self.ledger.all_succeeded() def report(self) -> Dict[str, int]: """Logging.Info final job statistics report""" diff --git a/run_on_maap.py b/run_on_maap.py index 8962fde22bf075b9298ccbc06e99aae2d199e997..c2014acb5ddc574886a62370f9732c24bfa0c03c 100644 --- a/run_on_maap.py +++ b/run_on_maap.py @@ -34,6 +34,7 @@ import os import shutil from pathlib import Path from typing import Dict, List +import time import click from maap.maap import MAAP @@ -219,15 +220,78 @@ def main( run_config, job_kwargs_list, check_interval=run_config.check_interval, - redo_enabled=not no_redo, ) job_manager.submit(output_dir) # Handle monitoring and potential resubmissions - job_manager.monitor() + def prompt_after_interrupt() -> str: + """Prompt user after monitoring is interrupted by Ctrl-C.""" + timeout = 10 + if no_redo: + print(f"Monitoring suspended. Waiting {timeout} seconds to resume or press Ctrl-C to exit.") + try: + time.sleep(timeout) + return "resume" + except KeyboardInterrupt: + return "exit" + else: + print(f"\nMonitoring suspended. Press 'r' to resubmit failed jobs, " + f"press Ctrl-C to exit, or wait {timeout} seconds to resume.") + try: + answer = input().strip().lower() + if answer == "r": + return "resubmit" + time.sleep(timeout) + return "resume" + except KeyboardInterrupt: + return "exit" + + def prompt_after_run() -> str: + """Prompt user after jobs finish with some failures.""" + if no_redo: + return "exit" + else: + answer = input( + "\nAll jobs finished with some failures. " + "Press 'r' to retry failures or any other key to exit: " + ).strip().lower() + if answer == "r": + return "resubmit" + return "exit" + + # Main monitoring loop + while True: + run_status = job_manager.monitor() + + # monitoring exited because user intiated an interrupt via Ctrl-C + # and some jobs are still pending + if run_status == "interrupted": + next_step = prompt_after_interrupt() + if next_step == "resubmit": + job_manager.resubmit_unsuccessful_jobs() + continue + elif next_step == "exit": + break + else: + continue + + # monitoring exited because all jobs were successful + elif run_status == "succeeded": + break + + # monitoring exited because all jobs finished, + # but some were not successful + elif run_status == "partial": + next_step = prompt_after_run() + if next_step == "resubmit": + job_manager.resubmit_unsuccessful_jobs() + continue + else: + break + job_manager.exit_gracefully() - end_time = datetime.datetime.now() + end_time = datetime.datetime.now() logging.info(f"Model run completed at {end_time}.") logging.info("****************************************" "****************************************")