Skip to content
Snippets Groups Projects
Commit 94721c5b authored by ranchodeluxe's avatar ranchodeluxe
Browse files

try this

parent 23b4f0d8
No related branches found
No related tags found
2 merge requests!106Parallelize Uploads to S3 Where Necessary,!61Additions: primary keys, speed ups (preprocessing and postprocessing), tests, scalene, reorg entire fireatlas
......@@ -47,14 +47,12 @@ def validate_json(s):
def concurrent_copy_inputs_from_local_to_s3(eventual_results: Tuple[Delayed], local_filepath: str):
copy_from_local_to_s3(local_filepath, fs)
# dask is dumb and I cannot reuse this with delayed DAG for some reason
def concurrent_copy_outputs_from_local_to_s3(eventual_results: Tuple[Delayed], local_filepath: str):
@delayed
def concurrent_copy_from_local_to_s3(eventual_results: Tuple[Delayed], local_filepath: str):
copy_from_local_to_s3(local_filepath, fs)
@delayed
def job_fire_forward(eventual_results: Tuple[Delayed], region: Region, tst: TimeStep, ted: TimeStep):
logger.info(f"Running code for {region[0]} from {tst} to {ted} with source {settings.FIRE_SOURCE}")
......@@ -72,6 +70,7 @@ def job_fire_forward(eventual_results: Tuple[Delayed], region: Region, tst: Time
save_large_fires_layers(allfires.gdf, region, large_fires, tst)
@delayed
def job_preprocess_region_t(
eventual_results: Tuple[Delayed],
region: Region, t: TimeStep):
......@@ -80,13 +79,14 @@ def job_preprocess_region_t(
copy_from_local_to_s3(filepath, fs)
@delayed
def job_preprocess_region(region: Region):
filepath = preprocess_region(region)
copy_from_local_to_s3(filepath, fs)
@delayed
def job_data_update_checker():
""""""
try:
# Download SUOMI-NPP
update_VNP14IMGTDL()
......@@ -114,28 +114,28 @@ def Run(region: Region, tst: TimeStep, ted: TimeStep):
logger.info(f"dask workers = {len(dask_client.cluster.workers)}")
# run the first two jobs in parallel
data_input_results = delayed(job_data_update_checker)()
region_results = delayed(job_preprocess_region)(region)
data_input_results = job_data_update_checker()
region_results = job_preprocess_region(region)
# block on the first two jobs, then uploads raw satellite files from `job_data_update_checker` in parallel
data_upload_results = [
delayed(concurrent_copy_inputs_from_local_to_s3)([data_input_results, region_results], local_filepath)
concurrent_copy_from_local_to_s3([data_input_results, region_results], local_filepath)
for local_filepath in glob.glob(f"{settings.LOCAL_PATH}/{settings.PREPROCESSED_DIR}/*/*.txt")
]
# blocks on data upload results, then runs all region-plus-t in parallel
region_and_t_results = [
delayed(job_preprocess_region_t)(data_upload_results, region, t)
job_preprocess_region_t(data_upload_results, region, t)
for t in list_of_timesteps
]
# blocks on region_and_t_results and then runs fire forward algorithm (which cannot be run in parallel)
fire_forward_results = delayed(job_fire_forward)(region_and_t_results, region, tst, ted)
fire_forward_results = job_fire_forward(region_and_t_results, region, tst, ted)
# block on fire forward output and upload all snapshots/largefire outputs in parallel
data_dir = os.path.join(settings.LOCAL_PATH, settings.OUTPUT_DIR, region[0], str(tst[0]))
fgb_upload_results = [
delayed(concurrent_copy_outputs_from_local_to_s3)([fire_forward_results,], local_filepath)
concurrent_copy_from_local_to_s3([fire_forward_results,], local_filepath)
for local_filepath in list(chain(
glob.glob(os.path.join(data_dir, "Snapshot", "*", "*.fgb")),
glob.glob(os.path.join(data_dir, "Largefire", "*", "*.fgb"))
......
import json
import argparse
import fireatlas
from fireatlas.utils import timed
from fireatlas import FireRunDaskCoordinator
from fireatlas import settings
from fireatlas import FireLog
def validate_json(s):
try:
return json.loads(s)
except ValueError:
raise argparse.ArgumentTypeError("Not a valid JSON string")
@timed
def Run(region: fireatlas.FireType.Region, tst: fireatlas.FireType.TimeStep):
FireLog.logger.info(f"Running preprocessing code for {region[0]} at {tst=} with source {settings.FIRE_SOURCE}")
FireRunDaskCoordinator.job_preprocess_region_t([None,], region, tst)
if __name__ == "__main__":
""" The main code to run preprocessing for a region and time period. It writes to a dedicated directory on s3.
Example:
python3 FireRunByRegionAndT.py --regnm="WesternUS" --tst="[2023,6,1,\"AM\"]"
"""
parser = argparse.ArgumentParser()
parser.add_argument("--regnm", type=str)
parser.add_argument("--tst", type=validate_json)
args = parser.parse_args()
Run([args.regnm, None], args.tst)
import json
import argparse
from fireatlas.utils import timed
from fireatlas import FireRunDaskCoordinator
def validate_json(s):
try:
return json.loads(s)
except ValueError:
raise argparse.ArgumentTypeError("Not a valid JSON string")
@timed
def Run():
""" download data from different satellite sensors at the
start of every 2nd hour from 1am through 11pm: `0 1-23/2 * * *`
the jobs are being scheduled here: https://repo.ops.maap-project.org/eorland_gee/fireatlas_nrt/-/pipeline_schedules
:return: None
"""
FireRunDaskCoordinator.job_data_update_checker()
if __name__ == "__main__":
"""Downloads the NRT files at a regular interval
Example:
python3 FireRunDataUpdateChecker.py
"""
Run()
import json
import argparse
import fireatlas
from fireatlas.utils import timed
from fireatlas import FireRunDaskCoordinator
def validate_json(s):
try:
print(f"${s}$")
return json.loads(s)
except ValueError:
raise argparse.ArgumentTypeError("Not a valid JSON string")
@timed
def Run(region: fireatlas.FireTypes.Region, tst: fireatlas.FireTypes.TimeStep, ted: fireatlas.FireTypes.TimeStep):
FireRunDaskCoordinator.job_fire_forward([None,], region, tst, ted)
if __name__ == "__main__":
""" The main code to run time forwarding for a time period
Example:
python3 FireRunFireForward.py --regnm="CaliTestRun" --tst="[2023,6,1,\"AM\"]" --ted="[2023,9,1,\"AM\"]"
"""
parser = argparse.ArgumentParser()
parser.add_argument("--regnm", type=str)
parser.add_argument("--tst", type=validate_json)
parser.add_argument("--ted", type=validate_json)
args = parser.parse_args()
Run([args.regnm, None], args.tst, args.ted)
import json
import argparse
import fireatlas
from fireatlas.utils import timed
from fireatlas import FireRunDaskCoordinator
def validate_json(s):
try:
return json.loads(s)
except ValueError:
raise argparse.ArgumentTypeError("Not a valid JSON string")
@timed
def Run(region: fireatlas.FireTypes.Region):
FireRunDaskCoordinator.job_preprocess_region(region)
if __name__ == "__main__":
""" The main code to run preprocessing for a region and time period. It writes to a dedicated directory on s3.
Example:
python3 FireRunPreprocessRegion.py --regnm="WesternUS" --bbox="[-119.5, 36.8, -118.9, 37.7]"
"""
parser = argparse.ArgumentParser()
parser.add_argument("--regnm", type=str)
parser.add_argument("--bbox", type=validate_json)
args = parser.parse_args()
Run([args.regnm, args.bbox])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment