Source code for moaap.trackers.cyclones

import numpy as np
from scipy import ndimage
from pdb import set_trace as stop
from moaap.utils.data_proc import smooth_uniform
from moaap.utils.segmentation import watershed_3d_overlap_parallel, analyze_watershed_history
from moaap.utils.object_props import clean_up_objects, BreakupObjects, ConnectLon_on_timestep
from moaap.utils.grid import radialdistance



[docs] def cy_acy_psl_tracking( slp, MaxPresAnCY, MinTimeCY, MinPresAnACY, MinTimeACY, dT, Gridspacing, connectLon, breakup = 'watershed', analyze_psl_history = False ): """ Tracks Cyclones (CY) and Anticyclones (ACY) based on Sea Level Pressure (SLP) anomalies. Parameters ---------- slp : np.ndarray Sea Level Pressure data [Pa]. MaxPresAnCY : float Maximum anomaly threshold for Cyclones. MinTimeCY : int Minimum lifetime for Cyclones (hours). MinPresAnACY : float Minimum anomaly threshold for Anticyclones. MinTimeACY : int Minimum lifetime for Anticyclones (hours). dT : int Time step (hours). Gridspacing : float Average grid spacing (m). connectLon : int 1 to connect across date line. breakup : str Method for object separation ('breakup' or 'watershed'). analyze_psl_history : bool, optional If True, computes watershed merge/split history. Default is False. Returns ------- CY_objects : np.ndarray Labeled Cyclone objects. ACY_objects : np.ndarray Labeled Anticyclone objects. """ import numpy as np print(' track cyclones') rgiObj_Struct=np.zeros((3,3,3)); rgiObj_Struct[:,:,:]=1 slp = slp/100. slp_smooth = smooth_uniform(slp, 1, int(100/(Gridspacing/1000.))) slpsmoothAn = smooth_uniform(slp, int(78/dT), int(int(3000/(Gridspacing/1000.)))) # set NaNs in smoothed fields nan = np.isnan(slp[0,:]) slp_smooth[:,nan] = np.nan slpsmoothAn[:,nan] = np.nan slp_Anomaly = slp_smooth-slpsmoothAn Pressure_anomaly = slp_Anomaly < MaxPresAnCY # 10 hPa depression | original setting was 12 rgiObjectsUD, nr_objectsUD = ndimage.label(Pressure_anomaly, structure=rgiObj_Struct) print(' '+str(nr_objectsUD)+' object found') CY_objects, _ = clean_up_objects(rgiObjectsUD, dT, min_tsteps=int(MinTimeCY/dT)) print(' break up long living CY objects using the '+breakup+' method') if breakup == 'breakup': CY_objects, object_split = BreakupObjects(CY_objects, int(MinTimeCY/dT), dT) if connectLon == 1: print(' connect cyclones objects over date line') CY_objects = ConnectLon_on_timestep(CY_objects) elif breakup == 'watershed': min_dist=int((1600 * 10**3)/Gridspacing) low_pres_an = np.copy(slp_Anomaly) low_pres_an[CY_objects == 0] = 0 low_pres_an[low_pres_an < -999999999] = 0 low_pres_an[low_pres_an > 999999999] = 0 CY_objects = watershed_3d_overlap_parallel( low_pres_an * -1, MaxPresAnCY * -1, MaxPresAnCY * -1.5, min_dist, dT, mintime = MinTimeCY, connectLon = connectLon, extend_size_ratio = 0.25 ) # CY_objects = watershed_field(anom_sel*-1, # Gridspacing, # min_dist, # threshold, # smooth_t, # smooth_xy) print(' track anti-cyclones') HighPressure_annomaly = slp_Anomaly > MinPresAnACY # 12 rgiObjectsUD, nr_objectsUD = ndimage.label(HighPressure_annomaly,structure=rgiObj_Struct) print(' '+str(nr_objectsUD)+' object found') ACY_objects, _ = clean_up_objects(rgiObjectsUD, dT, min_tsteps=int(MinTimeACY/dT)) print(' break up long living ACY objects that have many elements') if breakup == 'breakup': ACY_objects, object_split = BreakupObjects(ACY_objects, int(MinTimeCY/dT), dT) if connectLon == 1: # connect objects over date line ACY_objects = ConnectLon_on_timestep(ACY_objects) elif breakup == 'watershed': min_dist=int((1000 * 10**3 * 2)/Gridspacing) high_pres_an = np.copy(slp_Anomaly) high_pres_an[ACY_objects == 0] = 0 ACY_objects = watershed_3d_overlap_parallel( high_pres_an, MinPresAnACY, MinPresAnACY, min_dist, dT, mintime = MinTimeCY, connectLon = connectLon, extend_size_ratio = 0.15 ) if analyze_psl_history: min_dist=int((1000 * 10**3 * 2)/Gridspacing) print(f" Minimum distance between high_pres_anomaly maxima for watershed analysis: {min_dist} grid cells") union_array, events, histories, history_cy = analyze_watershed_history( CY_objects, min_dist, "cy_psl" ) union_array, events, histories, history_acy = analyze_watershed_history( ACY_objects, min_dist, "acy_psl" ) """ union_array_clean = {int(k): int(v) for k, v in union_array.items()} events_clean = [ { 'type': e['type'], 'time': int(e['time']), 'from_label': int(e['from_label']), 'to_label': int(e['to_label']), 'distance': float(e['distance']) } for e in events ] histories_clean = {int(root): [int(label) for label in labels] for root, labels in histories.items()} print(f" Printing union array: {dict(list(union_array_clean.items()))}") print(f" Printing events: {events_clean}") print(f" Printing histories: {dict(list(histories_clean.items()))}") """ else: history_cy = None history_acy = None return CY_objects, ACY_objects, history_cy, history_acy
[docs] def cy_acy_z500_tracking( z500, MinTimeCY, dT, Gridspacing, connectLon, z500_low_anom = -80, z500_high_anom = 70, breakup = 'breakup', analyze_z500_history = False ): """ Tracks mid-tropospheric cyclones and anticyclones based on Z500 anomalies. Parameters ---------- z500 : np.ndarray Geopotential height at 500 hPa [m or gpm]. MinTimeCY : int Minimum lifetime (hours). dT : int Time step (hours). Gridspacing : float Grid spacing (m). connectLon : int 1 to connect across date line. z500_low_anom : float Anomaly threshold for cyclones (e.g., -80 m). z500_high_anom : float Anomaly threshold for anticyclones (e.g., +70 m). breakup : str Method for object separation. Returns ------- cy_z500_objects : np.ndarray Labeled Z500 cyclone objects. acy_z500_objects : np.ndarray Labeled Z500 anticyclone objects. """ rgiObj_Struct=np.zeros((3,3,3)); rgiObj_Struct[:,:,:]=1 z500 = z500 / 9.81 z500_smooth = smooth_uniform(z500, 1, int(100/(Gridspacing/1000.))) z500smoothAn = smooth_uniform(z500, int(78/dT), int(int(3000/(Gridspacing/1000.)))) # set NaNs in smoothed fields nan = np.isnan(z500[0,:]) z500_smooth[:,nan] = np.nan z500smoothAn[:,nan] = np.nan z500_Anomaly = z500_smooth - z500smoothAn # ------------------------------------- print(' track 500 hPa cyclones') print(' break up long living cyclones using the '+breakup+' method') min_dist=int((1600 * 10**3)/Gridspacing) if breakup == 'breakup': cy_z500_objects, object_split = BreakupObjects(cy_z500_objects, int(MinTimeCY/dT), dT) elif breakup == 'watershed': cy_z500_objects = watershed_3d_overlap_parallel( z500_Anomaly * -1, z500_low_anom*-1, z500_low_anom*-1.1, min_dist, dT, mintime = MinTimeCY, ) if connectLon == 1: print(' connect cyclones objects over date line') cy_z500_objects = ConnectLon_on_timestep(cy_z500_objects) if analyze_z500_history: print(f" Minimum distance between z500_Anomaly minima for watershed analysis: {min_dist} grid cells") union_array, events, histories, history_cy500 = analyze_watershed_history( cy_z500_objects, min_dist, "cy_z500" ) """ union_array_clean = {int(k): int(v) for k, v in union_array.items()} events_clean = [ { 'type': e['type'], 'time': int(e['time']), 'from_label': int(e['from_label']), 'to_label': int(e['to_label']), 'distance': float(e['distance']) } for e in events ] histories_clean = {int(root): [int(label) for label in labels] for root, labels in histories.items()} print(f" Printing union array: {dict(list(union_array_clean.items()))}") print(f" Printing events: {events_clean}") print(f" Printing histories: {dict(list(histories_clean.items()))}") """ else: history_cy500 = None # ------------------------------------- print(' track 500 hPa anticyclones') print(' break up long living CY objects that heve many elements') if breakup == 'breakup': acy_z500_objects, object_split = BreakupObjects(acy_z500_objects, int(MinTimeCY/dT), dT) elif breakup == 'watershed': min_dist=int((1000 * 10**3 *2)/Gridspacing) # high_pres_an = np.copy(z500_Anomaly) # high_pres_an[acy_z500_objects == 0] = 0 acy_z500_objects = watershed_3d_overlap_parallel( z500_Anomaly, z500_high_anom, z500_high_anom * 1.5, min_dist, dT, mintime = MinTimeCY, extend_size_ratio = 0.25 ) if connectLon == 1: print(' connect cyclones objects over date line') acy_z500_objects = ConnectLon_on_timestep(acy_z500_objects) if analyze_z500_history: min_dist=int((1000 * 10**3)/Gridspacing) print(f" Minimum distance between z500_Anomaly minima for watershed analysis: {min_dist} grid cells") union_array, events, histories, history_acy500 = analyze_watershed_history( acy_z500_objects, min_dist, "acy_z500" ) """ union_array_clean = {int(k): int(v) for k, v in union_array.items()} events_clean = [ { 'type': e['type'], 'time': int(e['time']), 'from_label': int(e['from_label']), 'to_label': int(e['to_label']), 'distance': float(e['distance']) } for e in events ] histories_clean = {int(root): [int(label) for label in labels] for root, labels in histories.items()} print(f" Printing union array: {dict(list(union_array_clean.items()))}") print(f" Printing events: {events_clean}") print(f" Printing histories: {dict(list(histories_clean.items()))}") """ else: history_acy500 = None return cy_z500_objects, acy_z500_objects, history_cy500, history_acy500
[docs] def col_identification(cy_z500_objects, z500, u200, Frontal_Diagnostic, MinTimeC, dx, dy, Lon, Lat ): """ Identifies Cut-Off Lows (COLs) from tracked upper-level cyclones. Checks for isolation (Z500 gradient), flow reversal (poleward easterlies), and frontal separation. Parameters ---------- cy_z500_objects : np.ndarray Labeled 500hPa cyclone objects. z500 : np.ndarray Geopotential height at 500hPa. u200 : np.ndarray Zonal wind at 200hPa. Frontal_Diagnostic : np.ndarray Frontal parameter field. MinTimeC : int Minimum lifetime required. dx, dy : np.ndarray Grid spacing arrays. Returns ------- col_obj : np.ndarray Labeled Cut-Off Low objects. """ # area arround cyclone col_buffer = 500000 # m # check if cyclone is COL Objects=ndimage.find_objects(cy_z500_objects.astype(int)) col_obj = np.copy(cy_z500_objects); col_obj[:]=0 for ii in range(len(Objects)): if Objects[ii] == None: continue ObjACT = cy_z500_objects[Objects[ii]] == ii+1 if ObjACT.shape[0] < MinTimeC: continue dxObj = abs(np.mean(dx[Objects[ii][1],Objects[ii][2]])) dyObj = abs(np.mean(dy[Objects[ii][1],Objects[ii][2]])) col_buffer_obj_lo = int(col_buffer/dxObj) col_buffer_obj_la = int(col_buffer/dyObj) # add buffer to object slice tt_start = Objects[ii][0].start tt_stop = Objects[ii][0].stop lo_start = Objects[ii][2].start - col_buffer_obj_lo lo_stop = Objects[ii][2].stop + col_buffer_obj_lo la_start = Objects[ii][1].start - col_buffer_obj_la la_stop = Objects[ii][1].stop + col_buffer_obj_la if lo_start < 0: lo_start = 0 if lo_stop >= Lon.shape[1]: lo_stop = Lon.shape[1]-1 if la_start < 0: la_start = 0 if la_stop >= Lon.shape[0]: la_stop = Lon.shape[0]-1 LonObj = Lon[la_start:la_stop, lo_start:lo_stop] LatObj = Lat[la_start:la_stop, lo_start:lo_stop] z500_ACT = np.copy(z500[tt_start:tt_stop, la_start:la_stop, lo_start:lo_stop]) ObjACT = cy_z500_objects[tt_start:tt_stop, la_start:la_stop, lo_start:lo_stop] == ii+1 u200_ob = u200[tt_start:tt_stop, la_start:la_stop, lo_start:lo_stop] front_ob = Frontal_Diagnostic[tt_start:tt_stop, la_start:la_stop, lo_start:lo_stop] if LonObj[0,-1] - LonObj[0,0] > 358: sift_lo = 'yes' # object crosses the date line shift = int(LonObj.shape[1]/2) LonObj = np.roll(LonObj, shift, axis=1) LatObj = np.roll(LatObj, shift, axis=1) z500_ACT = np.roll(z500_ACT, shift, axis=2) ObjACT = np.roll(ObjACT, shift, axis=2) u200_ob = np.roll(u200_ob, shift, axis=2) front_ob = np.roll(front_ob, shift, axis=2) else: sift_lo = 'no' # find location of z500 minimum z500_ACT_obj = np.copy(z500_ACT) z500_ACT_obj[ObjACT == 0] = 999999999999. for tt in range(z500_ACT_obj.shape[0]): min_loc = np.where(z500_ACT_obj[tt,:,:] == np.nanmin(z500_ACT_obj[tt])) min_la = min_loc[0][0] min_lo = min_loc[1][0] la_0 = min_la - col_buffer_obj_la if la_0 < 0: la_0 = 0 lo_0 = min_lo - col_buffer_obj_lo if lo_0 < 0: lo_0 = 0 lat_reg = LatObj[la_0:min_la + col_buffer_obj_la+1, lo_0:min_lo + col_buffer_obj_lo+1] lon_reg = LonObj[la_0:min_la + col_buffer_obj_la+1, lo_0:min_lo + col_buffer_obj_lo+1] col_region = z500_ACT[tt, la_0:min_la + col_buffer_obj_la+1, lo_0:min_lo + col_buffer_obj_lo+1] obj_col_region = z500_ACT_obj[tt, la_0:min_la + col_buffer_obj_la+1, lo_0:min_lo + col_buffer_obj_lo+1] min_z500_obj = z500_ACT[tt,min_la,min_lo] u200_ob_region = u200_ob[tt, la_0:min_la + col_buffer_obj_la+1, lo_0:min_lo + col_buffer_obj_lo+1] front_ob_region = front_ob[tt, la_0:min_la + col_buffer_obj_la+1, lo_0:min_lo + col_buffer_obj_lo+1] # check if 350 km radius arround center has higher Z min_loc_tt = np.where(obj_col_region[:,:] == np.nanmin(z500_ACT_obj[tt])) min_la_tt = min_loc_tt[0][0] min_lo_tt = min_loc_tt[1][0] rdist = radialdistance(lat_reg[min_la_tt,min_lo_tt], lon_reg[min_la_tt,min_lo_tt], lat_reg, lon_reg) # COL should only occure between 20 and 70 degrees # https://journals.ametsoc.org/view/journals/clim/33/6/jcli-d-19-0497.1.xml if (abs(lat_reg[min_la_tt,min_lo_tt]) < 20) | (abs(lat_reg[min_la_tt,min_lo_tt]) > 70): ObjACT[tt,:,:] = 0 continue # remove cyclones that are close to the poles if np.max(np.abs(lat_reg)) > 88: ObjACT[tt,:,:] = 0 continue if np.nanmin(z500_ACT_obj[tt]) > 100000: # there is no object to process ObjACT[tt,:,:] = 0 continue # CRITERIA 1) at least 75 % of grid cells in ring have have 10 m higher Z than center ring = (rdist >= (350 - (dxObj/1000.)*2)) & (rdist <= (350 + (dxObj/1000.)*2)) if np.sum((min_z500_obj - col_region[ring]) < -10) < np.sum(ring)*0.75: ObjACT[tt,:,:] = 0 continue # CRITERIA 2) check if 200 hPa wind speed is eastward in the poleward direction of the cyclone if lat_reg[min_la_tt,min_lo_tt] > 0: east_flow = u200_ob_region[0 : min_la_tt, min_lo_tt] else: east_flow = u200_ob_region[min_la_tt : -1, min_lo_tt] try: if np.min(east_flow) > 0: ObjACT[tt,:,:] = 0 continue except: ObjACT[tt,:,:] = 0 continue # Criteria 3) frontal zone in eastern flank of COL front_test = np.sum(np.abs(front_ob_region[:, min_lo_tt:]) > 1) if front_test < 1: ObjACT[tt,:,:] = 0 continue if sift_lo == 'yes': ObjACT = np.roll(ObjACT, -shift, axis=2) ObjACT = ObjACT.astype('int') ObjACT[ObjACT > 0] = ii+1 ObjACT = ObjACT + col_obj[tt_start:tt_stop, la_start:la_stop, lo_start:lo_stop] col_obj[tt_start:tt_stop, la_start:la_stop, lo_start:lo_stop] = ObjACT return col_obj