Arun's Blog - Problem2 - pyspark



Hello everybody,

i’m trying to solve the second scenario in the Arun’s blog.
Since i haven’t access to the Lab, i imported in my VM the product file you can find in here:
hdfs dfs -ls /user/cloudera/products
hdfs dfs -tail /user/cloudera/products/products

hdfs dfs -mv /user/cloudera/products/* /user/cloudera/problem2/products/
hdfs dfs -ls /user/cloudera/problem2/products/

hdfs dfs -chmod 765 /user/cloudera/problem2/products/part-00000
hdfs dfs -ls /user/cloudera/problem2/products/
i tried to run the same taskes Arun asked in his blog using the following code:


prodFilt=productRdd.filter(lambda x: float(x.split(",")[4]) < 100)
prodFilt.take(5) x: x.split(","))
productSplit.take(10) x: (int(x[0]), int(x[1]), float(x[4])))
from pyspark.sql import Row x: Row(table_id=(x[0]), prod_cat_id=(x[1]), prod_price=(x[2]))).toDF()

from pyspark.sql.functions import *

.agg(max(‘prod_price’).alias(‘Max_Price’), countDistinct(‘table_id’), avg(‘prod_price’).alias(‘Avg_Price’), min(‘prod_price’).alias(‘Min_Price’))

But i’m facing this problem:

ValueError: could not convert string to float:

at org.apache.spark.api.python.PythonRunner$$anon$
at org.apache.spark.api.python.PythonRunner$$anon$
at org.apache.spark.api.python.PythonRunner$$anon$
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.sql.execution.Expand$$anonfun$doExecute$1$$anonfun$3$$anon$
at org.apache.spark.sql.execution.Expand$$anonfun$doExecute$1$$anonfun$3$$anon$
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:512)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.executor.Executor$
at java.util.concurrent.ThreadPoolExecutor.runWorker(
at java.util.concurrent.ThreadPoolExecutor$
... 1 more




@sneha_ananthan please, hel me on find the solution on this problem


Can u paste the sqoop import query here? sc.textFile will read only text files. if you are trying to read any other files like avro or parquet with with sc.textFile. You will face this error.


Hi @naveenraj ,
thank you for the answer.
I didn’t use the sqoop command since i don’t have access to the mysql table (and i don’t know how can i import the mysql databases and table in the QuickStartVM).

I used the file you can find in the link i mentioned in my post:

I hope it can help you to figured the solution out of this problem.



Can u pls confirm during which action it is failing?


Exactly as i posted it


@itversity please, can you help me solving this problem ?



I believe the error is due to the line of code: x: Row(table_id=(x[0]), prod_cat_id=(x[1]), prod_price=(x[2]))).toDF()

Here, can you typecast the individual column values to their respective types as, x: Row(table_id=int(x[0]), prod_cat_id=int(x[1]), prod_price=float(x[2]))).toDF()

so that when you apply the Dataframe APIs like max(‘prod_price’), avg(‘prod_price’), etc, you don’t end up computing on strings.


@sneha_ananthan thank you.

I also tried run the script as you suggested.
But it didn’t work.

I don’t know.


Well, I forgot about one of the videos where Sir had mentioned about data issues in the datasets.

The below filter made it work.

prodFilt=productRdd. \
filter(lambda p: p.split(",")[4] != ''). \
filter(lambda x: float(x.split(",")[4]) < 100)

Suggestion: It’s better to practise sqoop commands also as mentioned in Arun’s blog. On cloudera VM, there is a mysql instance running at:

Host: quickstart.cloudera
Port: 3306
DB name: retail_db


Yes @sneha_ananthan i did it using sqoop in ClouderaVM and it works without any issue.

May i ask you how you solve the problem via SparkApi (RDD)?


In pyspark - using RDDs:

products = sc.textFile("/user/snehaananthan/problem2/products")

productsFiltered = products.filter(lambda p: float(p.split("|")[4]) < 100.00)

productsMap = productsFiltered. \
map(lambda p: (int(p.split("|")[1]), float(p.split("|")[4])))

results = productsMap. \
	(0.0, 0, 0.0, 101.00), 
	lambda (highest, count, sum, lowest), input_price: 
		(input_price if input_price > highest else highest, 
		count + 1, 
		sum + input_price, 
		input_price if input_price < lowest else lowest), 
	lambda (highest1, count1, sum1, lowest1), (highest2, count2, sum2, lowest2): 
		(highest1 if highest1 > highest2 else highest2, 
		count1 + count2, 
		sum1 + sum2 / count1 + count2, 
		lowest1 if lowest1 < lowest2 else lowest2

from pyspark.sql import Row
results_DF = r: Row(product_category_id=r[0], 

results_DF.write. \
format("com.databricks.spark.avro"). \
option("codec", ""). \


@sneha_ananthan thank you very much.

I did something that doesn’t work… can you check it?

productRddResult.aggregateByKey((0.0,0,0.0,0.0),lambda x,v: (x[0] if (x[0] > v[0]) else v[0], x[1]+1, x[0]+v[0], x[0] if (x[0] < v[0]) else v[0]), lambda x,y: (x[0] if (x[0] > y[0]) else y[0], x[1]+y[1], x[0]+y[0],x[0] if (x[0] < y[0]) else y[0]))

I have one more question: why did you use that script to save as avro file?
is it equal to


How did you get the smart way to write that code for aggregate by key?

Thank you


I did not quite understand this piece of code. Because, when we use aggregateByKey(), only the value from the (key, value) paired RDD passes inside the aggregateByKey() method. If you can understand the code I have posted, you can come up with your version based on that if you’re comfortable.

NOTE: To be honest, solving such problems using RDDs is very time consuming and I’m pretty sure we cannot affort to spend so much time on a single problem as this.


Hi again,

one more question: why did you put 101.00 as the lowest “Zero value”? What if it was 0.0?


So, according the problem, all prices are under 100.00 after applying the filter. In the aggregateByKey() function, in order for the ‘product_price’ in the first dataset to pass the condition, we set it as 101.00.


This is part of the problem2 scenario in Arun’s blog:


you are rigth.
i didn’t specified that i have something like this in my RDD:
(2, (59.979999999999997, 1)) ->(Key, (price, 1for count))


In this case I don’t understand your answer. Sorry!


Why did you post the Arun’s solution video?