From 9f795912be0c493f682d31ac23627589130bbcc9 Mon Sep 17 00:00:00 2001
From: Narayana Rao Bhogapurapu <narayanarao.bhogapurapu@gmail.com>
Date: Sun, 13 Oct 2024 11:16:26 -0700
Subject: [PATCH] v0.5

Performance improvements
 - RAM optimization
 - Process blocks in parallel batches (10x10)
---
 algorithm_config.yaml                    |   2 +-
 src/ich/__pycache__/algo.cpython-310.pyc | Bin 7484 -> 7801 bytes
 src/ich/algo.py                          |  92 +++++++++++++----------
 3 files changed, 55 insertions(+), 39 deletions(-)

diff --git a/algorithm_config.yaml b/algorithm_config.yaml
index 59d38f2..3afd3cc 100644
--- a/algorithm_config.yaml
+++ b/algorithm_config.yaml
@@ -1,6 +1,6 @@
 description: Estimates canopy height from InSAR coherence and LiDAR data
 algo_name: ich
-version: 0.4.1
+version: 0.5
 environment: ubuntu
 repository_url: https://repo.maap-project.org/bnarayanarao/insar_forest_height.git
 docker_url: mas.maap-project.org/root/maap-workspaces/base_images/python:v4.1.0
diff --git a/src/ich/__pycache__/algo.cpython-310.pyc b/src/ich/__pycache__/algo.cpython-310.pyc
index 361785fd8a34583250cd10f5606ed605b7ecb91b..646093577efcb78c07a364cf7c0abfe40b23e6a8 100644
GIT binary patch
delta 1777
zcmb_c&2Jk;6rVS{YwtSt`rEtyi1Yp7)}c)yAfW~hEeM3@fr3h{qBiwx3bh-@>kVzu
z?55xb6{M;zgH+TDb`J=l3JIA5l|X=yxV3*k#HCk+xKyY@K*igHMkT#)U`I3Wz2AFp
z=J#f1=j$hb->Wwwkr2V}nif8B<Auw50j^)(`Nt{)M(dN6Nh^4AiUE$(EXLw2aZ#?6
ztYCKsOZLDjG1yDc+G&-1D^DtAmbA(~iaBMLLT`ytY^gOVOE9zifK|4}t+G{W)5;7>
zv&?x?d4w6vM7fL2W?2rE-7L=vC}-IqE27-PhS)I5N7*DBWn<@|@)#Ru6Nt?9Z^14K
z<^DaX1@J`QR!##P?f)G39$-maSD#nkmkH_6x61v$)GtB&7B;DUSHvSDQLuppW#SsH
z*@2~kxEm@{FC)eV`fy9!&}OwMnK<DNoc%(?!*45lI?$XDSHfpu5YxwCQp`n0V1NHm
z<QNTH!JTY<D87l#rznkq3I$MU7&0^i6l4&gm<7Lx<=98i5?Xvm-HUoLPx7b}?5aHi
zgb(mAk8qtQc#@}hn#UKlgT#@#8jtp%8|o2z4J{eF;v-Sg1E$pE)<p++^dxBm3)JKe
z*g0o_$9R-0JkXZi4917WrFc3WK~3iw%%U~g1N(Cx>_)|n_{h{v^u)NqlpY0t#W**5
z;2PV|gn&faNcd~<41+A`QEsG&o5Z+;m`TixCAqojX>WSc_iOYrp5d8(rJ|K+4Xt3M
zYK<b-6&~@~Y-l>yHZ`FnzYWt9)%9StQLkH7NBodHAd0EmfnWVSJ|dL#NI<13WI#-(
zhfQ7bRh1@SvrXfcPXoOvj-`$0B&I~M@AFd&l6W_LVU-6s%@M9(58l_XkeCzXI+HI#
z2QHP|xTEqIQ#P!Gn{YH1@bkEmB5i3udF?YS+{bW+E$m}Ab%MBQN8?HxFlHRL|Gqxx
z8hEf7PE)JIAg+mLY@p%mS=_fkjo4RtmIpUvx*I|)yNKrmr0F(K^LTf_3FECG9B@I2
z%RJIk-5if%WX_LrB0Sm#uGbXYP2JIb3n{$f=6Svw?Gd-Y3uw<D(N^6-x9ASJ!|sSX
z>W;bN?gSs<Dj(#<HOhzh@ETpDdkG)q+C$hF4?Tp9<8f}!oxtn74QqbK6Wth-daKA5
zdB=I)=k4X4L^tXG0nM+Pji$B5{H1NopE{!bKg8eIs_;KD$KSs25BGl&Mm%+hFd}1X
zOF)*iS<-=)jqeYZ1LUx{WMtqS@uTsyUV?SSX*dgY8}KHIq&Y`zO*~^xiKdxJtp{xD
zRO3y{uA$Se<4~FuSIug%64^*PdVImDzHYVFRqKpZU2z(AMP5F>AOhL>c&yquwOqFx
ziyd8DaaL?gyqbM^^%ySGTE>Y&klJeaAW<1iNe7+!sEiNPW{ZIi^c?-u42D3L!uWjU
zwz4u1l0p)NkRNe7Jo=!|L?6Xj=$+zE^jz_tK4)LUMXMs2%fhml$sOM95f40yPo3;3
zUcjMUY2&#I9wa3=P?GHne#tNT!)Ec~)7+VfPkj11f-(}ErbQ&IB6a%%gi2t)=ZnYt
p`||S;_p{aq9DAW^om!~Z8&z?>umsPEJB5<S4Gt=YLU#DszX9YGv^W3&

delta 1523
zcmZXTU5Fc16oBu!nMo$uWHR~x-DLl|O`86=YAfh2S}H;*RZ3gkra!vPoo?N1yUk?k
zPdRtDny%D`(zF*;d`duk5d=vQ6ruFNr#=WGKCEv(R|Ew`RP@ZQF15~O?m73n=gd7n
zcYd4x;&F2+5(zPS?j0GLuT0-Ci}1mX+V3lNU|~`K&0B>fVj_M`t5@tmuPPFKuqy(l
z8LhV26>n8w^_oc7HII@{O(bczA~=oYsxQaLvv%LAW>45PyVBwH?II=8S6F?A$cQY-
zogydlBzK8wK@>@ONDPY-$th75BP1UdlVVJaUxE4~VnS>o^621O*u|kX@M~>=J%da7
zs{p44Kl$$foKv@q{l*oIu`a(<9q7SZkj3;uOoZn_M1*;b14B(jo0{{BDn~|4=Pwb^
zn9Rs*7tR-yZ)m%k8XCY|btgC#+^R7*+=bUaR^{*;`mQdt#?<l9D_WO3dGa2IakVcp
z3VR23<R$KZhf3P+s=Lt{%i;zU!Oz2x=4sHlDq1#tt=_cWgQhCPww=Hs3}XaMjAH_m
zn8MhZ;9=(adI5~~p=b1&BWb14t9vBM`XKbC)?Rf1qYJD9!r#=o;6&UYS{Owg{T)rF
z$v&h$iKS9Qq=hj}UcAZs;CzUIUPOHt8$J9b?U<MmdY@`RdK5E#keLT4eRF|fTIe~}
z=a@+{87GT>DQwb&N7Zxj9l01%r3LMDHtbW&(Phhx_Y&$*EU!L`A1j}woQ)=@sLx`=
zi)&poF}R_rsl;vdXX0R(&o|Zrjitp!yWy(+$?yI5yvok1snjS>b2Xb9$!b0W(qQm7
zY>Wz(xdQwv)lFrhTgW^@@4j_6u0Bm&U2y|w3hf$P|3+G_A*fu)QBSgt?`d}OfQ+rP
zz<ul~8J}k|;TouSfNUmYa#J6c)G%rrPqGY?X=-BXA3cL<^f#IF7G^N8E)%^Vv6ajj
z>H@IV%dG}ly^uRZeP;-Cpbs^S^bMKCD4AwGtL5x+qIJA=$PK#@jC#GMz|YE@&}H5=
zJwgE;C68<`+GnzW1zOLX2+E=ymL*x1BXU%Z$#FS>BWPd|hgViPmax3a&+uu6qZsty
zf7lp?{>R3tVGr6((A9l_wFa0MzQ>r%W3Fcj-+Bs#<*Xa)#ruqs{s;@vUcysfV1j8h
zIzC|MgHx;vZA-nMEyIZVJo}{S0JW?NYr4DS&Mm4yZnxT-o7fp7>g@L>udPm<rZxY?
z3u4Z-owi3@kLZm>d(NqDQP*>g;F@8-W;ZUnOUla6q_hjC=bXCPSUPuM(ROX|;=<DD
zwmOwRGPjjZY|oN=2$Gw_@P)ZXKW~yJ>E{i@#<0KyewqJ~^@q40_|KN5(T>6Mn||Jn
z41oqA2=UU7{Br3Bb5hj`WjL)46pn3odC%t+0Rx9A$!DvYbJYv}pf_!%QB64S5}+h<
z>es^2@snPFVItmP6<(qvG;nSbsDkshCw327#Tn4NFR0tY=iwQ(vs6{bN(KG-&=cW3
Fe*;pZaMAz(

diff --git a/src/ich/algo.py b/src/ich/algo.py
index c3ac5f4..b820617 100644
--- a/src/ich/algo.py
+++ b/src/ich/algo.py
@@ -27,6 +27,7 @@ import os
 from concurrent.futures import ThreadPoolExecutor
 from scipy.interpolate import PchipInterpolator
 import gc
+import tracemalloc
 
 # Precompute XX and YY
 XX = np.linspace(0, np.pi, num=100, endpoint=True)
@@ -184,6 +185,7 @@ def cal_(temp_cor, temp_gedi, htl, htg):
         result = np.array(result)
         tempdf = pd.DataFrame(data={'N': result[:, 0], 'S': result[:, 1], 'C': result[:, 2],
                                     'r': result[:, 3], 'rmse': result[:, 4]})
+        del result
         tempdf.dropna(subset=['rmse'], inplace=True)
         if nn>6:
             tempdf = tempdf[tempdf['N'] > 3].sort_values(by=['rmse'], ascending=True)
@@ -193,7 +195,7 @@ def cal_(temp_cor, temp_gedi, htl, htg):
         else:
             sCoarse = np.round(tempdf.iloc[0]['S'], 2)
             cCoarse = np.round(tempdf.iloc[0]['C'], 2)
-
+            del tempdf
             result = []
             for S_param in np.arange(sCoarse - 0.1, sCoarse + 0.1, 0.02):
                 for C_param in np.arange(cCoarse - 1, cCoarse + 1, 0.2):
@@ -252,6 +254,8 @@ def process_block(i, j, cohArray, lidarArray, initial_ws, htl, htg, parm_):
             rmse_parm = 0
             ht = np.zeros(cohBlock.shape)
             # print(lidarBlock.shape)
+            del lidarBlock, cohBlock,lidarArray, cohArray
+            gc.collect()
             return start_i, end_i, start_j, end_j, s_parm, c_parm, rmse_parm, ht, count
         
         # elif (np.isfinite(lidarBlock).any() and np.count_nonzero(~np.isnan(lidarBlock)) > 1) or max(lidarBlock.shape)>512:
@@ -278,7 +282,7 @@ def process_block(i, j, cohArray, lidarArray, initial_ws, htl, htg, parm_):
             ht = arc_sinc(gama, parm[2]) * mask
             # ht = arc_sinc_fast(gama, parm[2]) * mask
             # print(lidarBlock.shape)
-            del lidarBlock, cohBlock
+            del lidarBlock, cohBlock,lidarArray, cohArray,parm,mask,gama
             gc.collect()
             return start_i, end_i, start_j, end_j, s_parm, c_parm, rmse_parm, ht, count
         else:
@@ -292,6 +296,7 @@ def process_block(i, j, cohArray, lidarArray, initial_ws, htl, htg, parm_):
                 print(f"Unable to find finite block at position ({i}, {j}).")
                 return start_i, end_i, start_j, end_j, np.zeros_like(lidarBlock), np.zeros_like(lidarBlock), 0, np.zeros_like(lidarBlock), 0
 
+## blocks (windows) are processed in parallel bacthes
 def dynamicWindow(cohArray, lidarArray, initial_ws, htl, htg, batch_size=10):
     rows, cols = cohArray.shape
     s_parm = np.zeros((rows, cols), dtype=np.float32)
@@ -300,36 +305,53 @@ def dynamicWindow(cohArray, lidarArray, initial_ws, htl, htg, batch_size=10):
     count = np.zeros((rows, cols), dtype=np.int16)
     ht_ = np.zeros((rows, cols), dtype=np.float32)
 
-
     parm_ = [0, 0, 0, 0, 0]
 
     num_workers = os.cpu_count() - 1
-
     futures = []
+
+    # Calculate the total number of batches (for progress bar)
+    total_batches = ((rows // initial_ws) // batch_size) * ((cols // initial_ws) // batch_size)
     with ProcessPoolExecutor(max_workers=num_workers) as executor:
         # Loop through rows and columns in batches
-        for i in range(0, rows, initial_ws * batch_size):
-            for j in range(0, cols, initial_ws * batch_size):
-                # Collect blocks for the current batch
-                batch_futures = []
-                for bi in range(batch_size):
-                    for bj in range(batch_size):
-                        block_i = i + bi * initial_ws
-                        block_j = j + bj * initial_ws
-                        if block_i < rows and block_j < cols:
-                            batch_futures.append((block_i, block_j))
-
-                future = executor.submit(process_batch, batch_futures, cohArray, lidarArray, initial_ws, htl, htg, parm_)
-
-                futures.append(future)
-                del future,batch_futures
-
-        # Initialize the progress bar with the total number of futures
-        with tqdm(total=len(futures)) as pbar:
-            completed_jobs = 0
-            for future in as_completed(futures):
-                # Process results from each batch
-                results = future.result()
+        with tqdm(total=total_batches) as pbar:  # Set progress bar for the total number of batches
+            for i in range(0, rows, initial_ws * batch_size):
+                for j in range(0, cols, initial_ws * batch_size):
+                    # Collect blocks for the current batch
+                    batch_futures = []
+                    for bi in range(batch_size):
+                        for bj in range(batch_size):
+                            block_i = i + bi * initial_ws
+                            block_j = j + bj * initial_ws
+                            if block_i < rows and block_j < cols:
+                                batch_futures.append((block_i, block_j))
+                    
+                    # Submit the batch to the executor (parallel execution)
+                    future = executor.submit(process_batch, batch_futures, cohArray, lidarArray, initial_ws, htl, htg, parm_)
+                    futures.append(future)
+
+                    # Once we accumulate enough futures, process them in parallel
+                    if len(futures) >= num_workers:
+                        for completed_future in as_completed(futures):
+                            results = completed_future.result()
+                            for start_i, end_i, start_j, end_j, s_p, c_p, r_p, ht, cnt in results:
+                                s_parm[start_i:end_i, start_j:end_j] = s_p
+                                c_parm[start_i:end_i, start_j:end_j] = c_p
+                                rmse_parm[start_i:end_i, start_j:end_j] = r_p
+                                ht_[start_i:end_i, start_j:end_j] = ht
+                                count[start_i:end_i, start_j:end_j] = cnt
+
+                            # Update progress for each completed batch (not each block)
+                            pbar.update(1)
+
+                            # Free the future's memory once processed
+                            futures.remove(completed_future)
+                            del completed_future
+                            gc.collect()
+
+            # Ensure any remaining futures are processed
+            for completed_future in as_completed(futures):
+                results = completed_future.result()
                 for start_i, end_i, start_j, end_j, s_p, c_p, r_p, ht, cnt in results:
                     s_parm[start_i:end_i, start_j:end_j] = s_p
                     c_parm[start_i:end_i, start_j:end_j] = c_p
@@ -337,12 +359,11 @@ def dynamicWindow(cohArray, lidarArray, initial_ws, htl, htg, batch_size=10):
                     ht_[start_i:end_i, start_j:end_j] = ht
                     count[start_i:end_i, start_j:end_j] = cnt
 
-                completed_jobs += 1
+                # Update progress for each completed batch
                 pbar.update(1)
-                del results,future
+
+                del completed_future
                 gc.collect()
-            # Clear the futures list to prevent accumulation
-            futures.clear()
 
     return s_parm, c_parm, rmse_parm, ht_, count
 
@@ -352,19 +373,14 @@ def process_batch(batch_futures, cohArray, lidarArray, initial_ws, htl, htg, par
         # Call process_block for each block in the batch
         start_i, end_i, start_j, end_j, s_p, c_p, r_p, ht, cnt = process_block(block_i, block_j, cohArray, lidarArray, initial_ws, htl, htg, parm_)
         results.append((start_i, end_i, start_j, end_j, s_p, c_p, r_p, ht, cnt))
+
+    del batch_futures, cohArray, lidarArray, initial_ws, htl, htg, parm_
     # Invoke garbage collection
     gc.collect()
     return results
 
 
-
-
-
-
-
-
-
-
+## blocks (windows) are processed in parallel 
 # def dynamicWindow(cohArray, lidarArray, initial_ws, htl, htg):
 #     rows, cols = cohArray.shape
 #     c_parm = np.zeros((rows, cols))
-- 
GitLab