从Apache Spark访问公共可用的Amazon S3文件

我有一个公共的可用Amazon S3资源(文本文件),并希望从spark中访问它。 这意味着 - 我没有任何亚马逊凭证 - 如果我只想下载它,它工作正常:

val bucket = "<my-bucket>"
val key = "<my-key>"

val client = new AmazonS3Client
val o = client.getObject(bucket, key)
val content = o.getObjectContent // <= can be read and used as input stream

但是,当我尝试从spark上下文访问相同的资源时

val conf = new SparkConf().setAppName("app").setMaster("local")
val sc = new SparkContext(conf)
val f = sc.textFile(s"s3a://$bucket/$key")
println(f.count())

我收到以下stacktrace错误:

Exception in thread "main" com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
    at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
    at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
    at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2653)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:221)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1099)
    at com.example.Main$.main(Main.scala:14)
    at com.example.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

我不想提供任何AWS证书 - 我只是想匿名访问资源(现在) - 如何实现这一目标? 我可能需要使它像AnonymousAWSCredentialsProvider - 但如何把它放在火花或hadoop?

PS我的build.sbt以防万一

scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.4.1",
  "org.apache.hadoop" % "hadoop-aws" % "2.7.1"
)

更新:在我做了一些调查之后 - 我看到了它没有工作的原因。

首先,S3AFileSystem使用以下凭证顺序创建AWS客户端:

AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain(
    new BasicAWSCredentialsProvider(accessKey, secretKey),
    new InstanceProfileCredentialsProvider(),
    new AnonymousAWSCredentialsProvider()
);

“accessKey”和“secretKey”值取自spark conf实例(键值必须是“fs.s3a.access.key”和“fs.s3a.secret.key”或org.apache.hadoop.fs.s3a.Constants .ACCESS_KEY和org.apache.hadoop.fs.s3a.Constants.SECRET_KEY常量,这样更方便)。

其次 - 你可能会看到AnonymousAWSCredentialsProvider是第三个选项(最后优先) - 那有什么可能是错误的? 查看AnonymousAWSCredentials的实现:

public class AnonymousAWSCredentials implements AWSCredentials {

    public String getAWSAccessKeyId() {
        return null;
    }

    public String getAWSSecretKey() {
        return null;
    }
}

它只是为访问密钥和密钥返回null。 听起来很合理。 但看看AWSCredentialsProviderChain:

AWSCredentials credentials = provider.getCredentials();

if (credentials.getAWSAccessKeyId() != null &&
    credentials.getAWSSecretKey() != null) {
    log.debug("Loading credentials from " + provider.toString());

    lastUsedProvider = provider;
    return credentials;
}

如果两个密钥均为空,它不会选择提供者 - 这意味着匿名凭证无法工作。 看起来像aws-java-sdk-1.7.4中的一个bug。 我试图使用最新版本 - 但它与hadoop-aws-2.7.1不兼容。

任何其他想法?


我个人从未访问过Spark的公共数据。 您可以尝试使用虚拟凭证,或仅为此用途创建虚拟凭证。 直接在SparkConf对象上设置它们。

val sparkConf: SparkConf = ???
val accessKeyId: String = ???
val secretAccessKey: String = ???
sparkConf.set("spark.hadoop.fs.s3.awsAccessKeyId", accessKeyId)
sparkConf.set("spark.hadoop.fs.s3n.awsAccessKeyId", accessKeyId)
sparkConf.set("spark.hadoop.fs.s3.awsSecretAccessKey", secretAccessKey)
sparkConf.set("spark.hadoop.fs.s3n.awsSecretAccessKey", secretAccessKey)

作为替代方案,请阅读DefaultAWSCredentialsProviderChain的文档以查看凭据的查找位置。 清单(顺序很重要)是:

  • 环境变量 - AWS_ACCESS_KEY_ID和AWS_SECRET_KEY
  • Java系统属性 - aws.accessKeyId和aws.secretKey
  • Credential配置文件文件位于所有AWS开发工具包和AWS CLI共享的默认位置(〜/ .aws / credentials)
  • 通过Amazon EC2元数据服务提供的实例配置文件凭据
  • 链接地址: http://www.djcxy.com/p/27599.html

    上一篇: Access public available Amazon S3 file from Apache Spark

    下一篇: Does JavaScript promise create memory leaks when not rejected or resolved?