Exercise 04 - Convert nyse data to parquet

Exercise 04 - Convert nyse data to parquet
0.0 0


Before attempting these questions, make sure you prepare by going through appropriate material.

Here are the Udemy coupons for our certification courses. Our coupons include 1 month lab access as well.

  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Python.
  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Scala.
  • Click here for $25 coupon for HDPCD:Spark using Python.
  • Click here for $25 coupon for HDPCD:Spark using Scala.
  • Click here for access to state of the art 13 node Hadoop and Spark Cluster

  • Details - Duration 10 minutes
    • Data is available in local file system under /data/nyse (ls -ltr /data/nyse)
    • Fields (stockticker:string, transactiondate:string, openprice:float, highprice:float, lowprice:float, closeprice:float, volume:bigint)
    • Convert file format to parquet
    • Save it /user/<YOUR_USER_ID>/nyse_parquet
  • Validation
  • Solution


In Scala using Data Frames

val nData = sc.textFile(“file:///home/cloudera/nyse”).map(r => {val d = r.split(’,’); (d(0),d(1),d(2).toFloat,d(3).toFloat,d(4).toFloat,d(5).toFloat,d(6).toLong ) } )

case class nycc (
stockticker: String, transactiondate: String, openprice: Float, highprice: Float, lowprice: Float, closeprice: Float, volume: Long

val nDF = nData.map( r => nycc(r._1, r._2, r._3, r._4, r._5, r._6, r._7) ).toDF




Hello @Raman_Kumar_Jha,
Sorry, for tagging you specifically but need help. I have exam in few days.

Can you help me with this problem. How to read multiple files from local file system and create DF.



Hi @itversity

When solving this problem, I executed the below code using itversity labs:

val nyse = sc.textFile("/user/geenicholas/nyse/nyse_2009.csv")
val nysedf = nyse.map(rec=>(rec.split(",")(0).toString,rec.split(",")(1).toString,rec.split(",")(2).toFloat,rec.split(",")(3).toFloat,rec.split(",")(4).toFloat,rec.split(",")(5).toFloat,
rec.split(",")(6).toLong)).toDF(“stockticker”, “transactiondate”, “openprice”, “highprice”, “lowprice”, “closeprice”, “volume”)

I got the below error:

SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

Also, when running with all the files(nyse_2009.csv,nyse_2010.csv,nyse_2011.csv…) data in nyse folder, I am facing the error

Could you please look into this.

Thanks in advance.


You may not be able to load local files since Spark expects all those files to be locally available in the same location on all its nodes.
I uploaded it into a HDFS folder and loaded it in the program.

I am in the process of prep too. So, not sure if this program is accurate.
Anyways, here is what I came up with. I finally got 21 parquet output files.

nyseRaw=sc.textFile(input path)
def convert (x):
s = x.split(",")
return s

nyseMap=nyseRaw.map(lambda x: convert(x))
df= nyseMap.toDF(schema=[“stockticker”, “transactiondate”,“openprice”, “highprice”, “lowprice”,“closeprice”, “volume”])
df.write.parquet(output path)




to read files in different formats other than textFile use sqlContext.load("", “json”)


du -sh /data/NYSE

hdfs dfs -copyFromLocal /data/nyse /user/rjshekar90/nyse

hdfs dfs -ls /user/rjshekar90/nyse

hdfs fsck -blocks /user/rjshekar90/nyse

pyspark --master yarn
–conf spark.ui.port=21456
–num-executors 3
–executor-cores 4
–executor-memory 1G

nyseRDD = sc.textFile("/user/rjshekar90/nyse")
for i in nyseRaw.take(10): print(i)

nyseMap = nyseRDD.map(lambda x: x.split(","))
for i in nyseMap.take(10): print(i)

nyseRDDMap = nyseMap.map(lambda x: (str(x[0]), str(x[1]), float(x[2]), float(x[3]), float(x[4]), float(x[5]), int(x[6])))
for i in nyseRDDMap.take(10): print(i)

nyseDF = nyseRDDMap.toDF(schema=[“stockticker”, “transactiondate”, “openprice”, “highprice”, “lowprice”, “closeprice”, “volume”])


for i in sc.textFile("/user/rjshekar90/pblm4/solution").take(10):