Parse json schema from kafka message in spark

pyspark
apache-spark
spark-streaming

#1

I am reading messages from a kafka topic

messageDFRaw = spark.readStream\
                .format("kafka")\
                .option("kafka.bootstrap.servers", "localhost:9092")\
                .option("subscribe", "test-message")\
                .load()

messageDF = messageDFRaw.selectExpr(“CAST(key AS STRING)”, “CAST(value AS STRING) as dict”)

When I print the data frame from the above query I get the below console output.

|key|dict|
|#badbunny |{"channel": "#badbunny", "username": "mgat22", "message": "cool"}|

How can I create a data frame from the DataStreamReader such that I have a dataframe with columns as |key|channel| username| message|

I tried defining a schema, but, I get Expected type 'StructField', got 'StructType' instead in from_json()

struct = StructType([
    StructField("channel", StringType()),
    StructField("username", StringType()),
    StructField("message", StringType()),
])

messageDFRaw.select(from_json("CAST(value AS STRING)", struct))