How to calculate the core climate extreme indices with python

  Data, Numpy

Introduction

This article includes some Python functions for calculating the core climate extreme index. I post them here for my future reference, and share them with people who are interested in python or climate change. No guarantee of accuracy.

The definitions of the core climate extreme indices

1.Description of Climate Extreme Indexes by OCDP

2.Definitions of the core climate extreme indices by Canada.ca

Python functions for these climate extreme indices

Temperature indices


import numpy as np
#functions
def TN_index_TNn(tn):
    TNn=np.amin(tn)
    return TNn
def TN_index_FD(tn):
    tn01=np.where(tn<0,1,0)
    FD = np.sum(tn01, axis=0)
    return FD
def TN_index_TN10LT(tn):
    tn01=np.where(tn<=-10,1,0)
    TN10LT = np.sum(tn01, axis=0)
    return TN10LT    
        
def TN_index_TR(tn):
    tn01=np.where(tn>20,1,0)
    TR = np.sum(tn01, axis=0)
    return TR
def TN_index_TN10p(tn,tn10th):
    dtn=tn-tn10th
    dtn01=np.where(dtn<0,1,0)
    TN10p=np.sum(dtn01)
    return TN10p
def TN_index_TN90p(tn,tn90th):
    dtn=tn-tn90th
    dtn01=np.where(dtn>0,1,0)
    TN90p=np.sum(dtn01)
    return TN90p    
def TN_index_TNCSDI(tn,tn10th):
    dtn=tn-tn10th
    dtn01=np.where(dtn<0,1,0)
    dtn0100=np.insert(np.append(dtn01,0),0,0)
    edges=np.diff(dtn0100)
    rising=np.argwhere(edges==1)
    falling=np.argwhere(edges==-1)
    is_rising_empty=rising.size==0
    isnot_rising_empty=rising.size>0
    is_falling_empty=falling.size==0
    isnot_falling_empty=falling.size>0
    if isnot_rising_empty and isnot_falling_empty:
        c=falling-rising
        tnCSDI=np.sum(np.where(c>5,c,0))
    elif isnot_falling_empty and is_rising_empty:
        tnCSDI=dtn.size
    else:
        tnCSDI=0
    return tnCSDI   
    
def TX_index_SU(tx):
    tx01=np.where(tx>25,1,0)
    SU=np.sum(tx01)
    return SU
def TX_index_TX30GE(tx):
    tx01=np.where(tx>=30,1,0)
    TX30GE=np.sum(tx01)
    return TX30GE
def TX_index_TX35GE(tx):
    tx01=np.where(tx>=35,1,0)
    TX35GE=np.sum(tx01)
    return TX35GE
def TX_index_ID(tx):
    tx01=np.where(tx<0,1,0)
    ID=np.sum(tx01)
    return ID
def TX_index_TX10p(tx,tx10th):
    dtx=tx-tx10th
    dtx01=np.where(dtx<0,1,0)
    TX10p=np.sum(dtx01)
    return TX10p
def TX_index_TX90p(tx,tx90th):
    dtx=tx-tx90th
    dtx01=np.where(dtx>0,1,0)
    TX90p=np.sum(dtx01)
    return TX90p    
def TX_index_TXx(tx):
    TXx=np.amax(tx, axis=0)
    return TXx
def TXTN_index_HDD(tx,tn):
    tm=(tx+tn)/2.0
    tm=18-tm
    tmhdd=np.where(tm>0,tm,0)
    HDD=np.sum(tmhdd)
    return HDD
def TXTN_index_CDD(tx,tn):
    tm=(tx+tn)/2
    tm=tm-18
    tmcdd=np.where(tm>0,tm,0)
    CDD=np.sum(tmcdd)
    return CDD
def TXTN_index_DTR(tx,tn):
    txtndtr=tx-tn
    DTR=np.mean(txtndtr)
    return DTR
def TX_index_TXWSDI(tx,tx90th):
    dtx=tx-tx90th
    dtx01=np.where(dtx>0,1,0)
    dtx0100=np.insert(np.append(dtx01,0),0,0)
    edges=np.diff(dtx0100)
    rising=np.argwhere(edges==1)
    falling=np.argwhere(edges==-1)
    is_rising_empty=rising.size==0
    isnot_rising_empty=rising.size>0
    is_falling_empty=falling.size==0
    isnot_falling_empty=falling.size>0
    if isnot_rising_empty and isnot_falling_empty:
        c=falling-rising
        TXWSDI=np.sum(np.where(c>5,c,0))
    elif isnot_falling_empty and is_rising_empty:
        TXWSDI=dtx.size
    else:
        TXWSDI=0
    return TXWSDI
def TM_index_GSL(tm):
    l=len(tm) #tm is 1-year data l=365 or 366
    j1=l-1
    j2=l-185
    for j in range(l-6):
        d1=tm[j:j+6]
        nd1=np.where(d1>5,1,0)
        sd1=sum(nd1)
        if sd1==6:
            j1=j
            break
    for j in np.arange(l-185,l-6):
        d1=tm[j:j+6]
        nd1=np.where(d1<5,1,0)
        sd1=sum(nd1)
        if sd1==6:
            j2=j
            break
    GSL=j2-j1
    GSL=np.where(GSL>0,GSL,0)
    if GSL==0:
        j1=0
        j2=0
    TM_index_GSL=(j1,j2,GSL)
    return TM_index_GSL
    
def TX_HWDI_5Days5DegreeAboveClimatology(tx,txClimatology):
    istxnan=np.sum(np.where(np.isnan(tx),1,0))
    istxClimatologynan=np.sum(np.where(np.isnan(txClimatology),1,0))
    if istxnan>0 or istxClimatologynan:
        TX_HWDI_days=np.nan
        TX_HWDI_strength=np.nan
        TX_HWDI_period=np.nan
        TX_HWDI_Numbers=np.nan
    else:
        dtx=tx-txClimatology-5
        dtx01=np.where(dtx>0,1,0)
        nohw=np.sum(dtx01)
        if nohw<5:
            TX_HWDI_days=0
            TX_HWDI_strength=0
            TX_HWDI_period=0
            TX_HWDI_Numbers=0
        else:
            dtx0100=np.insert(np.append(dtx01,0),0,0)
            edges=np.diff(dtx0100)
            rising=np.argwhere(edges==1)
            falling=np.argwhere(edges==-1)
            c=falling-rising
            #print(c)
            TX_HWDI_days=np.max(np.where(c>4,c,0))
            TX_HWDI_period=np.sum(np.where(c>4,c,0))
            TX_HWDI_Numbers=np.sum(np.where(c>4,1,0))
            if TX_HWDI_days>0:
                ii=0
                dtxsum=0
                for i in c:
                    if i>4:
                        i1=rising[ii][0]
                        i2=i1+i[0]
                        #print((i1,i2))
                        dtxsum=dtxsum+np.sum(dtx[i1:i2])
                    ii=ii+1
            else:
                dtxsum=0
            TX_HWDI_strength=np.round(dtxsum,2)
    return (TX_HWDI_days,TX_HWDI_period,TX_HWDI_Numbers,TX_HWDI_strength)
def TX_HWDI_3DaysAbove30Degree(tx):
    istxnan=np.sum(np.where(np.isnan(tx),1,0))
    if istxnan>0:
        TX_HWDI_days=np.nan
        TX_HWDI_strength=np.nan
        TX_HWDI_period=np.nan
        TX_HWDI_Numbers=np.nan
    else:
        dtx=tx-30
        dtx01=np.where(dtx>=0,1,0)
        nohw=np.sum(dtx01)
        if nohw<3:
            TX_HWDI_days=0
            TX_HWDI_strength=0
            TX_HWDI_period=0
            TX_HWDI_Numbers=0
        else:
            dtx0100=np.insert(np.append(dtx01,0),0,0)
            edges=np.diff(dtx0100)
            rising=np.argwhere(edges==1)
            falling=np.argwhere(edges==-1)
            c=falling-rising
            #print(c)
            TX_HWDI_days=np.max(np.where(c>2,c,0))
            TX_HWDI_period=np.sum(np.where(c>2,c,0))
            TX_HWDI_Numbers=np.sum(np.where(c>2,1,0))
            if TX_HWDI_days>0:
                ii=0
                dtxsum=0
                for i in c:
                    if i>2:
                        i1=rising[ii][0]
                        i2=i1+i[0]
                        #print((i1,i2))
                        dtxsum=dtxsum+np.sum(dtx[i1:i2])
                    ii=ii+1
            else:
                dtxsum=0 
            TX_HWDI_strength=np.round(dtxsum,2)    
    return (TX_HWDI_days,TX_HWDI_period,TX_HWDI_Numbers,TX_HWDI_strength)

Precipitation indices


## If missing more than 20% data, set the index nan
# Total cumulated precipitation
def PrecTOT(prDaily):
    ind_PrecTOT=[]
    prDaily_no_nan = prDaily[~np.isnan(prDaily)]
    N = len(prDaily)
    N2 = len(prDaily_no_nan)
    if ((N2/N) < 0.8):
        ind_PrecTOT = np.empty(1)
        ind_PrecTOT = np.nan
    else:
        wetdayprDaily=np.where(prDaily_no_nan>=1,prDaily_no_nan,0)
        ind_PrecTOT = np.nansum(wetdayprDaily)        
    return ind_PrecTOT 
 
# No of wet days (precipitation ≥ 1 mm) [%]
def R1mm(prDaily):
    ind_R1mm=[]
    prDaily_no_nan = prDaily[~np.isnan(prDaily)]
    N = len(prDaily)
    N2 = len(prDaily_no_nan)
    if (N2 == 0):
        N2=1
    if ((N2/N) < 0.8): 
        ind_R1mm = np.empty(1)
        ind_R1mm = np.nan
    else:
        wetday=np.where(prDaily_no_nan>=1,1,0)
        ind_R1mm=np.nansum(wetday)
    return ind_R1mm
# No of wet days (precipitation ≥ 1 mm) [%]
def R10mm(prDaily):
    ind_R10mm=[]
    prDaily_no_nan = prDaily[~np.isnan(prDaily)]
    N = len(prDaily)
    N2 = len(prDaily_no_nan)
    if (N2 == 0):
        N2=1
    if ((N2/N) < 0.8): 
        ind_R10mm = np.empty(1)
        ind_R10mm = np.nan
    else:
        wetday=np.where(prDaily_no_nan>=10,1,0)
        ind_R10mm=np.nansum(wetday)
    return ind_R10mm
# No of wet days (precipitation ≥ 1 mm) [%]
def R20mm(prDaily):
    ind_R20mm=[]
    prDaily_no_nan = prDaily[~np.isnan(prDaily)]
    N = len(prDaily)
    N2 = len(prDaily_no_nan)
    if (N2 == 0):
        N2=1
    if ((N2/N) < 0.8): 
        ind_R20mm = np.empty(1)
        ind_R20mm = np.nan
    else:
        wetday=np.where(prDaily_no_nan>=20,1,0)
        ind_R20mm=np.nansum(wetday)
    return ind_R20mm
# Calculates the 95th percentile of precipitation (mm/day).
def pr95pDays(prDaily,pr95p):
    ind_pr95pDays=[]
    prDaily_no_nan = prDaily[~np.isnan(prDaily)]
    N = len(prDaily)
    N2 = len(prDaily_no_nan)
    # condition on available data >= 80%
    if ((N2/N) < 0.8): 
        ind_pr95pDays = np.empty(1)
        ind_pr95pDays = np.nan
    else:
        prDaily = np.where(prDaily > pr95p,1,0)
        ind_pr95pDays=np.sum(prDaily)
    return ind_pr95pDays
def pr95pTOT(prDaily,pr95p):
    ind_pr95pTOT=[]
    prDaily_no_nan = prDaily[~np.isnan(prDaily)]
    N = len(prDaily)
    N2 = len(prDaily_no_nan)
    # condition on available data >= 80%
    if ((N2/N) < 0.8): 
        ind_pr95pTOT = np.empty(1)
        ind_pr95pTOT = np.nan
    else:
        prDaily = np.where(prDaily > pr95p,prDaily,0)
        ind_pr95pTOT=np.sum(prDaily)
    return ind_pr95pTOT
def pr99pDays(prDaily,pr99p):
    ind_pr99pDays=[]
    prDaily_no_nan = prDaily[~np.isnan(prDaily)]
    N = len(prDaily)
    N2 = len(prDaily_no_nan)
    # condition on available data >= 80%
    if ((N2/N) < 0.8): 
        ind_pr99pDays = np.empty(1)
        ind_pr99pDays = np.nan
    else:
        prDaily = np.where(prDaily > pr99p,1,0)
        ind_pr99pDays=np.sum(prDaily)
    return ind_pr99pDays
def pr99pTOT(prDaily,pr99p):
    ind_pr99pTOT=[]
    prDaily_no_nan = prDaily[~np.isnan(prDaily)]
    N = len(prDaily)
    N2 = len(prDaily_no_nan)
    # condition on available data >= 80%
    if ((N2/N) < 0.8): 
        ind_pr99pTOT = np.empty(1)
        ind_pr99pTOT = np.nan
    else:
        prDaily = np.where(prDaily > pr99p,prDaily,0)
        ind_pr99pTOT=np.sum(prDaily)
    return ind_pr99pTOT
# Calculates the simple daily intensity of precipitation of the input signal.
def SDII(prDaily):
    ind_SDII=[]
    prDaily_no_nan = prDaily[~np.isnan(prDaily)]
    N = len(prDaily)
    N2 = len(prDaily_no_nan)
    if ((N2/N) < 0.8): 
        ind_SDII = np.empty(1)
        ind_SDII = np.nan
    else:
        prDaily = prDaily[prDaily > 1]        
        ind_SDII = np.round(np.nanmean(prDaily),2)
    return ind_SDII       
# Maximum number of dry consecutive days
def CDD(prDaily):
    ind_CDD=[]
    prDaily_no_nan = prDaily[~np.isnan(prDaily)]
    N = len(prDaily)
    N2 = len(prDaily_no_nan)
    if ((N2/N) < 0.8): 
        ind_CDD = np.empty(1)
        ind_CDD = np.nan
    else:
        temp = 0
        ind_CDD = 0 
        j =0
        while (j < N2):
            while (j < N2 ) and (prDaily_no_nan[j] < 1.0 ):
                    j += 1
                    temp +=1
            if ind_CDD < temp:
                 ind_CDD = temp
            temp = 0
            j += 1 
    return ind_CDD      
# Maximum number of wet consecutive days
def CWD(prDaily):
    ind_CWD=[]
    prDaily_no_nan = prDaily[~np.isnan(prDaily)]
    N = len(prDaily)
    N2 = len(prDaily_no_nan)
    if ((N2/N) < 0.8): 
        ind_CWD = np.empty(1)
        ind_CWD = np.nan
    else:
        temp = 0
        ind_CWD = 0 
        j =0
        while (j < N2):
            while (j < N2 ) and (prDaily_no_nan[j] > 1.0 ):
                j += 1
                temp +=1
            if ind_CWD < temp:
                ind_CWD = temp
            temp = 0
            j += 1 
    return ind_CWD   
# Calculates the maximum total precipitation in 1 day (mm).
def Rx1day(prDaily):
    ind_Rx1day=[]
    prDaily_no_nan = prDaily[~np.isnan(prDaily)]
    N = len(prDaily)
    N2 = len(prDaily_no_nan)
    if ((N2/N) < 0.8): 
        ind_Rx1day = np.empty(1)
        ind_Rx1day = np.nan
    else:
        temp = 0
        ind_Rx1day = 0 
        for i in range(0,N):
            if (~np.isnan(prDaily[i])):
                temp = prDaily[i]
            if ind_Rx1day < temp:
                ind_Rx1day = temp
    return ind_Rx1day         
# Calculates the maximum total precipitation cumulated in 5 consecutive days (mm).
def Rx5day(prDaily):
    ind_Rx5day=[]
    prDaily_no_nan = prDaily[~np.isnan(prDaily)]
    N = len(prDaily)
    N2 = len(prDaily_no_nan)
    if ((N2/N) < 0.8): 
        ind_Rx5day = np.empty(1)
        ind_Rx5day = np.nan
    else:
        temp = 0
        ind_Rx5day = 0 
        for i in range(0,N-4):
            if (~np.isnan(prDaily[i])) and (~np.isnan(prDaily[i+1])) and (~np.isnan(prDaily[i+2])) and (~np.isnan(prDaily[i+3])) and (~np.isnan(prDaily[i+4])):
                temp = prDaily[i] + prDaily[i+1] + prDaily[i+2] + prDaily[i+3] + prDaily[i+4]
            if ind_Rx5day < temp:
                ind_Rx5day = temp
    return ind_Rx5day         

Examples

The following code uses two different definitions to calculate the heat wave index.


import np as np
import pandas as pd

#txClimatology= # provide 365 values for each grid points
gridID=pd.read_csv('data/LAMPS_Ontario_8964Pts.csv')
gridID.set_index('ID',inplace=True)

TX_HWDI_def0=gridID.copy()
TX_HWDI_def1=gridID.copy()
cols=['TX_HWDI_days','TX_HWDI_period','TX_HWDI_Numbers','TX_HWDI_strength']
TX_HWDI_def0[cols]  =np.empty(shape=(8964,4))
TX_HWDI_def1[cols]  =np.empty(shape=(8964,4))
theyear=str(year)
csvfile='data/csvFiles/daily_maximum_temperature_8964_ERA5_1985.csv'
df_max=pd.read_csv(csvfile)

for ip in range(8964): #there are 8964 grid points in the daily maximum temperature data
    tx=df_max.iloc[ip,3:368] # data for the 365 days in a year
    txClimatology=mtx25yrs1[ip,:]
    hw1=TX_HWDI_5Days5DegreeAboveClimatology(tx,txClimatology)
    hw2=TX_HWDI_3DaysAbove30Degree(tx)
    TX_HWDI_def0.loc[ip,cols]=hw1
    TX_HWDI_def1.loc[ip,cols]=hw2
f1='data/csvFiles/TX_HWDI_def0_'+theyear+'.csv'
f2='data/csvFiles/TX_HWDI_def1_'+theyear+'.csv'
TX_HWDI_def0.to_csv(f1)
TX_HWDI_def1.to_csv(f2)

Reference

Definitions of the core climate extreme indices. There are 27 core climate extreme indices.

These 27 climate extremes indices are defined by the Expert Team on Climate Change Detection and Indices (ETCCDI). ETCCDI is a joint international effort that includes the participation of Environment and Climate Change Canada among other organizations.

Description of Climate Extreme Indexes by OCDP

https://lamps.math.yorku.ca/OntarioClimate/index_app_documents.htm#/indexDefinationTable

Compute Extreme Precipitation Indices using Python

https://www.guillaumedueymes.com/post/extreme_precipitation/