Basic PySpark commands

Would you like to start using Databricks or Synapse Spark Pool? Let’s review useful Pyspark commands used in Apache Spark DataFrames and transform data …

Documentation was divided into 7 components.

FILES – walk through folders and files in Databricks.
READ – load data to dataframe
TRANSFORM – basic transformation on dataframe
WRITE – write dataframe to destination folder
METASTORE– table, view registration in Databricks
DELTA – useful delta functionalities
OTHERS – schema, explain, partitioning etc.

FILES

Files section will allow to walk through folders and files in Databricks.

1. List files in folder

dbutils.fs.ls function allows to list all files in particular folder of mounted storage.

dbutils.fs.ls("source_path")

source_path – path of a file to be loaded

2. List all files in folder and all subfolders of mounted Storage

Following python script creates a list of files in folder and all subfolders of mounted storage

 import os
 file_list = {}
 for subdir, dirs, files in os.walk("/dbfs/mnt/"):
    for filename in files:
        if subdir not in file_list.keys():
            file_list[subdir] = []
        file_list[subdir].append(filename)

mnt – name of mounted Data Lake

3. Remove files

dbutils.fs.rm function allows to remove files from particular folder of mounted storage.

 dbutils.fs.rm("source_path", recurse=True)

[collapse]
READ

Read section will allow to load data to dataframe.

1. Read file to dataframe

spark.read function allows to load data from storage to cluster memory.

 df=spark.read.format("format").load("source_path")

source_path – path of file to be loaded(eg. /mnt/prd/bronze/dbr/training/2021/08/03/)
format – parquet, delta, csv etc.

2. Read table to dataframe

spark.table function allows to load data from Databricks table to cluster memory.

 df=spark.table("schema_name.table_name")

schema_name – schema in Databricks
table_name – table in Databricks

3. Create dataframe from values

 df=spark.createDataFrame([("value1", value2), ("value1", value2)], ["column_name1", "column_name2"])

value1 – value for first column (string)
value2 – value for second column (numeric)
column_name1 – name of first column
column_name2 – name of second column

[collapse]
TRANSFORM

This section is about simple transformation that can be implemented in Databricks.

1. Filter

filter/where functions allow to filter data of dataframe. There are several ways described below to use filter/where functions.

 df.filter(df.column_name=="value")

 from pyspark.sql.functions import col
 df.where(col("column_name")=="value")

 df.where(col("column_name").isNotNull())

 df.where((col("column_name")=="value") & (col("column_name2")=="value2"))

 df.where((col("column_name")=="value") | (col("column_name2")=="value2"))

column_name – column that should be filtered
value – value that should be used in filter
& – AND
| – OR

2. Join

join function allows to merge 2 dataframes into single one. There are 2 ways to join data depending if you want to show joining column values twice.

 first_dataframe.join(second_dataframe, first_dataframe.column_name==second_dataframe.column_name, "join_type")

 first_dataframe.join(second_dataframe, ["column_name"],"join_type")

join_type – inner, left_outer, right_outer etc.
column_name – join column name

There are 5 types of joins

– the broadcast hash join (BHJ) – one small (less than 10 MB) and one larger dataset,
– shuffle hash join (SHJ),
– shuffle sort merge join (SMJ) – two large datasets a common key that is sortable, unique, and can be assigned to or stored in the same partition,
– broadcast nested loop join (BNLJ),
– shuffle-an-replicated nested loop join

3. Add column

withColumn function allows to add new column to dataframe.

 from pyspark.sql.functions import current_timestamp,lit
 df.withColumn("alias_name",current_timestamp(),\
  .withColumn("alias_name2", lit("test"))

alias_name- name of new column

4. Case when

when/expr functions allow to add new column depending on different if logic.

 df.select("column_name1", F.when(df.column_name2 > X, Y).otherwise(Z).alias(alias_name2))

 from pyspark.sql.functions import expr
 df.withColumn("column_name", expr("CASE WHEN column_name > X THEN Y ELSE Z END"))

5. Select column

select functions allow to select interesting column from dataframe.

 from pyspark.sql.functions import col
 df.select(col("column_name").cast("type_of_column").alias("alias_name"),\
 "column_name2",\
 col("column_name3").cast("type_of_column3").alias("alias_name3")))

column_name – column that should be selected
type_of_column – type of transformed column
alias_name – new column name

Use col function if you want to easily change format or name of the column.

6. Drop column

drop function allows to drop not needed column from dataframe.

 df.drop(df.column_name).drop(df.column_name2)

 df.drop("column_name","column_name2")

7. Aggregate

groupBy function allows to aggregate measures on particular level.

 df.groupBy.function(column_name)

 from pyspark.sql.functions import function
 df.groupBy("grouping_column_name")\
 .agg(function("column_name").alias("alias_name"),\
 function("column_name").alias("alias_name"))

grouping_column_name – column used in grouping
column_name – KPI used to aggregate
function – max, min, sum, avg etc.
alias_name- KPI alias name

.agg() aggregation function is used to to calculate multiple aggregations at the same time.

8. Sort

sort/order by functions allow to sort data either ascending or descending based on selected columns.

 df.sort(df.column_name.desc())

 df.sort("column_name", ascending=False)

 df.orderBy(["column_name", "column_name2"], ascending=[0,1])

9. Remove duplicates

 df.dropDuplicates()

 df.distinct()

10. Spark SQL

spark.sql allows to type sql query based on registered tables or views in Databricks.

 spark.sql("""SELECT column_name
 FROM schema_name.table_name
 WHERE column_name = 'ORD'
 ORDER by column_name DESC""")

11. Datetime Functions

 from pyspark.sql.functions import date_format, month, to_date, date_add
 df.withColumn("date string", date_format("column_name", "MMMM dd, yyyy"))\
  .withColumn("time string", date_format("column_name", "HH:mm:ss.SSSSSS"))\
  .withColumn("month", month(col("column_name")))\
  .withColumn("date", to_date(col("column_name")))\
  .withColumn("plus_two_days",date_add(col("column_name"),2))

[collapse]
WRITE

Write section will describe how to save transformed data to destination folder.

1. Overwrite, append

 df.write.format("format")\
 .mode("write_mode")\
 .option("mergeSchema", True)\
 .partitionBy("partitioned_column")\
 .save("dest_path")

format – parquet, delta, csv etc.
write_mode – overwrite, append
dest_path – path were transformed data should be written
partitioned_column – column that should be used for partitioning

2. Merge (Delta)

 deltaTable = DeltaTable.forPath(spark, "dest_path")
 deltaTable.alias("t").merge(df.alias("s"), "s.ID=t.ID")\
 .whenMatchedUpdateAll()\
 .whenNotMatchedInsertAll()\
 .execute()

dest_path – path of existing files (target) in Delta Lake
df – dataframe (source) that will be merged with target data

[collapse]
METASTORE

Metastore-defined table allows to read files using SQL.

1. Create table

create table statement allows to create table metastore based on existing files in Delta Lake

 %sql
 CREATE TABLE dest_schema_name.dest_table_name 
 (column_name column_type)
 USING DELTA
 LOCATION "dest_path"

dest_path – path of existing files in Delta Lake
dest_schema_name – schema in Databricks
dest_table_name – table in Databricks

2. Save as table

saveAsTable option allows to create table metastore in Databricks while saving file in Data Lake.

 df.write.format(delta)\
 .option("path",dest_path)\
 .option("mergeSchema", True)\
 .saveAsTable("dest_schema_name.dest_table_name")

dest_path – path were transformed data should be written
dest_schema_name – schema in Databricks
dest_table_name – table in Databricks

3. Register as view

createGlobalTempView statement allows to create view that is available for other users.

createOrReplaceTempView statement allows to create temporary view that can be used within the same session.

 df.createGlobalTempView("schema_name.table_name")

 df.createOrReplaceTempView("table_name")

[collapse]
DELTA

1. Audit

history function allows to see summary of transaction log.

 deltaTable=DeltaTable.forPath(spark,"path")
 deltaTable.history(no_records)

path – path of delta files
no_records – filter history of operations on delta files

 DESCRIBE HISTORY schema_name.table_name LIMIT no_records

schema_name – schema in Databricks
table_name – table in Databricks
no_records – filter history of operations on delta files

2. Time Travel

restore functions allows to get old version of delta files.

 deltaTable=DeltaTable.forPath(spark,"path")
 deltaTable.restoreToVersion(X)

path – path of delta files
X – version to be restored

 %sql
 RESTORE TABLE schema_name.table_name TO VERSION AS OF X
 RESTORE TABLE schema_name.table_name` TO TIMESTAMP AS OF DATE

X – version to be restored
DATE – date of version to be restored

3. Files Retention

vacum function allows to remove old delta files from Storage.

 VACUUM schema_name.table_name RETAIN X hours

 deltaTable=DeltaTable.forPath(spark,"path") 
 deltaTable.vacuum(X)

[collapse]
OTHERS

1. SCHEMA DEFINITION

 from pyspark.sql.types import *
 schema = StructType([StructField("author", StringType(), False),
 StructField("title", StringType(), False),
 StructField("pages", IntegerType(), False)])

 from pyspark.sql import SparkSession 
 schema = "'author' STRING, 'title' STRING, 'pages' INT"

2.  EXPLAIN

explain function allows to see execution plan.

 df.explan(mode="format_mode")

mode – simple, formatted, extended, codegen, cost

3. REPARTITIONING

Partitioning allows for efficient parallelism. A distributed scheme of breaking up data into chunks or partitions allows Spark executors to process only data that is close to them, minimizing network bandwidth. The size of a partition in Spark is dictated by spark.sql.files.maxPartitionBytes. The default is 128 MB. You can decrease the size, but that may result in what’s known as the “small file problem”—many small partition files, introducing an inordinate amount of disk I/O and performance degradation thanks to filesystem operations such as opening, closing, and listing directories, which on a distributed filesystem can be slow. Partitions are also created when you explicitly use certain methods of the DataFrame API. For example, while creating a large DataFrame or reading a large file from disk, you can explicitly instruct Spark to create a certain number of partitions.

df.repartition(X)

X – number of partitions

[collapse]