Unable to create foreachwriter to load data from kafka to hbase using structured streaming


#1

Hi,

I need to insert data into hbase from kafka. I am getting error while creating foreachwriter.

Code:

import java.util.concurrent.ExecutorService
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put}
import org.apache.hadoop.hbase.security.User
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}

val writer = new ForeachWriter[Row] {
var hBaseConf: Configuration = _
var connection: Connection = _

override def open(partitionId: Long, version: Long) = {
hBaseConf = HBaseConfiguration.create()

hBaseConf.set(“hbase.zookeeper.quorum”,config.getString(“zookeeper.quorum”))
hBaseConf.set(“hbase.zookeeper.property.clientPort”,config.getString(“zookeeper.port”))
hBaseConf.set(“zookeeper.znode.parent”,"/hbase-unsecure")
hBaseConf.set(“hbase.cluster.distributed”,“true”)

connection = ConnectionFactory.createConnection(hBaseConf)
true
}
override def process(value: Row) = {
val table = connection.getTable(TableName.valueOf(“hbase_table_name”))
table.put(value)
}
override def close(errorOrNull: Throwable) = {
connection.close()
}
}

Error in spark-shell -

:44: error: not found: type Configuration
var hBaseConf: Configuration = _
^
:60: error: overloaded method value put with alternatives:
(x$1: java.util.List[org.apache.hadoop.hbase.client.Put])Unit
(x$1: org.apache.hadoop.hbase.client.Put)Unit
cannot be applied to (org.apache.spark.sql.Row)
table.put(value)
^

Any help is appreciated.


Learn Spark 1.6.x or Spark 2.x on our state of the art big data labs

  • Click here for access to state of the art 13 node Hadoop and Spark Cluster