How to split a dataframe of multiple files into smaller dataframes (by checking data row wise)?

apache-spark

#1

In a directory, I have nearly 20k files. My requirement is to work on the files that are created yesterday. To do that, I came up with the below code which will get me the files that are created on the same day in that directory:

    val spark = SparkSession.builder.master("yarn").appName("AutoCheck").enableHiveSupport().getOrCreate()
import spark.implicits._
 
val t = (x:Long) => { new SimpleDateFormat("yyyy-MM-dd").format(x)}
def getFileTree(f: File): Stream[File] =
  f #:: (if (f.isDirectory) f.listFiles().toStream.flatMap(getFileTree)
  else Stream.empty)
 
val simpDate = new java.text.SimpleDateFormat("yyyy-MM-dd")
val currDate = simpDate.format(new java.util.Date())
val now = Instant.now                                                                           // Gets current date in the format: 2017-12-13T09:40:29.920Zval today = now.toEpochMilli
val yesterday = now.minus(Duration.ofDays(1))
val yesterdayMilliSec = yesterday.toEpochMilli
val todaySimpDate = t(today)
val yesterdaySimpDate = t(yesterdayMilliSec)
val local:String = "file://"// Steps to get the folders of current dateval folders = getFileTree(new File("/tmp/hive_audits/")).filterNot(_.getName.endsWith(".log"))  // Gets the date of folderval folderCrtDateDesc = folders.toList.map(y => (y,y.lastModified)).sortBy(-_._2)
val latestFolder = folderCrtDateDesc.map(y=>(y._1,t(y._2)))
val folderToday = latestFolder.filter(y => y._2==todaySimpDate)
 
// Steps to get the files of current dateval localFiles = getFileTree(new File("/tmp/hive_audits/")).filter(_.getName.endsWith(".log"))
val fileCrtDateDesc = localFiles.toList.map(y => (y,y.lastModified)).sortBy(-_._2)
val latestFiles = fileCrtDateDesc.toList.map(y => (y._1,t(y._2)))
val filesToday = latestFiles.filter(y => y._2==todaySimpDate)
 
// Getting file into sparkval localFileNames = filesToday.map(y => local+y._1)
 
// Below code oworks only for one file from the list: localFileNames.// Loop thru it to load all the latest files.val fileName = localFileNames(2).split("/")(6)
val hadoopConf = new Configuration()
val hdfs = FileSystem.get(hadoopConf)
val localPath = new Path(localFileNames(2))
val hdfsPath = new Path(s"hdfs://fdldev/user/fdlhdpetl/dailylogs/${fileName}")
hdfs.copyFromLocalFile(localPath,hdfsPath)

Now I have the all the log files that are preent in the dir: /user/fdlhdpetl/dailylogs in the dataframe: fileDF

The content in the dataframe looks like below: (I am giving data of three log files here so the question doesn’t look too big)
There are three types of status in the log files: error, failure, success
Each file starts with the line: “jobID” and ends with “Updating the job keeper…” which can be seen below:

    JobID: 454
[Wed Dec 27 05:38:47 UTC 2017] INFO: Starting Auditing for : baseTable1
[Wed Dec 27 05:38:49 UTC 2017] SEVERE: Error while compiling statement: FAILED: SemanticException [Error 10004]: Line 1:261 Invalid table alias or column
[Wed Dec 27 05:38:49 UTC 2017] INFO:
Completed Auditing for : baseTable1
[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper...
JobID: 455
[Wed Dec 27 05:38:18 UTC 2017] INFO: Starting Auditing for : baseTable2
[Wed Dec 27 05:38:19 UTC 2017] INFO: Connections established to gp and finance ...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Starting the auditing for the intial fetched set of records...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Number of pk columns in the src table: 16. Number of PK Columns in the dest table: 16
[Wed Dec 27 05:38:20 UTC 2017] INFO: Success
Completed Auditing for : baseTable2
[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper...
JobID: 547
[Wed Dec 27 05:38:18 UTC 2017] INFO: Starting Auditing for : baseTable3
[Wed Dec 27 05:38:19 UTC 2017] INFO: Connections established to gp and finance ...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Starting the auditing for the intial fetched set of records...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Number of pk columns in the src table: 16. Number of PK Columns in the dest table: 5
[Wed Dec 27 05:38:20 UTC 2017] INFO: Failed. Invalid data found.
Completed Auditing for : baseTable3
[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper...

I am trying to extracting all three cases into three different dataframes. Each for “error”, “failure”, “success”. Could anyone let me know if it is possible & if it is, how can I achieve that.