Using PartitionBy to split and efficiently compute RDD groups by Key

I've implemented a solution to group RDD[K, V] by key and to compute data according to each group (K, RDD[V]) , using partitionBy and Partitioner . Nevertheless, I'm not sure if it is really efficient and I'd like to have your point of view.

Here is a sample case : according to a list of [K: Int, V: Int] , compute the V s mean for each group of K , knowing that it should be distributed and that V values may be very large. That should give :

List[K, V] => (K, mean(V))

The simple Partitioner class:

class MyPartitioner(maxKey: Int) extends Partitioner {

    def numPartitions = maxKey

    def getPartition(key: Any): Int = key match {
      case i: Int if i < maxKey => i
    }
  }

The partition code :

val l = List((1, 1), (1, 8), (1, 30), (2, 4), (2, 5), (3, 7))

      val rdd = sc.parallelize(l)
      val p =  rdd.partitionBy(new MyPartitioner(4)).cache()

      p.foreachPartition(x => {
        try {
          val r = sc.parallelize(x.toList)
          val id = r.first() //get the K partition id
          val v = r.map(x => x._2)
          println(id._1 + "->" + mean(v))
        } catch {
          case e: UnsupportedOperationException => 0
        }
      })

The output is :

1->13, 2->4, 3->7

My questions are :

  • what does it really happen when calling partitionBy ? (sorry, I didn't find enough specs on it)
  • Is it really efficient to map by partition, knowing that in my production case it would not be too much keys (as 50 for sample) by very much values (as 1 million for sample)
  • What is the cost of paralellize(x.toList) ? Is it consistent to do it ? (I need a RDD in input of mean() )
  • How would you do it by yourself ?
  • Regards


    Your code should not work. You cannot pass the SparkContext object to the executors. (It's not Serializable .) Also I don't see why you would need to.

    To calculate the mean, you need to calculate the sum and the count and take their ratio. The default partitioner will do fine.

    def meanByKey(rdd: RDD[(Int, Int)]): RDD[(Int, Double)] = {
      case class SumCount(sum: Double, count: Double)
      val sumCounts = rdd.aggregateByKey(SumCount(0.0, 0.0))(
        (sc, v) => SumCount(sc.sum + v, sc.count + 1.0),
        (sc1, sc2) => SumCount(sc1.sum + sc2.sum, sc1.count + sc2.count))
      sumCounts.map(sc => sc.sum / sc.count)
    }
    

    This is an efficient single-pass calculation that generalizes well.

    链接地址: http://www.djcxy.com/p/83630.html

    上一篇: RESTful应用程序的@ResponseStatus Spring注释究竟如何工作?

    下一篇: 使用PartitionBy按键分割和有效计算RDD组