I'm trying to make year and month columns from a column named logtimestamp (of type TimeStampType) in spark. The data source is cassandra. I am using sparkshell to perform these steps, here is the code I have written -
import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.types._
var logsDF = spark.read.cassandraFormat("tableName", "cw").load()
var newlogs = logsDF.withColumn("year", year(col("logtimestamp")))
.withColumn("month", month(col("logtimestamp")))
newlogs.write.cassandraFormat("tableName_v2", "cw")
.mode("Append").save()
But these steps do not succeed, I end up with the following error
java.lang.ArithmeticException: long overflow
at java.lang.Math.multiplyExact(Math.java:892)
at org.apache.spark.sql.catalyst.util.DateTimeUtils$.millisToMicros(DateTimeUtils.scala:205)
at org.apache.spark.sql.catalyst.util.DateTimeUtils$.fromJavaTimestamp(DateTimeUtils.scala:166)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$TimestampConverter$.toCatalystImpl(CatalystTypeConverters.scala:327)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$TimestampConverter$.toCatalystImpl(CatalystTypeConverters.scala:325)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:107)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:252)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:242)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:107)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$.$anonfun$createToCatalystConverter$2(CatalystTypeConverters.scala:426)
at com.datastax.spark.connector.datasource.UnsafeRowReader.read(UnsafeRowReaderFactory.scala:34)
at com.datastax.spark.connector.datasource.UnsafeRowReader.read(UnsafeRowReaderFactory.scala:21)
at com.datastax.spark.connector.datasource.CassandraPartitionReaderBase.$anonfun$getIterator$2(CassandraScanPartitionReaderFactory.scala:110)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:494)
at com.datastax.spark.connector.datasource.CassandraPartitionReaderBase.next(CassandraScanPartitionReaderFactory.scala:66)
at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:413)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
I thought it was something to do with null values in the table so I ran the following
scala> logsDF.filter("logtimestamp is null").show()
But this too gave the same long overflow error.
How come there is an overflow in spark but not in cassandra when both have timestamps of 8 bytes? What could be the issue here and how do I extract year and month from timestamp correctly?
Turns out one of the cassandra table had a timestamp value that was greater than the highest value allowed by spark but not large enough to overflow in cassandra. The timestamp had been manually edited to get around the upserting that is done by default in cassandra, but this led to some large values being formed during development. Ran a python script to find this out.