I am trying to compare two columns in a dataframe and to find out the rate of change of value. I wrote a UDF to achieve this but getting error while executing.

Below is the data structure in the dataframe.

| NUM_ID     | TIME        |PREVIOUS_SG1|SG1_V|
|XXXXX01     |1570167499000|  null     |79.0  |
|XXXXX01     |1570167502000|   79.0    |88.0  |
|XXXXX01     |1570167503000|  88.0     |99.0  |
|XXXXX01     |1570179810000|  99.0     |null  |
|XXXXX01     |1570179811000|  null     |100.0 |

below is the schema for this dataframe.

scala> castDF.printSchema
 |-- NUM_ID: string (nullable = true)
 |-- TIME: long (nullable = true)
 |-- PREVIOUS_SG1: double (nullable = true)
 |-- SG1_V: double (nullable = true)

Below is the UDF written.

def UDF_D:UserDefinedFunction=udf((PREV: Double,CURR: Double)=>{
  if(PREV != null || PREV !=0){
  val out = ((CURR-PREV)/PREV)*100

and the scala code to call the UDF

val diffDF = castDF.withColumn("SG1_DIFF", (UDF_D(col("PREVIOUS_SG1"),col("SG1_V"))))

While executing I am getting below error.

scala> val diffDF = castDF.withColumn("SG1_DIFF", (UDF_D(col("PREVIOUS_SG1"),col("SG1_V"))))
java.lang.UnsupportedOperationException: Schema for type AnyVal is not supported

Is there any casting to be done to call the UDF or are the null values causing the issue ? I hope I am passing Double value and not dealing with any other Type.

No any casting need not to be done to call the UDF, However UDF & column type should be in sync. Also the null values are not cauisng the issue.

The problem is in UDF, UDF should always return a value. Add else condition in UDF when the input data is null or 0;

def UDF_D: UserDefinedFunction = udf((PREV: Double, CURR: Double) => {
    if (PREV != null || PREV != 0 || CURR != null || CURR != 0) {
      val out = ((CURR - PREV) / PREV) * 100
    } else 0

you don't need udf to do this'PREV.isNull || 'CURR === 0),  (('CURR-'PREV)/'PREV)*100).otherwise(0))

and as function

 def compareCols(PREV: Column, CURR: Column): Column = {
    when((PREV.isNull || CURR === 0),  ((CURR-PREV)/PREV)*100).otherwise(0)

 val diffDF = df.withColumn("SG1_DIFF", compareCols('PREV,'CURR))
