Source code for moaap.utils.object_props

import numpy as np
from scipy import ndimage
import pickle
from pdb import set_trace as stop
import time
from tqdm import tqdm # pyright: ignore[reportMissingModuleSource]
from moaap.utils.profiling import timer

[docs] def calc_object_characteristics( var_objects, # feature object file var_data, # original file used for feature detection filename_out, # output file name and locaiton times, # timesteps of the data Lat, # 2D latidudes Lon, # 2D Longitudes grid_spacing, # average grid spacing grid_cell_area, min_tsteps=1, # minimum lifetime in data timesteps history = None # dict containing information of splitting and merging of objects ): """ Calculates comprehensive statistics for tracked objects, including size, intensity, velocity, and trajectory. Parameters ---------- var_objects : np.ndarray 3D labeled object array. var_data : np.ndarray Original data field used for detection (e.g., precipitation, pressure). filename_out : str Path to save the resulting dictionary as a pickle file. times : np.ndarray Array of datetime objects corresponding to the time axis. Lat, Lon : np.ndarray Latitude and Longitude grids. grid_spacing : float Average grid spacing (m). grid_cell_area : np.ndarray Area of grid cells. min_tsteps : int, optional Minimum timesteps an object must exist to be processed. history : dict, optional Dictionary containing object history information from a previous call of analyze_watershed_history. Returns ------- objects_charac : dict Dictionary where keys are object IDs and values are dictionaries containing: 'mass_center_loc', 'speed', 'tot', 'min', 'max', 'mean', 'size', 'times', 'track'. """ # ======== num_objects = int(var_objects.max()) # num_objects = len(np.unique(var_objects))-1 object_indices = ndimage.find_objects(var_objects) if num_objects >= 1: objects_charac = {} print(" Loop over " + str(num_objects) + " objects") for iobj in range(num_objects): if object_indices[iobj] == None: continue object_slice = np.copy(var_objects[object_indices[iobj]]) data_slice = np.copy(var_data[object_indices[iobj]]) time_idx_slice = object_indices[iobj][0] lat_idx_slice = object_indices[iobj][1] lon_idx_slice = object_indices[iobj][2] if len(object_slice) >= min_tsteps: data_slice[object_slice != (iobj + 1)] = np.nan grid_cell_area_slice = np.tile(grid_cell_area[lat_idx_slice, lon_idx_slice], (len(data_slice), 1, 1)) grid_cell_area_slice[object_slice != (iobj + 1)] = np.nan lat_slice = Lat[lat_idx_slice, lon_idx_slice] lon_slice = Lon[lat_idx_slice, lon_idx_slice] # calculate statistics obj_times = times[time_idx_slice] obj_size = np.nansum(grid_cell_area_slice, axis=(1, 2)) obj_min = np.nanmin(data_slice, axis=(1, 2)) obj_max = np.nanmax(data_slice, axis=(1, 2)) obj_mean = np.nanmean(data_slice, axis=(1, 2)) obj_tot = np.nansum(data_slice, axis=(1, 2)) # Track lat/lon obj_mass_center = \ np.array([ndimage.center_of_mass(object_slice[tt,:,:]==(iobj+1)) for tt in range(object_slice.shape[0])]) obj_track = np.full([len(obj_mass_center), 2], np.nan) iREAL = ~np.isnan(obj_mass_center[:,0]) try: obj_track[iREAL,0]=np.array([lat_slice[int(round(obj_loc[0])),int(round(obj_loc[1]))] for tstep, obj_loc in enumerate(obj_mass_center[iREAL,:]) if np.isnan(obj_loc[0]) != True]) obj_track[iREAL,1]=np.array([lon_slice[int(round(obj_loc[0])),int(round(obj_loc[1]))] for tstep, obj_loc in enumerate(obj_mass_center[iREAL,:]) if np.isnan(obj_loc[0]) != True]) except: stop() # obj_track = np.full([len(obj_mass_center), 2], np.nan) # try: # obj_track[:,0]=np.array([lat_slice[int(round(obj_loc[0])),int(round(obj_loc[1]))] for tstep, obj_loc in enumerate(obj_mass_center[:,:]) if np.isnan(obj_loc[0]) != True]) # obj_track[:,1]=np.array([lon_slice[int(round(obj_loc[0])),int(round(obj_loc[1]))] for tstep, obj_loc in enumerate(obj_mass_center[:,:]) if np.isnan(obj_loc[0]) != True]) # except: # stop() # if np.any(np.isnan(obj_track)): # raise ValueError("track array contains NaNs") obj_speed = (np.sum(np.diff(obj_mass_center,axis=0)**2,axis=1)**0.5) * (grid_spacing / 1000.0) this_object_charac = { "mass_center_loc": obj_mass_center, "speed": obj_speed, "tot": obj_tot, "min": obj_min, "max": obj_max, "mean": obj_mean, "size": obj_size, # 'rgrAccumulation':rgrAccumulation, "times": obj_times, "track": obj_track, } if history is not None: # add the history of this object this_object_charac["history"] = history[iobj + 1] try: objects_charac[iobj + 1] = this_object_charac except: raise ValueError ("Error asigning properties to final dictionary") if filename_out is not None: with open(filename_out+'.pkl', 'wb') as handle: pickle.dump(objects_charac, handle) return objects_charac
[docs] def ConnectLon_on_timestep(object_indices): """ This function connects objects over the date line on a time-step by time-step basis, which makes it different from the ConnectLon function. This function is needed when long-living objects are first split into smaller objects using the BreakupObjects function. Parameters ---------- object_indices : np.ndarray Array of object indices from ndimage.find_objects Returns ------- object_indices : np.ndarray Updated array of object indices with connected objects across the date line. """ for tt in range(object_indices.shape[0]): EDGE = np.append( object_indices[tt, :, -1][:, None], object_indices[tt, :, 0][:, None], axis=1 ) iEDGE = np.sum(EDGE > 0, axis=1) == 2 OBJ_Left = EDGE[iEDGE, 0] OBJ_Right = EDGE[iEDGE, 1] OBJ_joint_list = [] for ii in range(len(OBJ_Left)): if OBJ_Left[ii] is not None and OBJ_Right[ii] is not None: try: joint_val = OBJ_Left[ii].astype(str) + "_" + OBJ_Right[ii].astype(str) OBJ_joint_list.append(joint_val) except Exception: # Skip the pair if an error occurs during conversion or concatenation continue OBJ_joint = np.array(OBJ_joint_list) """ OBJ_joint = np.array( [ OBJ_Left[ii].astype(str) + "_" + OBJ_Right[ii].astype(str) for ii,_ in enumerate(OBJ_Left) ] ) """ NotSame = OBJ_Left != OBJ_Right try: OBJ_joint = OBJ_joint[NotSame] except: continue OBJ_unique = np.unique(OBJ_joint) # if len(OBJ_unique) >1: # stop() # set the eastern object to the number of the western object in all timesteps for obj,_ in enumerate(OBJ_unique): """ ObE = int(OBJ_unique[obj].split("_")[1].split()[0]) ObW = int(OBJ_unique[obj].split("_")[0].split()[0]) """ try: ObE = int(OBJ_unique[obj].split("_")[1]) ObW = int(OBJ_unique[obj].split("_")[0]) except: continue object_indices[tt,object_indices[tt,:] == ObE] = ObW return object_indices
### Break up long living objects by extracting the biggest object at each time
[docs] def BreakupObjects( DATA, # 3D matrix [time,lat,lon] containing the objects min_tsteps, # minimum lifetime in data timesteps dT,# time step in hours obj_history = False, # calculates how object start and end ): """ Splits long-living objects that may be artificially merged (e.g., distinct storms merging for one timestep). It analyzes the 2D connectivity over time and keeps the largest consistent overlap. Parameters ---------- DATA : np.ndarray 3D matrix [time, lat, lon] containing labeled objects. min_tsteps : int Minimum lifetime (in timesteps) required to keep an object. dT : int Time resolution of the data in hours. obj_history : bool, optional If True, calculates how objects start and end (merges/splits). Returns ------- DATA_fin : np.ndarray The re-labeled 3D object array. object_split : dict or None Dictionary containing splitting/merging history if obj_history is True. """ start = time.perf_counter() object_indices = ndimage.find_objects(DATA) MaxOb = np.max(DATA) MinLif = int(min_tsteps / dT) # min lifetime of object to be split AVmax = 1.5 obj_structure_2D = np.zeros((3, 3, 3)) obj_structure_2D[1, :, :] = 1 rgiObjects2D, nr_objects2D = ndimage.label(DATA, structure=obj_structure_2D) rgiObjNrs = np.unique(DATA)[1:] TT = np.zeros((MaxOb)) for obj in range(MaxOb): if object_indices[obj] != None: TT[obj] = object_indices[obj][0].stop - object_indices[obj][0].start TT = TT[rgiObjNrs-1] TT = TT.astype('int') # Sel_Obj = rgiObjNrs[TT > MinLif] # Average 2D objects in 3D objects? Av_2Dob = np.zeros((len(rgiObjNrs))) Av_2Dob[:] = np.nan ii = 1 object_split = {} # this directory holds information about splitting and merging of objects for obj in tqdm(range(len(rgiObjNrs))): iOb = rgiObjNrs[obj] if TT[obj] <= MinLif: # ignore short lived objects DATA[DATA == iOb] = 0 continue SelOb = rgiObjNrs[obj] - 1 DATA_ACT = np.copy(DATA[object_indices[SelOb]]) rgiObjects2D_ACT = np.copy(rgiObjects2D[object_indices[SelOb]]) rgiObjects2D_ACT[DATA_ACT != iOb] = 0 Av_2Dob[obj] = np.mean( np.array( [ len(np.unique(rgiObjects2D_ACT[tt, :, :])) - 1 for tt in range(DATA_ACT.shape[0]) ] ) ) if Av_2Dob[obj] <= AVmax: if obj_history == True: # this is a signle[ object object_split[str(iOb)] = [0] * TT[obj] if object_indices[SelOb][0].start == 0: # object starts when tracking starts object_split[str(iOb)][0] = -1 if object_indices[SelOb][0].stop == DATA.shape[0]-1: # object stops when tracking stops object_split[str(iOb)][-1] = -1 else: rgiObAct = np.unique(rgiObjects2D_ACT[0, :, :])[1:] for tt in range(1, rgiObjects2D_ACT[:, :, :].shape[0]): rgiObActCP = list(np.copy(rgiObAct)) for ob1 in rgiObAct: tt1_obj = list( np.unique( rgiObjects2D_ACT[tt, rgiObjects2D_ACT[tt - 1, :] == ob1] )[1:] ) if len(tt1_obj) == 0: # this object ends here rgiObActCP.remove(ob1) continue elif len(tt1_obj) == 1: rgiObjects2D_ACT[ tt, rgiObjects2D_ACT[tt, :] == tt1_obj[0] ] = ob1 else: VOL = [ np.sum(rgiObjects2D_ACT[tt, :] == tt1_obj[jj]) for jj,_ in enumerate(tt1_obj) ] rgiObjects2D_ACT[ tt, rgiObjects2D_ACT[tt, :] == tt1_obj[np.argmax(VOL)] ] = ob1 tt1_obj.remove(tt1_obj[np.argmax(VOL)]) rgiObActCP = rgiObActCP + list(tt1_obj) # make sure that mergers are assigned the largest object for ob2 in rgiObActCP: ttm1_obj = list( np.unique( rgiObjects2D_ACT[tt - 1, rgiObjects2D_ACT[tt, :] == ob2] )[1:] ) if len(ttm1_obj) > 1: VOL = [ np.sum(rgiObjects2D_ACT[tt - 1, :] == ttm1_obj[jj]) for jj,_ in enumerate(ttm1_obj) ] rgiObjects2D_ACT[tt, rgiObjects2D_ACT[tt, :] == ob2] = ttm1_obj[ np.argmax(VOL) ] # are there new object? NewObj = np.unique(rgiObjects2D_ACT[tt, :, :])[1:] NewObj = list(np.setdiff1d(NewObj, rgiObAct)) if len(NewObj) != 0: rgiObActCP = rgiObActCP + NewObj rgiObActCP = np.unique(rgiObActCP) rgiObAct = np.copy(rgiObActCP) rgiObjects2D_ACT[rgiObjects2D_ACT != 0] = np.copy( rgiObjects2D_ACT[rgiObjects2D_ACT != 0] + MaxOb ) MaxOb = np.max(DATA) # save the new objects to the original object array TMP = np.copy(DATA[object_indices[SelOb]]) TMP[rgiObjects2D_ACT != 0] = rgiObjects2D_ACT[rgiObjects2D_ACT != 0] DATA[object_indices[SelOb]] = np.copy(TMP) if obj_history == True: # ---------------------------------- # remember how objects start and end temp_obj = np.unique(TMP[DATA_ACT[:, :, :] == iOb]) for ob_ms in range(len(temp_obj)): t1_obj = temp_obj[ob_ms] sel_time = np.where(np.sum((TMP == t1_obj) > 0, axis=(1,2)) > 0)[0] obj_charac = [0] * len(sel_time) for kk in range(len(sel_time)): if sel_time[kk] == 0: # object starts when tracking starts obj_charac[kk] = -1 elif sel_time[kk]+1 == TMP.shape[0]: # object ends when tracking ends obj_charac[kk] = -1 # check if system starts from splitting t0_ob = TMP[sel_time[kk]-1,:,:][TMP[sel_time[kk],:,:] == t1_obj] unique_t0 = list(np.unique(t0_ob)) try: unique_t0.remove(0) except: pass try: unique_t0.remove(t1_obj) except: pass if len(unique_t0) == 0: # object has pure start or continues without interactions continue else: # Object merges with other object obj_charac[kk] = unique_t0[0] # check if object ends by merging if obj_charac[-1] != -1: if sel_time[-1]+1 == TMP.shape[0]: obj_charac[-1] = -1 else: t2_ob = TMP[sel_time[-1]+1,:,:][TMP[sel_time[-1],:,:] == t1_obj] unique_t2 = list(np.unique(t2_ob)) try: unique_t2.remove(0) except: pass try: unique_t2.remove(t1_obj) except: pass if len(unique_t2) != 0: obj_charac[-1] = unique_t2[0] object_split[str(t1_obj)] = obj_charac # clean up object matrix if obj_history == True: DATA_fin, object_split = clean_up_objects(DATA, dT, min_tsteps, obj_splitmerge = object_split) else: DATA_fin, object_split = clean_up_objects(DATA, dT, min_tsteps) end = time.perf_counter() timer(start, end) return DATA_fin, object_split
# from memory_profiler import profile # @profile_
[docs] def clean_up_objects(DATA, dT, min_tsteps = 0, obj_splitmerge = None): """ Function to remove objects that are too short lived and to numerrate the object from 1...N Parameters ---------- DATA : np.ndarray Labeled object array [time, lat, lon]. dT : int Time step (hours). min_tsteps : int Minimum lifetime of objects to keep (hours). obj_splitmerge : dict, optional Dictionary tracking object splits/merges. Default is None. Returns ------- objectsTMP : np.ndarray Cleaned labeled object array. obj_splitmerge_clean : dict Updated split/merge history dictionary. """ # 1. Find object slices (fast C-level operation) object_slices = ndimage.find_objects(DATA) max_label = len(object_slices) # 2. Create a Lookup Table (LUT) # Index = Old Label, Value = New Label # We initialize with 0 (background) lut = np.zeros(max_label + 1, dtype=DATA.dtype) min_duration = min_tsteps / dT new_label_counter = 1 # 3. Populate LUT (Pure metadata calculation, no array manipulation yet) # We iterate over slices, which is fast because we aren't touching the heavy 3D data for old_label_idx, slc in enumerate(object_slices): old_label = old_label_idx + 1 # find_objects ignores 0, so index 0 is label 1 if slc is not None: # Check duration (time is the first axis: index 0) duration = slc[0].stop - slc[0].start if duration >= min_duration: lut[old_label] = new_label_counter new_label_counter += 1 # else: lut[old_label] remains 0 (deleted) # else: lut[old_label] remains 0 (was missing) # 4. Apply LUT to the entire 3D array in one shot # This replaces the entire previous loop and masking logic objects_cleaned = lut[DATA] # 5. Handle Dictionary Optimization obj_splitmerge_clean = {} if obj_splitmerge is not None: # Vectorized update of the dictionary keys and values for old_key_str, split_list in obj_splitmerge.items(): old_key = int(old_key_str) # Get the new ID for this object if old_key <= max_label: new_key = lut[old_key] else: new_key = 0 # Out of bounds # Only keep entry if the main object survived (new_key > 0) if new_key > 0: # Update the values in the list using the same LUT # Filter out values that point to deleted objects (0) if desired, # or keep them. Here we update them. split_arr = np.array(split_list, dtype=int) # We need to handle IDs in split_list that might be larger than our current max_label # (though usually they shouldn't be). Clip or check bounds safe. valid_indices = split_arr <= max_label new_split_list = split_arr.copy() # Update valid IDs new_split_list[valid_indices] = lut[split_arr[valid_indices]] # Invalid/Deleted IDs become 0. You might want to filter 0s out: # new_split_list = new_split_list[new_split_list > 0] obj_splitmerge_clean[str(new_key)] = new_split_list.tolist() return objects_cleaned, obj_splitmerge_clean
# https://stackoverflow.com/questions/13542855/algorithm-to-find-the-minimum-area-rectangle-for-given-points-in-order-to-comput/33619018#33619018 import numpy as np from scipy.spatial import ConvexHull
[docs] def minimum_bounding_rectangle(points): """ Find the smallest bounding rectangle for a set of points. Returns a set of points representing the corners of the bounding box. :param points: an nx2 matrix of coordinates :rval: an nx2 matrix of coordinates """ from scipy.ndimage.interpolation import rotate pi2 = np.pi/2. # get the convex hull for the points hull_points = points[ConvexHull(points).vertices] # calculate edge angles edges = np.zeros((len(hull_points)-1, 2)) edges = hull_points[1:] - hull_points[:-1] angles = np.zeros((len(edges))) angles = np.arctan2(edges[:, 1], edges[:, 0]) angles = np.abs(np.mod(angles, pi2)) angles = np.unique(angles) # find rotation matrices # XXX both work rotations = np.vstack([ np.cos(angles), np.cos(angles-pi2), np.cos(angles+pi2), np.cos(angles)]).T rotations = rotations.reshape((-1, 2, 2)) # apply rotations to the hull rot_points = np.dot(rotations, hull_points.T) # find the bounding points min_x = np.nanmin(rot_points[:, 0], axis=1) max_x = np.nanmax(rot_points[:, 0], axis=1) min_y = np.nanmin(rot_points[:, 1], axis=1) max_y = np.nanmax(rot_points[:, 1], axis=1) # find the box with the best area areas = (max_x - min_x) * (max_y - min_y) best_idx = np.argmin(areas) # return the best box x1 = max_x[best_idx] x2 = min_x[best_idx] y1 = max_y[best_idx] y2 = min_y[best_idx] r = rotations[best_idx] rval = np.zeros((4, 2)) rval[0] = np.dot([x1, y2], r) rval[1] = np.dot([x2, y2], r) rval[2] = np.dot([x2, y1], r) rval[3] = np.dot([x1, y1], r) return rval
import cartopy.io.shapereader as shpreader import shapely.geometry as sgeom from shapely.ops import unary_union from shapely.prepared import prep land_geom = None land = None # prepared geometry def _ensure_land_geom(): global land_geom, land if land is not None: return # already prepared land_shp = shpreader.natural_earth( resolution="110m", category="physical", name="land" ) geoms = list(shpreader.Reader(land_shp).geometries()) if not geoms: raise RuntimeError("Natural Earth 'land' geometries are empty/unavailable.") land_geom = unary_union(geoms) land = prep(land_geom)
[docs] def is_land(x, y): _ensure_land_geom() return land.contains(sgeom.Point(float(x), float(y)))