From e23ad560cdc6fed950a41e9c29ebf1d6bd329e86 Mon Sep 17 00:00:00 2001
From: Narayana Rao Bhogapurapu <narayanarao.bhogapurapu@gmail.com>
Date: Sun, 6 Oct 2024 11:14:14 -0700
Subject: [PATCH] arc_sinc_parallel

---
 src/ich/algo.py    |  76 ++++++++++
 src/ich/args_in.py | 347 +++++++++++++++++++++++----------------------
 2 files changed, 250 insertions(+), 173 deletions(-)

diff --git a/src/ich/algo.py b/src/ich/algo.py
index fc448c6..913af1d 100644
--- a/src/ich/algo.py
+++ b/src/ich/algo.py
@@ -38,6 +38,82 @@ def arc_sinc(x, c_param):
     interp_func = interpolate.interp1d(YY, XX * c_param, kind='slinear', bounds_error=False, fill_value=0)
     y = interp_func(x)
     return np.clip(y, 0, None)  # Ensure no negative heights
+
+from concurrent.futures import ThreadPoolExecutor
+import os
+
+
+def arc_sinc_block(x_chunk, c_param_chunk, XX, YY):
+    # Initialize result array for the chunk
+    y_chunk = np.zeros_like(x_chunk)
+    
+    # Handle the interpolation for each unique c_param value in the chunk
+    for unique_c in np.unique(c_param_chunk):
+        # Interpolation function for the current unique c_param
+        interp_func = interpolate.interp1d(YY, XX * unique_c, kind='slinear', bounds_error=False, fill_value=0)
+        
+        # Apply the interpolation
+        mask = (c_param_chunk == unique_c)
+        y_chunk[mask] = interp_func(x_chunk[mask])
+    
+    # Clip the result to ensure non-negative values
+    return np.clip(y_chunk, 0, None)
+
+
+
+
+def arc_sinc_vectorized_parallel(x, c_param, chunk_size=100000):
+    # Ensure x and c_param are numpy arrays
+    x = np.asarray(x)
+    c_param = np.asarray(c_param)
+    
+    # Clip x to the range [0, 1]
+    x = np.clip(x, 0, 1)
+    
+    # Create array of increments between 0 and pi
+    XX = np.linspace(0, np.pi, num=100, endpoint=True)
+    
+    # Set the first value of XX to eps to avoid division by zero issues
+    XX[0] = np.spacing(1)
+    
+    # Calculate sinc for XX and save it to YY
+    YY = np.sin(XX) / XX
+    
+    # Reset the first value of XX and YY
+    XX[0] = 0
+    YY[0] = 1
+    YY[-1] = 0
+    
+    # Reverse arrays for interpolation
+    XX = XX[::-1]
+    YY = YY[::-1]
+    
+    # Get the length of the input arrays
+    n = len(x)
+    
+    # Automatically get the number of available CPU cores
+    num_workers = os.cpu_count()
+    
+    # Split input arrays into chunks and process in parallel
+    y = np.zeros_like(x)
+    with ThreadPoolExecutor(max_workers=num_workers) as executor:
+        futures = []
+        
+        # Loop over chunks
+        for i in range(0, n, chunk_size):
+            x_chunk = x[i:i + chunk_size]
+            c_param_chunk = c_param[i:i + chunk_size]
+            
+            # Submit chunk processing to the thread pool
+            futures.append(executor.submit(arc_sinc_block, x_chunk, c_param_chunk, XX, YY))
+        
+        # Collect results
+        for idx, future in enumerate(futures):
+            i = idx * chunk_size
+            y[i:i + chunk_size] = future.result()
+    
+    return y
+
 def arc_sinc_vectorized(x, c_param):
     # Ensure x and c_param are numpy arrays
     x = np.asarray(x)
diff --git a/src/ich/args_in.py b/src/ich/args_in.py
index dd96c8a..8c935ff 100644
--- a/src/ich/args_in.py
+++ b/src/ich/args_in.py
@@ -26,7 +26,7 @@ from scipy.interpolate import interpn
 from empatches import EMPatches
 from sklearn.model_selection import train_test_split
 from plotting import density_scatter, plot_data, rmse
-from algo import arc_sinc, arc_sinc_vectorized, cal_, dynamicWindow
+from algo import arc_sinc, arc_sinc_vectorized, cal_, dynamicWindow,arc_sinc_vectorized_parallel
 from rst_io import read_bin, write_tif, write_bin
 from utils import blockshaped, unblockshaped, unpad, spatial_intp_lin, split_sample
 
@@ -47,176 +47,177 @@ def rvog_inverse(args):
     Args:
         args (argparse.Namespace): Command-line arguments containing file paths and parameters.
     """
-    try:
-        t0 = time.time()  # Record start time
-
-        # Print the arguments for debugging
-        print("-corFile {} -lidarFile {} -htl {} -htg {} -window_size {}  -validation {}".format(
-            args.corFile,
-            args.lidarFile,
-            args.htl,
-            args.htg,
-            args.window_size,
-            args.validation
-        ))
-
-        # Read input files
-        lidar_ht = read_bin(args.lidarFile)  # LiDAR height data
-        corFile = args.corFile
-        cor = read_bin(corFile)  # Correlation file
-
-        # Calculate padding needed to make dimensions divisible by window size
-        rPad, cPad = args.window_size - np.shape(lidar_ht)[0] % args.window_size, args.window_size - np.shape(lidar_ht)[1] % args.window_size
-        pad_width = [(0, rPad), (0, cPad)]
-
-        # Pad arrays to handle edge cases
-        if rPad != 0 or cPad != 0:
-            lidar_ht = np.pad(lidar_ht, pad_width, mode='constant', constant_values=0)
-            cor = np.pad(cor, pad_width, mode='constant', constant_values=0)
-
-        # Apply masks to filter out invalid or extreme values
-        lidar_ht[lidar_ht == 0] = np.nan
-
-        if args.htl==0 and args.htg==0:
-            """ Calculating the upper limit of the heights to be estimated from the LiDAR data 
-            Ceiling the upper limit to the closest multiple of 5 for the height at 99.9 percentile """
-            args.htl=0
-            pp99 = np.nanpercentile(lidar_ht,99.9)
-            args.htg = int( 5 * np.ceil( pp99 / 5. ))
-
-
-        lidar_ht[lidar_ht < args.htl] = np.nan
-        lidar_ht[lidar_ht > args.htg] = np.nan
-        cor[cor < 0.1] = np.nan
-
-        rows, cols = np.shape(cor)[0], np.shape(cor)[1]
-
-        ######################################################
-        # Handle cross-validation if specified
-        if args.validation > 0 and args.validation < 1:
-            temp_lidar = blockshaped(lidar_ht, args.window_size, args.window_size)
-            lidar_ht_cal = np.zeros(np.shape(temp_lidar))
-            lidar_ht_val = np.zeros(np.shape(temp_lidar))
-
-            for win in range(np.shape(temp_lidar)[0]):
-                if np.count_nonzero(~np.isnan(temp_lidar[win, :, :])) > 10:
-                    lidar_ht_cal[win, :, :], lidar_ht_val[win, :, :] = split_sample(temp_lidar[win, :, :], args.validation)
-                else:
-                    lidar_ht_cal[win, :, :] = temp_lidar[win, :, :]
-
-            lidar_ht_cal = unblockshaped(lidar_ht_cal, rows, cols)
-            lidar_ht_val = unblockshaped(lidar_ht_val, rows, cols)
-
-            lidar_ht_cal[lidar_ht_cal == 0] = np.nan
-            lidar_ht_val[lidar_ht_val == 0] = np.nan
-
-        elif args.validation == 0:
-            lidar_ht_cal = lidar_ht.copy()
-            lidar_ht_val = lidar_ht.copy()
-        else:
-            print("\n   Invalid validation fraction!!\n    Please enter validation fraction between 0 and 1; \n    -val=[0,1)")
-            sys.exit(1)
-
-        #########################################################
-        # Set up output directories and filenames
-        os.chdir(os.path.dirname(corFile))
-        outDir = r'../output'
-        if not os.path.exists(outDir):
-            os.mkdir(outDir)
-
-        # Create postfix string for output files
-        _postfix_str = f"_w{args.window_size}_{args.htl}_{args.htg}"
-
-        # Define output file paths
-        outcFile = os.path.join(outDir, os.path.basename(corFile).split('.tif')[0] + _postfix_str + '_c.tif')
-        outsFile = os.path.join(outDir, os.path.basename(corFile).split('.tif')[0] + _postfix_str + '_s.tif')
-        outciFile = os.path.join(outDir, os.path.basename(corFile).split('.tif')[0] + _postfix_str + '_ci.tif')
-        outsiFile = os.path.join(outDir, os.path.basename(corFile).split('.tif')[0] + _postfix_str + '_si.tif')
-        outrmseFile = os.path.join(outDir, os.path.basename(corFile).split('.tif')[0] + _postfix_str + '_rmse.tif')
-
-        outhtFile = os.path.join(outDir, os.path.basename(corFile).split('.tif')[0] + _postfix_str + '_ht.tif')
-        outcntFile = os.path.join(outDir, os.path.basename(corFile).split('.tif')[0] + _postfix_str + '_count.tif')
-        pltName = os.path.join(outDir, os.path.basename(corFile).split('.tif')[0] + _postfix_str + '.png')
-        
-        if args.validation > 0 and args.validation < 1:
-            pltName = os.path.join(outDir, os.path.basename(corFile).split('.tif')[0] + _postfix_str + f"_cal_{int(100-args.validation*100)}.png")
-        
-        pltvalName = os.path.join(outDir, os.path.basename(corFile).split('.tif')[0] + _postfix_str + f"_val_{int(args.validation*100)}.png")
-
-        # Generate calibration parameters
-        print("[1/3] Generating calibration parameters... ",end=" ")
-        t1 = time.time()
-        s, c, rmse__, ht_, count = dynamicWindow(cor, lidar_ht_cal, args.window_size, args.htl, args.htg)
-
-        # Mask out invalid values
-        ht_[ht_ == 0] = np.nan
-        s[s == 0] = np.nan
-        c[c == 0] = np.nan
-        rmse__[rmse__ == 0] = np.nan
-
-        # Remove padding from arrays if applied
-        if rPad != 0 or cPad != 0:
-            lidar_ht_cal = unpad(lidar_ht_cal, pad_width)
-            c = unpad(c, pad_width)
-            s = unpad(s, pad_width)
-            ht_ = unpad(ht_, pad_width)
-            rmse__ = unpad(rmse__, pad_width)
-            count = unpad(count, pad_width)
-            temp_cor = unpad(cor, pad_width)
-
-        print("%0.2f sec"%(time.time()-t1))
-        # Plot calibration results
-        plot_data(lidar_ht_cal, ht_, args.htl, args.htg, 'Lidar height (m)', 'InSAR inverted (m)', pltName)
-        
-        t2 = time.time()
-        print("[2/3] Generating height map... ",end=" ")
-
-        # Interpolate calibration parameters
-        ci = spatial_intp_lin(c)
-        si = spatial_intp_lin(s)
-
-        # Clip extreme values
-        si[si > 1.6] = 1.6
-        ci[ci > 16] = 16
-        ci[ci < 0] = 0
-        si[si < 0] = 0
-
-        # Calculate gamma values and estimate height
-        gama = (temp_cor / si)
-        gama = gama.flatten()
-        gama[gama > 1] = 1
-        gama[gama < 0] = 0
-        c_temp = ci.flatten()
-        c_temp[c_temp < 0] = np.nan
-        height = []
-
-        height = arc_sinc_vectorized(gama, c_temp)
-        height = np.array(height).reshape(np.shape(temp_cor))
-        height[height < 0] = 0
-        print("%0.2f sec"%(time.time()-t2))
-
-        # If validation is enabled, save validation results
-        if args.validation > 0 and args.validation < 1:
-            lidar_ht_val = unpad(lidar_ht_val, pad_width)
-            write_tif(os.path.join(outDir, os.path.basename(corFile).split('.tif')[0] + _postfix_str + '_val.tif'),
-                      lidar_ht_val, os.path.basename(args.corFile))
-            plot_data(lidar_ht_val, height, args.htl, args.htg, 'Lidar height (m)', 'InSAR inverted (m)', pltvalName)
-
-        
-        t3 = time.time()
-        # Write all output files
-        print("[3/3] Writing output... ",end=" ")
-        write_tif(outhtFile, height, os.path.basename(args.corFile))
-        write_tif(outcFile, c, os.path.basename(args.corFile))
-        write_tif(outsFile, s, os.path.basename(args.corFile))
-        write_tif(outrmseFile, rmse__, os.path.basename(args.corFile))
-        write_tif(outciFile, ci, os.path.basename(args.corFile))
-        write_tif(outsiFile, si, os.path.basename(args.corFile))
-        write_tif(outcntFile, count, os.path.basename(args.corFile))
-        print("%0.2f sec"%(time.time()-t3))
-
-        print("\n      Total computing time : %0.2f sec" % (time.time() - t0))
-
-    except Exception as e:
-        print("Task failed due to", e)
+    # try:
+    t0 = time.time()  # Record start time
+
+    # Print the arguments for debugging
+    print("-corFile {} -lidarFile {} -htl {} -htg {} -window_size {}  -validation {}".format(
+        args.corFile,
+        args.lidarFile,
+        args.htl,
+        args.htg,
+        args.window_size,
+        args.validation
+    ))
+
+    # Read input files
+    lidar_ht = read_bin(args.lidarFile)  # LiDAR height data
+    corFile = args.corFile
+    cor = read_bin(corFile)  # Correlation file
+
+    # Calculate padding needed to make dimensions divisible by window size
+    rPad, cPad = args.window_size - np.shape(lidar_ht)[0] % args.window_size, args.window_size - np.shape(lidar_ht)[1] % args.window_size
+    pad_width = [(0, rPad), (0, cPad)]
+
+    # Pad arrays to handle edge cases
+    if rPad != 0 or cPad != 0:
+        lidar_ht = np.pad(lidar_ht, pad_width, mode='constant', constant_values=0)
+        cor = np.pad(cor, pad_width, mode='constant', constant_values=0)
+
+    # Apply masks to filter out invalid or extreme values
+    lidar_ht[lidar_ht == 0] = np.nan
+
+    if args.htl==0 and args.htg==0:
+        """ Calculating the upper limit of the heights to be estimated from the LiDAR data 
+        Ceiling the upper limit to the closest multiple of 5 for the height at 99.9 percentile """
+        args.htl=0
+        pp99 = np.nanpercentile(lidar_ht,99.9)
+        args.htg = int( 5 * np.ceil( pp99 / 5. ))
+
+
+    lidar_ht[lidar_ht < args.htl] = np.nan
+    lidar_ht[lidar_ht > args.htg] = np.nan
+    cor[cor < 0.1] = np.nan
+
+    rows, cols = np.shape(cor)[0], np.shape(cor)[1]
+
+    ######################################################
+    # Handle cross-validation if specified
+    if args.validation > 0 and args.validation < 1:
+        temp_lidar = blockshaped(lidar_ht, args.window_size, args.window_size)
+        lidar_ht_cal = np.zeros(np.shape(temp_lidar))
+        lidar_ht_val = np.zeros(np.shape(temp_lidar))
+
+        for win in range(np.shape(temp_lidar)[0]):
+            if np.count_nonzero(~np.isnan(temp_lidar[win, :, :])) > 10:
+                lidar_ht_cal[win, :, :], lidar_ht_val[win, :, :] = split_sample(temp_lidar[win, :, :], args.validation)
+            else:
+                lidar_ht_cal[win, :, :] = temp_lidar[win, :, :]
+
+        lidar_ht_cal = unblockshaped(lidar_ht_cal, rows, cols)
+        lidar_ht_val = unblockshaped(lidar_ht_val, rows, cols)
+
+        lidar_ht_cal[lidar_ht_cal == 0] = np.nan
+        lidar_ht_val[lidar_ht_val == 0] = np.nan
+
+    elif args.validation == 0:
+        lidar_ht_cal = lidar_ht.copy()
+        lidar_ht_val = lidar_ht.copy()
+    else:
+        print("\n   Invalid validation fraction!!\n    Please enter validation fraction between 0 and 1; \n    -val=[0,1)")
         sys.exit(1)
+
+    #########################################################
+    # Set up output directories and filenames
+    os.chdir(os.path.dirname(corFile))
+    outDir = r'../output'
+    if not os.path.exists(outDir):
+        os.mkdir(outDir)
+
+    # Create postfix string for output files
+    _postfix_str = f"_w{args.window_size}_{args.htl}_{args.htg}"
+
+    # Define output file paths
+    outcFile = os.path.join(outDir, os.path.basename(corFile).split('.tif')[0] + _postfix_str + '_c.tif')
+    outsFile = os.path.join(outDir, os.path.basename(corFile).split('.tif')[0] + _postfix_str + '_s.tif')
+    outciFile = os.path.join(outDir, os.path.basename(corFile).split('.tif')[0] + _postfix_str + '_ci.tif')
+    outsiFile = os.path.join(outDir, os.path.basename(corFile).split('.tif')[0] + _postfix_str + '_si.tif')
+    outrmseFile = os.path.join(outDir, os.path.basename(corFile).split('.tif')[0] + _postfix_str + '_rmse.tif')
+
+    outhtFile = os.path.join(outDir, os.path.basename(corFile).split('.tif')[0] + _postfix_str + '_ht.tif')
+    outcntFile = os.path.join(outDir, os.path.basename(corFile).split('.tif')[0] + _postfix_str + '_count.tif')
+    pltName = os.path.join(outDir, os.path.basename(corFile).split('.tif')[0] + _postfix_str + '.png')
+    
+    if args.validation > 0 and args.validation < 1:
+        pltName = os.path.join(outDir, os.path.basename(corFile).split('.tif')[0] + _postfix_str + f"_cal_{int(100-args.validation*100)}.png")
+    
+    pltvalName = os.path.join(outDir, os.path.basename(corFile).split('.tif')[0] + _postfix_str + f"_val_{int(args.validation*100)}.png")
+
+    # Generate calibration parameters
+    print("[1/3] Generating calibration parameters... ",end=" ")
+    t1 = time.time()
+    s, c, rmse__, ht_, count = dynamicWindow(cor, lidar_ht_cal, args.window_size, args.htl, args.htg)
+
+    # Mask out invalid values
+    ht_[ht_ == 0] = np.nan
+    s[s == 0] = np.nan
+    c[c == 0] = np.nan
+    rmse__[rmse__ == 0] = np.nan
+
+    # Remove padding from arrays if applied
+    if rPad != 0 or cPad != 0:
+        lidar_ht_cal = unpad(lidar_ht_cal, pad_width)
+        c = unpad(c, pad_width)
+        s = unpad(s, pad_width)
+        ht_ = unpad(ht_, pad_width)
+        rmse__ = unpad(rmse__, pad_width)
+        count = unpad(count, pad_width)
+        temp_cor = unpad(cor, pad_width)
+
+    print("%0.2f sec"%(time.time()-t1))
+    # Plot calibration results
+    plot_data(lidar_ht_cal, ht_, args.htl, args.htg, 'Lidar height (m)', 'InSAR inverted (m)', pltName)
+    
+    t2 = time.time()
+    print("[2/3] Generating height map... ",end=" ")
+
+    # Interpolate calibration parameters
+    ci = spatial_intp_lin(c)
+    si = spatial_intp_lin(s)
+
+    # Clip extreme values
+    si[si > 1.6] = 1.6
+    ci[ci > 16] = 16
+    ci[ci < 0] = 0
+    si[si < 0] = 0
+
+    # Calculate gamma values and estimate height
+    gama = (temp_cor / si)
+    gama = gama.flatten()
+    gama[gama > 1] = 1
+    gama[gama < 0] = 0
+    c_temp = ci.flatten()
+    c_temp[c_temp < 0] = np.nan
+    height = []
+    print('...')
+    # height = arc_sinc_vectorized(gama, c_temp)
+    height = arc_sinc_vectorized_parallel(gama, c_temp)
+    height = np.array(height).reshape(np.shape(temp_cor))
+    height[height < 0] = 0
+    print("%0.2f sec"%(time.time()-t2))
+
+    # If validation is enabled, save validation results
+    if args.validation > 0 and args.validation < 1:
+        lidar_ht_val = unpad(lidar_ht_val, pad_width)
+        write_tif(os.path.join(outDir, os.path.basename(corFile).split('.tif')[0] + _postfix_str + '_val.tif'),
+                  lidar_ht_val, os.path.basename(args.corFile))
+        plot_data(lidar_ht_val, height, args.htl, args.htg, 'Lidar height (m)', 'InSAR inverted (m)', pltvalName)
+
+    
+    t3 = time.time()
+    # Write all output files
+    print("[3/3] Writing output... ",end=" ")
+    write_tif(outhtFile, height, os.path.basename(args.corFile))
+    write_tif(outcFile, c, os.path.basename(args.corFile))
+    write_tif(outsFile, s, os.path.basename(args.corFile))
+    write_tif(outrmseFile, rmse__, os.path.basename(args.corFile))
+    write_tif(outciFile, ci, os.path.basename(args.corFile))
+    write_tif(outsiFile, si, os.path.basename(args.corFile))
+    write_tif(outcntFile, count, os.path.basename(args.corFile))
+    print("%0.2f sec"%(time.time()-t3))
+
+    print("\n      Total computing time : %0.2f sec" % (time.time() - t0))
+
+    # except Exception as e:
+    #     print("Task failed due to", e)
+    #     sys.exit(1)
-- 
GitLab