DS Stream to DataFrame error -return r._jrdd

pyspark

#1

I am trying to create a temaporary table from Kafka stream and query it. My code is as follows. I am getting the following error:

File “/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/streaming/util.py”, line 67, in call
return r._jrdd
AttributeError: ‘function’ object has no attribute ‘_jrdd’

Can anyone please let me know, what is going wrong and how to fix it?

from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
from pyspark.sql import Row

from operator import add

conf = SparkConf().
setAppName(“Flume-Kafka-Spark Streaming WordCount”).
setMaster(“yarn-client”)

sc = SparkContext(conf = conf)

ssc = StreamingContext(sc, 10)

#sqlContext = SQLContext(sc)

Lazily instantiated global instance of SQLContext

def getSqlContextInstance(sparkContext):
if (‘sqlContextSingletonInstance’ not in globals()):
globals()[‘sqlContextSingletonInstance’] = SQLContext(sparkContext)
return globals()[‘sqlContextSingletonInstance’]

def process(time,rdd):

print("========= %s =========" % str(time))

	try:

	    # Get the singleton instance of SQLContext
    	sqlContext = getSqlContextInstance(rdd.context)

	    rowRDD = rdd.map(lambda rec: Row(RequestID=rec.split(' ')[0],Date=split(' ')[3].replace('[',''),Product=rec.split(' ')[6].split('/')[2]))

	    rdsDataFrame = sqlContext.createDataFrame(rowRdd)
	# Register as table
    	wordsDataFrame.registerTempTable("Product_Cnt")
    	wordCountsDataFrame = sqlContext.sql("select count(*) as total from Product_Cnt")
    	wordCountsDataFrame.show()

    except:
    	pass

if name == ‘main’:

topic = ["project-ProductCount"]
brokers = {"metadata.broker.list": "wn01.itversity.com:6667,wn02.itversity.com:6667,wn03.itversity.com:6667,wn04.itversity.com:6667"}
directKafkaStream = KafkaUtils. \
createDirectStream(ssc,topic,brokers)
messages = directKafkaStream.map(lambda rec:rec[1])
departmentMessages = messages.filter(lambda rec:rec.split(' ')[6].split('/')[1] == 'department')
departmentMessages.foreachRDD(lambda rdd:process)
ssc.start()
ssc.awaitTermination()