Skip to content
Snippets Groups Projects
Commit 666ee769 authored by ranchodeluxe's avatar ranchodeluxe
Browse files

parallelize uploads

parent b8bb853b
No related branches found
No related tags found
2 merge requests!106Parallelize Uploads to S3 Where Necessary,!61Additions: primary keys, speed ups (preprocessing and postprocessing), tests, scalene, reorg entire fireatlas
......@@ -2,6 +2,8 @@ import json
import argparse
import os
import glob
from itertools import chain
import s3fs
from typing import Tuple
......@@ -44,6 +46,11 @@ def validate_json(s):
raise argparse.ArgumentTypeError("Not a valid JSON string")
def concurrent_copy_from_local_to_s3(eventual_results: Tuple[Delayed], local_filepath: str):
copy_from_local_to_s3(local_filepath, fs)
def job_fire_forward(eventual_results: Tuple[Delayed], region: Region, tst: TimeStep, ted: TimeStep):
logger.info(f"Running code for {region[0]} from {tst} to {ted} with source {settings.FIRE_SOURCE}")
......@@ -60,18 +67,10 @@ def job_fire_forward(eventual_results: Tuple[Delayed], region: Region, tst: Time
save_large_fires_nplist(allpixels, region, large_fires, tst)
save_large_fires_layers(allfires.gdf, region, large_fires, tst)
data_dir = os.path.join(settings.LOCAL_PATH, settings.OUTPUT_DIR, region[0], str(tst[0]))
for filepath in glob.glob(os.path.join(data_dir, "Snapshot", "*", "*.fgb")):
copy_from_local_to_s3(filepath, fs)
for filepath in glob.glob(os.path.join(data_dir, "Largefire", "*", "*.fgb")):
copy_from_local_to_s3(filepath, fs)
def job_preprocess_region_t(
eventual_result1: Delayed,
eventual_result2: Delayed, region: Region, t: TimeStep):
eventual_results: Tuple[Delayed],
region: Region, t: TimeStep):
logger.info(f"Running preprocessing code for {region[0]} at {t=} with source {settings.FIRE_SOURCE}")
filepath = preprocess_region_t(t, region=region)
copy_from_local_to_s3(filepath, fs)
......@@ -91,10 +90,6 @@ def job_data_update_checker():
update_VJ114IMGTDL()
except Exception as exc:
logger.exception(exc)
finally:
for filepath in glob.glob(f"{settings.LOCAL_PATH}/{settings.PREPROCESSED_DIR}/*/*.txt"):
copy_from_local_to_s3(filepath, fs)
@timed
def Run(region: Region, tst: TimeStep, ted: TimeStep):
......@@ -114,14 +109,37 @@ def Run(region: Region, tst: TimeStep, ted: TimeStep):
dask_client = Client(n_workers=MAX_WORKERS)
logger.info(f"dask workers = {len(dask_client.cluster.workers)}")
# run the first two jobs in parallel
data_input_results = delayed(job_data_update_checker)()
region_results = delayed(job_preprocess_region)(region)
# block on the first two jobs, then uploads raw satellite files from `job_data_update_checker` in parallel
data_upload_results = [
delayed(concurrent_copy_from_local_to_s3)([data_input_results, region_results], local_filepath)
for local_filepath in glob.glob(f"{settings.LOCAL_PATH}/{settings.PREPROCESSED_DIR}/*/*.txt")
]
# blocks on data upload results, then runs all region-plus-t in parallel
region_and_t_results = [
delayed(job_preprocess_region_t)(data_input_results, region_results, region, t)
delayed(job_preprocess_region_t)(data_upload_results, region, t)
for t in list_of_timesteps
]
# blocks on region_and_t_results and then runs fire forward algorithm (which cannot be run in parallel)
fire_forward_results = delayed(job_fire_forward)(region_and_t_results, region, tst, ted)
fire_forward_results.compute()
# blocks on fire forward algorithm run and then uploads all snapshots/largefire outputs in parallel
data_dir = os.path.join(settings.LOCAL_PATH, settings.OUTPUT_DIR, region[0], str(tst[0]))
fgb_upload_results = [
delayed(concurrent_copy_from_local_to_s3)([fire_forward_results,], local_filepath)
for local_filepath in list(chain(
glob.glob(os.path.join(data_dir, "Snapshot", "*", "*.fgb")),
glob.glob(os.path.join(data_dir, "Largefire", "*", "*.fgb"))
))
]
# take the DAG and run it
fgb_upload_results.compute()
dask_client.close()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment