Table of Contents
Introduction
This article includes some Python functions for calculating the core climate extreme index. I post them here for my future reference, and share them with people who are interested in python or climate change. No guarantee of accuracy.
The definitions of the core climate extreme indices
1.Description of Climate Extreme Indexes by OCDP
2.Definitions of the core climate extreme indices by Canada.ca
Python functions for these climate extreme indices
Temperature indices
import numpy as np
#functions
def TN_index_TNn(tn):
TNn=np.amin(tn)
return TNn
def TN_index_FD(tn):
tn01=np.where(tn<0,1,0)
FD = np.sum(tn01, axis=0)
return FD
def TN_index_TN10LT(tn):
tn01=np.where(tn<=-10,1,0)
TN10LT = np.sum(tn01, axis=0)
return TN10LT
def TN_index_TR(tn):
tn01=np.where(tn>20,1,0)
TR = np.sum(tn01, axis=0)
return TR
def TN_index_TN10p(tn,tn10th):
dtn=tn-tn10th
dtn01=np.where(dtn<0,1,0)
TN10p=np.sum(dtn01)
return TN10p
def TN_index_TN90p(tn,tn90th):
dtn=tn-tn90th
dtn01=np.where(dtn>0,1,0)
TN90p=np.sum(dtn01)
return TN90p
def TN_index_TNCSDI(tn,tn10th):
dtn=tn-tn10th
dtn01=np.where(dtn<0,1,0)
dtn0100=np.insert(np.append(dtn01,0),0,0)
edges=np.diff(dtn0100)
rising=np.argwhere(edges==1)
falling=np.argwhere(edges==-1)
is_rising_empty=rising.size==0
isnot_rising_empty=rising.size>0
is_falling_empty=falling.size==0
isnot_falling_empty=falling.size>0
if isnot_rising_empty and isnot_falling_empty:
c=falling-rising
tnCSDI=np.sum(np.where(c>5,c,0))
elif isnot_falling_empty and is_rising_empty:
tnCSDI=dtn.size
else:
tnCSDI=0
return tnCSDI
def TX_index_SU(tx):
tx01=np.where(tx>25,1,0)
SU=np.sum(tx01)
return SU
def TX_index_TX30GE(tx):
tx01=np.where(tx>=30,1,0)
TX30GE=np.sum(tx01)
return TX30GE
def TX_index_TX35GE(tx):
tx01=np.where(tx>=35,1,0)
TX35GE=np.sum(tx01)
return TX35GE
def TX_index_ID(tx):
tx01=np.where(tx<0,1,0)
ID=np.sum(tx01)
return ID
def TX_index_TX10p(tx,tx10th):
dtx=tx-tx10th
dtx01=np.where(dtx<0,1,0)
TX10p=np.sum(dtx01)
return TX10p
def TX_index_TX90p(tx,tx90th):
dtx=tx-tx90th
dtx01=np.where(dtx>0,1,0)
TX90p=np.sum(dtx01)
return TX90p
def TX_index_TXx(tx):
TXx=np.amax(tx, axis=0)
return TXx
def TXTN_index_HDD(tx,tn):
tm=(tx+tn)/2.0
tm=18-tm
tmhdd=np.where(tm>0,tm,0)
HDD=np.sum(tmhdd)
return HDD
def TXTN_index_CDD(tx,tn):
tm=(tx+tn)/2
tm=tm-18
tmcdd=np.where(tm>0,tm,0)
CDD=np.sum(tmcdd)
return CDD
def TXTN_index_DTR(tx,tn):
txtndtr=tx-tn
DTR=np.mean(txtndtr)
return DTR
def TX_index_TXWSDI(tx,tx90th):
dtx=tx-tx90th
dtx01=np.where(dtx>0,1,0)
dtx0100=np.insert(np.append(dtx01,0),0,0)
edges=np.diff(dtx0100)
rising=np.argwhere(edges==1)
falling=np.argwhere(edges==-1)
is_rising_empty=rising.size==0
isnot_rising_empty=rising.size>0
is_falling_empty=falling.size==0
isnot_falling_empty=falling.size>0
if isnot_rising_empty and isnot_falling_empty:
c=falling-rising
TXWSDI=np.sum(np.where(c>5,c,0))
elif isnot_falling_empty and is_rising_empty:
TXWSDI=dtx.size
else:
TXWSDI=0
return TXWSDI
def TM_index_GSL(tm):
l=len(tm) #tm is 1-year data l=365 or 366
j1=l-1
j2=l-185
for j in range(l-6):
d1=tm[j:j+6]
nd1=np.where(d1>5,1,0)
sd1=sum(nd1)
if sd1==6:
j1=j
break
for j in np.arange(l-185,l-6):
d1=tm[j:j+6]
nd1=np.where(d1<5,1,0)
sd1=sum(nd1)
if sd1==6:
j2=j
break
GSL=j2-j1
GSL=np.where(GSL>0,GSL,0)
if GSL==0:
j1=0
j2=0
TM_index_GSL=(j1,j2,GSL)
return TM_index_GSL
def TX_HWDI_5Days5DegreeAboveClimatology(tx,txClimatology):
istxnan=np.sum(np.where(np.isnan(tx),1,0))
istxClimatologynan=np.sum(np.where(np.isnan(txClimatology),1,0))
if istxnan>0 or istxClimatologynan:
TX_HWDI_days=np.nan
TX_HWDI_strength=np.nan
TX_HWDI_period=np.nan
TX_HWDI_Numbers=np.nan
else:
dtx=tx-txClimatology-5
dtx01=np.where(dtx>0,1,0)
nohw=np.sum(dtx01)
if nohw<5:
TX_HWDI_days=0
TX_HWDI_strength=0
TX_HWDI_period=0
TX_HWDI_Numbers=0
else:
dtx0100=np.insert(np.append(dtx01,0),0,0)
edges=np.diff(dtx0100)
rising=np.argwhere(edges==1)
falling=np.argwhere(edges==-1)
c=falling-rising
#print(c)
TX_HWDI_days=np.max(np.where(c>4,c,0))
TX_HWDI_period=np.sum(np.where(c>4,c,0))
TX_HWDI_Numbers=np.sum(np.where(c>4,1,0))
if TX_HWDI_days>0:
ii=0
dtxsum=0
for i in c:
if i>4:
i1=rising[ii][0]
i2=i1+i[0]
#print((i1,i2))
dtxsum=dtxsum+np.sum(dtx[i1:i2])
ii=ii+1
else:
dtxsum=0
TX_HWDI_strength=np.round(dtxsum,2)
return (TX_HWDI_days,TX_HWDI_period,TX_HWDI_Numbers,TX_HWDI_strength)
def TX_HWDI_3DaysAbove30Degree(tx):
istxnan=np.sum(np.where(np.isnan(tx),1,0))
if istxnan>0:
TX_HWDI_days=np.nan
TX_HWDI_strength=np.nan
TX_HWDI_period=np.nan
TX_HWDI_Numbers=np.nan
else:
dtx=tx-30
dtx01=np.where(dtx>=0,1,0)
nohw=np.sum(dtx01)
if nohw<3:
TX_HWDI_days=0
TX_HWDI_strength=0
TX_HWDI_period=0
TX_HWDI_Numbers=0
else:
dtx0100=np.insert(np.append(dtx01,0),0,0)
edges=np.diff(dtx0100)
rising=np.argwhere(edges==1)
falling=np.argwhere(edges==-1)
c=falling-rising
#print(c)
TX_HWDI_days=np.max(np.where(c>2,c,0))
TX_HWDI_period=np.sum(np.where(c>2,c,0))
TX_HWDI_Numbers=np.sum(np.where(c>2,1,0))
if TX_HWDI_days>0:
ii=0
dtxsum=0
for i in c:
if i>2:
i1=rising[ii][0]
i2=i1+i[0]
#print((i1,i2))
dtxsum=dtxsum+np.sum(dtx[i1:i2])
ii=ii+1
else:
dtxsum=0
TX_HWDI_strength=np.round(dtxsum,2)
return (TX_HWDI_days,TX_HWDI_period,TX_HWDI_Numbers,TX_HWDI_strength)
Precipitation indices
## If missing more than 20% data, set the index nan
# Total cumulated precipitation
def PrecTOT(prDaily):
ind_PrecTOT=[]
prDaily_no_nan = prDaily[~np.isnan(prDaily)]
N = len(prDaily)
N2 = len(prDaily_no_nan)
if ((N2/N) < 0.8):
ind_PrecTOT = np.empty(1)
ind_PrecTOT = np.nan
else:
wetdayprDaily=np.where(prDaily_no_nan>=1,prDaily_no_nan,0)
ind_PrecTOT = np.nansum(wetdayprDaily)
return ind_PrecTOT
# No of wet days (precipitation ≥ 1 mm) [%]
def R1mm(prDaily):
ind_R1mm=[]
prDaily_no_nan = prDaily[~np.isnan(prDaily)]
N = len(prDaily)
N2 = len(prDaily_no_nan)
if (N2 == 0):
N2=1
if ((N2/N) < 0.8):
ind_R1mm = np.empty(1)
ind_R1mm = np.nan
else:
wetday=np.where(prDaily_no_nan>=1,1,0)
ind_R1mm=np.nansum(wetday)
return ind_R1mm
# No of wet days (precipitation ≥ 1 mm) [%]
def R10mm(prDaily):
ind_R10mm=[]
prDaily_no_nan = prDaily[~np.isnan(prDaily)]
N = len(prDaily)
N2 = len(prDaily_no_nan)
if (N2 == 0):
N2=1
if ((N2/N) < 0.8):
ind_R10mm = np.empty(1)
ind_R10mm = np.nan
else:
wetday=np.where(prDaily_no_nan>=10,1,0)
ind_R10mm=np.nansum(wetday)
return ind_R10mm
# No of wet days (precipitation ≥ 1 mm) [%]
def R20mm(prDaily):
ind_R20mm=[]
prDaily_no_nan = prDaily[~np.isnan(prDaily)]
N = len(prDaily)
N2 = len(prDaily_no_nan)
if (N2 == 0):
N2=1
if ((N2/N) < 0.8):
ind_R20mm = np.empty(1)
ind_R20mm = np.nan
else:
wetday=np.where(prDaily_no_nan>=20,1,0)
ind_R20mm=np.nansum(wetday)
return ind_R20mm
# Calculates the 95th percentile of precipitation (mm/day).
def pr95pDays(prDaily,pr95p):
ind_pr95pDays=[]
prDaily_no_nan = prDaily[~np.isnan(prDaily)]
N = len(prDaily)
N2 = len(prDaily_no_nan)
# condition on available data >= 80%
if ((N2/N) < 0.8):
ind_pr95pDays = np.empty(1)
ind_pr95pDays = np.nan
else:
prDaily = np.where(prDaily > pr95p,1,0)
ind_pr95pDays=np.sum(prDaily)
return ind_pr95pDays
def pr95pTOT(prDaily,pr95p):
ind_pr95pTOT=[]
prDaily_no_nan = prDaily[~np.isnan(prDaily)]
N = len(prDaily)
N2 = len(prDaily_no_nan)
# condition on available data >= 80%
if ((N2/N) < 0.8):
ind_pr95pTOT = np.empty(1)
ind_pr95pTOT = np.nan
else:
prDaily = np.where(prDaily > pr95p,prDaily,0)
ind_pr95pTOT=np.sum(prDaily)
return ind_pr95pTOT
def pr99pDays(prDaily,pr99p):
ind_pr99pDays=[]
prDaily_no_nan = prDaily[~np.isnan(prDaily)]
N = len(prDaily)
N2 = len(prDaily_no_nan)
# condition on available data >= 80%
if ((N2/N) < 0.8):
ind_pr99pDays = np.empty(1)
ind_pr99pDays = np.nan
else:
prDaily = np.where(prDaily > pr99p,1,0)
ind_pr99pDays=np.sum(prDaily)
return ind_pr99pDays
def pr99pTOT(prDaily,pr99p):
ind_pr99pTOT=[]
prDaily_no_nan = prDaily[~np.isnan(prDaily)]
N = len(prDaily)
N2 = len(prDaily_no_nan)
# condition on available data >= 80%
if ((N2/N) < 0.8):
ind_pr99pTOT = np.empty(1)
ind_pr99pTOT = np.nan
else:
prDaily = np.where(prDaily > pr99p,prDaily,0)
ind_pr99pTOT=np.sum(prDaily)
return ind_pr99pTOT
# Calculates the simple daily intensity of precipitation of the input signal.
def SDII(prDaily):
ind_SDII=[]
prDaily_no_nan = prDaily[~np.isnan(prDaily)]
N = len(prDaily)
N2 = len(prDaily_no_nan)
if ((N2/N) < 0.8):
ind_SDII = np.empty(1)
ind_SDII = np.nan
else:
prDaily = prDaily[prDaily > 1]
ind_SDII = np.round(np.nanmean(prDaily),2)
return ind_SDII
# Maximum number of dry consecutive days
def CDD(prDaily):
ind_CDD=[]
prDaily_no_nan = prDaily[~np.isnan(prDaily)]
N = len(prDaily)
N2 = len(prDaily_no_nan)
if ((N2/N) < 0.8):
ind_CDD = np.empty(1)
ind_CDD = np.nan
else:
temp = 0
ind_CDD = 0
j =0
while (j < N2):
while (j < N2 ) and (prDaily_no_nan[j] < 1.0 ):
j += 1
temp +=1
if ind_CDD < temp:
ind_CDD = temp
temp = 0
j += 1
return ind_CDD
# Maximum number of wet consecutive days
def CWD(prDaily):
ind_CWD=[]
prDaily_no_nan = prDaily[~np.isnan(prDaily)]
N = len(prDaily)
N2 = len(prDaily_no_nan)
if ((N2/N) < 0.8):
ind_CWD = np.empty(1)
ind_CWD = np.nan
else:
temp = 0
ind_CWD = 0
j =0
while (j < N2):
while (j < N2 ) and (prDaily_no_nan[j] > 1.0 ):
j += 1
temp +=1
if ind_CWD < temp:
ind_CWD = temp
temp = 0
j += 1
return ind_CWD
# Calculates the maximum total precipitation in 1 day (mm).
def Rx1day(prDaily):
ind_Rx1day=[]
prDaily_no_nan = prDaily[~np.isnan(prDaily)]
N = len(prDaily)
N2 = len(prDaily_no_nan)
if ((N2/N) < 0.8):
ind_Rx1day = np.empty(1)
ind_Rx1day = np.nan
else:
temp = 0
ind_Rx1day = 0
for i in range(0,N):
if (~np.isnan(prDaily[i])):
temp = prDaily[i]
if ind_Rx1day < temp:
ind_Rx1day = temp
return ind_Rx1day
# Calculates the maximum total precipitation cumulated in 5 consecutive days (mm).
def Rx5day(prDaily):
ind_Rx5day=[]
prDaily_no_nan = prDaily[~np.isnan(prDaily)]
N = len(prDaily)
N2 = len(prDaily_no_nan)
if ((N2/N) < 0.8):
ind_Rx5day = np.empty(1)
ind_Rx5day = np.nan
else:
temp = 0
ind_Rx5day = 0
for i in range(0,N-4):
if (~np.isnan(prDaily[i])) and (~np.isnan(prDaily[i+1])) and (~np.isnan(prDaily[i+2])) and (~np.isnan(prDaily[i+3])) and (~np.isnan(prDaily[i+4])):
temp = prDaily[i] + prDaily[i+1] + prDaily[i+2] + prDaily[i+3] + prDaily[i+4]
if ind_Rx5day < temp:
ind_Rx5day = temp
return ind_Rx5day
Examples
The following code uses two different definitions to calculate the heat wave index.
import np as np
import pandas as pd
#txClimatology= # provide 365 values for each grid points
gridID=pd.read_csv('data/LAMPS_Ontario_8964Pts.csv')
gridID.set_index('ID',inplace=True)
TX_HWDI_def0=gridID.copy()
TX_HWDI_def1=gridID.copy()
cols=['TX_HWDI_days','TX_HWDI_period','TX_HWDI_Numbers','TX_HWDI_strength']
TX_HWDI_def0[cols] =np.empty(shape=(8964,4))
TX_HWDI_def1[cols] =np.empty(shape=(8964,4))
theyear=str(year)
csvfile='data/csvFiles/daily_maximum_temperature_8964_ERA5_1985.csv'
df_max=pd.read_csv(csvfile)
for ip in range(8964): #there are 8964 grid points in the daily maximum temperature data
tx=df_max.iloc[ip,3:368] # data for the 365 days in a year
txClimatology=mtx25yrs1[ip,:]
hw1=TX_HWDI_5Days5DegreeAboveClimatology(tx,txClimatology)
hw2=TX_HWDI_3DaysAbove30Degree(tx)
TX_HWDI_def0.loc[ip,cols]=hw1
TX_HWDI_def1.loc[ip,cols]=hw2
f1='data/csvFiles/TX_HWDI_def0_'+theyear+'.csv'
f2='data/csvFiles/TX_HWDI_def1_'+theyear+'.csv'
TX_HWDI_def0.to_csv(f1)
TX_HWDI_def1.to_csv(f2)
Reference
Definitions of the core climate extreme indices. There are 27 core climate extreme indices.
These 27 climate extremes indices are defined by the Expert Team on Climate Change Detection and Indices (ETCCDI). ETCCDI is a joint international effort that includes the participation of Environment and Climate Change Canada among other organizations.
Description of Climate Extreme Indexes by OCDP
https://lamps.math.yorku.ca/OntarioClimate/index_app_documents.htm#/indexDefinationTable
Compute Extreme Precipitation Indices using Python
https://www.guillaumedueymes.com/post/extreme_precipitation/