SparkSQL / DataFrames

2018-09-04 17:48 更新

SparkSQL / DataFrames

HBase-Spark連接器(在HBase-Spark模塊中)利用Spark-1.2.0中引入的DataSource API (SPARK-3247),彌補(bǔ)了簡單HBase KV存儲(chǔ)和復(fù)雜關(guān)系SQL查詢之間的差距,使用戶能夠使用Spark在HBase上執(zhí)行復(fù)雜的數(shù)據(jù)分析工作。HBase Dataframe是標(biāo)準(zhǔn)的Spark Dataframe,能夠與任何其他數(shù)據(jù)源(如Hive,Orc,Parquet,JSON等)進(jìn)行交互。HBase-Spark Connector應(yīng)用關(guān)鍵技術(shù),如分區(qū)修剪,列修剪,謂詞疊加和數(shù)據(jù)局部性。

要使用HBase-Spark連接器,用戶需要為HBase和Spark表之間的模式映射定義Catalog,準(zhǔn)備數(shù)據(jù)并填充HBase表,然后加載HBase DataFrame。之后,用戶可以使用SQL查詢?cè)贖Base表中進(jìn)行集成查詢和訪問記錄。以下說明了基本程序。

定義目錄

def catalog = s"""{
       |"table":{"namespace":"default", "name":"table1"},
       |"rowkey":"key",
       |"columns":{
         |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
         |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
         |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
         |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
         |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
         |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
         |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
         |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
         |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
       |}
     |}""".stripMargin

Catalog定義了HBase和Spark表之間的映射。該目錄有兩個(gè)關(guān)鍵部分。一個(gè)是rowkey定義,另一個(gè)是Spark中的表列與HBase中的列族和列限定符之間的映射。上面定義了一個(gè)HBase表的模式,其名稱為table1,行鍵為key,列數(shù)為col1 -col8。請(qǐng)注意,還必須將r??owkey詳細(xì)定義為column (col0),該列具有特定的cf(rowkey)。

保存DataFrame

case class HBaseRecord(
   col0: String,
   col1: Boolean,
   col2: Double,
   col3: Float,
   col4: Int,       
   col5: Long,
   col6: Short,
   col7: String,
   col8: Byte)

object HBaseRecord
{                                                                                                             
   def apply(i: Int, t: String): HBaseRecord = {
      val s = s"""row${"%03d".format(i)}"""       
      HBaseRecord(s,
      i % 2 == 0,
      i.toDouble,
      i.toFloat,  
      i,
      i.toLong,
      i.toShort,  
      s"String$i: $t",      
      i.toByte)
  }
}

val data = (0 to 255).map { i =>  HBaseRecord(i, "extra")}

sc.parallelize(data).toDF.write.options(
 Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
 .format("org.apache.hadoop.hbase.spark ")
 .save()

用戶準(zhǔn)備的data是一個(gè)本地Scala集合,它有256個(gè)HBaseRecord對(duì)象。 sc.parallelize(data)函數(shù)分配data以形成RDD。toDF返回一個(gè)DataFrame。 writefunction返回一個(gè)DataFrameWriter,它用于將DataFrame寫入外部存儲(chǔ)系統(tǒng)(例如,HBase)。給定具有指定模式的DataFrame catalog,save函數(shù)將創(chuàng)建一個(gè)包含5個(gè)區(qū)域的HBase表,并將DataFrame保存在其中。

加載DataFrame

def withCatalog(cat: String): DataFrame = {
  sqlContext
  .read
  .options(Map(HBaseTableCatalog.tableCatalog->cat))
  .format("org.apache.hadoop.hbase.spark")
  .load()
}
val df = withCatalog(catalog)

在'withCatalog'函數(shù)中,sqlContext是SQLContext的變量,它是在Spark中處理結(jié)構(gòu)化數(shù)據(jù)(行和列)的入口點(diǎn)。 read返回一個(gè)DataFrameReader,可用于以DataFrame的形式讀取數(shù)據(jù)。 option函數(shù)將基礎(chǔ)數(shù)據(jù)源的輸入選項(xiàng)添加到DataFrameReader,format函數(shù)指定DataFrameReader的輸入數(shù)據(jù)源格式。該load()函數(shù)將輸入作為DataFrame加載。withCatalog函數(shù)返回的日期框df可用于訪問HBase表。

語言綜合查詢

val s = df.filter(($"col0" <= "row050" && $"col0" > "row040") ||
  $"col0" === "row005" ||
  $"col0" <= "row005")
  .select("col0", "col1", "col4")
s.show

DataFrame可以執(zhí)行各種操作,例如join,sort,select,filter,orderBy等。上面的df.filter使用給定的SQL表達(dá)式過濾行。select選擇一組列: col0,col1和col4。

SQL查詢

df.registerTempTable("table1")
sqlContext.sql("select count(col1) from table1").show

registerTempTabledf使用表名將DataFrame注冊(cè)為臨時(shí)表table1。此臨時(shí)表的生命周期與用于創(chuàng)建df的SQLContext相關(guān)聯(lián)。sqlContext.sql函數(shù)允許用戶執(zhí)行SQL查詢。

其他

示例-使用不同時(shí)間戳的查詢

在HBaseSparkConf中,可以設(shè)置與時(shí)間戳相關(guān)的四個(gè)參數(shù)。它們分別是TIMESTAMP,MIN_TIMESTAMP,MAX_TIMESTAMP和MAX_VERSIONS。用戶可以使用MIN_TIMESTAMP和MAX_TIMESTAMP查詢具有不同時(shí)間戳或時(shí)間范圍的記錄。與此同時(shí),在下面的示例中使用具體值而不是tsSpecified和oldMs。

下面的示例顯示了如何使用不同的時(shí)間戳加載df DataFrame。tsSpecified由用戶指定。HBaseTableCatalog定義HBase和Relation關(guān)系模式。writeCatalog定義模式映射的目錄。

val df = sqlContext.read
      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMESTAMP -> tsSpecified.toString))
      .format("org.apache.hadoop.hbase.spark")
      .load()

下面的示例顯示了如何加載具有不同時(shí)間范圍的df DataFrame。oldMs由用戶指定。

val df = sqlContext.read
      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
        HBaseSparkConf.MAX_TIMESTAMP -> oldMs.toString))
      .format("org.apache.hadoop.hbase.spark")
      .load()

加載df DataFrame后,用戶可以查詢數(shù)據(jù)。

df.registerTempTable("table")
sqlContext.sql("select count(col1) from table").show

示例-本地Avro支持

HBase-Spark Connector支持不同的數(shù)據(jù)格式,如Avro,Jason等。下面的用例顯示了spark是如何支持Avro的。用戶可以直接將Avro記錄保存到HBase中。在內(nèi)部,Avro模式自動(dòng)轉(zhuǎn)換為本機(jī)Spark Catalyst數(shù)據(jù)類型。請(qǐng)注意,HBase表中的兩個(gè)鍵值部分都可以用Avro格式定義。

1)定義模式映射的目錄:

def catalog = s"""{
                     |"table":{"namespace":"default", "name":"Avrotable"},
                      |"rowkey":"key",
                      |"columns":{
                      |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                      |"col1":{"cf":"cf1", "col":"col1", "type":"binary"}
                      |}
                      |}""".stripMargin

catalog是名為Avrotable的HBase表的模式。行鍵作為鍵和一列col1。還必須將r??owkey詳細(xì)定義為column (col0),該列具有特定的cf(rowkey)。

2)準(zhǔn)備數(shù)據(jù):

object AvroHBaseRecord {
   val schemaString =
     s"""{"namespace": "example.avro",
         |   "type": "record",      "name": "User",
         |    "fields": [
         |        {"name": "name", "type": "string"},
         |        {"name": "favorite_number",  "type": ["int", "null"]},
         |        {"name": "favorite_color", "type": ["string", "null"]},
         |        {"name": "favorite_array", "type": {"type": "array", "items": "string"}},
         |        {"name": "favorite_map", "type": {"type": "map", "values": "int"}}
         |      ]    }""".stripMargin

   val avroSchema: Schema = {
     val p = new Schema.Parser
     p.parse(schemaString)
   }

   def apply(i: Int): AvroHBaseRecord = {
     val user = new GenericData.Record(avroSchema);
     user.put("name", s"name${"%03d".format(i)}")
     user.put("favorite_number", i)
     user.put("favorite_color", s"color${"%03d".format(i)}")
     val favoriteArray = new GenericData.Array[String](2, avroSchema.getField("favorite_array").schema())
     favoriteArray.add(s"number${i}")
     favoriteArray.add(s"number${i+1}")
     user.put("favorite_array", favoriteArray)
     import collection.JavaConverters._
     val favoriteMap = Map[String, Int](("key1" -> i), ("key2" -> (i+1))).asJava
     user.put("favorite_map", favoriteMap)
     val avroByte = AvroSedes.serialize(user, avroSchema)
     AvroHBaseRecord(s"name${"%03d".format(i)}", avroByte)
   }
 }

 val data = (0 to 255).map { i =>
    AvroHBaseRecord(i)
 }

首先定義schemaString,然后解析得到avroSchema。avroSchema用于生成AvroHBaseRecord。用戶準(zhǔn)備的data是一個(gè)包含256個(gè)AvroHBaseRecord對(duì)象的本地Scala集合。

3)保存DataFrame:

 sc.parallelize(data).toDF.write.options(
     Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
     .format("org.apache.spark.sql.execution.datasources.hbase")
     .save()

給定具有指定模式的數(shù)據(jù)框catalog,上面將創(chuàng)建一個(gè)包含5個(gè)區(qū)域的HBase表,并將數(shù)據(jù)框保存在其中。

4)加載DataFrame

def avroCatalog = s"""{
            |"table":{"namespace":"default", "name":"avrotable"},
            |"rowkey":"key",
            |"columns":{
              |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
              |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
            |}
          |}""".stripMargin

 def withCatalog(cat: String): DataFrame = {
     sqlContext
         .read
         .options(Map("avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalog))
         .format("org.apache.spark.sql.execution.datasources.hbase")
         .load()
 }
 val df = withCatalog(catalog)

在withCatalog函數(shù)中,read返回一個(gè)DataFrameReader,可用于以DataFrame的形式讀取數(shù)據(jù)。該option函數(shù)將基礎(chǔ)數(shù)據(jù)源的輸入選項(xiàng)添加到DataFrameReader。有兩個(gè)選項(xiàng):一個(gè)是設(shè)置avroSchema為AvroHBaseRecord.schemaString,一個(gè)是設(shè)置HBaseTableCatalog.tableCatalog為avroCatalog。該load()函數(shù)將輸入作為DataFrame加載。withCatalog函數(shù)返回的日期框df可用于訪問HBase表。

5)SQL查詢:

 df.registerTempTable("avrotable")
 val c = sqlContext.sql("select count(1) from avrotable").

加載df DataFrame后,用戶可以查詢數(shù)據(jù)。registerTempTable使用表名avrotable將df DataFrame注冊(cè)為臨時(shí)表。sqlContext.sql函數(shù)允許用戶執(zhí)行SQL查詢。

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)