Spark accumulator

I have these entries in log file. Can some one help - how to get the count of the term ‘error’ from this file using accumulator?Thanks .

Log file entries:

[Sun Mar 7 20:04:35 2004] [info] [client 64.242.88.10] (104)Connection reset by peer: client stopped connection before send body completed
[Sun Mar 7 20:11:33 2004] [info] [client 64.242.88.10] (104)Connection reset by peer: client stopped connection before send body completed
[Sun Mar 7 20:12:55 2004] [info] [client 64.242.88.10] (104)Connection reset by peer: client stopped connection before send body completed
[Sun Mar 7 20:25:31 2004] [info] [client 64.242.88.10] (104)Connection reset by peer: client stopped connection before send body completed
[Sun Mar 7 20:44:48 2004] [info] [client 64.242.88.10] (104)Connection reset by peer: client stopped connection before send body completed
[Sun Mar 7 20:58:27 2004] [info] [client 64.242.88.10] (104)Connection reset by peer: client stopped connection before send body completed
[Sun Mar 7 21:16:17 2004] [error] [client 24.70.56.49] File does not exist: /home/httpd/twiki/view/Main/WebHome
[Sun Mar 7 21:16:17 2004] [error] [client 24.70.56.49] File does not exist: /home/httpd/twiki/flow/Main/WebHome
[Sun Mar 7 21:16:17 2004] [error] [client 24.70.56.49] File does not exist: /home/httpd/twiki/subjects/Main/WebHome
[Sun Mar 7 21:16:17 2004] [error] [client 24.70.56.49] File does not exist: /home/httpd/twiki/practise/Main/WebHome

Where are you running this? Is it on lab or your environment?

@itversity- I am trying it on my environment sir.

Hope below example helps you … I did similar kind of stuff in finding the Bad data packets in a purchase log file .

package com.spark2.examples

import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object PurchaseLogAnalysis {
def main(args: Array[String]): Unit = {

val sparkSession = SparkSession.builder.
  master("local")
  .appName("Spark_Accumulator")
  .getOrCreate()

val sc = sparkSession.sparkContext

val badPkts = sc.longAccumulator("bad.packets")
val zeroValueSales = sc.longAccumulator("Zero.Value.Sales")
val missingFields = sc.longAccumulator("Missing.Fields")
val blankLines = sc.longAccumulator("Blank.Lines")

sc.textFile("input/purchases.log", 4)
  .foreach { line =>

    if (line.length() == 0) blankLines.add(1)
    else if (line.contains("Bad data packet")) badPkts.add(1)
    else {
      val fields = line.split("\t")
      if (fields.length != 4) missingFields.add(1)
      else if (fields(3).toFloat == 0) zeroValueSales.add(1)
    }
  }

println("Purchase Log Analysis Counters:")
println(s"\tBad Data Packets=${badPkts.value}")
println(s"\tZero Value Sales=${zeroValueSales.value}")
println(s"\tMissing Fields=${missingFields.value}")
println(s"\tBlank Lines=${blankLines.value}")

sc.stop

}
}

1 Like

@RevanthReddy - thank you so much for you help.
Also could you try to give me the sample log file if you can?

Thanks in advance.

Here is the sample log file .

and code …

@RevanthReddy - Thanks a lot for your help. :slight_smile: