Category : PySpark

from pyspark.sql import SparkSession from pyspark.sql.types import * #data types from pyspark.sql import functions as F #functions spark=SparkSession.builder.appName(‘XIU-Daily’).getOrCreate() input_fn = ‘s-p-tsx-60-futures_01.csv’ df = spark.read.csv(input_fn,header=True,inferSchema=True) df.show(3) +——————-+—–+ | date|value| +——————-+—–+ |1999-09-07 00:00:00|416.5| |1999-09-08 00:00:00|417.2| |1999-09-09 00:00:00|421.5| +——————-+—–+ df=df.withColumn(‘Date’,F.date_format(‘date’,’yyyy-MM-dd’)) #change date format df=df.withColumn(‘current_date’,F.current_date()) #current date df=df.withColumn(‘year’,F.year(‘date’)) df=df.withColumn(‘month’,F.month(‘date’)) df=df.withColumn(‘dayofmonth’,F.dayofmonth(‘date’)) df=df.withColumn(‘minute’,F.minute(‘date’)) df=df.withColumn(‘second’,F.second(‘date’)) df=df.withColumn(‘dayofyear’,F.dayofyear(‘date’)) df=df.withColumn(‘dayofweek’,F.dayofweek(‘date’)) df=df.withColumn(‘weekofyear’,F.weekofyear(‘date’)) df=df.withColumn(‘quarter’,F.quarter(‘date’)) df=df.withColumn(‘next_day_Mon’,F.next_day(‘date’,’Mon’)) df=df.withColumn(‘next_day_Tue’,F.next_day(‘date’,’Tue’)) df=df.withColumn(‘next_day_Wed’,F.next_day(‘date’,’Wed’)) ..

Read more

df.orderBy(‘colname1′,’colname2’,ascending=False) from pyspark.sql.functions import sort_array df = spark.createDataFrame([([2, 1, 3],),([1],),([],)], [‘data’]) df.show() +———+ | data| +———+ |[2, 1, 3]| | [1]| | []| +———+ df0=spark.createDataFrame(df.select(sort_array(df.data).alias(‘r’)).collect(),[‘data’] df0.show() +———+ | data| +———+ |[1, 2, 3]| | [1]| | []| +———+ df1=spark.createDataFrame(df.select(sort_array(df.data, asc=False).alias(‘r’)).collect(),[‘data’]) df1.show() +———+ | data| +———+ |[3, 2, 1]| | [1]| | []| +..

Read more

Rename columns x1 to x3, x2 to x4 from pyspark.sql import SparkSession spark=SparkSession.builder.appName(‘rename columns’).getOrCreate() data = spark.createDataFrame([(1,2), (3,4)], [‘x1’, ‘x2’]) data.show() data = data.withColumnRenamed(‘x1′,’x3’) \ .withColumnRenamed(‘x2’, ‘x4’) d..

Read more

Examples for pivot_table of Pandas and crosstab of Pyspark from my work directory:pyWorkDir/Bigdata/Pyspark/DataForYuanPei.ipynb pivot_table casepandas=indcases.toPandas() casetable1=pd.pivot_table(casepandas, values=’VALUE’, index=[“Case identifier number”], columns=[“Case information”], aggfunc=np.sum) crosstab casetable=casedf.crosstab(‘case_Date’,’province’) casetable=casetable.toPandas() casetable=casetable.sort_values(‘case_Date_province’) cumsum_casetable=casetable.set_index(‘case_Date_province’).cumsum() cumsum_casetable[‘CA’]=cumsum_casetable.sum(axis=1) casedftable=casedf.crosstab(‘case_Date’,’health_region’) health_region_table=casedftable.select([‘case_Date_health_region’,’Toronto’,’Montréal’,’Vancouver Coastal’,..

Read more

Based on my code: Canada_COVID19_cases_information.ipynb I like this way to convert string to date: from pyspark.sql.types import * #data types func = udf (lambda x: datetime.strptime(x, ‘%d/%m/%Y’), DateType()) df = df.withColumn(‘newDate’, func(col(‘Date’))) calculate difference days between two date: Some good examples from pyspark.sql import functions as F df = df.withColumn(‘startDay’,F.lit(‘2020-01-01’).cast(“Date”)) df = df.withColumn(‘Days_from_01_Jan’,F.datediff(F.col(‘newDate’),F.col(‘startDay’))) convert pandas ..

Read more