2 flink 读取hbase
HbaseUtil(scala读写hbase)

package com.atguigu.flink.utils import java.io.IOException import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.util.Bytes import scala.collection.JavaConverters._ object HbaseUtil { def main(args: Array[String]): Unit = { val conf = HbaseUtil.getHbaseConf() val hbconn = HbaseUtil.getHbaseConn(conf) val table:Table = hbconn.getTable(TableName.valueOf("student")) // 查询rowkey为1001的行数据 HbaseUtil.getSingleRow(table,"1001") // 对table进行全表扫描 //HbaseUtil.queryAll(table) hbconn.close() } // hbase配置 def getHbaseConf(): Configuration = { val configuration = HBaseConfiguration.create() configuration.set("hbase.zookeeper.quorum", "192.168.1.122:2181,192.168.1.133:2181,192.168.1.144:2181") configuration } //hbase创建连接 def getHbaseConn(conf:Configuration): Connection={ val conn = ConnectionFactory.createConnection(conf) conn } //创建一个hbase表 def createTable(conn:Connection,tableName: String, columnFamilys: Array[String]) = { //创建 HBaseAdmin 对象 val adminTable:Admin = conn.getAdmin //操作的表名 val tName = TableName.valueOf(tableName) //当表不存在的时候创建Hbase表 if (!adminTable.tableExists(tName)) { //创建Hbase表模式 val descriptor = new HTableDescriptor(tName) //创建列簇i for (columnFamily <- columnFamilys) { descriptor.addFamily(new HColumnDescriptor(columnFamily)) } //创建表 adminTable.createTable(descriptor) println("create successful!!") }else{ print("table already existed") } adminTable.close() } // 删除一个表 def dropTable(conn:Connection,tableName: String) = { //创建 HBaseAdmin 对象 val adminTable:Admin = conn.getAdmin //操作的表名 val tName = TableName.valueOf(tableName) //当表不存在的时候创建Hbase表 if (adminTable.tableExists(tName)) { // 停用表 adminTable.disableTable(tName) // 删除表 adminTable.deleteTable(tName); }else{ print("table does not exist") } adminTable.close() } //获取表 def getHbaseTable(conn:Connection,tableName: String): Table={ //创建 HBaseAdmin 对象 val adminTable = conn.getAdmin //操作的表名 val tName = TableName.valueOf(tableName) //当表不存在的时候创建Hbase表 if (adminTable.tableExists(tName)) { val table = conn.getTable(tName) adminTable.close() return table }else { print("table does not exist") adminTable.close() return null } } //表添加数据 def addRowData(table: Table, rowKey: String, columnFamily: String, quorm: String, value: String): Unit ={ val rowPut: Put = new Put(Bytes.toBytes(rowKey)) if (value == null) { rowPut.addColumn(columnFamily.getBytes, quorm.getBytes, "".getBytes()) } else { rowPut.addColumn(columnFamily.getBytes, quorm.getBytes, value.getBytes) } table.put(rowPut) } //查询全部 def queryAll(table: Table): ResultScanner = { // 包含起始行键,不包含结束行键,但是如果真的想查询出末尾的那个行键,那么,可以在末尾行键上拼接一个不可见的字节(\000) // val scan = new Scan("10".getBytes(), "10000".getBytes()); //val scan = new Scan("10".getBytes(), "10000\001".getBytes()) //val results: ResultScanner = table.getScanner(scan) val s = new Scan() val results: ResultScanner = table.getScanner(s) val iterator = results.iterator() while (iterator.hasNext){ val result = iterator.next() val rowKey = Bytes.toString(result.getRow) print("rowkey",rowKey) val sb: StringBuffer = new StringBuffer() // 一行里面的所有cell将会被遍历 for (cell:Cell <- result.listCells().asScala){ // 列名 val columnKey = Bytes.toString(cell.getQualifierArray, cell.getQualifierOffset, cell.getQualifierLength) // 列值 val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength) sb.append(value).append("_") } } return results } //查询某一行数据 def getSingleRow(table: Table, rowKey: String): Result ={ // Get对象 指定行健 val get: Get = new Get(Bytes.toBytes(rowKey)) // 行健为rowKey的全部数据 val result: Result = table.get(get) // 从结果中取用户指定的某个列的value //val value =result.getValue("info".getBytes(), "age".getBytes()) //print("single value",new String(value)) for (rowKv <- result.rawCells()) { // 列族 println("Famiily:" + new String(rowKv.getFamilyArray, rowKv.getFamilyOffset, rowKv.getFamilyLength, "UTF-8")) // 列名 println("Qualifier:" + new String(rowKv.getQualifierArray, rowKv.getQualifierOffset, rowKv.getQualifierLength, "UTF-8")) // 时间戳 println("TimeStamp:" + rowKv.getTimestamp) // rowkey println("rowkey:" + new String(rowKv.getRowArray, rowKv.getRowOffset, rowKv.getRowLength, "UTF-8")) // 列值 println("Value:" + new String(rowKv.getValueArray, rowKv.getValueOffset, rowKv.getValueLength, "UTF-8")) } return result } }
HbaseSource
package com.atguigu.flink.source import com.atguigu.flink.bean.SensorReading import com.atguigu.flink.utils.HbaseUtil import org.apache.flink.configuration import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.util.Bytes import scala.collection.JavaConverters._ class HbaseSource extends RichSourceFunction[SensorReading]{ private var conn: Connection = null private var table: Table = null private var scan: Scan = null /** * 建立HBase连接 * @param parameters */ override def open(parameters: configuration.Configuration): Unit = { val conf = HbaseUtil.getHbaseConf() conn = HbaseUtil.getHbaseConn(conf) table = conn.getTable(TableName.valueOf("sensor")) } /** * run方法来自java的接口文件SourceFunction,使用IDEA工具Ctrl + o 无法便捷获取到该方法,直接override会提示 * @param sourceContext */ override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = { scan = new Scan() val results: ResultScanner = table.getScanner(scan) val iterator = results.iterator() while (iterator.hasNext){ var result = iterator.next() // 获取rowkey var rowKey = Bytes.toString(result.getRow) // 通过rowkey找到行数据 var get: Get = new Get(Bytes.toBytes(rowKey)) var element: Result = table.get(get) // 通过列族和列名找到对应值 var id:String = new String(element.getValue("info".getBytes(), "id".getBytes())) var curTime= new String(element.getValue("info".getBytes(), "timestamp".getBytes())).toLong var timepreture= new String(element.getValue("info".getBytes(), "timepreture".getBytes())).toDouble // 发送数据 sourceContext.collect(SensorReading(id,curTime,timepreture)) } } /** * 必须重写 */ override def cancel(): Unit = { } /** * 关闭hbase的连接,关闭table表 */ override def close(): Unit = { try { if (table != null) { table.close() } if (conn != null) { conn.close() } } catch { case e:Exception => println(e.getMessage) } } }
主程序 HbaseSourceSinkApp
package com.atguigu.flink.app import com.atguigu.flink.bean.SensorReading import com.atguigu.flink.sink.HbaseSink import org.apache.flink.streaming.api.scala import org.apache.flink.streaming.api.scala._ import com.atguigu.flink.source.HbaseSource object HbaseSourceSinkApp { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //调用addSource以此来作为数据输入端 val stream: scala.DataStream[SensorReading] = env.addSource(new HbaseSource) //调用addSink以此来作为数据输出端 stream.addSink(new HbaseSink) // 打印流 stream.print() // 执行主程序 env.execute() } }
3 flink 写入 hbase
HbaseSink
package com.atguigu.flink.sink import com.atguigu.flink.bean.SensorReading import com.atguigu.flink.utils.HbaseUtil import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink._ import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.util.Bytes /** * @Author: Yang JianQiu * @Date: 2019/3/1 1:34 * * 写入HBase * 第一种:继承RichSinkFunction重写父类方法 * * 注意:由于flink是一条一条的处理数据,所以我们在插入hbase的时候不能来一条flush下, * 不然会给hbase造成很大的压力,而且会产生很多线程导致集群崩溃,所以线上任务必须控制flush的频率。 * * 解决方案:我们可以在open方法中定义一个变量,然后在写入hbase时比如500条flush一次,或者加入一个list,判断list的大小满足某个阀值flush一下 */ class HbaseSink extends RichSinkFunction[SensorReading]{ private var conn: Connection = null private var table: Table = null private var scan: Scan = null var mutator: BufferedMutator = null var count = 0 var rowKey_test = 2000 /** * 建立HBase连接 * * @param parameters */ override def open(parameters: Configuration): Unit ={ val conf = HbaseUtil.getHbaseConf() conn = HbaseUtil.getHbaseConn(conf) val tableName: TableName = TableName.valueOf("psensor") val params: BufferedMutatorParams = new BufferedMutatorParams(tableName) //设置缓存1m,当达到1m时数据会自动刷到hbase params.writeBufferSize(1024 * 1024) //设置缓存的大小 mutator = conn.getBufferedMutator(params) count = 0 } /** * 处理获取的hbase数据 * */ override def invoke(value: SensorReading): Unit = { val cf1 = "info" var id = value.id var curtime = value.timestamp.toString var timperature = value.timepreture.toString val put: Put = new Put(Bytes.toBytes(rowKey_test.toString)) put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("id"), Bytes.toBytes(id)) put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("timestamp"), Bytes.toBytes(curtime)) put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("timeperature"), Bytes.toBytes(timperature)) mutator.mutate(put) //每满2条刷新一下数据 if (count >= 2){ mutator.flush() count = 0 } count = count + 1 rowKey_test = rowKey_test + 1 } /** * 关闭 */ override def close(): Unit = { if (conn != null) conn.close() } }
主程序 HbaseSourceSinkApp
package com.atguigu.flink.app import com.atguigu.flink.bean.SensorReading import com.atguigu.flink.sink.HbaseSink import org.apache.flink.streaming.api.scala import org.apache.flink.streaming.api.scala._ import com.atguigu.flink.source.HbaseSource object HbaseSourceSinkApp { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //调用addSource以此来作为数据输入端 val stream: scala.DataStream[SensorReading] = env.addSource(new HbaseSource) //调用addSink以此来作为数据输出端 stream.addSink(new HbaseSink) // 打印流 stream.print() // 执行主程序 env.execute() } }