Spark Streaming groupByKey和updateStateByKey实现

我试图通过从Kafka读取的(假的)apache Web服务器日志运行有状态的Spark Streaming计算。 目标是“会话”类似于此博客帖子的网络流量

唯一的区别是,我想“会话”每个网页的IP命中,而不是整个会话。 我能够在批处理模式下使用Spark从假Web流量文件中读取数据,但现在我想在流式上下文中进行此操作。

从Kafka读取日志文件并将其解析为(String, (String, Long, Long))或者K/V

(IP, (requestPage, time, time))

然后我在这个K/V pair上调用groupByKey() 。 在批处理模式下,这会产生一个:

(String, CollectionBuffer((String, Long, Long), ...)或者

(IP, CollectionBuffer((requestPage, time, time), ...)

在StreamingContext中,它产生一个:

(String, ArrayBuffer((String, Long, Long), ...)如下所示:

(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))

但是,随着下一个microbatch(DStream)到达,这些信息将被丢弃。

最终,我想要的是随着给定的IP持续交互并对其数据运行一些计算以“会话”页面时间, ArrayBuffer随着时间的推移而填满。

我相信运营商做到这一点是“ updateStateByKey 。 我对这个操作员有些麻烦(我对Spark和Scala都是新手);

任何帮助表示赞赏。

迄今:

val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey) 


    def updateGroupByKey(
                          a: Seq[(String, ArrayBuffer[(String, Long, Long)])],
                          b: Option[(String, ArrayBuffer[(String, Long, Long)])]
                          ): Option[(String, ArrayBuffer[(String, Long, Long)])] = {

  }

我认为你正在寻找这样的东西:

 def updateGroupByKey(
                          newValues: Seq[(String, ArrayBuffer[(String, Long, Long)])],
                          currentValue: Option[(String, ArrayBuffer[(String, Long, Long)])]
                          ): Option[(String, ArrayBuffer[(String, Long, Long)])] = {
     //Collect the values
     val buffs: Seq[ArrayBuffer[(String, Long, Long)]] = (for (v <- newValues) yield v._2)
     val buffs2 = if (currentValue.isEmpty) buffs else currentValue.get._2 :: buffs
     //Convert state to buffer
     if (buffs2.isEmpty) None else {
        val key = if (currentValue.isEmpty) newValues(0)._1 else currentValue.get._1
        Some((key, buffs2.foldLeft(new ArrayBuffer[(String, Long, Long)])((v, a) => v++a)))
     }
  }

Gabor的回答让我开始了正确的道路,但这是一个能够产生预期结果的答案。

首先,为了我想要的输出:

(100.40.49.235,List((/,1418934075000,1418934075000), (/,1418934105000,1418934105000), (/contactus.html,1418934174000,1418934174000)))

我不需要groupByKey()updateStateByKey已将这些值累加到Seq中,因此不需要添加groupByKey (并且代价昂贵)。 Spark用户强烈建议不要使用groupByKey

以下是有效的代码:

def updateValues( newValues: Seq[(String, Long, Long)],
                      currentValue: Option[Seq[ (String, Long, Long)]]
                      ): Option[Seq[(String, Long, Long)]] = {

  Some(currentValue.getOrElse(Seq.empty) ++ newValues)

  }


val grouped = ipTimeStamp.updateStateByKey(updateValues)

在这里, updateStateByKey传递一个函数(updateValues),该函数具有随时间而变化的值(newValues)以及流中当前值(currentValue)的选项。 然后它返回这些的组合。 getOrElse是必需的,因为currentValue有时可能是空的。 感谢https://twitter.com/granturing获取正确的代码。

链接地址: http://www.djcxy.com/p/83001.html

上一篇: Spark Streaming groupByKey and updateStateByKey implementation

下一篇: Custom QIcon using QIconEngine and transparency