Skip to content
Snippets Groups Projects
Commit 26b9fd4c authored by ranchodeluxe's avatar ranchodeluxe
Browse files

dask is stupid

parent f7781b2e
No related branches found
No related tags found
2 merge requests!106Parallelize Uploads to S3 Where Necessary,!61Additions: primary keys, speed ups (preprocessing and postprocessing), tests, scalene, reorg entire fireatlas
......@@ -47,7 +47,11 @@ def validate_json(s):
def concurrent_copy_from_local_to_s3(eventual_results: Tuple[Delayed], local_filepath: str):
def concurrent_copy_inputs_from_local_to_s3(eventual_results: Tuple[Delayed], local_filepath: str):
copy_from_local_to_s3(local_filepath, fs)
# dask is dumb and I cannot reuse this with delayed DAG for some reason
def concurrent_copy_outputs_from_local_to_s3(eventual_results: Tuple[Delayed], local_filepath: str):
copy_from_local_to_s3(local_filepath, fs)
......@@ -115,7 +119,7 @@ def Run(region: Region, tst: TimeStep, ted: TimeStep):
# block on the first two jobs, then uploads raw satellite files from `job_data_update_checker` in parallel
data_upload_results = [
delayed(concurrent_copy_from_local_to_s3)([data_input_results, region_results], local_filepath)
delayed(concurrent_copy_inputs_from_local_to_s3)([data_input_results, region_results], local_filepath)
for local_filepath in glob.glob(f"{settings.LOCAL_PATH}/{settings.PREPROCESSED_DIR}/*/*.txt")
]
......@@ -127,14 +131,12 @@ def Run(region: Region, tst: TimeStep, ted: TimeStep):
# blocks on region_and_t_results and then runs fire forward algorithm (which cannot be run in parallel)
fire_forward_results = delayed(job_fire_forward)(region_and_t_results, region, tst, ted)
# block and execute dag
fire_forward_results.compute()
# for some reason we can't run this step as part of the above DAG?
# uploads all snapshots/largefire outputs in parallel
data_dir = os.path.join(settings.LOCAL_PATH, settings.OUTPUT_DIR, region[0], str(tst[0]))
fgb_upload_results = [
delayed(concurrent_copy_from_local_to_s3)([fire_forward_results,], local_filepath)
delayed(concurrent_copy_outputs_from_local_to_s3)([fire_forward_results,], local_filepath)
for local_filepath in list(chain(
glob.glob(os.path.join(data_dir, "Snapshot", "*", "*.fgb")),
glob.glob(os.path.join(data_dir, "Largefire", "*", "*.fgb"))
......@@ -143,7 +145,6 @@ def Run(region: Region, tst: TimeStep, ted: TimeStep):
# block and execute dag
dag = delayed(lambda x: x)(fgb_upload_results)
dag.compute()
dask_client.close()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment