diff --git a/src/ich/__pycache__/algo.cpython-310.pyc b/src/ich/__pycache__/algo.cpython-310.pyc index 0535df8f8f579b7ed10090f39bb9b9ff9fdff371..2c7ddafe22fdcf135f9ba2c7fc53c1a3e4ecae6b 100644 Binary files a/src/ich/__pycache__/algo.cpython-310.pyc and b/src/ich/__pycache__/algo.cpython-310.pyc differ diff --git a/src/ich/algo.py b/src/ich/algo.py index bca2d9b46f538d9624adb92bdbec20018a1e0236..8f4a64068fd8a7c3688f96638f7d7a89211aaa8a 100644 --- a/src/ich/algo.py +++ b/src/ich/algo.py @@ -26,6 +26,7 @@ from concurrent.futures import ProcessPoolExecutor, as_completed import os from concurrent.futures import ThreadPoolExecutor from scipy.interpolate import PchipInterpolator +import gc # Precompute XX and YY XX = np.linspace(0, np.pi, num=100, endpoint=True) @@ -275,6 +276,7 @@ def process_block(i, j, cohArray, lidarArray, initial_ws, htl, htg, parm_): ht = arc_sinc(gama, parm[2]) * mask # ht = arc_sinc_fast(gama, parm[2]) * mask # print(lidarBlock.shape) + del lidarBlock, cohBlock return start_i, end_i, start_j, end_j, s_parm, c_parm, rmse_parm, ht, count else: S += 2 @@ -287,37 +289,106 @@ def process_block(i, j, cohArray, lidarArray, initial_ws, htl, htg, parm_): print(f"Unable to find finite block at position ({i}, {j}).") return start_i, end_i, start_j, end_j, np.zeros_like(lidarBlock), np.zeros_like(lidarBlock), 0, np.zeros_like(lidarBlock), 0 -def dynamicWindow(cohArray, lidarArray, initial_ws, htl, htg): +def dynamicWindow(cohArray, lidarArray, initial_ws, htl, htg, batch_size=10): rows, cols = cohArray.shape - c_parm = np.zeros((rows, cols)) - s_parm = np.zeros((rows, cols)) - rmse_parm = np.zeros((rows, cols)) - count = np.zeros((rows, cols)) - ht_ = np.zeros((rows, cols)) + s_parm = np.zeros((rows, cols), dtype=np.float32) + c_parm = np.zeros((rows, cols), dtype=np.float32) + rmse_parm = np.zeros((rows, cols), dtype=np.float32) + count = np.zeros((rows, cols), dtype=np.int16) + ht_ = np.zeros((rows, cols), dtype=np.float32) + parm_ = [0, 0, 0, 0, 0] - num_workers = os.cpu_count()-1 + num_workers = os.cpu_count() - 1 futures = [] with ProcessPoolExecutor(max_workers=num_workers) as executor: - for i in range(0, rows, initial_ws): - for j in range(0, cols, initial_ws): - futures.append(executor.submit(process_block, i, j, cohArray, lidarArray, initial_ws, htl, htg, parm_)) + # Loop through rows and columns in batches + for i in range(0, rows, initial_ws * batch_size): + for j in range(0, cols, initial_ws * batch_size): + # Collect blocks for the current batch + batch_futures = [] + for bi in range(batch_size): + for bj in range(batch_size): + block_i = i + bi * initial_ws + block_j = j + bj * initial_ws + if block_i < rows and block_j < cols: + batch_futures.append((block_i, block_j)) + + # Submit the batch as a single task + futures.append(executor.submit(process_batch, batch_futures, cohArray, lidarArray, initial_ws, htl, htg, parm_)) # Initialize the progress bar with the total number of futures with tqdm(total=len(futures)) as pbar: completed_jobs = 0 for future in as_completed(futures): - start_i, end_i, start_j, end_j, s_p, c_p, r_p, ht, cnt = future.result() - s_parm[start_i:end_i, start_j:end_j] = s_p - c_parm[start_i:end_i, start_j:end_j] = c_p - rmse_parm[start_i:end_i, start_j:end_j] = r_p - ht_[start_i:end_i, start_j:end_j] = ht - count[start_i:end_i, start_j:end_j] = cnt + # Process results from each batch + results = future.result() + for start_i, end_i, start_j, end_j, s_p, c_p, r_p, ht, cnt in results: + s_parm[start_i:end_i, start_j:end_j] = s_p + c_parm[start_i:end_i, start_j:end_j] = c_p + rmse_parm[start_i:end_i, start_j:end_j] = r_p + ht_[start_i:end_i, start_j:end_j] = ht + count[start_i:end_i, start_j:end_j] = cnt completed_jobs += 1 - if completed_jobs % 100 == 0: # Update every 100 jobs - pbar.update(100) + pbar.update(1) + del results,future + gc.collect() return s_parm, c_parm, rmse_parm, ht_, count + +def process_batch(batch_futures, cohArray, lidarArray, initial_ws, htl, htg, parm_): + results = [] + for block_i, block_j in batch_futures: + # Call process_block for each block in the batch + start_i, end_i, start_j, end_j, s_p, c_p, r_p, ht, cnt = process_block(block_i, block_j, cohArray, lidarArray, initial_ws, htl, htg, parm_) + results.append((start_i, end_i, start_j, end_j, s_p, c_p, r_p, ht, cnt)) + # Invoke garbage collection + # gc.collect() + return results + + + + + + + + + + +# def dynamicWindow(cohArray, lidarArray, initial_ws, htl, htg): +# rows, cols = cohArray.shape +# c_parm = np.zeros((rows, cols)) +# s_parm = np.zeros((rows, cols)) +# rmse_parm = np.zeros((rows, cols)) +# count = np.zeros((rows, cols)) +# ht_ = np.zeros((rows, cols)) + +# parm_ = [0, 0, 0, 0, 0] + +# num_workers = os.cpu_count()-1 + +# futures = [] +# with ProcessPoolExecutor(max_workers=num_workers) as executor: +# for i in range(0, rows, initial_ws): +# for j in range(0, cols, initial_ws): +# futures.append(executor.submit(process_block, i, j, cohArray, lidarArray, initial_ws, htl, htg, parm_)) + +# # Initialize the progress bar with the total number of futures +# with tqdm(total=len(futures)) as pbar: +# completed_jobs = 0 +# for future in as_completed(futures): +# start_i, end_i, start_j, end_j, s_p, c_p, r_p, ht, cnt = future.result() +# s_parm[start_i:end_i, start_j:end_j] = s_p +# c_parm[start_i:end_i, start_j:end_j] = c_p +# rmse_parm[start_i:end_i, start_j:end_j] = r_p +# ht_[start_i:end_i, start_j:end_j] = ht +# count[start_i:end_i, start_j:end_j] = cnt + +# completed_jobs += 1 +# if completed_jobs % 100 == 0: # Update every 100 jobs +# pbar.update(100) + +# return s_parm, c_parm, rmse_parm, ht_, count