Exercise 04 - Convert nyse data to parquet

Before attempting these questions, make sure you prepare by going through appropriate material.


Here are the Udemy coupons for our certification courses. Our coupons include 1 month lab access as well.

  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Python.
  • Click here for $35 coupon for CCA 175 Spark and Hadoop Developer using Scala.
  • Click here for $25 coupon for HDPCD:Spark using Python.
  • Click here for $25 coupon for HDPCD:Spark using Scala.
  • Click here for access to state of the art 13 node Hadoop and Spark Cluster

  • Details - Duration 10 minutes
    • Data is available in local file system under /data/nyse (ls -ltr /data/nyse)
    • Fields (stockticker:string, transactiondate:string, openprice:float, highprice:float, lowprice:float, closeprice:float, volume:bigint)
    • Convert file format to parquet
    • Save it /user/<YOUR_USER_ID>/nyse_parquet
  • Validation
  • Solution
2 Likes

In Scala using Data Frames

val nData = sc.textFile(“file:///home/cloudera/nyse”).map(r => {val d = r.split(’,’); (d(0),d(1),d(2).toFloat,d(3).toFloat,d(4).toFloat,d(5).toFloat,d(6).toLong ) } )

case class nycc (
stockticker: String, transactiondate: String, openprice: Float, highprice: Float, lowprice: Float, closeprice: Float, volume: Long
)

val nDF = nData.map( r => nycc(r._1, r._2, r._3, r._4, r._5, r._6, r._7) ).toDF

sqlContext.setConf(“spark.sql.parquet.compression.codec”,“uncompressed”)

nDF.repartition(1).write.parquet("/user/cloudera/nyse_parquet")

1 Like

Hello @Raman_Kumar_Jha,
Sorry, for tagging you specifically but need help. I have exam in few days.

Can you help me with this problem. How to read multiple files from local file system and create DF.

Thanks

Hi @itversity

When solving this problem, I executed the below code using itversity labs:

val nyse = sc.textFile("/user/geenicholas/nyse/nyse_2009.csv")
val nysedf = nyse.map(rec=>(rec.split(",")(0).toString,rec.split(",")(1).toString,rec.split(",")(2).toFloat,rec.split(",")(3).toFloat,rec.split(",")(4).toFloat,rec.split(",")(5).toFloat,
rec.split(",")(6).toLong)).toDF(“stockticker”, “transactiondate”, “openprice”, “highprice”, “lowprice”, “closeprice”, “volume”)
nysedf.save("/user/geenicholas/nyse_parquet",“parquet”)

I got the below error:

SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

Also, when running with all the files(nyse_2009.csv,nyse_2010.csv,nyse_2011.csv…) data in nyse folder, I am facing the error
’java.lang.ArrayIndexOutOfBoundsException’

Could you please look into this.

Thanks in advance.

You may not be able to load local files since Spark expects all those files to be locally available in the same location on all its nodes.
I uploaded it into a HDFS folder and loaded it in the program.

I am in the process of prep too. So, not sure if this program is accurate.
Anyways, here is what I came up with. I finally got 21 parquet output files.

nyseRaw=sc.textFile(input path)
def convert (x):
s = x.split(",")
s[2]=float(s[2])
s[3]=float(s[3])
s[4]=float(s[4])
s[5]=float(s[5])
s[6]=int(s[6])
return s

nyseMap=nyseRaw.map(lambda x: convert(x))
df= nyseMap.toDF(schema=[“stockticker”, “transactiondate”,“openprice”, “highprice”, “lowprice”,“closeprice”, “volume”])
df.write.parquet(output path)

#python

4:
#pyspark

to read files in different formats other than textFile use sqlContext.load("", “json”)

4:

du -sh /data/NYSE

hdfs dfs -copyFromLocal /data/nyse /user/rjshekar90/nyse

hdfs dfs -ls /user/rjshekar90/nyse

hdfs fsck -blocks /user/rjshekar90/nyse

pyspark --master yarn
–conf spark.ui.port=21456
–num-executors 3
–executor-cores 4
–executor-memory 1G

nyseRDD = sc.textFile("/user/rjshekar90/nyse")
for i in nyseRaw.take(10): print(i)

nyseMap = nyseRDD.map(lambda x: x.split(","))
for i in nyseMap.take(10): print(i)

nyseRDDMap = nyseMap.map(lambda x: (str(x[0]), str(x[1]), float(x[2]), float(x[3]), float(x[4]), float(x[5]), int(x[6])))
for i in nyseRDDMap.take(10): print(i)

nyseDF = nyseRDDMap.toDF(schema=[“stockticker”, “transactiondate”, “openprice”, “highprice”, “lowprice”, “closeprice”, “volume”])

nyseDF.coalesce(1).write.
format(“parquet”).
save("/user/rjshekar90/pblm4/solution")

for i in sc.textFile("/user/rjshekar90/pblm4/solution").take(10):
print(i)

sqlContext.load("/user/rjshekar90/pblm4/solution",“parquet”).show()

1 Like

Here is my code :slight_smile:
hdfs dfs -copyFromLocal /data/nyse /user/pricillap2001/nyse

hdfs dfs -ls /user/pricillap2001/nyse

hdfs fsck -blocks /user/pricillap2001/nyse

pyspark --master yarn --conf spark.ui.port=12345

nyse=sc.textFile("/user/pricillap2001/nyse")

for i in nyse.take(10) : print(i)

nyseMap = nyse.map(lambda x: x.split(","))
for i in nyseMap.take(10): print(i)
:string, transactiondate:string, openprice:float, highprice:float, lowprice:float, closeprice:float, volume:bigint

from pyspark.sql import Row
N=nyse.map(lambda i : Row(stockticker=i.split(",")[0],transactiondate=i.split(",")[1],openprice=i.split(",")[2],highprice=i.split(",")[3],
lowprice=i.split(",")[4],closeprice=i.split(",")[5],volume=i.split(",")[6])).toDF()

for i in N.take(10): print(i)

N.write.parquet.coalese(1).("/user/pricillap2001/nyse_parquet")

N.coalesce(1).write.format(“parquet”).save("/user/pricillap2001/pblm4/solution")

sqlContext.load("/user/pricillap2001/pblm4/solution",“parquet”).show()

My problem is I am not getting the columns in proper order. Any suggestions!

Hello Pricilla, you can use DF2 = DF1.select(order_wanted) like this:

PYSPARK

pyspark --master yarn
–conf spark.ui.port=12890
–num-executors 4
–executor-memory 512M

from pyspark.sql import Row,SparkContext

nyseRaw = sc.textFile("/user/root/nyse/nyse*")

nyse_DF = nyseRaw.
map(lambda o:
Row(stockticker = str(o.split(",")[0]),
transactiondate = str(o.split(",")[1]),
openprice = float(o.split(",")[2]),
highprice = float(o.split(",")[3]),
lowprice = float(o.split(",")[4]),
closeprice = float(o.split(",")[5]),
volume = int(o.split(",")[6])
)
).toDF()

nyseDF = nyse_DF.select(“stockticker”, “transactiondate”, “openprice”, “highprice”, “lowprice”, “closeprice”, “volume”)

nyseDF.coalesce(1).write.
format(“parquet”).
save("/user/root/tp4")

sqlContext.read.load("/user/root/tp4",“parquet”).show()

Note: Invoke pyspark from local to be able to read local files for RDD/DF creation…

from pyspark.sql import Row
dataDF=sc.textFile(‘file:////data/nyse’).
map(lambda rec: Row(stockticker=rec.split(",")[0],
transactiondate=rec.split(",")[1],
openprice=float(rec.split(",")[2]),
highprice=float(rec.split(",")[3]),
lowprice=float(rec.split(",")[4]),
closeprice=float(rec.split(",")[5]),
volume=int(rec.split(",")[6]))).toDF()
sqlContext.sql(‘set spark.sql.shuffle.partitions=1’)
dataDF.cloalesce(1).write.format(“parquet”).save(’/tmp/p4’)

nyse=sc.textFile("/usr/nysedata/")
nyseMap=nyse.map(lambda o:(o.split(",")[0],o.split(",")[1],float(o.split(",")[2]),float(o.split(",")[3]),float(o.split(",")[4]),
,float(o.split(",")[5]),float(o.split(",")[6]),))
nyseDF=nyse.toDF(schema=[‘stockticker’,‘tradedate’,‘openprice’,‘highprice’,‘lowprice’,‘closeprice’,‘volume’])

– to assign this schema to a rdd format and further process the data i have to assign the schema to rdd
nyseRDD=sc.parallelize(nyseDF)
after executing iam getting an error as
Method getnewargs([]) does not exist

My solution is
in local filesystem
cd /data/nyse
hdfs dfs -put NY* /user/guptarab/filetran/nyse (copy data to hdfs)

In pyspark2
stock = spark.read.format(‘csv’).
schema (‘stockticker string, transactiondate string, openprice float, highprice float, lowprice float, closeprice float, volume int’).
load(’/user/guptarab/filetran/nyse’)

dd = stock.repartition(1)
dd.write.parquet("/user/guptarab/filetran/nysq.parquet)

1 Like

package examples

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

object exer_4 {
def main(Args: Array[String]){
val spark = SparkSession.builder().appName(“CCA175 Exercise-4”).master(“local[*]”).getOrCreate()

val nyseschema = StructType(List(
                             StructField("StockTicker",StringType,true),
                             StructField("transactiondate",StringType,true),
                             StructField("openprice",FloatType,true),
                             StructField("highprice",FloatType,true),
                             StructField("lowprice",FloatType,true),
                             StructField("closeprice",FloatType,true),
                             StructField("volume",IntegerType,true)                                 
                            )
                       )
val data = spark.read.format("csv").schema(nyseschema).load("C:\\Users\\Vamsi_Mannava\\Downloads\\My_Stuff\\BigData-Spark\\CCA175-Course\\nyse\\*.csv")  
data.coalesce(1).write.mode("overwrite").format("parquet").option("header","true").option("inferSchema","true").save("C:\\Users\\Vamsi_Mannava\\Downloads\\My_Stuff\\BigData-Spark\\CCA175-Course\\Exercises\\outputs\\nyse")

println("***************************File Write Completed******************************")

}
}

Hello,
Please i need help.
When i try to save the parquet file (with this command - nyse.save("/user/xxlosoxx/nyse_parquet", “parquet”)) i get this error:

Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://nn01.itversity.com:8020/user/xxlosoxx/user/xxlosoxx/nyse

The path is “duplicated” - /user/xxlosoxx/user/xxlosoxx
Why? the path would be only /user/xxlosoxx

Please help
Thanks

Run using a bash script:

hdfs dfs -copyFromLocal /data/nyse /user/swarajnewar/.

pyspark --master yarn --conf spark.ui.port=0

nyse = spark.read.format(‘csv’).schema(’’‘stockticker string,
transactiondate string,
openprice float,
highprice float,
lowprice float,
closeprice float,
volume bigint’’’).load(’/user/swarajnewar/nyse’)

nyse.coalesce(1).write.mode(‘overwrite’).format(‘parquet’).save(’/user/swarajnewar/nyse_parquet’)