DStream中的轉(zhuǎn)換

2018-02-24 15:57 更新

DStream中的轉(zhuǎn)換(transformation)

和RDD類似,transformation允許從輸入DStream來的數(shù)據(jù)被修改。DStreams支持很多在RDD中可用的transformation算子。一些常用的算子如下所示:

Transformation Meaning
map(func) 利用函數(shù)func處理原DStream的每個(gè)元素,返回一個(gè)新的DStream
flatMap(func) 與map相似,但是每個(gè)輸入項(xiàng)可用被映射為0個(gè)或者多個(gè)輸出項(xiàng)
filter(func) 返回一個(gè)新的DStream,它僅僅包含源DStream中滿足函數(shù)func的項(xiàng)
repartition(numPartitions) 通過創(chuàng)建更多或者更少的partition改變這個(gè)DStream的并行級(jí)別(level of parallelism)
union(otherStream) 返回一個(gè)新的DStream,它包含源DStream和otherStream的聯(lián)合元素
count() 通過計(jì)算源DStream中每個(gè)RDD的元素?cái)?shù)量,返回一個(gè)包含單元素(single-element)RDDs的新DStream
reduce(func) 利用函數(shù)func聚集源DStream中每個(gè)RDD的元素,返回一個(gè)包含單元素(single-element)RDDs的新DStream。函數(shù)應(yīng)該是相關(guān)聯(lián)的,以使計(jì)算可以并行化
countByValue() 這個(gè)算子應(yīng)用于元素類型為K的DStream上,返回一個(gè)(K,long)對(duì)的新DStream,每個(gè)鍵的值是在原DStream的每個(gè)RDD中的頻率。
reduceByKey(func, [numTasks]) 當(dāng)在一個(gè)由(K,V)對(duì)組成的DStream上調(diào)用這個(gè)算子,返回一個(gè)新的由(K,V)對(duì)組成的DStream,每一個(gè)key的值均由給定的reduce函數(shù)聚集起來。注意:在默認(rèn)情況下,這個(gè)算子利用了Spark默認(rèn)的并發(fā)任務(wù)數(shù)去分組。你可以用numTasks參數(shù)設(shè)置不同的任務(wù)數(shù)
join(otherStream, [numTasks]) 當(dāng)應(yīng)用于兩個(gè)DStream(一個(gè)包含(K,V)對(duì),一個(gè)包含(K,W)對(duì)),返回一個(gè)包含(K, (V, W))對(duì)的新DStream
cogroup(otherStream, [numTasks]) 當(dāng)應(yīng)用于兩個(gè)DStream(一個(gè)包含(K,V)對(duì),一個(gè)包含(K,W)對(duì)),返回一個(gè)包含(K, Seq[V], Seq[W])的元組
transform(func) 通過對(duì)源DStream的每個(gè)RDD應(yīng)用RDD-to-RDD函數(shù),創(chuàng)建一個(gè)新的DStream。這個(gè)可以在DStream中的任何RDD操作中使用
updateStateByKey(func) 利用給定的函數(shù)更新DStream的狀態(tài),返回一個(gè)新"state"的DStream。

最后兩個(gè)transformation算子需要重點(diǎn)介紹一下:

UpdateStateByKey操作

updateStateByKey操作允許不斷用新信息更新它的同時(shí)保持任意狀態(tài)。你需要通過兩步來使用它

  • 定義狀態(tài)-狀態(tài)可以是任何的數(shù)據(jù)類型
  • 定義狀態(tài)更新函數(shù)-怎樣利用更新前的狀態(tài)和從輸入流里面獲取的新值更新狀態(tài)

讓我們舉個(gè)例子說明。在例子中,你想保持一個(gè)文本數(shù)據(jù)流中每個(gè)單詞的運(yùn)行次數(shù),運(yùn)行次數(shù)用一個(gè)state表示,它的類型是整數(shù)

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}

這個(gè)函數(shù)被用到了DStream包含的單詞上

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
// Create a local StreamingContext with two working thread and batch interval of 1 second
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

更新函數(shù)將會(huì)被每個(gè)單詞調(diào)用,newValues擁有一系列的1(從 (詞, 1)對(duì)而來),runningCount擁有之前的次數(shù)。要看完整的代碼,見例子

Transform操作

transform操作(以及它的變化形式如transformWith)允許在DStream運(yùn)行任何RDD-to-RDD函數(shù)。它能夠被用來應(yīng)用任何沒在DStream API中提供的RDD操作(It can be used to apply any RDD operation that is not exposed in the DStream API)。例如,連接數(shù)據(jù)流中的每個(gè)批(batch)和另外一個(gè)數(shù)據(jù)集的功能并沒有在DStream API中提供,然而你可以簡單的利用transform方法做到。如果你想通過連接帶有預(yù)先計(jì)算的垃圾郵件信息的輸入數(shù)據(jù)流來清理實(shí)時(shí)數(shù)據(jù),然后過了它們,你可以按如下方法來做:

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform(rdd => {
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
})

事實(shí)上,你也可以在transform方法中用機(jī)器學(xué)習(xí)圖計(jì)算算法

窗口(window)操作

Spark Streaming也支持窗口計(jì)算,它允許你在一個(gè)滑動(dòng)窗口數(shù)據(jù)上應(yīng)用transformation算子。下圖闡明了這個(gè)滑動(dòng)窗口。

滑動(dòng)窗口

如上圖顯示,窗口在源DStream上滑動(dòng),合并和操作落入窗內(nèi)的源RDDs,產(chǎn)生窗口化的DStream的RDDs。在這個(gè)具體的例子中,程序在三個(gè)時(shí)間單元的數(shù)據(jù)上進(jìn)行窗口操作,并且每兩個(gè)時(shí)間單元滑動(dòng)一次。這說明,任何一個(gè)窗口操作都需要指定兩個(gè)參數(shù):

  • 窗口長度:窗口的持續(xù)時(shí)間
  • 滑動(dòng)的時(shí)間間隔:窗口操作執(zhí)行的時(shí)間間隔

這兩個(gè)參數(shù)必須是源DStream的批時(shí)間間隔的倍數(shù)。

下面舉例說明窗口操作。例如,你想擴(kuò)展前面的例子用來計(jì)算過去30秒的詞頻,間隔時(shí)間是10秒。為了達(dá)到這個(gè)目的,我們必須在過去30秒的pairs DStream上應(yīng)用reduceByKey操作。用方法reduceByKeyAndWindow實(shí)現(xiàn)。

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

一些常用的窗口操作如下所示,這些操作都需要用到上文提到的兩個(gè)參數(shù):窗口長度和滑動(dòng)的時(shí)間間隔

Transformation Meaning
window(windowLength, slideInterval) 基于源DStream產(chǎn)生的窗口化的批數(shù)據(jù)計(jì)算一個(gè)新的DStream
countByWindow(windowLength, slideInterval) 返回流中元素的一個(gè)滑動(dòng)窗口數(shù)
reduceByWindow(func, windowLength, slideInterval) 返回一個(gè)單元素流。利用函數(shù)func聚集滑動(dòng)時(shí)間間隔的流的元素創(chuàng)建這個(gè)單元素流。函數(shù)必須是相關(guān)聯(lián)的以使計(jì)算能夠正確的并行計(jì)算。
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 應(yīng)用到一個(gè)(K,V)對(duì)組成的DStream上,返回一個(gè)由(K,V)對(duì)組成的新的DStream。每一個(gè)key的值均由給定的reduce函數(shù)聚集起來。注意:在默認(rèn)情況下,這個(gè)算子利用了Spark默認(rèn)的并發(fā)任務(wù)數(shù)去分組。你可以用numTasks參數(shù)設(shè)置不同的任務(wù)數(shù)
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enter the sliding window, and "inverse reducing" the old data that leave the window. An example would be that of "adding" and "subtracting" counts of keys as the window slides. However, it is applicable to only "invertible reduce functions", that is, those reduce functions which have a corresponding "inverse reduce" function (taken as parameter invFunc. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument.
countByValueAndWindow(windowLength, slideInterval, [numTasks]) 應(yīng)用到一個(gè)(K,V)對(duì)組成的DStream上,返回一個(gè)由(K,V)對(duì)組成的新的DStream。每個(gè)key的值都是它們?cè)诨瑒?dòng)窗口中出現(xiàn)的頻率。
以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)