如何使用Pyspark和Dataframe查询Elasticsearch索引
Elasticsaerch的文档仅涵盖加载Spark的完整索引。
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type")
df.printSchema()
你如何执行查询以从Elasticsearch索引中返回数据并使用pyspark将它们作为DataFrame加载到Spark?
以下是我如何做到的。
常规环境设置和命令:
export SPARK_HOME=/home/ezerkar/spark-1.6.0-bin-hadoop2.6
export PYSPARK_DRIVER_PYTHON=ipython2
./spark-1.6.0-bin-hadoop2.6/bin/pyspark --driver-class-path=/home/eyald/spark-1.6.0-bin-hadoop2.6/lib/elasticsearch-hadoop-2.3.1.jar
码:
from pyspark import SparkConf
from pyspark.sql import SQLContext
conf = SparkConf().setAppName("ESTest")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
q ="""{
"query": {
"filtered": {
"filter": {
"exists": {
"field": "label"
}
},
"query": {
"match_all": {}
}
}
}
}"""
es_read_conf = {
"es.nodes" : "localhost",
"es.port" : "9200",
"es.resource" : "titanic/passenger",
"es.query" : q
}
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_read_conf)
sqlContext.createDataFrame(es_rdd).collect()
您还可以定义数据帧列。 请参阅此处了解更多信息。
希望它有帮助!
我使用pyspark在亚马逊的EMR群集中运行我的代码。 然后,我按照以下步骤开展工作:
1)将此引导操作放入集群创建中(以创建localhost elasticsearch服务器):
s3://awssupportdatasvcs.com/bootstrap-actions/elasticsearch/elasticsearch_install.4.0.0.rb
2)我运行这些命令来用一些数据填充elasticsearch数据库:
curl -XPUT "http://localhost:9200/movies/movie/1" -d' {
"title": "The Godfather",
"director": "Francis Ford Coppola",
"year": 1972
}'
如果您愿意,您也可以运行其他卷曲命令,如:
curl -XGET http://localhost:9200/_search?pretty=true&q={'matchAll':{''}}
3)我使用以下参数导入了pyspark:
pyspark --driver-memory 5G --executor-memory 10G --executor-cores 2 --jars=elasticsearch-hadoop-5.5.1.jar
我以前下载过elasticsearch python客户端
4)我运行以下代码:
from pyspark import SparkConf
from pyspark.sql import SQLContext
q ="""{
"query": {
"match_all": {}
}
}"""
es_read_conf = {
"es.nodes" : "localhost",
"es.port" : "9200",
"es.resource" : "movies/movie",
"es.query" : q
}
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_read_conf)
sqlContext.createDataFrame(es_rdd).collect()
然后我终于从命令中获得了成功的结果。
我遇到类似这样的问题,要将地理过滤数据导入PySpark DataFrame。 我在Spark版本2.1.1和ES版本5.2中使用elasticsearch-spark-20_2.11-5.2.2.jar。 通过在创建DataFrame时将我的查询指定为选项,我能够将数据加载到DataFrame中
我的地理查询
q ="""{
"query": {
"bool" : {
"must" : {
"match_all" : {}
},
"filter" : {
"geo_distance" : {
"distance" : "100km",
"location" : {
"lat" : 35.825,
"lon" : -87.99
}
}
}
}
}
}"""
我使用以下命令将数据加载到DataFrame中
spark_df = spark.read.format("es").option("es.query", q).load("index_name")
链接地址: http://www.djcxy.com/p/92471.html
上一篇: How to query an Elasticsearch index using Pyspark and Dataframes