Reading data from one hbase table and inserting data again into hbase table


#1

import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

object DerivedCMTSApp {

val logger: Logger = Logger.getLogger(getClass.getName)

def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName(“DerivedCMTS-W”)
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
conf.set(“hbase.zookeeper.quorum”, “port.com”)
conf.set(“hbase.zookeeper.property.clientPort”, “2181”)
conf.set(TableInputFormat.INPUT_TABLE, “entity”)
conf.set(TableOutputFormat.OUTPUT_TABLE, “test:DerivedCMTS”)

 val outputConf = new Configuration(conf)
 outputConf.set("mapreduce.job.output.key.class", classOf[ImmutableBytesWritable].getName)
 outputConf.set("mapreduce.job.output.value.class", classOf[Put].getName)
 outputConf.set("mapreduce.job.outputformat.class", classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[Put]].getName)

 val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
                classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
                classOf[org.apache.hadoop.hbase.client.Result])

 val hbase = hbaseRDD.filter(r => {
	               Bytes.toString(r._2.getValue(Bytes.toBytes("p"), Bytes.toBytes("Billing_entityType"))) == "CableModem"
	               Bytes.toString(r._2.getValue(Bytes.toBytes("p"), Bytes.toBytes("Billing_enabled"))) == "1"
	         }). 
           map(r => { val node_hk = Bytes.toString(r._2.getValue(Bytes.toBytes("p"), Bytes.toBytes("Billing_node_hashKey"))) 
           val cmts_hk = Bytes.toString(r._2.getValue(Bytes.toBytes("p"), Bytes.toBytes("ModemScanner_cmts_hashKey")))
           (node_hk, cmts_hk) }).map(r => r._1 + "," + r._2)

 val derived_path = "/tmp/hbase_DerivedCMTS"
 FileSystem.get(conf).delete(new Path(derived_path), true)

 hbase.distinct.saveAsTextFile("/tmp/hbase_DerivedCMTS") // done as saving in text format

 **def buildPut(record: String) = {**

** val attr = record.split(",") // CMTS_HK**
** val key = Bytes.toBytes(attr(1))**
** val row = new Put(key)**
** row.addColumn(Bytes.toBytes(“p”), Bytes.toBytes(“node_hk”), Bytes.toBytes(attr(0)))**
** }**


** //val inputpath = args(1)**
** val connection = ConnectionFactory.createConnection(conf)**
** val table = connection.getTable(TableName.valueOf(“test:DerivedCMTS”))**
** val DerivedData = Source.fromFile("/user/tvelaga/DERIVEDCMTS-d").getLines**
** DerivedData.foreach(record => {**
** table.put(buildPut(record))**
** })**

 table.close()
 connection.close()

}
}

Reading data from one hbase table and inserting data again into hbase table .

But am unable to load data into hbase table ( steps highlighted with **)
can anyone help me in this.