Skip to content
Snippets Groups Projects
Commit 2eb64fbb authored by gcorradini's avatar gcorradini
Browse files

Merge branch 'gc/uploads_dask' into 'primarykeyv2'

Parallelize Uploads to S3 Where Necessary

See merge request !106
parents b8bb853b 94721c5b
No related branches found
No related tags found
2 merge requests!106Parallelize Uploads to S3 Where Necessary,!61Additions: primary keys, speed ups (preprocessing and postprocessing), tests, scalene, reorg entire fireatlas
...@@ -2,6 +2,8 @@ import json ...@@ -2,6 +2,8 @@ import json
import argparse import argparse
import os import os
import glob import glob
from itertools import chain
import s3fs import s3fs
from typing import Tuple from typing import Tuple
...@@ -44,6 +46,13 @@ def validate_json(s): ...@@ -44,6 +46,13 @@ def validate_json(s):
raise argparse.ArgumentTypeError("Not a valid JSON string") raise argparse.ArgumentTypeError("Not a valid JSON string")
@delayed
def concurrent_copy_from_local_to_s3(eventual_results: Tuple[Delayed], local_filepath: str):
copy_from_local_to_s3(local_filepath, fs)
@delayed
def job_fire_forward(eventual_results: Tuple[Delayed], region: Region, tst: TimeStep, ted: TimeStep): def job_fire_forward(eventual_results: Tuple[Delayed], region: Region, tst: TimeStep, ted: TimeStep):
logger.info(f"Running code for {region[0]} from {tst} to {ted} with source {settings.FIRE_SOURCE}") logger.info(f"Running code for {region[0]} from {tst} to {ted} with source {settings.FIRE_SOURCE}")
...@@ -60,30 +69,24 @@ def job_fire_forward(eventual_results: Tuple[Delayed], region: Region, tst: Time ...@@ -60,30 +69,24 @@ def job_fire_forward(eventual_results: Tuple[Delayed], region: Region, tst: Time
save_large_fires_nplist(allpixels, region, large_fires, tst) save_large_fires_nplist(allpixels, region, large_fires, tst)
save_large_fires_layers(allfires.gdf, region, large_fires, tst) save_large_fires_layers(allfires.gdf, region, large_fires, tst)
data_dir = os.path.join(settings.LOCAL_PATH, settings.OUTPUT_DIR, region[0], str(tst[0]))
for filepath in glob.glob(os.path.join(data_dir, "Snapshot", "*", "*.fgb")):
copy_from_local_to_s3(filepath, fs)
for filepath in glob.glob(os.path.join(data_dir, "Largefire", "*", "*.fgb")):
copy_from_local_to_s3(filepath, fs)
@delayed
def job_preprocess_region_t( def job_preprocess_region_t(
eventual_result1: Delayed, eventual_results: Tuple[Delayed],
eventual_result2: Delayed, region: Region, t: TimeStep): region: Region, t: TimeStep):
logger.info(f"Running preprocessing code for {region[0]} at {t=} with source {settings.FIRE_SOURCE}") logger.info(f"Running preprocessing code for {region[0]} at {t=} with source {settings.FIRE_SOURCE}")
filepath = preprocess_region_t(t, region=region) filepath = preprocess_region_t(t, region=region)
copy_from_local_to_s3(filepath, fs) copy_from_local_to_s3(filepath, fs)
@delayed
def job_preprocess_region(region: Region): def job_preprocess_region(region: Region):
filepath = preprocess_region(region) filepath = preprocess_region(region)
copy_from_local_to_s3(filepath, fs) copy_from_local_to_s3(filepath, fs)
@delayed
def job_data_update_checker(): def job_data_update_checker():
""""""
try: try:
# Download SUOMI-NPP # Download SUOMI-NPP
update_VNP14IMGTDL() update_VNP14IMGTDL()
...@@ -91,10 +94,6 @@ def job_data_update_checker(): ...@@ -91,10 +94,6 @@ def job_data_update_checker():
update_VJ114IMGTDL() update_VJ114IMGTDL()
except Exception as exc: except Exception as exc:
logger.exception(exc) logger.exception(exc)
finally:
for filepath in glob.glob(f"{settings.LOCAL_PATH}/{settings.PREPROCESSED_DIR}/*/*.txt"):
copy_from_local_to_s3(filepath, fs)
@timed @timed
def Run(region: Region, tst: TimeStep, ted: TimeStep): def Run(region: Region, tst: TimeStep, ted: TimeStep):
...@@ -114,14 +113,37 @@ def Run(region: Region, tst: TimeStep, ted: TimeStep): ...@@ -114,14 +113,37 @@ def Run(region: Region, tst: TimeStep, ted: TimeStep):
dask_client = Client(n_workers=MAX_WORKERS) dask_client = Client(n_workers=MAX_WORKERS)
logger.info(f"dask workers = {len(dask_client.cluster.workers)}") logger.info(f"dask workers = {len(dask_client.cluster.workers)}")
data_input_results = delayed(job_data_update_checker)() # run the first two jobs in parallel
region_results = delayed(job_preprocess_region)(region) data_input_results = job_data_update_checker()
region_results = job_preprocess_region(region)
# block on the first two jobs, then uploads raw satellite files from `job_data_update_checker` in parallel
data_upload_results = [
concurrent_copy_from_local_to_s3([data_input_results, region_results], local_filepath)
for local_filepath in glob.glob(f"{settings.LOCAL_PATH}/{settings.PREPROCESSED_DIR}/*/*.txt")
]
# blocks on data upload results, then runs all region-plus-t in parallel
region_and_t_results = [ region_and_t_results = [
delayed(job_preprocess_region_t)(data_input_results, region_results, region, t) job_preprocess_region_t(data_upload_results, region, t)
for t in list_of_timesteps for t in list_of_timesteps
] ]
fire_forward_results = delayed(job_fire_forward)(region_and_t_results, region, tst, ted)
fire_forward_results.compute() # blocks on region_and_t_results and then runs fire forward algorithm (which cannot be run in parallel)
fire_forward_results = job_fire_forward(region_and_t_results, region, tst, ted)
# block on fire forward output and upload all snapshots/largefire outputs in parallel
data_dir = os.path.join(settings.LOCAL_PATH, settings.OUTPUT_DIR, region[0], str(tst[0]))
fgb_upload_results = [
concurrent_copy_from_local_to_s3([fire_forward_results,], local_filepath)
for local_filepath in list(chain(
glob.glob(os.path.join(data_dir, "Snapshot", "*", "*.fgb")),
glob.glob(os.path.join(data_dir, "Largefire", "*", "*.fgb"))
))
]
# block and execute dag
dag = delayed(lambda x: x)(fgb_upload_results)
dag.compute()
dask_client.close() dask_client.close()
......
import json
import argparse
import fireatlas
from fireatlas.utils import timed
from fireatlas import FireRunDaskCoordinator
from fireatlas import settings
from fireatlas import FireLog
def validate_json(s):
try:
return json.loads(s)
except ValueError:
raise argparse.ArgumentTypeError("Not a valid JSON string")
@timed
def Run(region: fireatlas.FireType.Region, tst: fireatlas.FireType.TimeStep):
FireLog.logger.info(f"Running preprocessing code for {region[0]} at {tst=} with source {settings.FIRE_SOURCE}")
FireRunDaskCoordinator.job_preprocess_region_t(None, None, region, tst)
if __name__ == "__main__":
""" The main code to run preprocessing for a region and time period. It writes to a dedicated directory on s3.
Example:
python3 FireRunByRegionAndT.py --regnm="WesternUS" --tst="[2023,6,1,\"AM\"]"
"""
parser = argparse.ArgumentParser()
parser.add_argument("--regnm", type=str)
parser.add_argument("--tst", type=validate_json)
args = parser.parse_args()
Run([args.regnm, None], args.tst)
import json
import argparse
from fireatlas.utils import timed
from fireatlas import FireRunDaskCoordinator
def validate_json(s):
try:
return json.loads(s)
except ValueError:
raise argparse.ArgumentTypeError("Not a valid JSON string")
@timed
def Run():
""" download data from different satellite sensors at the
start of every 2nd hour from 1am through 11pm: `0 1-23/2 * * *`
the jobs are being scheduled here: https://repo.ops.maap-project.org/eorland_gee/fireatlas_nrt/-/pipeline_schedules
:return: None
"""
FireRunDaskCoordinator.job_data_update_checker()
if __name__ == "__main__":
"""Downloads the NRT files at a regular interval
Example:
python3 FireRunDataUpdateChecker.py
"""
Run()
import json
import argparse
import fireatlas
from fireatlas.utils import timed
from fireatlas import FireRunDaskCoordinator
def validate_json(s):
try:
print(f"${s}$")
return json.loads(s)
except ValueError:
raise argparse.ArgumentTypeError("Not a valid JSON string")
@timed
def Run(region: fireatlas.FireTypes.Region, tst: fireatlas.FireTypes.TimeStep, ted: fireatlas.FireTypes.TimeStep):
FireRunDaskCoordinator.job_fire_forward([None,], region, tst, ted)
if __name__ == "__main__":
""" The main code to run time forwarding for a time period
Example:
python3 FireRunFireForward.py --regnm="CaliTestRun" --tst="[2023,6,1,\"AM\"]" --ted="[2023,9,1,\"AM\"]"
"""
parser = argparse.ArgumentParser()
parser.add_argument("--regnm", type=str)
parser.add_argument("--tst", type=validate_json)
parser.add_argument("--ted", type=validate_json)
args = parser.parse_args()
Run([args.regnm, None], args.tst, args.ted)
import json
import argparse
import fireatlas
from fireatlas.utils import timed
from fireatlas import FireRunDaskCoordinator
def validate_json(s):
try:
return json.loads(s)
except ValueError:
raise argparse.ArgumentTypeError("Not a valid JSON string")
@timed
def Run(region: fireatlas.FireTypes.Region):
FireRunDaskCoordinator.job_preprocess_region(region)
if __name__ == "__main__":
""" The main code to run preprocessing for a region and time period. It writes to a dedicated directory on s3.
Example:
python3 FireRunPreprocessRegion.py --regnm="WesternUS" --bbox="[-119.5, 36.8, -118.9, 37.7]"
"""
parser = argparse.ArgumentParser()
parser.add_argument("--regnm", type=str)
parser.add_argument("--bbox", type=validate_json)
args = parser.parse_args()
Run([args.regnm, args.bbox])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment