Skip to content
Snippets Groups Projects
combine_largefire.py 10.1 KiB
Newer Older
ranchodeluxe's avatar
ranchodeluxe committed
import argparse
gsfc_landslides's avatar
gsfc_landslides committed
import pandas as pd
import geopandas as gpd
ranchodeluxe's avatar
ranchodeluxe committed
import s3fs
ranchodeluxe's avatar
ranchodeluxe committed
import boto3
import re
gsfc_landslides's avatar
gsfc_landslides committed
import time
ranchodeluxe's avatar
ranchodeluxe committed
from pathlib import Path
gsfc_landslides's avatar
gsfc_landslides committed
from FireConsts import diroutdata
ranchodeluxe's avatar
ranchodeluxe committed
from dask.distributed import Client
from FireLog import logger
ranchodeluxe's avatar
ranchodeluxe committed
LAYERS = ["nfplist", "newfirepix", "fireline", "perimeter"]
ranchodeluxe's avatar
ranchodeluxe committed
MAX_WORKERS = 14
TARGET_MAAP_INPUT_BUCKET_NAME = "CONUS_NRT_DPS" #"WesternUS_REDO"
LOCAL_DIR_OUTPUT_PREFIX_PATH = f"/tmp/{TARGET_MAAP_INPUT_BUCKET_NAME}/LargeFire_Outputs"
ranchodeluxe's avatar
ranchodeluxe committed

ranchodeluxe's avatar
ranchodeluxe committed

ranchodeluxe's avatar
ranchodeluxe committed
def mkdir_dash_p(parent_output_path):
    """named after linux bash `mkdir -p`

    this function will create all parent folders
    for a path if they don't already exist and
ranchodeluxe's avatar
ranchodeluxe committed
    if they do exist, it will gracefully ignore them
ranchodeluxe's avatar
ranchodeluxe committed

    Examples:
ranchodeluxe's avatar
ranchodeluxe committed
        input: `mkdir_dash_p('/tmp/foo/bar/doobie/README.txt')`
ranchodeluxe's avatar
ranchodeluxe committed
        output: all nested parent directories of the file are created "/tmp/foo/bar/doobie"

    :param parent_output_path:
    :return:
    """
    path = Path(parent_output_path)
    path.parent.mkdir(parents=True, exist_ok=True)


ranchodeluxe's avatar
ranchodeluxe committed
def copy_from_maap_to_veda_s3(from_maap_s3_path):
ranchodeluxe's avatar
ranchodeluxe committed
    """from MAAP to VEDA s3

    :param from_maap_s3_path: s3 MAAP path
    :return: None
ranchodeluxe's avatar
ranchodeluxe committed
    """
ranchodeluxe's avatar
ranchodeluxe committed
    s3_client = boto3.client("s3")

ranchodeluxe's avatar
ranchodeluxe committed
    if "LargeFire" in from_maap_s3_path:
ranchodeluxe's avatar
ranchodeluxe committed
        try:
ranchodeluxe's avatar
ranchodeluxe committed
            fname_regex = r"^s3://maap.*?(/LargeFire_Outputs/)merged/(?P<fname>lf_fireline_archive.fgb|lf_fireline_nrt.fgb|lf_perimeter_archive.fgb|lf_perimeter_nrt.fgb|lf_newfirepix_archive.fgb|lf_newfirepix_nrt.fgb|lf_nfplist_archive.fgb|lf_nfplist_nrt.fgb)$"
ranchodeluxe's avatar
ranchodeluxe committed
            # note that `destination_dict` should resemble this output with a match if the URL was a perimeter file:
            # {'fname': 'lf_perimeter.fgb'}
            destination_dict = (
                re.compile(fname_regex).match(from_maap_s3_path).groupdict()
            )
        except AttributeError:
ranchodeluxe's avatar
ranchodeluxe committed
            logger.error(f"[ NO REGEX MATCH FOUND ]: for file {from_maap_s3_path}")
ranchodeluxe's avatar
ranchodeluxe committed
            return

        from_maap_s3_path = from_maap_s3_path.replace("s3://", "")
        s3_client.copy_object(
            CopySource=from_maap_s3_path,  # full bucket path
            Bucket="veda-data-store-staging",  # Destination bucket
ranchodeluxe's avatar
ranchodeluxe committed
            Key=f"EIS/FEDSoutput/LFArchive/{destination_dict['fname']}",
ranchodeluxe's avatar
ranchodeluxe committed
        )
    else:
ranchodeluxe's avatar
ranchodeluxe committed
        logger.error(f"[ NO S3 COPY EXPORTED ]: for file {from_maap_s3_path}")
ranchodeluxe's avatar
ranchodeluxe committed

ranchodeluxe's avatar
ranchodeluxe committed
def merge_lf_years(
ranchodeluxe's avatar
ranchodeluxe committed
    parent_years_folder_input_path,
    maap_output_folder_path,
ranchodeluxe's avatar
ranchodeluxe committed
    layers=LAYERS,
ranchodeluxe's avatar
ranchodeluxe committed
):
ranchodeluxe's avatar
ranchodeluxe committed
    """using `glob` and `concat` merge all large fire layers across years and write back to MAAP s3

    :param years_range: a list of year ints
    :param parent_folder_path: local dir path to the folder that houses all output years from `combine_per
    :param layers: a list of layer strings
ranchodeluxe's avatar
ranchodeluxe committed
    :return:
    """
    for layer in layers:
ranchodeluxe's avatar
ranchodeluxe committed
        folder = Path(parent_years_folder_input_path)
ranchodeluxe's avatar
ranchodeluxe committed
        logger.info(f"[ PARENT ]: years folder path: {folder}")
ranchodeluxe's avatar
ranchodeluxe committed
        flatgeobufs_by_layer_and_year = list(folder.glob(f"*/lf_{layer}.fgb"))
ranchodeluxe's avatar
ranchodeluxe committed
        logger.info(f"[ CHILD ]: fgb(s) by year: {flatgeobufs_by_layer_and_year}")
        gpd_by_year = [gpd.read_file(fgb) for fgb in flatgeobufs_by_layer_and_year]
        logger.info(f"[ GPD ]: frames by year: {gpd_by_year}")
ranchodeluxe's avatar
ranchodeluxe committed
        gdf = pd.concat(gpd_by_year).pipe(gpd.GeoDataFrame)
ranchodeluxe's avatar
ranchodeluxe committed
        maap_s3_layer_path = f"{maap_output_folder_path}/lf_{layer}_archive.fgb"
        if IS_NRT_RUN:
            maap_s3_layer_path = f"{maap_output_folder_path}/lf_{layer}_nrt.fgb"
ranchodeluxe's avatar
ranchodeluxe committed
        gdf.to_file(
            maap_s3_layer_path,
            driver="FlatGeobuf",
        )
ranchodeluxe's avatar
ranchodeluxe committed
        if IS_PRODUCTION_RUN:
ranchodeluxe's avatar
ranchodeluxe committed
            copy_from_maap_to_veda_s3(maap_s3_layer_path)
ranchodeluxe's avatar
ranchodeluxe committed
def load_lf(lf_id, file_path, layer="nfplist", drop_duplicate_geometries=False):
ranchodeluxe's avatar
ranchodeluxe committed
    """find the large fire pickled file by id

ranchodeluxe's avatar
ranchodeluxe committed
    :param lf_id: integer
ranchodeluxe's avatar
ranchodeluxe committed
    :param file_path: s3 MAAP path to inputs
ranchodeluxe's avatar
ranchodeluxe committed
    :param layer: str
    :param drop_duplicate_geometries: bool
    :return: pandas.DataFrame
    """
ranchodeluxe's avatar
ranchodeluxe committed
    try:
ranchodeluxe's avatar
ranchodeluxe committed
        logger.info(f"[ READ FILE ]: {file_path}/{layer}")
ranchodeluxe's avatar
ranchodeluxe committed
        gdf = gpd.read_file(file_path, layer=layer)
    except Exception as e:
        logger.exception(e)
gsfc_landslides's avatar
gsfc_landslides committed
        return
ranchodeluxe's avatar
ranchodeluxe committed
    gdf["ID"] = lf_id

    if (drop_duplicate_geometries == True) and (layer != "nfplist"):
        gdf.sort_values("t", ascending=True, inplace=True)
        gdf = gdf.loc[gdf["geometry"].drop_duplicates(keep="first").index]
gsfc_landslides's avatar
gsfc_landslides committed
    return gdf

ranchodeluxe's avatar
ranchodeluxe committed

ranchodeluxe's avatar
ranchodeluxe committed
def write_lf_layers_by_year(
    year, s3_maap_input_path, LOCAL_DIR_OUTPUT_PREFIX_PATH, layers=LAYERS
ranchodeluxe's avatar
ranchodeluxe committed
):
ranchodeluxe's avatar
ranchodeluxe committed
    """ for each layer write out the most recent lf layer

    :param year: int
    :param s3_maap_input_path: the s3 MAAP path
    :param LOCAL_DIR_OUTPUT_PREFIX_PATH: the local dir output prefix
ranchodeluxe's avatar
ranchodeluxe committed
    :param layers: a list of layer strings
    :return: None
ranchodeluxe's avatar
ranchodeluxe committed
    """
    s3 = s3fs.S3FileSystem(anon=False)
ranchodeluxe's avatar
ranchodeluxe committed

    # load in NRT Largefire data
    lf_files = [f for f in s3.ls(s3_maap_input_path)]

gsfc_landslides's avatar
gsfc_landslides committed
    lf_files.sort()
ranchodeluxe's avatar
ranchodeluxe committed
    lf_ids = list(
        set([file.split("Largefire/")[1].split("_")[0] for file in lf_files])
    )  # unique lf ids
gsfc_landslides's avatar
gsfc_landslides committed

    # each largefire id has a file for each timestep which has entire evolution up to that point.
    # the latest one has the most up-to-date info for that fire
    largefire_dict = dict.fromkeys(lf_ids)
    for lf_id in lf_ids:
ranchodeluxe's avatar
ranchodeluxe committed
        most_recent_file = (
            "s3://" + [file for file in lf_files if lf_id in file][-1]
        )  # most recent file is last!
gsfc_landslides's avatar
gsfc_landslides committed
        largefire_dict[lf_id] = most_recent_file
ranchodeluxe's avatar
ranchodeluxe committed

ranchodeluxe's avatar
ranchodeluxe committed
    # NOTE: there's another opportunity to speed up code here for each year process
    # if we spin up a couple more dask workers per year and scatter each layer reads/writes
ranchodeluxe's avatar
ranchodeluxe committed
    for layer in layers:
        all_lf_per_layer = pd.concat(
            [
                load_lf(lf_id, file_path, layer=f"{layer}")
                for lf_id, file_path in largefire_dict.items()
            ],
            ignore_index=True,
        )
ranchodeluxe's avatar
ranchodeluxe committed

        layer_output_fgb_path = f"{LOCAL_DIR_OUTPUT_PREFIX_PATH}/{year}/lf_{layer}.fgb"
ranchodeluxe's avatar
ranchodeluxe committed
        # create all parent directories for local output (if they don't exist already)
ranchodeluxe's avatar
ranchodeluxe committed
        mkdir_dash_p(layer_output_fgb_path)
ranchodeluxe's avatar
ranchodeluxe committed
        all_lf_per_layer.to_file(
ranchodeluxe's avatar
ranchodeluxe committed
            layer_output_fgb_path,
ranchodeluxe's avatar
ranchodeluxe committed
            driver="FlatGeobuf",
        )

ranchodeluxe's avatar
ranchodeluxe committed

def main(years_range, in_parallel=False):
    """
    :param years_range: a list of year integers
    :param in_parallel: bool
    :return: None
    """
    if not in_parallel:
        for year in years_range:
            s3_maap_input_path = f"{diroutdata}{TARGET_MAAP_INPUT_BUCKET_NAME}/{year}/Largefire/"
            write_lf_layers_by_year(year, s3_maap_input_path, LOCAL_DIR_OUTPUT_PREFIX_PATH)
ranchodeluxe's avatar
ranchodeluxe committed
        merge_lf_years(
            LOCAL_DIR_OUTPUT_PREFIX_PATH,
            f"{diroutdata}{TARGET_MAAP_INPUT_BUCKET_NAME}/LargeFire_Outputs/merged",
ranchodeluxe's avatar
ranchodeluxe committed
        )
ranchodeluxe's avatar
ranchodeluxe committed
        return

    #############################################
    # in parallel via dask
    #############################################
    # this assumes we are running on our largest worker instance where we have 16 CPU
    # so we limit (using modulo) to 14 workers at most but default to using `len(years_range)` workers
ranchodeluxe's avatar
ranchodeluxe committed

    dask_client = Client(n_workers=(len(years_range) % MAX_WORKERS))
ranchodeluxe's avatar
ranchodeluxe committed
    logger.info(f"workers = {len(dask_client.cluster.workers)}")
    tstart = time.time()
    # set up work items
    futures = [
        dask_client.submit(
ranchodeluxe's avatar
ranchodeluxe committed
            write_lf_layers_by_year,
ranchodeluxe's avatar
ranchodeluxe committed
            year,
            f"{diroutdata}{TARGET_MAAP_INPUT_BUCKET_NAME}/{year}/Largefire/",
            LOCAL_DIR_OUTPUT_PREFIX_PATH,
ranchodeluxe's avatar
ranchodeluxe committed
        )
        for year in years_range
    ]
    # join children and wait to finish
    dask_client.gather(futures)
    logger.info(
        f"workers after dask_client.gather = {len(dask_client.cluster.workers)}"
    )
    tend = time.time()
ranchodeluxe's avatar
ranchodeluxe committed
    logger.info(f'"write_lf_layers_by_year" in parallel: {(tend - tstart) / 60} minutes')
ranchodeluxe's avatar
ranchodeluxe committed
    dask_client.restart()
ranchodeluxe's avatar
ranchodeluxe committed
    merge_lf_years(
        LOCAL_DIR_OUTPUT_PREFIX_PATH,
        f"{diroutdata}{TARGET_MAAP_INPUT_BUCKET_NAME}/LargeFire_Outputs/merged",
ranchodeluxe's avatar
ranchodeluxe committed
    )
ranchodeluxe's avatar
ranchodeluxe committed

gsfc_landslides's avatar
gsfc_landslides committed

if __name__ == "__main__":
ranchodeluxe's avatar
ranchodeluxe committed
    """
    Examples:
        # run a single year
        python3 combine_largefire.py -s 2023 -e 2023 
        
        # run multiple years in parallel
        python3 combine_largefire.py -s 2018 -e 2022 -p
        # run multiple years in parallel in PRODUCTION mode, by default this is the LF archive (not NRT)
        # PRODUCTION mode means outputs get `aws s3 cp` to s3://veda-data-store-staging
ranchodeluxe's avatar
ranchodeluxe committed
        python3 combine_largefire.py -s 2018 -e 2022 -p -x
        
        # run NRT current year in PRODUCTION mode
        # PRODUCTION mode means outputs get `aws s3 cp` to s3://veda-data-store-staging
        python3 combine_largefire.py -s 2023 -e 20203 -x --nrt
ranchodeluxe's avatar
ranchodeluxe committed
    """
ranchodeluxe's avatar
ranchodeluxe committed
    parser = argparse.ArgumentParser()
ranchodeluxe's avatar
ranchodeluxe committed
    parser.add_argument(
        "-s", "--start-year", required=True, type=int, help="start year int"
    )
    parser.add_argument(
        "-e", "--end-year", required=True, type=int, help="end year int"
    )
    parser.add_argument(
        "-p",
        "--parallel",
        action="store_true",
        help="turn on dask processing years in parallel",
    )
ranchodeluxe's avatar
ranchodeluxe committed
    parser.add_argument(
        "-x",
        "--production-run",
        action="store_true",
        help="creates a flag/trap for us to know this is running as the PRODUCTION job to avoid overwrite VEDA s3 data",
    )
    parser.add_argument(
        "-n",
        "--nrt",
        action="store_true",
        help="creates a flag/trap to know if this is LF archive (all years) or LF NRT (current year)",
    )
ranchodeluxe's avatar
ranchodeluxe committed
    args = parser.parse_args()
ranchodeluxe's avatar
ranchodeluxe committed
    # set global flag/trap to protect VEDA s3 copy
    global IS_PRODUCTION_RUN
    IS_PRODUCTION_RUN = args.production_run

    # set global flag/trap for NRT
    global IS_NRT_RUN
    IS_NRT_RUN = args.nrt

ranchodeluxe's avatar
ranchodeluxe committed
    # validate `years_range` construction
    years_range = list(range(args.start_year, args.end_year + 1))
    if years_range[0] != args.start_year or years_range[-1] != args.end_year:
        raise ValueError(
            f"[ ERROR ]: the range='{years_range}' doesn't start or end with inputs='{args.start_year}-{args.end_year}'"
        )
ranchodeluxe's avatar
ranchodeluxe committed
    start = time.time()
    logger.info(f"Running algo with year range: '{years_range}'")
    main(years_range, in_parallel=args.parallel)
    total = time.time() - start
ranchodeluxe's avatar
ranchodeluxe committed
    logger.info(f"Total runtime is {str(total / 60)} minutes")