Spark Streaming

2018-08-23 14:45 更新

Spark Streaming

Spark Streaming是一個(gè)基于Spark構(gòu)建的微批處理流處理框架。HBase和Spark Streaming是一個(gè)很好的搭檔,因?yàn)镠Base可以與Spark Streaming一起提供以下好處:

  • 即時(shí)獲取參考數(shù)據(jù)或配置文件數(shù)據(jù)的地方
  • 以支持僅一次處理的Spark Streaming承諾的方式存儲計(jì)數(shù)或聚合的位置。

HBase-Spark模塊與Spark Streaming的集成點(diǎn)類似于其常規(guī)的Spark集成點(diǎn),因?yàn)橐韵旅羁梢灾苯油ㄟ^Spark Streaming DStream實(shí)現(xiàn)。

bulkPut

用于向HBase大規(guī)模并行發(fā)送put

bulkDelete

用于向HBase大規(guī)模并行發(fā)送delete

bulkGet

用于大規(guī)模并行發(fā)送get到HBase以創(chuàng)建一個(gè)新的RDD

mapPartition

使用Connection對象執(zhí)行Spark Map函數(shù),以允許完全訪問HBase

hBaseRDD

簡化分布式掃描以創(chuàng)建RDD

帶有DStream的bulkPut示例

下面是使用DStreams的bulkPut示例。RDD批量放置的感覺非常接近。

val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()

val hbaseContext = new HBaseContext(sc, config)
val ssc = new StreamingContext(sc, Milliseconds(200))

val rdd1 = ...
val rdd2 = ...

val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte],
    Array[Byte], Array[Byte])])]]()

queue += rdd1
queue += rdd2

val dStream = ssc.queueStream(queue)

dStream.hbaseBulkPut(
  hbaseContext,
  TableName.valueOf(tableName),
  (putRecord) => {
   val put = new Put(putRecord._1)
   putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
   put
  })

該hbaseBulkPut功能有三個(gè)輸入:帶有配置Boardcast信息的hbaseContext將我們鏈接到執(zhí)行程序中的HBase Connections、我們將數(shù)據(jù)放入的表的表名、將DStream中的記錄轉(zhuǎn)換為HBase Put對象的函數(shù)。

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號