import numpy as np
from scipy import ndimage
from moaap.utils.segmentation import (
watershed_3d_overlap_parallel,
analyze_watershed_history
)
from moaap.utils.data_proc import smooth_uniform
from moaap.utils.object_props import clean_up_objects, BreakupObjects, is_land
from tqdm import tqdm
from pdb import set_trace as stop
#from memory_profiler import profile
# @profile_
[docs]
def mcs_tb_tracking(
tb,
pr,
SmoothSigmaC,
Pthreshold,
CL_Area,
CL_MaxT,
Cthreshold,
MinAreaC,
MinTimeC,
MCS_minPR,
MCS_minTime,
MCS_Minsize,
dT,
Area,
connectLon,
Gridspacing,
breakup = 'watershed',
analyze_mcs_history = False,
):
"""
Tracks Mesoscale Convective Systems (MCS) using Brightness Temperature (Tb).
Verifies candidates using precipitation criteria.
Parameters
----------
tb : np.ndarray
Brightness temperature [K].
pr : np.ndarray
Precipitation [mm/h].
SmoothSigmaC : float
Smoothing factor for Tb.
Pthreshold : float
Precipitation threshold.
CL_Area, CL_MaxT : float
Cloud shield area and max temp thresholds.
Cthreshold : float
Tb threshold for initial cloud detection.
MinAreaC : float
Minimum cloud area for initial detection.
MinTimeC : int
Minimum cloud duration.
MCS_minPR : float
Minimum peak precipitation for MCS verification.
MCS_minTime : int
Minimum MCS duration.
MCS_Minsize : float
Minimum precipitation area for MCS verification.
dT : int
Data timestep (hours).
Area : np.ndarray
Grid cell area array.
connectLon : int
1 to connect objects across date line.
Gridspacing : float
Grid spacing in meters.
breakup : str
Method to handle merging objects ('breakup' or 'watershed').
analyze_mcs_history : bool
If True, computes watershed merge/split history.
Returns
-------
MCS_objects_Tb : np.ndarray
Labeled MCS objects based on Tb.
C_objects : np.ndarray
Labeled cloud objects (all clouds, not just MCS).
"""
print(' track clouds')
print(' break up long living cloud shield objects with '+breakup+' that have many elements')
if breakup == 'breakup':
C_objects, object_split = BreakupObjects(C_objects,
int(MinTimeC/dT),
dT)
elif breakup == 'watershed':
# C_objects = watersheding(C_objects,
# 6, # at least six grid cells apart
# 1)
threshold=1
min_dist=int((np.sqrt(CL_Area/np.pi))/(Gridspacing/1000))*4
C_objects = watershed_3d_overlap_parallel(
tb * -1,
Cthreshold * -1,
Cthreshold * -1, #CL_MaxT * -1,
min_dist,
dT,
mintime = MinTimeC,
connectLon = connectLon,
extend_size_ratio = 0.10
)
# check if precipitation object is from an MCS
object_indices = ndimage.find_objects(C_objects)
MCS_objects_Tb = np.zeros(C_objects.shape,dtype=int)
for iobj,_ in tqdm(enumerate(object_indices)):
if object_indices[iobj] is None:
continue
time_slice = object_indices[iobj][0]
lat_slice = object_indices[iobj][1]
lon_slice = object_indices[iobj][2]
tb_object_slice= C_objects[object_indices[iobj]]
tb_object_act = np.where(tb_object_slice==iobj+1,True,False)
if len(tb_object_act) < MCS_minTime:
continue
tb_slice = tb[object_indices[iobj]]
tb_act = np.copy(tb_slice)
tb_act[~tb_object_act] = np.nan
bt_object_slice = C_objects[object_indices[iobj]]
bt_object_act = np.copy(bt_object_slice)
bt_object_act[~tb_object_act] = 0
area_act = np.tile(Area[lat_slice, lon_slice], (tb_act.shape[0], 1, 1))
area_act[~tb_object_act] = 0
### Calculate cloud properties
tb_size = np.array(np.sum(area_act,axis=(1,2)))
tb_min = np.array(np.nanmin(tb_act,axis=(1,2)))
### Calculate precipitation properties
pr_act = np.copy(pr[object_indices[iobj]])
pr_act[tb_object_act == 0] = np.nan
pr_peak_act = np.array(np.nanmax(pr_act,axis=(1,2)))
pr_region_act = pr_act >= Pthreshold #*dT
area_act = np.tile(Area[lat_slice, lon_slice], (tb_act.shape[0], 1, 1))
area_act[~pr_region_act] = 0
pr_under_cloud = np.array(np.sum(area_act,axis=(1,2)))/1000**2
# Test if object classifies as MCS
tb_size_test = np.max(np.convolve((tb_size / 1000**2 >= CL_Area), np.ones(MCS_minTime), 'valid') / MCS_minTime) == 1
tb_overshoot_test = np.max((tb_min <= CL_MaxT )) == 1
pr_peak_test = np.max(np.convolve((pr_peak_act >= MCS_minPR ), np.ones(MCS_minTime), 'valid') / MCS_minTime) ==1
pr_area_test = np.max((pr_under_cloud >= MCS_Minsize)) == 1
MCS_test = (
tb_size_test
& tb_overshoot_test
& pr_peak_test
& pr_area_test
)
# assign unique object numbers
tb_object_act = np.array(tb_object_act).astype(int)
tb_object_act[tb_object_act == 1] = iobj + 1
if MCS_test == 1:
TMP = np.copy(MCS_objects_Tb[object_indices[iobj]])
TMP = TMP + tb_object_act
MCS_objects_Tb[object_indices[iobj]] = TMP
else:
continue
MCS_objects_Tb, _ = clean_up_objects(MCS_objects_Tb,
dT,
min_tsteps=int(MCS_minTime/dT))
# analyze MCS history with the watershed method
if analyze_mcs_history:
min_dist=int(((CL_Area/np.pi)**0.5)/(Gridspacing/1000))*2
print(f" Minimum distance between TB minima for watershed analysis: {min_dist} grid cells")
union_array, events, histories, history_MCSs = analyze_watershed_history(
MCS_objects_Tb, min_dist, "mcs"
)
"""
union_array_clean = {int(k): int(v) for k, v in union_array.items()}
events_clean = [
{
'type': e['type'],
'time': int(e['time']),
'from_label': int(e['from_label']),
'to_label': int(e['to_label']),
'distance': float(e['distance'])
}
for e in events
]
histories_clean = {int(root): [int(label) for label in labels] for root, labels in histories.items()}
print(f" Printing union array: {dict(list(union_array_clean.items()))}")
print(f" Printing events: {events_clean}")
print(f" Printing histories: {dict(list(histories_clean.items()))}")
union_array, events, histories, history_clouds = analyze_watershed_history(
C_objects, min_dist, "mcs"
)
"""
else:
history_MCSs = None
return MCS_objects_Tb, C_objects, history_MCSs
#from memory_profiler import profile
# @profile_
[docs]
def cloud_tracking(
tb,
pr,
# MCS_obj,
connectLon,
Gridspacing,
dT,
tb_threshold = 241,
tb_overshoot = 235,
erosion_disk = 1.5,
min_dist = 8,
analyze_cloud_history = False
):
"""
Tracks clouds from hourly or sub-hourly brightness temperature data.
Calculates cloud statistics, including their precipitation (pr) properties if pr is provided.
Parameters
----------
tb (float): brightness temperature of dimension [time,lat,lon]
pr (float): precipitation rate of dimension [time,lat,lon]
connectLon (bol): 1 means that clouds should be connected accross date line
Gridspacing (float): average horizontal grid spacing in [m]
dT (int): time step of data in hours
tb_threshold (float, optional): tb threshold to define cloud mask. Default is "241".
tb_overshoot (float, optional): tb threshold to find local minima for watershedding. Default is "235".
erosion_disk (float, optional): reduction of next timestep mask for temporal connection of features. Larger values result in more erosion and can remove smaller clouds. The default is "0.15".
min_dist (int, optional): minimum distance in grid cells between two tb minima (overshoots). The default is "8".
analyze_cloud_history (bool, optional): If True, computes watershed merge/split history. Default is False.
Returns
-------
cloud_objects (np.ndarray): labeled cloud objects of dimension [time,lat,lon]
"""
CL_Area = min_dist * Gridspacing
print(' track clouds')
print(' break up long living cloud shield objects with wathershedding')
min_dist=int(((CL_Area/np.pi)**0.5)/(Gridspacing/1000))*4
print(f" Minimum distance between TB minima for watershed analysis: {min_dist} grid cells")
cloud_objects = watershed_3d_overlap_parallel(
tb * -1,
tb_threshold * -1,
tb_overshoot * -1, #CL_MaxT * -1,
min_dist,
dT,
mintime = 0,
connectLon = connectLon,
extend_size_ratio = 0.10,
# erosion_disk = erosion_disk
)
print(" make sure that each object has at least one grid cell with more than min_pr threshold of precipitation")
min_pr = 2 * dT # minimum precipitation in [mm/h]
object_indices = ndimage.find_objects(cloud_objects)
for iobj,_ in tqdm(enumerate(object_indices)):
if object_indices[iobj] is None:
continue
pr_object_slice= cloud_objects[object_indices[iobj]]
pr_object_act = np.where(pr_object_slice==iobj+1,True,False)
pr_slice = pr[object_indices[iobj]]
pr_act = np.copy(pr_slice)
pr_act[~pr_object_act] = np.nan
if np.nanmax(pr_act) < min_pr:
cloud_objects[object_indices[iobj]][cloud_objects[object_indices[iobj]] == iobj+1] = 0
if analyze_cloud_history:
min_dist=8
print(f" Minimum distance between TB minima for watershed analysis: {min_dist} grid cells")
union_array, events, histories, history_clouds = analyze_watershed_history(
cloud_objects, min_dist, "cloud"
)
"""
union_array_clean = {int(k): int(v) for k, v in union_array.items()}
events_clean = [
{
'type': e['type'],
'time': int(e['time']),
'from_label': int(e['from_label']),
'to_label': int(e['to_label']),
'distance': float(e['distance'])
}
for e in events
]
histories_clean = {int(root): [int(label) for label in labels] for root, labels in histories.items()}
print(f" Printing union array: {dict(list(union_array_clean.items()))}")
print(f" Printing events: {events_clean}")
print(f" Printing histories: {dict(list(histories_clean.items()))}")
"""
else:
history_clouds = None
return cloud_objects, history_clouds