After Joining two dataset ,how to access this tuple?

apache-spark
pyspark
#1

Hi I joined the result after daily revenue and total order like in example ,now how to access this tuple elements, what I am doing wrong here?

for i in data.take(10): print(i)

(u’2013-08-23 00:00:00.0’, (99616.169999999925, 169))
(u’2014-04-06 00:00:00.0’, (56192.119999999981, 85))
(u’2013-11-29 00:00:00.0’, (136296.62999999995, 220))
(u’2014-03-18 00:00:00.0’, (122522.78999999994, 208))
(u’2014-02-04 00:00:00.0’, (72887.27999999997, 124))
(u’2014-06-17 00:00:00.0’, (75506.049999999974, 115))
(u’2014-01-23 00:00:00.0’, (108224.99999999993, 176))
(u’2014-02-26 00:00:00.0’, (69276.279999999984, 123))
(u’2013-11-18 00:00:00.0’, (94744.589999999938, 148))
(u’2013-12-19 00:00:00.0’, (62375.89999999998, 102))

dataR = data.map(lambda x: x[0])
for i in dataR.take(10): print(i)

(
(
(
(
(
(
(
(
(
(

0 Likes

#2

@TheBest… In the snippet you provided I don’t see any issues. Ideally it should work. not sure why iti snot printing date field

0 Likes

#3

@TheBest - Could you please paste the full code ?

0 Likes

#4

I use join like in joining example for final result
finalJoinRDD = totalOrdersPerDay.join(totalRevenuePerDay)
then I save this as text file
finalJoinRDD.saveAsTextFile(“hdfs///user/cloudera/sqoop_import/final”)
then
data = sc.textFile(“hdfs///user/cloudera/sqoop_import/final”)
after that as you can see in the code
for i in data.take(10): print(i)

(u’2013-08-23 00:00:00.0’, (99616.169999999925, 169))
(u’2014-04-06 00:00:00.0’, (56192.119999999981, 85))
(u’2013-11-29 00:00:00.0’, (136296.62999999995, 220))
(u’2014-03-18 00:00:00.0’, (122522.78999999994, 208))
(u’2014-02-04 00:00:00.0’, (72887.27999999997, 124))
(u’2014-06-17 00:00:00.0’, (75506.049999999974, 115))
(u’2014-01-23 00:00:00.0’, (108224.99999999993, 176))
(u’2014-02-26 00:00:00.0’, (69276.279999999984, 123))
(u’2013-11-18 00:00:00.0’, (94744.589999999938, 148))
(u’2013-12-19 00:00:00.0’, (62375.89999999998, 102))
dataR = data.map(lambda x: x[0])
for i in dataR.take(10): print(i)

(
(
(
(
(
(
(
(
(
(
I think because I saved tuple as textfile that’s why?

0 Likes

#5

@TheBest - After saved to HDFS you can directly refer in user/cloudera/sqoop_import/final folder for reference. Since the data looks good before storing.

0 Likes

#6

I was trying to practice something like top 10 dates with maximum total orders or top 10 dates with max revenue using this already stored data but don’t know why I am not able to access the tuple elements.

0 Likes

#7

Can you provide the complete script?
When you save data in tuples to text file, I am not sure whether it will store as tuples. It will store as plain text.

When you reload the data from final, each record is plain string and not tuple.

0 Likes

#8

orderR = sc.textFile("/user/cloudera/sqoop_import/orders")
orderItemsR = sc.textFile("/user/cloudera/sqoop_import/order_items")
for i in orderR.take(5): print(i)

1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE
4,2013-07-25 00:00:00.0,8827,CLOSED
5,2013-07-25 00:00:00.0,11318,COMPLETE

for i in orderItemsR.take(5): print(i)

1,1,957,1,299.98,299.98
2,2,1073,1,199.99,199.99
3,2,502,5,250.0,50.0
4,2,403,1,129.99,129.99
5,4,897,2,49.98,24.99

orderMapR = orderR.map(lambda x: (int(x.split(",")[0]),x.split(",")[1]))
for i in orderMapR.take(5): print(i)

(1, u’2013-07-25 00:00:00.0’)
(2, u’2013-07-25 00:00:00.0’)
(3, u’2013-07-25 00:00:00.0’)
(4, u’2013-07-25 00:00:00.0’)
(5, u’2013-07-25 00:00:00.0’)

orderItemsMapR = orderItemsR.map(lambda x: (int(x.split(",")[1]),float(x.split(",")[4]))

… )

for i in orderItemsMapR.take(5): print(i)

(1, 299.98000000000002)
(2, 199.99000000000001)
(2, 250.0)
(2, 129.99000000000001)
(4, 49.979999999999997)

orderJoinR = orderItemsMapR.join(orderMapR)
for i in orderJoinR.take(5): print(i)

(32768, (199.99000000000001, u’2014-02-12 00:00:00.0’))
(32768, (129.99000000000001, u’2014-02-12 00:00:00.0’))
(32768, (299.98000000000002, u’2014-02-12 00:00:00.0’))
(32768, (399.98000000000002, u’2014-02-12 00:00:00.0’))
(49152, (299.98000000000002, u’2014-05-27 00:00:00.0’))

totalOrderMapR = orderJoinR.map(lambda x: (x[1][1],1))
for i in totalOrderMapR.take(5): print(i)

(u’2014-02-12 00:00:00.0’, 1)
(u’2014-02-12 00:00:00.0’, 1)
(u’2014-02-12 00:00:00.0’, 1)
(u’2014-02-12 00:00:00.0’, 1)
(u’2014-05-27 00:00:00.0’, 1)

totalOrderPerDay = totalOrderMapR.reduceByKey(lambda x,y: x + y)
totalOrderPerDay.count()
364

for i in totalOrderPerday.sortByKey().take(10): print(i)

Traceback (most recent call last):
File “”, line 1, in
NameError: name ‘totalOrderPerday’ is not defined

for i in totalOrderPerDay.sortByKey().take(10): print(i)

(u’2013-07-25 00:00:00.0’, 339)
(u’2013-07-26 00:00:00.0’, 694)
(u’2013-07-27 00:00:00.0’, 503)
(u’2013-07-28 00:00:00.0’, 438)
(u’2013-07-29 00:00:00.0’, 666)
(u’2013-07-30 00:00:00.0’, 540)
(u’2013-07-31 00:00:00.0’, 641)
(u’2013-08-01 00:00:00.0’, 636)
(u’2013-08-02 00:00:00.0’, 558)
(u’2013-08-03 00:00:00.0’, 485)

orderRevenueMapR = orderJoinR.map(lambda x: (x[1][1],float(x[1][0])))

for i in orderRevenueMapR.take(5): print(i)

(u’2014-02-12 00:00:00.0’, 199.99000000000001)
(u’2014-02-12 00:00:00.0’, 129.99000000000001)
(u’2014-02-12 00:00:00.0’, 299.98000000000002)
(u’2014-02-12 00:00:00.0’, 399.98000000000002)
(u’2014-05-27 00:00:00.0’, 299.98000000000002)

orderRevenuePerDay = orderRevenueMapR.reduceByKey(lambda x,y: x + y)
orderRevenuePerDay.count()
364

for i in orderRevenuePerDay.sortByKey().take(5): print(i)

(u’2013-07-25 00:00:00.0’, 68153.829999999973)
(u’2013-07-26 00:00:00.0’, 136520.16999999993)
(u’2013-07-27 00:00:00.0’, 101074.33999999992)
(u’2013-07-28 00:00:00.0’, 87123.079999999958)
(u’2013-07-29 00:00:00.0’, 137287.08999999991)

orderFinal = orderRevenuePerDay.join(totalOrderPerDay)
for i in orderFinal.sortByKey().take(5): print(i)

(u’2013-07-25 00:00:00.0’, (68153.829999999973, 339))
(u’2013-07-26 00:00:00.0’, (136520.16999999993, 694))
(u’2013-07-27 00:00:00.0’, (101074.33999999992, 503))
(u’2013-07-28 00:00:00.0’, (87123.079999999958, 438))
(u’2013-07-29 00:00:00.0’, (137287.08999999991, 666))

orderFinal.saveAsTextFile("/user/cloudera/sqoop_import/orderFinal")
dataR = sc.textFile("/user/cloudera/sqoop_import/orderFinal")
for i in dataR.take(5): print(i)

(u’2013-08-23 00:00:00.0’, (99616.169999999925, 495))
(u’2014-04-06 00:00:00.0’, (56192.119999999981, 269))
(u’2013-11-29 00:00:00.0’, (136296.62999999995, 676))
(u’2014-03-18 00:00:00.0’, (122522.78999999994, 620))
(u’2014-02-04 00:00:00.0’, (72887.27999999997, 362))

dataMapR = dataR.map(lambda x: x[0])
for i in dataMapR.take(5): print(i)

(
(
(
(
(

0 Likes

#9

As mentioned earlier, once tuples are written to file and read it back - the records are of string type not tuple type

It is reading first character of each line with x[0].

Also, it is not good practice to write all intermediate RDDs to files and read them back.

1 Like

#10

Due to some reason my m/c shutting down so I just saved it to use it later but learned the lesson .
Thank you sir. your work is awesome :slight_smile:

1 Like