如何在PySpark中读取Avro文件
我正在使用python写一个spark工作。 但是,我需要阅读大量的avro文件。
这是我在Spark的示例文件夹中找到的最接近的解决方案。 但是,您需要使用spark-submit提交此python脚本。 在spark-submit的命令行中,您可以指定驱动程序类,在这种情况下,您所有的avrokey,avrovalue类将被定位。
avro_rdd = sc.newAPIHadoopFile(
path,
"org.apache.avro.mapreduce.AvroKeyInputFormat",
"org.apache.avro.mapred.AvroKey",
"org.apache.hadoop.io.NullWritable",
keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
conf=conf)
在我的情况下,我需要在Python脚本中运行所有内容,我试图创建一个环境变量来包含jar文件,手指交叉Python会将jar添加到路径中,但显然不是,它给了我意想不到的类错误。
os.environ['SPARK_SUBMIT_CLASSPATH'] = "/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0.jar"
任何人都可以帮助我如何在一个python脚本中读取avro文件吗?
你可以使用spark-avro
库。 首先让我们创建一个示例数据集:
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
schema_string ='''{"namespace": "example.avro",
"type": "record",
"name": "KeyValue",
"fields": [
{"name": "key", "type": "string"},
{"name": "value", "type": ["int", "null"]}
]
}'''
schema = avro.schema.parse(schema_string)
with open("kv.avro", "w") as f, DataFileWriter(f, DatumWriter(), schema) as wrt:
wrt.append({"key": "foo", "value": -1})
wrt.append({"key": "bar", "value": 1})
使用spark-csv
读取它非常简单:
df = sqlContext.read.format("com.databricks.spark.avro").load("kv.avro")
df.show()
## +---+-----+
## |key|value|
## +---+-----+
## |foo| -1|
## |bar| 1|
## +---+-----+
以前的解决方案需要安装第三方Java依赖项,这不是大多数Python开发人员所满意的。 但是如果你只想用给定的模式解析你的Avro文件,你并不需要外部库。 你可以阅读二进制文件,并用你最喜欢的Python Avro包解析它们。
例如,您可以使用fastavro
加载Avro文件:
from io import BytesIO
import fastavro
schema = {
...
}
rdd = sc.binaryFiles("/path/to/dataset/*.avro")
.flatMap(lambda args: fastavro.reader(BytesIO(args[1]), reader_schema=schema))
print(rdd.collect())
链接地址: http://www.djcxy.com/p/84583.html