Exercise 04 - Convert nyse data to parquet

pyspark
spark-shell
apache-spark
scala
python

#1

Before attempting these questions, make sure you prepare by going through appropriate material.


Here are the Udemy coupons for our certification courses. Our coupons include 1 month lab access as well.

  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Python.
  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Scala.
  • Click here for $25 coupon for HDPCD:Spark using Python.
  • Click here for $25 coupon for HDPCD:Spark using Scala.
  • Click here for access to state of the art 13 node Hadoop and Spark Cluster

  • Details - Duration 10 minutes
    • Data is available in local file system under /data/nyse (ls -ltr /data/nyse)
    • Fields (stockticker:string, transactiondate:string, openprice:float, highprice:float, lowprice:float, closeprice:float, volume:bigint)
    • Convert file format to parquet
    • Save it /user/<YOUR_USER_ID>/nyse_parquet
  • Validation
  • Solution

#2

In Scala using Data Frames

val nData = sc.textFile(“file:///home/cloudera/nyse”).map(r => {val d = r.split(’,’); (d(0),d(1),d(2).toFloat,d(3).toFloat,d(4).toFloat,d(5).toFloat,d(6).toLong ) } )

case class nycc (
stockticker: String, transactiondate: String, openprice: Float, highprice: Float, lowprice: Float, closeprice: Float, volume: Long
)

val nDF = nData.map( r => nycc(r._1, r._2, r._3, r._4, r._5, r._6, r._7) ).toDF

sqlContext.setConf(“spark.sql.parquet.compression.codec”,“uncompressed”)

nDF.repartition(1).write.parquet("/user/cloudera/nyse_parquet")


#3

Hello @Raman_Kumar_Jha,
Sorry, for tagging you specifically but need help. I have exam in few days.

Can you help me with this problem. How to read multiple files from local file system and create DF.

Thanks


#4

Hi @itversity

When solving this problem, I executed the below code using itversity labs:

val nyse = sc.textFile("/user/geenicholas/nyse/nyse_2009.csv")
val nysedf = nyse.map(rec=>(rec.split(",")(0).toString,rec.split(",")(1).toString,rec.split(",")(2).toFloat,rec.split(",")(3).toFloat,rec.split(",")(4).toFloat,rec.split(",")(5).toFloat,
rec.split(",")(6).toLong)).toDF(“stockticker”, “transactiondate”, “openprice”, “highprice”, “lowprice”, “closeprice”, “volume”)
nysedf.save("/user/geenicholas/nyse_parquet",“parquet”)

I got the below error:

SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

Also, when running with all the files(nyse_2009.csv,nyse_2010.csv,nyse_2011.csv…) data in nyse folder, I am facing the error
’java.lang.ArrayIndexOutOfBoundsException’

Could you please look into this.

Thanks in advance.


#5

You may not be able to load local files since Spark expects all those files to be locally available in the same location on all its nodes.
I uploaded it into a HDFS folder and loaded it in the program.

I am in the process of prep too. So, not sure if this program is accurate.
Anyways, here is what I came up with. I finally got 21 parquet output files.

nyseRaw=sc.textFile(input path)
def convert (x):
s = x.split(",")
s[2]=float(s[2])
s[3]=float(s[3])
s[4]=float(s[4])
s[5]=float(s[5])
s[6]=int(s[6])
return s

nyseMap=nyseRaw.map(lambda x: convert(x))
df= nyseMap.toDF(schema=[“stockticker”, “transactiondate”,“openprice”, “highprice”, “lowprice”,“closeprice”, “volume”])
df.write.parquet(output path)


#6

#python

4:
#pyspark

to read files in different formats other than textFile use sqlContext.load("", “json”)

4:

du -sh /data/NYSE

hdfs dfs -copyFromLocal /data/nyse /user/rjshekar90/nyse

hdfs dfs -ls /user/rjshekar90/nyse

hdfs fsck -blocks /user/rjshekar90/nyse

pyspark --master yarn
–conf spark.ui.port=21456
–num-executors 3
–executor-cores 4
–executor-memory 1G

nyseRDD = sc.textFile("/user/rjshekar90/nyse")
for i in nyseRaw.take(10): print(i)

nyseMap = nyseRDD.map(lambda x: x.split(","))
for i in nyseMap.take(10): print(i)

nyseRDDMap = nyseMap.map(lambda x: (str(x[0]), str(x[1]), float(x[2]), float(x[3]), float(x[4]), float(x[5]), int(x[6])))
for i in nyseRDDMap.take(10): print(i)

nyseDF = nyseRDDMap.toDF(schema=[“stockticker”, “transactiondate”, “openprice”, “highprice”, “lowprice”, “closeprice”, “volume”])

nyseDF.coalesce(1).write.
format(“parquet”).
save("/user/rjshekar90/pblm4/solution")

for i in sc.textFile("/user/rjshekar90/pblm4/solution").take(10):
print(i)

sqlContext.load("/user/rjshekar90/pblm4/solution",“parquet”).show()


#7

Here is my code :slight_smile:
hdfs dfs -copyFromLocal /data/nyse /user/pricillap2001/nyse

hdfs dfs -ls /user/pricillap2001/nyse

hdfs fsck -blocks /user/pricillap2001/nyse

pyspark --master yarn --conf spark.ui.port=12345

nyse=sc.textFile("/user/pricillap2001/nyse")

for i in nyse.take(10) : print(i)

nyseMap = nyse.map(lambda x: x.split(","))
for i in nyseMap.take(10): print(i)
:string, transactiondate:string, openprice:float, highprice:float, lowprice:float, closeprice:float, volume:bigint

from pyspark.sql import Row
N=nyse.map(lambda i : Row(stockticker=i.split(",")[0],transactiondate=i.split(",")[1],openprice=i.split(",")[2],highprice=i.split(",")[3],
lowprice=i.split(",")[4],closeprice=i.split(",")[5],volume=i.split(",")[6])).toDF()

for i in N.take(10): print(i)

N.write.parquet.coalese(1).("/user/pricillap2001/nyse_parquet")

N.coalesce(1).write.format(“parquet”).save("/user/pricillap2001/pblm4/solution")

sqlContext.load("/user/pricillap2001/pblm4/solution",“parquet”).show()

My problem is I am not getting the columns in proper order. Any suggestions!


#8

Hello Pricilla, you can use DF2 = DF1.select(order_wanted) like this:

PYSPARK

pyspark --master yarn
–conf spark.ui.port=12890
–num-executors 4
–executor-memory 512M

from pyspark.sql import Row,SparkContext

nyseRaw = sc.textFile("/user/root/nyse/nyse*")

nyse_DF = nyseRaw.
map(lambda o:
Row(stockticker = str(o.split(",")[0]),
transactiondate = str(o.split(",")[1]),
openprice = float(o.split(",")[2]),
highprice = float(o.split(",")[3]),
lowprice = float(o.split(",")[4]),
closeprice = float(o.split(",")[5]),
volume = int(o.split(",")[6])
)
).toDF()

nyseDF = nyse_DF.select(“stockticker”, “transactiondate”, “openprice”, “highprice”, “lowprice”, “closeprice”, “volume”)

nyseDF.coalesce(1).write.
format(“parquet”).
save("/user/root/tp4")

sqlContext.read.load("/user/root/tp4",“parquet”).show()