Skip to content
Snippets Groups Projects
Commit 4f23a9f9 authored by ranchodeluxe's avatar ranchodeluxe
Browse files

stupid dask again

parent 846a3962
No related branches found
No related tags found
1 merge request!61Additions: primary keys, speed ups (preprocessing and postprocessing), tests, scalene, reorg entire fireatlas
......@@ -48,7 +48,12 @@ def validate_json(s):
@delayed
def concurrent_copy_from_local_to_s3(eventual_results: Tuple[Delayed], local_filepath: str):
def concurrent_copy_inputs_from_local_to_s3(eventual_results: Tuple[Delayed], local_filepath: str):
copy_from_local_to_s3(local_filepath, fs)
@delayed
def concurrent_copy_outputs_from_local_to_s3(eventual_results: Tuple[Delayed], local_filepath: str):
copy_from_local_to_s3(local_filepath, fs)
......@@ -119,7 +124,7 @@ def Run(region: Region, tst: TimeStep, ted: TimeStep):
# block on the first two jobs, then uploads raw satellite files from `job_data_update_checker` in parallel
data_upload_results = [
concurrent_copy_from_local_to_s3([data_input_results, region_results], local_filepath)
concurrent_copy_inputs_from_local_to_s3([data_input_results, region_results], local_filepath)
for local_filepath in glob.glob(f"{settings.LOCAL_PATH}/{settings.PREPROCESSED_DIR}/*/*.txt")
]
......@@ -132,10 +137,10 @@ def Run(region: Region, tst: TimeStep, ted: TimeStep):
# blocks on region_and_t_results and then runs fire forward algorithm (which cannot be run in parallel)
fire_forward_results = job_fire_forward(region_and_t_results, region, tst, ted)
# block on fire forward output and upload all snapshots/largefire outputs in parallel
# take all fire forward output and upload all snapshots/largefire outputs in parallel
data_dir = os.path.join(settings.LOCAL_PATH, settings.OUTPUT_DIR, region[0], str(tst[0]))
fgb_upload_results = [
concurrent_copy_from_local_to_s3([fire_forward_results,], local_filepath)
concurrent_copy_outputs_from_local_to_s3([fire_forward_results,], local_filepath)
for local_filepath in list(chain(
glob.glob(os.path.join(data_dir, "Snapshot", "*", "*.fgb")),
glob.glob(os.path.join(data_dir, "Largefire", "*", "*.fgb"))
......@@ -144,6 +149,7 @@ def Run(region: Region, tst: TimeStep, ted: TimeStep):
# block and execute dag
dag = delayed(lambda x: x)(fgb_upload_results)
dag.compute()
dask_client.close()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment