Spark Streaming groupByKey and updateStateByKey implementation

I am trying to run stateful Spark Streaming computations over (fake) apache web server logs read from Kafka. The goal is to "sessionize" the web traffic similar to this blog post

The only difference is that I want to "sessionize" each page the IP hits, instead of the entire session. I was able to do this reading from a file of fake web traffic using Spark in batch mode, but now I want to do it in a streaming context.

Log files are read from Kafka and parsed into K/V pairs of (String, (String, Long, Long)) or

(IP, (requestPage, time, time)) .

I then call groupByKey() on this K/V pair . In batch mode, this would produce a:

(String, CollectionBuffer((String, Long, Long), ...) or

(IP, CollectionBuffer((requestPage, time, time), ...)

In a StreamingContext, it produces a:

(String, ArrayBuffer((String, Long, Long), ...) like so:

(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))

However, as the next microbatch (DStream) arrives, this information is discarded.

Ultimately what I want is for that ArrayBuffer to fill up over time as a given IP continues to interact and to run some computations on its data to "sessionize" the page time.

I believe the operator to make that happen is " updateStateByKey ." I'm having some trouble with this operator (I'm new to both Spark & Scala);

any help is appreciated.

Thus far:

val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey) 


    def updateGroupByKey(
                          a: Seq[(String, ArrayBuffer[(String, Long, Long)])],
                          b: Option[(String, ArrayBuffer[(String, Long, Long)])]
                          ): Option[(String, ArrayBuffer[(String, Long, Long)])] = {

  }

我认为你正在寻找这样的东西:

 def updateGroupByKey(
                          newValues: Seq[(String, ArrayBuffer[(String, Long, Long)])],
                          currentValue: Option[(String, ArrayBuffer[(String, Long, Long)])]
                          ): Option[(String, ArrayBuffer[(String, Long, Long)])] = {
     //Collect the values
     val buffs: Seq[ArrayBuffer[(String, Long, Long)]] = (for (v <- newValues) yield v._2)
     val buffs2 = if (currentValue.isEmpty) buffs else currentValue.get._2 :: buffs
     //Convert state to buffer
     if (buffs2.isEmpty) None else {
        val key = if (currentValue.isEmpty) newValues(0)._1 else currentValue.get._1
        Some((key, buffs2.foldLeft(new ArrayBuffer[(String, Long, Long)])((v, a) => v++a)))
     }
  }

Gabor's answer got me started down the right path, but here is an answer that produces the expected output.

First, for the output I want:

(100.40.49.235,List((/,1418934075000,1418934075000), (/,1418934105000,1418934105000), (/contactus.html,1418934174000,1418934174000)))

I don't need groupByKey() . updateStateByKey already accumulates the values into a Seq, so the addition of groupByKey is unnecessary (and expensive). Spark users strongly suggest not using groupByKey .

Here is the code that worked:

def updateValues( newValues: Seq[(String, Long, Long)],
                      currentValue: Option[Seq[ (String, Long, Long)]]
                      ): Option[Seq[(String, Long, Long)]] = {

  Some(currentValue.getOrElse(Seq.empty) ++ newValues)

  }


val grouped = ipTimeStamp.updateStateByKey(updateValues)

Here updateStateByKey is passed a function (updateValues) that has the accumulation of values over time (newValues) as well as an option for the current value in the stream (currentValue). It then returns the combination of these. getOrElse is required as currentValue may occasionally be empty. Credit to https://twitter.com/granturing for the correct code.

链接地址: http://www.djcxy.com/p/83002.html

上一篇: GWT GoogleMaps使用样式隐藏默认图层

下一篇: Spark Streaming groupByKey和updateStateByKey实现