Spark DataFrame External schema Error

apache-spark

#1

Learn Spark 1.6.x or Spark 2.x on our state of the art big data labs

  • Click here for access to state of the art 13 node Hadoop and Spark Cluster

@dgadiraju
Dear Itversity,

I tried to create the data-frame with an external schema. My data is in .txt file.
Dataframe created but while accessing the df throws schema mismatch error.

Data

1-venkatesh-10-10000
2-Aarthi-20-3000
3-Anu-30-40000
4-Muthu-40-40000
5-venkatesh-00-2000
6-Anu-00-40000
7-Aarthi-30-40000

Code

import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession, types}
import org.apache.spark.sql.functions._

object DuplicateDetection {
def main(args: Array[String]): Unit = {

val spark = SparkSession.builder()
  .appName("Duplicate Detection")
  .master("local[*]")
  .getOrCreate()

import spark.implicits._

val rawData = spark.sparkContext.textFile("src/test/Resources/DataSets/Emp.txt")
val rowRD = rawData.map(line => line.split("-"))
val rowRDD = rowRD.map(r => Row(r(0).toInt, r(1), r(2).toInt, r(3))
)

// rowRDD.collect().foreach(println)
val schema = StructType(Array(
StructField(“id”, IntegerType, true),
StructField(“name”, StringType, true),
StructField(“age”, IntegerType, true),
StructField(“sal”, IntegerType, true)
))
val df = spark.createDataFrame(rowRDD, schema)
df.printSchema()
df.show()

Error

18/04/21 20:21:15 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 2)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, id), IntegerType) AS id#0
± if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, id), IntegerType)
:- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
: :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
: : ± input[0, org.apache.spark.sql.Row, true]
: ± 0
:- null

Correct me where I did wrong. I thought of the problem with my schema mapping.
Don’t know how to parse it.

Regards
Venkatesh


#2

As part of below code sal is of type int

val schema = StructType(Array(
StructField(“id”, IntegerType, true),
StructField(“name”, StringType, true),
StructField(“age”, IntegerType, true),
StructField(“sal”, IntegerType, true)
))

But you are not converting r(3) to Int as part of this snippet.

val rowRDD = rowRD.map(r => Row(r(0).toInt, r(1), r(2).toInt, r(3)))


#3

Thanks Durga.

It’s working now, got the results. :slight_smile: