Skip to content
Snippets Groups Projects
Commit f7781b2e authored by ranchodeluxe's avatar ranchodeluxe
Browse files

two dags?

parent a82d8732
No related branches found
No related tags found
2 merge requests!106Parallelize Uploads to S3 Where Necessary,!61Additions: primary keys, speed ups (preprocessing and postprocessing), tests, scalene, reorg entire fireatlas
......@@ -127,8 +127,11 @@ def Run(region: Region, tst: TimeStep, ted: TimeStep):
# blocks on region_and_t_results and then runs fire forward algorithm (which cannot be run in parallel)
fire_forward_results = delayed(job_fire_forward)(region_and_t_results, region, tst, ted)
# block and execute dag
fire_forward_results.compute()
# blocks on fire forward algorithm run and then uploads all snapshots/largefire outputs in parallel
# for some reason we can't run this step as part of the above DAG?
# uploads all snapshots/largefire outputs in parallel
data_dir = os.path.join(settings.LOCAL_PATH, settings.OUTPUT_DIR, region[0], str(tst[0]))
fgb_upload_results = [
delayed(concurrent_copy_from_local_to_s3)([fire_forward_results,], local_filepath)
......@@ -137,10 +140,10 @@ def Run(region: Region, tst: TimeStep, ted: TimeStep):
glob.glob(os.path.join(data_dir, "Largefire", "*", "*.fgb"))
))
]
# take the DAG and run it
# block and execute dag
dag = delayed(lambda x: x)(fgb_upload_results)
dag.compute()
dask_client.close()
......
......@@ -18,7 +18,7 @@ def validate_json(s):
@timed
def Run(region: fireatlas.FireType.Region, tst: fireatlas.FireType.TimeStep):
FireLog.logger.info(f"Running preprocessing code for {region[0]} at {tst=} with source {settings.FIRE_SOURCE}")
FireRunDaskCoordinator.job_preprocess_region_t(None, None, region, tst)
FireRunDaskCoordinator.job_preprocess_region_t([None,], region, tst)
if __name__ == "__main__":
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment