Unable to partition the data properly while reading a huge RDBMS table into Spark


#1

I am trying to read a table on postgres which is of size 400GB and insert its data into Hive table on HDFS. This is how I wrote the code to read it:
val conf = new SparkConf().setAppName(“Spark-JDBC”).set(“spark.executor.heartbeatInterval”,“120s”).set(“spark.network.timeout”,“12000s”).set(“spark.sql.inMemoryColumnarStorage.compressed”, “true”).set(“spark.sql.orc.filterPushdown”,“true”).set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”).set(“spark.kryoserializer.buffer.max”,“512m”).set(“spark.serializer”, classOf[org.apache.spark.serializer.KryoSerializer].getName).set(“spark.streaming.stopGracefullyOnShutdown”,“true”).set(“spark.yarn.driver.memoryOverhead”,“7168”).set(“spark.yarn.executor.memoryOverhead”,“7168”).set(“spark.sql.shuffle.partitions”, “61”).set(“spark.default.parallelism”, “60”).set(“spark.memory.storageFraction”,“0.5”).set(“spark.memory.fraction”,“0.6”).set(“spark.memory.offHeap.enabled”,“true”).set(“spark.memory.offHeap.size”,“16g”).set(“spark.dynamicAllocation.enabled”, “false”).set(“spark.dynamicAllocation.enabled”,“true”).set(“spark.shuffle.service.enabled”,“true”)
val spark = SparkSession.builder().config(conf).master(“yarn”).enableHiveSupport().config(“hive.exec.dynamic.partition”, “true”).config(“hive.exec.dynamic.partition.mode”, “nonstrict”).getOrCreate()
def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
val colList = allColumns.split(",").toList
val (partCols, npartCols) = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ’ ')))
val queryCols = npartCols.mkString(",") + “, 0 as " + flagCol + “,” + partCols.reverse.mkString(”,")
val execQuery = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year=‘2017’ and period_num=‘12’"
val yearDF = spark.read.format(“jdbc”).option(“url”, connectionUrl).option(“dbtable”, s"(${execQuery}) as year2017")
.option(“user”, devUserName).option(“password”, devPassword)
.option(“partitionColumn”,“cast_id”)
.option(“lowerBound”, 358005799618L).option(“upperBound”, 1152921420511546551L)
.option(“numPartitions”,200).load()
val totalCols:List[String] = splitColumns ++ textList
val cdt = new ChangeDataTypes(totalCols, dataMapper)
hiveDataTypes = cdt.gpDetails()
val fc = prepareHiveTableSchema(hiveDataTypes, partition_columns)
val allColsOrdered = yearDF.columns.diff(partition_columns) ++ partition_columns
val allCols = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
val resultDF = yearDF.select(allCols:_*)
val stringColumns = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
val finalDF = stringColumns.foldLeft(resultDF) {
(tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), “[\r\n]+”, " “), “[\t]+”,” "))
}
finalDF
}
val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
dataDF.write.format(“ORC”).mode(SaveMode.Overwrite).partitionBy(“source_system_name”,“period_year”,“period_num”).saveAsTable(“default.tablename”)

Spark-submit:
SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/surfer/jars/postgresql-42.1.4.jar --jars /home/surfer/jars/postgresql-42.1.4.jar --num-executors 120 --executor-cores 15 --executor-memory 30G --driver-memory 30G --driver-cores 3 --class com.partition.source.DataRead reader_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/surfer/surfer.keytab --principal surfer@DEV.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Reader --conf spark.executor.extraClassPath=/home/surfer/jars/postgresql-42.1.4.jar

The above code use to work on a table with size of 1.5gb. But the same logic is not working on table which has 400gb. I gave the partitionColumn, lower, upper bounds correctly in the code as per the numnber in the table. I increased the memory, executors, cores accordingly as well.
But I get GC Overhead exception when I submit the code. Could anyone tell me what is the mistake I am doing here ? Any help is much appreciated.