W3Cschool
恭喜您成為首批注冊用戶
獲得88經(jīng)驗(yàn)值獎勵
Spark Streaming是一個(gè)基于Spark構(gòu)建的微批處理流處理框架。HBase和Spark Streaming是一個(gè)很好的搭檔,因?yàn)镠Base可以與Spark Streaming一起提供以下好處:
HBase-Spark模塊與Spark Streaming的集成點(diǎn)類似于其常規(guī)的Spark集成點(diǎn),因?yàn)橐韵旅羁梢灾苯油ㄟ^Spark Streaming DStream實(shí)現(xiàn)。
bulkPut
用于向HBase大規(guī)模并行發(fā)送put
bulkDelete
用于向HBase大規(guī)模并行發(fā)送delete
bulkGet
用于大規(guī)模并行發(fā)送get到HBase以創(chuàng)建一個(gè)新的RDD
mapPartition
使用Connection對象執(zhí)行Spark Map函數(shù),以允許完全訪問HBase
hBaseRDD
簡化分布式掃描以創(chuàng)建RDD
帶有DStream的bulkPut示例
下面是使用DStreams的bulkPut示例。RDD批量放置的感覺非常接近。
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
val hbaseContext = new HBaseContext(sc, config)
val ssc = new StreamingContext(sc, Milliseconds(200))
val rdd1 = ...
val rdd2 = ...
val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte],
Array[Byte], Array[Byte])])]]()
queue += rdd1
queue += rdd2
val dStream = ssc.queueStream(queue)
dStream.hbaseBulkPut(
hbaseContext,
TableName.valueOf(tableName),
(putRecord) => {
val put = new Put(putRecord._1)
putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
put
})
該hbaseBulkPut功能有三個(gè)輸入:帶有配置Boardcast信息的hbaseContext將我們鏈接到執(zhí)行程序中的HBase Connections、我們將數(shù)據(jù)放入的表的表名、將DStream中的記錄轉(zhuǎn)換為HBase Put對象的函數(shù)。
Copyright©2021 w3cschool編程獅|閩ICP備15016281號-3|閩公網(wǎng)安備35020302033924號
違法和不良信息舉報(bào)電話:173-0602-2364|舉報(bào)郵箱:jubao@eeedong.com
掃描二維碼
下載編程獅App
編程獅公眾號
聯(lián)系方式:
更多建議: