PySpark User Defined Functions (UDF)

User defined functions (UDF) in PySpark can be used to extend built-in function library to provide extra functionality, for example, creating a function to extract values from XML, etc. This code snippet shows you how to implement an UDF in PySpark. It shows two slightly different approaches - one use udf decorator and another without. Output: ``` +--------------+-------+--------+--------+ | str_col|int_col|str_len1|str_len2| +--------------+-------+--------+--------+ |Hello Kontext!| 100| 14| 14| |Hello Context!| 100| 14| 14| +--------------+-------+--------+--------+ ```

Kontext Kontext 0 283 0.27 index 8/18/2022

Code description

User defined functions (UDF) in PySpark can be used to extend built-in function library to provide extra functionality, for example, creating a function to extract values from XML, etc. 

This code snippet shows you how to implement an UDF in PySpark. It shows two slightly different approaches - one use udf decorator and another without. 

Output:

    +--------------+-------+--------+--------+
    |       str_col|int_col|str_len1|str_len2|
    +--------------+-------+--------+--------+
    |Hello Kontext!|    100|      14|      14|
    |Hello Context!|    100|      14|      14|
    +--------------+-------+--------+--------+  
    

Code snippet

    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType
    from pyspark.sql.functions import udf
    
    app_name = "PySpark  Exmaple"
    master = "local"
    
    spark = SparkSession.builder         .appName(app_name)         .master(master)         .getOrCreate()
    
    spark.sparkContext.setLogLevel("WARN")
    
    data = [['Hello Kontext!', 100], ['Hello Context!', 100]]
    
    # Define the schema for the input data
    schema = StructType([StructField('str_col', StringType(), nullable=True),
                         StructField('int_col', IntegerType(), nullable=True)])
    
    # Create a DataFrame with the schema provided
    df = spark.createDataFrame(data=data, schema=schema)
    
    
    @udf(IntegerType())
    def custom_udf1(str):
        return len(str)
    
    
    def custom_func2(str):
        return len(str)
    
    
    custom_udf2 = udf(custom_func2, returnType="int")
    
    df = df.withColumn('str_len1', custom_udf1(df.str_col)).withColumn(
        'str_len2', custom_udf2(df.str_col))
    
    df.show()
    
pyspark

Join the Discussion

View or add your thoughts below

Comments