使用Spark將數(shù)據(jù)批量加載到HBase

2018-08-24 15:43 更新

使用Spark將數(shù)據(jù)批量加載到HBase有兩種選擇。有一些基本的批量加載功能適用于行具有數(shù)百萬列的情況和未整合列的情況,以及Spark批量加載過程的映射側(cè)之前的分區(qū)。

Spark還有一個精簡記錄批量加載選項,第二個選項是為每行少于10k列的表設(shè)計的。第二個選項的優(yōu)點是Spark shuffle操作的吞吐量更高,而且負載更少。

這兩種實現(xiàn)的工作方式或多或少類似于MapReduce批量加載過程,因為分區(qū)器根據(jù)區(qū)域拆分對rowkeys進行分區(qū),并且行鍵按順序發(fā)送到reducer,以便HFiles可以直接從reduce階段寫出。

在Spark術(shù)語中,批量加載將圍繞Spark repartitionAndSortWithinPartitions實現(xiàn),然后是Spark foreachPartition。

首先讓我們看一下使用基本批量加載功能的示例

批量加載實例

以下示例顯示了Spark的批量加載:

val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()

val hbaseContext = new HBaseContext(sc, config)

val stagingFolder = ...
val rdd = sc.parallelize(Array(
      (Bytes.toBytes("1"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
      (Bytes.toBytes("3"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...

rdd.hbaseBulkLoad(TableName.valueOf(tableName),
  t => {
   val rowKey = t._1
   val family:Array[Byte] = t._2(0)._1
   val qualifier = t._2(0)._2
   val value = t._2(0)._3

   val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)

   Seq((keyFamilyQualifier, value)).iterator
  },
  stagingFolder.getPath)

val load = new LoadIncrementalHFiles(config)
load.doBulkLoad(new Path(stagingFolder.getPath),
  conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))

該hbaseBulkLoad函數(shù)需要三個必需參數(shù):

  1. 我們打算批量加載的表的表名
  2. 將RDD中的記錄轉(zhuǎn)換為元組鍵值par的函數(shù)。該元組鍵是KeyFamilyQualifer對象,值是單元格值。KeyFamilyQualifer對象將保存RowKey,列族和列限定符。shuffle將在RowKey上進行分區(qū),但將按所有三個值排序。
  3. HFile的臨時路徑也將被寫出來

在Spark bulk load命令之后,使用HBase的LoadIncrementalHFiles對象將新創(chuàng)建的HFile加載到HBase中。

使用Spark進行批量加載的附加參數(shù)

您可以在hbaseBulkLoad上使用其他參數(shù)選項設(shè)置以下屬性。

  • HFiles的最大文件大小
  • 從壓縮中排除HFile的標志
  • compression,bloomType,blockSize和dataBlockEncoding的列族設(shè)置

使用附加參數(shù):

val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()

val hbaseContext = new HBaseContext(sc, config)

val stagingFolder = ...
val rdd = sc.parallelize(Array(
      (Bytes.toBytes("1"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
      (Bytes.toBytes("3"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...

val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX")

familyHBaseWriterOptions.put(Bytes.toBytes("columnFamily1"), f1Options)

rdd.hbaseBulkLoad(TableName.valueOf(tableName),
  t => {
   val rowKey = t._1
   val family:Array[Byte] = t._2(0)._1
   val qualifier = t._2(0)._2
   val value = t._2(0)._3

   val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)

   Seq((keyFamilyQualifier, value)).iterator
  },
  stagingFolder.getPath,
  familyHBaseWriterOptions,
  compactionExclude = false,
  HConstants.DEFAULT_MAX_FILE_SIZE)

val load = new LoadIncrementalHFiles(config)
load.doBulkLoad(new Path(stagingFolder.getPath),
  conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))

現(xiàn)在讓我們看看如何調(diào)用精簡記錄大批量加載實現(xiàn)

使用精簡記錄批量加載示例:

val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()

val hbaseContext = new HBaseContext(sc, config)

val stagingFolder = ...
val rdd = sc.parallelize(Array(
      ("1",
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
      ("3",
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...

rdd.hbaseBulkLoadThinRows(hbaseContext,
      TableName.valueOf(tableName),
      t => {
        val rowKey = t._1

        val familyQualifiersValues = new FamiliesQualifiersValues
        t._2.foreach(f => {
          val family:Array[Byte] = f._1
          val qualifier = f._2
          val value:Array[Byte] = f._3

          familyQualifiersValues +=(family, qualifier, value)
        })
        (new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)
      },
      stagingFolder.getPath,
      new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions],
      compactionExclude = false,
      20)

val load = new LoadIncrementalHFiles(config)
load.doBulkLoad(new Path(stagingFolder.getPath),
  conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))

請注意, 在使用精簡記錄批量加載時, 函數(shù)會返回一個元組,其中第一個值為行鍵,第二個值為FamiliesQualifiersValues對象,它將包含所有列族此行的所有值。

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號