value pair into a key

I am writing a Spark application and want to combine a set of Key-Value pairs (K, V1), (K, V2), ..., (K, Vn) into one Key-Multivalue pair (K, [V1, V2, ..., Vn]) . I feel like I should be able to do this using the reduceByKey function with something of the flavor:

My_KMV = My_KV.reduce(lambda a, b: a.append([b]))

The error that I get when this occurs is:

'NoneType' object has no attribue 'append'.

My keys are integers and values V1,...,Vn are tuples. My goal is to create a single pair with the key and a list of the values (tuples).


Map and ReduceByKey

Input type and output type of reduce must be the same, therefore if you want to aggregate a list, you have to map the input to lists. Afterwards you combine the lists into one list.

Combining lists

You'll need a method to combine lists into one list. Phyton provides some methods to combine lists.

append modifies the first list and will always return None .

x = [1, 2, 3]
x.append([4, 5])
# x is [1, 2, 3, [4, 5]]

extend does the same, but unwraps lists:

x = [1, 2, 3]
x.extend([4, 5])
# x is [1, 2, 3, 4, 5]

Both methods return None , but you'll need a method that returns the combined list, therefore just use the plus sign.

x = [1, 2, 3] + [4, 5]
# x is [1, 2, 3, 4, 5]

Spark

file = spark.textFile("hdfs://...")
counts = file.flatMap(lambda line: line.split(" ")) 
         .map(lambda actor: (actor.split(",")[0], actor))  

         # transform each value into a list
         .map(lambda nameTuple: (nameTuple[0], [ nameTuple[1] ])) 

         # combine lists: ([1,2,3] + [4,5]) becomes [1,2,3,4,5]
         .reduceByKey(lambda a, b: a + b)

CombineByKey

It's also possible to solve this with combineByKey , which is used internally to implement reduceByKey , but it's more complex and "using one of the specialized per-key combiners in Spark can be much faster". Your use case is simple enough for the upper solution.

GroupByKey

It's also possible to solve this with groupByKey , but it reduces parallelization and therefore could be much slower for big data sets.


我谈话迟到了,但这是我的建议:

>>> foo = sc.parallelize([(1, ('a','b')), (2, ('c','d')), (1, ('x','y'))])
>>> foo.map(lambda (x,y): (x, [y])).reduceByKey(lambda p,q: p+q).collect()
[(1, [('a', 'b'), ('x', 'y')]), (2, [('c', 'd')])]

You can use the RDD groupByKey method.

Input:

data = [(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (2, 'e'), (3, 'f')]
rdd = sc.parallelize(data)
result = rdd.groupByKey().collect()

Output:

[(1, ['a', 'b']), (2, ['c', 'd', 'e']), (3, ['f'])]
链接地址: http://www.djcxy.com/p/22572.html

上一篇: 列表和元组有什么区别?

下一篇: 值对组合成一个密钥