使用Spark的值数据库
我无法理解Spark如何与存储进行交互。
我想创建一个Spark集群,从RocksDB数据库(或任何其他键值存储)中获取数据。 然而,在这个时候,我能做的最好的是从数据库中将整个数据集提取到每个群集节点的内存中(例如,映射到一个映射中),并从该对象构建一个RDD。
我需要做什么才能获取必要的数据(如Spark对HDFS的使用)? 我已阅读了Hadoop输入格式和记录读者,但我并没有完全理解我应该实现的内容。
我知道这是一个广泛的问题,但我真的很感谢帮助我开始。 先谢谢你。
这是一个可能的解决方案。 我假设你有一个你想访问的键值存储(你的情况下是RocksDB)的客户端库。
KeyValuePair
表示一个bean类,表示来自键值存储的一个键值对。
类
/*Lazy iterator to read from KeyValue store*/
class KeyValueIterator implements Iterator<KeyValuePair> {
public KeyValueIterator() {
//TODO initialize your custom reader using java client library
}
@Override
public boolean hasNext() {
//TODO
}
@Override
public KeyValuePair next() {
//TODO
}
}
class KeyValueReader implements FlatMapFunction<KeyValuePair, KeyValuePair>() {
@Override
public Iterator<KeyValuePair> call(KeyValuePair keyValuePair) throws Exception {
//ignore empty 'keyValuePair' object
return new KeyValueIterator();
}
}
创建KeyValue RDD
/*list with a dummy KeyValuePair instance*/
ArrayList<KeyValuePair> keyValuePairs = new ArrayList<>();
keyValuePairs.add(new KeyValuePair());
JavaRDD<KeyValuePair> keyValuePairRDD = javaSparkContext.parallelize(keyValuePairs);
/*Read one key-value pair at a time lazily*/
keyValuePairRDD = keyValuePairRDD.flatMap(new KeyValueReader());
注意:
以上解决方案默认创建一个带有两个分区的RDD(其中一个将为空)。 在对keyValuePairRDD
应用任何转换之前增加分区,以便跨处理器执行处理。 增加分区的不同方法:
keyValuePairRDD.repartition(partitionCounts)
//OR
keyValuePairRDD.partitionBy(...)
链接地址: http://www.djcxy.com/p/94179.html