From f9c19b341b81afc89c6d7083c5a7cb74837ae22d Mon Sep 17 00:00:00 2001
From: Narayana Rao Bhogapurapu <narayanarao.bhogapurapu@gmail.com>
Date: Sat, 12 Oct 2024 21:31:12 -0700
Subject: [PATCH] working batch

---
 src/ich/__pycache__/algo.cpython-310.pyc | Bin 6822 -> 7409 bytes
 src/ich/algo.py                          | 107 +++++++++++++++++++----
 2 files changed, 89 insertions(+), 18 deletions(-)

diff --git a/src/ich/__pycache__/algo.cpython-310.pyc b/src/ich/__pycache__/algo.cpython-310.pyc
index 0535df8f8f579b7ed10090f39bb9b9ff9fdff371..2c7ddafe22fdcf135f9ba2c7fc53c1a3e4ecae6b 100644
GIT binary patch
delta 1647
zcmZux&2Jk;6rVS{_O8Dj+l@bxIBDV}Hs&iSZPii^6%;s>3RIxQ6il+SHgVnBS#KyU
zGjSpPxDd5vrAkzM*r<YI4;FtwIdR|s5(l(5j))?J3O9s!V<#b{ozeW>o8QcvulL@3
zx%-cO;YKLrCs=MB4pfgnc`ZB&H?I}%wiz&5pDs<CT5W~_;w+1@Ar@O#OL<f46j;0q
zW}d+Tg1v{#yzGsTQjx{YqGZvg$Pzf3XB2DVqLL<PS=`nunmMy*=GSQHVYY(}Um&GN
zSdyhsKFao{Sq7EI*a#a%`8XS6S(I~ZoK2vdXESV)O<jP}E|z1{$UMR5QBs)geFFsw
z#oisI1+b_0mgfw>o4udBw*gM|Zs<M@t~)m)U&FV(QZxvlJJq2(>S|;)>Ku(V#tk0i
z`bl~fEQ5y_-O$3^H_&=N$9@CvulQK4mz^II2ivML3_6X$pyuf?<bnQRUBVf^$VWB2
z2%BXZl?b+{v9+G3gm6+LC#diW&4wD~UanOrY6^pOD)a|%zW;G;InjR)srHQn_i%#+
zgH^)l7a&5mr{nDsrlg1nSBZ$&8V@sd9j<&QqPEUFco51x39_a<L*95!4D~r}8+Sm&
z&@OK4ZSGkEoRkv@Ti?*f#11~h6PzYUl89j*<2yF=Bp>G9lf=5jlU(b|c*loavO>E6
zWO-@X;1Oh^9lssmaZWhkrV3YisH^v9yuSRB9kfF{vIfkH2W~TwVjhvU!;->-ydp9@
z(}{G67~vz>f9ix!jEXUl72{$;>=ct?O62$i*ZC+PyGVJKk6)w}dVuhq+;<;0$^G|n
zQy3vzw(Sk$91r|Q4!Gr8Jk6B7M5KAD6J?4#|LVwU#vba#x&&|g)iJ_|igir^a@sdX
zu0l)4V8t;YuVH(@cxwANULxtk69##p;=$0@jugQ4F#BxNbRx;worSnn#YnS;CDagH
ztx|6+*t_Ok&(djoZjW;~nUA=krqwXaR;#?YU>k2cXOp9cTv|0;&1lr?rePPRt#{-K
zXC%BU;h2Qu2qk~trrcUOXIkeZ`<{gNC4AuAP9A*G4Gm^gR_qnaY`JuC30uw87V8b;
zWO->*sg=~`;(}EQ8;w)Vx@nuNTx%@0T<^fp)zEmQZnvC?RKn{jt#Z?K_NV6BIgqJ{
z`}rZURdppuRT_i@O@Nex8ig>>^Yrftjs6u4tH|m!vt=XG*bgfBf!@vjL(gY_4^LWM
zT-eYMHcK3pAhv!6mk4PS%ma-RjI<d{WL@}df^P9`1uo{&nvOB(7Xc9zArTf45fwus
z#<dQh?*iz(5Xb1{QB0ax#n%sWm22ovI8vbfpTv3rw?uDVM{%z&#<w}cIxB}ik?^sE
z8wm2b4C_<b{Y=8=5-v#SNEq9KtkkX|yec15{#^2b@-z#o(&r0cT}HOB+ttct)-cLt
wS6{HI%>}DvS|9elPvrqVa{f#o!k<Mh<?PSQz_ZRmW){EL%b79Nzb`odF9&Fei~s-t

delta 1068
zcmZWnOOI4V6s}YE+1<BqKj%(A?hH6O^aBJ&LliR$MqIEmMixj1(wT7XbTSZu+Z|Cn
zWrO1wH$<5m6E`N^E{qFz9bK>i6a4`iR{VsCE({4KF3?jm6T+nHOMR!l@7$_ekMsA*
z?@#zwv)Q!ZmaArZCy(FtL)f}|`0+4u))x_&4`Ch)SiEC)8j&+<VrdLf17TUPYDEpL
z>Y~%eQq<Ni&uL?svkjDt@}^N0JhT0rYDX_c?WnOKJBM)wgU>|g2+m@K<I7mZ8por!
zSjUhD$8ZkkIUdIYc#z`~zJLoHUqN|Rv|gM%gqDQ%<f+jII5CN>8vqw4+x9O2S0+EY
zDF?nze$M>_zfP{?JpeoTsPJcUtPmu_;`sxs<dJ(>eg*L=Wl`>pd`j((CjS)w0(+Zp
zrvGlTU0xXmhAWG(i>3^q3G$ElbIxC~^!@F%d5(GRr@8kZfDk0s#RaJ>We;E^$s&7A
zjzKy5l)F!*_Nnx)8tzTZ^!Sdy1LX~jkvSG9D?(*^Liq!Wk73+_FTYW_fsH2rN^Tf$
ziHmiW-_iD-E~*0gBZqP$caWky6-W?}B!f)y#;GxYxPIpPsX=;>A%6qV;x)T!SVU8m
z2A;Nf(RWmt%A@R9Tu7-I6{uNNQB_q_brq^Ps*+1H6l_YGrOKvUlgol?<V<;WN=<no
zWv0A2N)M;Z63W+vDp3jTK%jY9#`mZ=^3mQo!TLFY!ene1Al`SEME^Ig4xw?L`&9Pf
z^I6`$!&}Qxdd=cObKZ_mC--L;gIGUpD;M`T#DS)Dro^f=t({UxmMh_r)_nYdPTkP-
zp{8@1-qUnb)5n@V)pR@gwz85q)%jXXS}RSTXc{tgjDGi8CplAHiWaz%`lr}SgZ~V!
z^rR_0$TJo|{+BmR@Sq0eXW%a&4f2+J5<Zo;!hij8yv2*OUVY?ryLk0dxBJLljeFNt
i<9-x>G0E1Okh~GDz*cfQJOpnhKZK3s_b`;+>;D7h<oF=~

diff --git a/src/ich/algo.py b/src/ich/algo.py
index bca2d9b..8f4a640 100644
--- a/src/ich/algo.py
+++ b/src/ich/algo.py
@@ -26,6 +26,7 @@ from concurrent.futures import ProcessPoolExecutor, as_completed
 import os
 from concurrent.futures import ThreadPoolExecutor
 from scipy.interpolate import PchipInterpolator
+import gc
 
 # Precompute XX and YY
 XX = np.linspace(0, np.pi, num=100, endpoint=True)
@@ -275,6 +276,7 @@ def process_block(i, j, cohArray, lidarArray, initial_ws, htl, htg, parm_):
             ht = arc_sinc(gama, parm[2]) * mask
             # ht = arc_sinc_fast(gama, parm[2]) * mask
             # print(lidarBlock.shape)
+            del lidarBlock, cohBlock
             return start_i, end_i, start_j, end_j, s_parm, c_parm, rmse_parm, ht, count
         else:
             S += 2
@@ -287,37 +289,106 @@ def process_block(i, j, cohArray, lidarArray, initial_ws, htl, htg, parm_):
                 print(f"Unable to find finite block at position ({i}, {j}).")
                 return start_i, end_i, start_j, end_j, np.zeros_like(lidarBlock), np.zeros_like(lidarBlock), 0, np.zeros_like(lidarBlock), 0
 
-def dynamicWindow(cohArray, lidarArray, initial_ws, htl, htg):
+def dynamicWindow(cohArray, lidarArray, initial_ws, htl, htg, batch_size=10):
     rows, cols = cohArray.shape
-    c_parm = np.zeros((rows, cols))
-    s_parm = np.zeros((rows, cols))
-    rmse_parm = np.zeros((rows, cols))
-    count = np.zeros((rows, cols))
-    ht_ = np.zeros((rows, cols))
+    s_parm = np.zeros((rows, cols), dtype=np.float32)
+    c_parm = np.zeros((rows, cols), dtype=np.float32)
+    rmse_parm = np.zeros((rows, cols), dtype=np.float32)
+    count = np.zeros((rows, cols), dtype=np.int16)
+    ht_ = np.zeros((rows, cols), dtype=np.float32)
+
 
     parm_ = [0, 0, 0, 0, 0]
 
-    num_workers = os.cpu_count()-1
+    num_workers = os.cpu_count() - 1
 
     futures = []
     with ProcessPoolExecutor(max_workers=num_workers) as executor:
-        for i in range(0, rows, initial_ws):
-            for j in range(0, cols, initial_ws):
-                futures.append(executor.submit(process_block, i, j, cohArray, lidarArray, initial_ws, htl, htg, parm_))
+        # Loop through rows and columns in batches
+        for i in range(0, rows, initial_ws * batch_size):
+            for j in range(0, cols, initial_ws * batch_size):
+                # Collect blocks for the current batch
+                batch_futures = []
+                for bi in range(batch_size):
+                    for bj in range(batch_size):
+                        block_i = i + bi * initial_ws
+                        block_j = j + bj * initial_ws
+                        if block_i < rows and block_j < cols:
+                            batch_futures.append((block_i, block_j))
+
+                # Submit the batch as a single task
+                futures.append(executor.submit(process_batch, batch_futures, cohArray, lidarArray, initial_ws, htl, htg, parm_))
 
         # Initialize the progress bar with the total number of futures
         with tqdm(total=len(futures)) as pbar:
             completed_jobs = 0
             for future in as_completed(futures):
-                start_i, end_i, start_j, end_j, s_p, c_p, r_p, ht, cnt = future.result()
-                s_parm[start_i:end_i, start_j:end_j] = s_p
-                c_parm[start_i:end_i, start_j:end_j] = c_p
-                rmse_parm[start_i:end_i, start_j:end_j] = r_p
-                ht_[start_i:end_i, start_j:end_j] = ht
-                count[start_i:end_i, start_j:end_j] = cnt
+                # Process results from each batch
+                results = future.result()
+                for start_i, end_i, start_j, end_j, s_p, c_p, r_p, ht, cnt in results:
+                    s_parm[start_i:end_i, start_j:end_j] = s_p
+                    c_parm[start_i:end_i, start_j:end_j] = c_p
+                    rmse_parm[start_i:end_i, start_j:end_j] = r_p
+                    ht_[start_i:end_i, start_j:end_j] = ht
+                    count[start_i:end_i, start_j:end_j] = cnt
 
                 completed_jobs += 1
-                if completed_jobs % 100 == 0:  # Update every 100 jobs
-                    pbar.update(100)
+                pbar.update(1)
+                del results,future
+                gc.collect()
 
     return s_parm, c_parm, rmse_parm, ht_, count
+
+def process_batch(batch_futures, cohArray, lidarArray, initial_ws, htl, htg, parm_):
+    results = []
+    for block_i, block_j in batch_futures:
+        # Call process_block for each block in the batch
+        start_i, end_i, start_j, end_j, s_p, c_p, r_p, ht, cnt = process_block(block_i, block_j, cohArray, lidarArray, initial_ws, htl, htg, parm_)
+        results.append((start_i, end_i, start_j, end_j, s_p, c_p, r_p, ht, cnt))
+    # Invoke garbage collection
+    # gc.collect()
+    return results
+
+
+
+
+
+
+
+
+
+
+# def dynamicWindow(cohArray, lidarArray, initial_ws, htl, htg):
+#     rows, cols = cohArray.shape
+#     c_parm = np.zeros((rows, cols))
+#     s_parm = np.zeros((rows, cols))
+#     rmse_parm = np.zeros((rows, cols))
+#     count = np.zeros((rows, cols))
+#     ht_ = np.zeros((rows, cols))
+
+#     parm_ = [0, 0, 0, 0, 0]
+
+#     num_workers = os.cpu_count()-1
+
+#     futures = []
+#     with ProcessPoolExecutor(max_workers=num_workers) as executor:
+#         for i in range(0, rows, initial_ws):
+#             for j in range(0, cols, initial_ws):
+#                 futures.append(executor.submit(process_block, i, j, cohArray, lidarArray, initial_ws, htl, htg, parm_))
+
+#         # Initialize the progress bar with the total number of futures
+#         with tqdm(total=len(futures)) as pbar:
+#             completed_jobs = 0
+#             for future in as_completed(futures):
+#                 start_i, end_i, start_j, end_j, s_p, c_p, r_p, ht, cnt = future.result()
+#                 s_parm[start_i:end_i, start_j:end_j] = s_p
+#                 c_parm[start_i:end_i, start_j:end_j] = c_p
+#                 rmse_parm[start_i:end_i, start_j:end_j] = r_p
+#                 ht_[start_i:end_i, start_j:end_j] = ht
+#                 count[start_i:end_i, start_j:end_j] = cnt
+
+#                 completed_jobs += 1
+#                 if completed_jobs % 100 == 0:  # Update every 100 jobs
+#                     pbar.update(100)
+
+#     return s_parm, c_parm, rmse_parm, ht_, count
-- 
GitLab