LaVOZs

The World’s Largest Online Community for Developers

'; scala - Compare two columns in a dataframe and find the rate of change of values - LavOzs.Com

I am trying to compare two columns in a dataframe and to find out the rate of change of value. I wrote a UDF to achieve this but getting error while executing.

Below is the data structure in the dataframe.

+------------+-------------+-----------+------+
| NUM_ID     | TIME        |PREVIOUS_SG1|SG1_V|
+------------+-------------+-----------+------+
|XXXXX01     |1570167499000|  null     |79.0  |
|XXXXX01     |1570167502000|   79.0    |88.0  |
|XXXXX01     |1570167503000|  88.0     |99.0  |
|XXXXX01     |1570179810000|  99.0     |null  |
|XXXXX01     |1570179811000|  null     |100.0 |

below is the schema for this dataframe.

scala> castDF.printSchema
root
 |-- NUM_ID: string (nullable = true)
 |-- TIME: long (nullable = true)
 |-- PREVIOUS_SG1: double (nullable = true)
 |-- SG1_V: double (nullable = true)

Below is the UDF written.

def UDF_D:UserDefinedFunction=udf((PREV: Double,CURR: Double)=>{
  if(PREV != null || PREV !=0){
  val out = ((CURR-PREV)/PREV)*100
  out
  }})

and the scala code to call the UDF

val diffDF = castDF.withColumn("SG1_DIFF", (UDF_D(col("PREVIOUS_SG1"),col("SG1_V"))))

While executing I am getting below error.

scala> val diffDF = castDF.withColumn("SG1_DIFF", (UDF_D(col("PREVIOUS_SG1"),col("SG1_V"))))
java.lang.UnsupportedOperationException: Schema for type AnyVal is not supported

Is there any casting to be done to call the UDF or are the null values causing the issue ? I hope I am passing Double value and not dealing with any other Type.

No any casting need not to be done to call the UDF, However UDF & column type should be in sync. Also the null values are not cauisng the issue.

The problem is in UDF, UDF should always return a value. Add else condition in UDF when the input data is null or 0;

def UDF_D: UserDefinedFunction = udf((PREV: Double, CURR: Double) => {
    if (PREV != null || PREV != 0 || CURR != null || CURR != 0) {
      val out = ((CURR - PREV) / PREV) * 100
      out
    } else 0
})

you don't need udf to do this

df.select(when(('PREV.isNull || 'CURR === 0),  (('CURR-'PREV)/'PREV)*100).otherwise(0))

and as function

 def compareCols(PREV: Column, CURR: Column): Column = {
    when((PREV.isNull || CURR === 0),  ((CURR-PREV)/PREV)*100).otherwise(0)
  }

 val diffDF = df.withColumn("SG1_DIFF", compareCols('PREV,'CURR))
Related
How to sort a dataframe by multiple column(s)
Selecting multiple columns in a pandas dataframe
Adding new column to existing DataFrame in Python pandas
How to change the order of DataFrame columns?
Delete column from pandas DataFrame
How to drop rows of Pandas DataFrame whose value in a certain column is NaN
Change data type of columns in Pandas
How to select rows from a DataFrame based on column values?
Get list from pandas DataFrame column headers
Scala Spark udf java.lang.UnsupportedOperationException