diff --git a/algorithm_config.yaml b/algorithm_config.yaml index 59d38f2bab77beee5f10de52ddf2cd7d6cd60b0a..3afd3cc9003bb4fc81f2bf77d42ab17de2bc22f7 100644 --- a/algorithm_config.yaml +++ b/algorithm_config.yaml @@ -1,6 +1,6 @@ description: Estimates canopy height from InSAR coherence and LiDAR data algo_name: ich -version: 0.4.1 +version: 0.5 environment: ubuntu repository_url: https://repo.maap-project.org/bnarayanarao/insar_forest_height.git docker_url: mas.maap-project.org/root/maap-workspaces/base_images/python:v4.1.0 diff --git a/src/ich/__pycache__/algo.cpython-310.pyc b/src/ich/__pycache__/algo.cpython-310.pyc index 361785fd8a34583250cd10f5606ed605b7ecb91b..646093577efcb78c07a364cf7c0abfe40b23e6a8 100644 Binary files a/src/ich/__pycache__/algo.cpython-310.pyc and b/src/ich/__pycache__/algo.cpython-310.pyc differ diff --git a/src/ich/algo.py b/src/ich/algo.py index c3ac5f42c19df1a76389a6d1811417fe4652d7c7..b82061756e43ae9f14d4e6bcc4078f57fed5ebf4 100644 --- a/src/ich/algo.py +++ b/src/ich/algo.py @@ -27,6 +27,7 @@ import os from concurrent.futures import ThreadPoolExecutor from scipy.interpolate import PchipInterpolator import gc +import tracemalloc # Precompute XX and YY XX = np.linspace(0, np.pi, num=100, endpoint=True) @@ -184,6 +185,7 @@ def cal_(temp_cor, temp_gedi, htl, htg): result = np.array(result) tempdf = pd.DataFrame(data={'N': result[:, 0], 'S': result[:, 1], 'C': result[:, 2], 'r': result[:, 3], 'rmse': result[:, 4]}) + del result tempdf.dropna(subset=['rmse'], inplace=True) if nn>6: tempdf = tempdf[tempdf['N'] > 3].sort_values(by=['rmse'], ascending=True) @@ -193,7 +195,7 @@ def cal_(temp_cor, temp_gedi, htl, htg): else: sCoarse = np.round(tempdf.iloc[0]['S'], 2) cCoarse = np.round(tempdf.iloc[0]['C'], 2) - + del tempdf result = [] for S_param in np.arange(sCoarse - 0.1, sCoarse + 0.1, 0.02): for C_param in np.arange(cCoarse - 1, cCoarse + 1, 0.2): @@ -252,6 +254,8 @@ def process_block(i, j, cohArray, lidarArray, initial_ws, htl, htg, parm_): rmse_parm = 0 ht = np.zeros(cohBlock.shape) # print(lidarBlock.shape) + del lidarBlock, cohBlock,lidarArray, cohArray + gc.collect() return start_i, end_i, start_j, end_j, s_parm, c_parm, rmse_parm, ht, count # elif (np.isfinite(lidarBlock).any() and np.count_nonzero(~np.isnan(lidarBlock)) > 1) or max(lidarBlock.shape)>512: @@ -278,7 +282,7 @@ def process_block(i, j, cohArray, lidarArray, initial_ws, htl, htg, parm_): ht = arc_sinc(gama, parm[2]) * mask # ht = arc_sinc_fast(gama, parm[2]) * mask # print(lidarBlock.shape) - del lidarBlock, cohBlock + del lidarBlock, cohBlock,lidarArray, cohArray,parm,mask,gama gc.collect() return start_i, end_i, start_j, end_j, s_parm, c_parm, rmse_parm, ht, count else: @@ -292,6 +296,7 @@ def process_block(i, j, cohArray, lidarArray, initial_ws, htl, htg, parm_): print(f"Unable to find finite block at position ({i}, {j}).") return start_i, end_i, start_j, end_j, np.zeros_like(lidarBlock), np.zeros_like(lidarBlock), 0, np.zeros_like(lidarBlock), 0 +## blocks (windows) are processed in parallel bacthes def dynamicWindow(cohArray, lidarArray, initial_ws, htl, htg, batch_size=10): rows, cols = cohArray.shape s_parm = np.zeros((rows, cols), dtype=np.float32) @@ -300,36 +305,53 @@ def dynamicWindow(cohArray, lidarArray, initial_ws, htl, htg, batch_size=10): count = np.zeros((rows, cols), dtype=np.int16) ht_ = np.zeros((rows, cols), dtype=np.float32) - parm_ = [0, 0, 0, 0, 0] num_workers = os.cpu_count() - 1 - futures = [] + + # Calculate the total number of batches (for progress bar) + total_batches = ((rows // initial_ws) // batch_size) * ((cols // initial_ws) // batch_size) with ProcessPoolExecutor(max_workers=num_workers) as executor: # Loop through rows and columns in batches - for i in range(0, rows, initial_ws * batch_size): - for j in range(0, cols, initial_ws * batch_size): - # Collect blocks for the current batch - batch_futures = [] - for bi in range(batch_size): - for bj in range(batch_size): - block_i = i + bi * initial_ws - block_j = j + bj * initial_ws - if block_i < rows and block_j < cols: - batch_futures.append((block_i, block_j)) - - future = executor.submit(process_batch, batch_futures, cohArray, lidarArray, initial_ws, htl, htg, parm_) - - futures.append(future) - del future,batch_futures - - # Initialize the progress bar with the total number of futures - with tqdm(total=len(futures)) as pbar: - completed_jobs = 0 - for future in as_completed(futures): - # Process results from each batch - results = future.result() + with tqdm(total=total_batches) as pbar: # Set progress bar for the total number of batches + for i in range(0, rows, initial_ws * batch_size): + for j in range(0, cols, initial_ws * batch_size): + # Collect blocks for the current batch + batch_futures = [] + for bi in range(batch_size): + for bj in range(batch_size): + block_i = i + bi * initial_ws + block_j = j + bj * initial_ws + if block_i < rows and block_j < cols: + batch_futures.append((block_i, block_j)) + + # Submit the batch to the executor (parallel execution) + future = executor.submit(process_batch, batch_futures, cohArray, lidarArray, initial_ws, htl, htg, parm_) + futures.append(future) + + # Once we accumulate enough futures, process them in parallel + if len(futures) >= num_workers: + for completed_future in as_completed(futures): + results = completed_future.result() + for start_i, end_i, start_j, end_j, s_p, c_p, r_p, ht, cnt in results: + s_parm[start_i:end_i, start_j:end_j] = s_p + c_parm[start_i:end_i, start_j:end_j] = c_p + rmse_parm[start_i:end_i, start_j:end_j] = r_p + ht_[start_i:end_i, start_j:end_j] = ht + count[start_i:end_i, start_j:end_j] = cnt + + # Update progress for each completed batch (not each block) + pbar.update(1) + + # Free the future's memory once processed + futures.remove(completed_future) + del completed_future + gc.collect() + + # Ensure any remaining futures are processed + for completed_future in as_completed(futures): + results = completed_future.result() for start_i, end_i, start_j, end_j, s_p, c_p, r_p, ht, cnt in results: s_parm[start_i:end_i, start_j:end_j] = s_p c_parm[start_i:end_i, start_j:end_j] = c_p @@ -337,12 +359,11 @@ def dynamicWindow(cohArray, lidarArray, initial_ws, htl, htg, batch_size=10): ht_[start_i:end_i, start_j:end_j] = ht count[start_i:end_i, start_j:end_j] = cnt - completed_jobs += 1 + # Update progress for each completed batch pbar.update(1) - del results,future + + del completed_future gc.collect() - # Clear the futures list to prevent accumulation - futures.clear() return s_parm, c_parm, rmse_parm, ht_, count @@ -352,19 +373,14 @@ def process_batch(batch_futures, cohArray, lidarArray, initial_ws, htl, htg, par # Call process_block for each block in the batch start_i, end_i, start_j, end_j, s_p, c_p, r_p, ht, cnt = process_block(block_i, block_j, cohArray, lidarArray, initial_ws, htl, htg, parm_) results.append((start_i, end_i, start_j, end_j, s_p, c_p, r_p, ht, cnt)) + + del batch_futures, cohArray, lidarArray, initial_ws, htl, htg, parm_ # Invoke garbage collection gc.collect() return results - - - - - - - - +## blocks (windows) are processed in parallel # def dynamicWindow(cohArray, lidarArray, initial_ws, htl, htg): # rows, cols = cohArray.shape # c_parm = np.zeros((rows, cols))