配置接收器elasticsearch apache

这是我第一次来这里,非常抱歉,如果我没有发表正确的话,并且抱歉我的英语不好。

我正在尝试配置Apache Flume和Elasticsearch接收器。 一切都好,看起来运行良好,但我启动代理时有两个警告; 以下几点:

2015-11-16 09:11:22,122 (lifecycleSupervisor-1-3) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@ce359aa counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoSuchMethodError: org.elasticsearch.common.transport.InetSocketTransportAddress.<init>(Ljava/lang/String;I)V
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.configureHostnames(ElasticSearchTransportClient.java:143)
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.<init>(ElasticSearchTransportClient.java:77)
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48)
    at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:357)
    at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
    at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
2015-11-16 09:11:22,137 (lifecycleSupervisor-1-3) [WARN - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:260)] Component SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@ce359aa counterGroup:{ name:null counters:{} } } stopped, since it could not besuccessfully started due to missing dependencies

我的代理配置:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink ES
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = items
a1.sinks.k1.indexType = item
a1.sinks.k1.clusterName = elasticsearch
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer=org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

它启动netcat,一切都很好,但我害怕这些警告,我不明白。


我找到了一个理由,似乎Apache Flume 1.6.0和Elasticsearch 2.0无法正确沟通。

我从我修改的第三人那里找到了一个很好的水槽。

链接在这里

这是我最后的配置,

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink ES
a1.sinks = k1
a1.sinks.k1.type = com.frontier45.flume.sink.elasticsearch2.ElasticSearchSink
a1.sinks.k1.hostNames = 127.0.0.1:9300
a1.sinks.k1.indexName = items
a1.sinks.k1.indexType = item
a1.sinks.k1.clusterName = elasticsearch
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = com.frontier45.flume.sink.elasticsearch2.ElasticSearchDynamicSerializer
a1.sinks.k1.indexNameBuilder = com.frontier45.flume.sink.elasticsearch2.TimeBasedIndexNameBuilder
a1.sinks.k1.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

这个对我有用。

感谢您的回答。

PS是的,我必须移动库。


关注日志,有些缺失的依赖关系存在问题。

如果您查看ElasticSearchSink文档,您将看到以下内容:

您的环境所需的elasticsearch和lucene-core jar必须放置在Apache Flume安装的lib目录中。 Elasticsearch要求客户端JAR的主要版本与服务器的主要版本相匹配,并且两者都运行相同次要版本的JVM。 如果这是不正确的,SerializationException将会出现。 要首先选择所需版本,请确定elasticsearch的版本和目标群集正在运行的JVM版本。 然后选择一个与主版本匹配的elasticsearch客户端库。 0.19.x客户端可以与0.19.x群集交谈; 0.20.x可以和0.20.x交谈,0.90.x可以和0.90.x交谈。 一旦确定了elasticsearch版本,然后阅读pom.xml文件以确定要使用的正确的lucene-core JAR版本。 运行ElasticSearchSink的Flume代理还应该将目标群集正在运行的JVM与次要版本匹配。

很可能你没有放置所需的Java jar,或者该版本不适合。


仅在flume / lib目录下添加了2个JAR,并且它可以工作,不必添加所有其他Lucene JAR:

elasticsearch-1.7.1.jar

Lucene的核心- 4.10.4.jar

命令启动水槽:

bin/flume-ng agent --conf conf --conf-file conf/flume-aggregator.conf --name agent2 -Dflume.root.logger=INFO,console

请务必在下面添加到flume-env.sh

export JAVA_HOME=/usr/java/default

export JAVA_OPTS="-Xms3072m -Xmx3072m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

FLUME_CLASSPATH="/usr/flume/flume1.6/apache-flume-1.6.0-bin/;/usr/flume/flume1.6/apache-flume-1.6.0-bin/lib"

Flume aggregator配置来加载ES中的数据: flume-aggregator.conf

agent2.sources = source1
agent2.sinks = sink1
agent2.channels = channel1

################################################
# Describe Source
################################################

# Source Avro
agent2.sources.source1.type = avro
agent2.sources.source1.bind = 0.0.0.0 
agent2.sources.source1.port = 9997

################################################
# Describe Interceptors
################################################
# an example of nginx access log regex match
# agent2.sources.source1.interceptors = interceptor1
# agent2.sources.source1.interceptors.interceptor1.type = regex_extractor
# 
# agent2.sources.source1.interceptors.interceptor1.regex = "^(S+) [(.*?)] "(.*?)" (S+) (S+)( "(.*?)" "(.*?)")?"
# 
# # agent2.sources.source1.interceptors.interceptor1.regex = ^(.*) ([a-zA-Z.@-+_%]+) ([a-zA-Z.@-+_%]+) [(.*)] "(POST|GET) ([A-Za-z0-9$.+@#%_/-]*)??(.*) (.*)" ([a-zA-Z0-9./s-]*) (.*) ([0-9]+) ([0-9]+) ([0-9.]+)
# # agent2.sources.source1.interceptors.interceptor1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 s9 s10 s11 s12 s13
# 
# agent2.sources.source1.interceptors.interceptor1.serializers = s1 s2 s3 s4 s5 s6 s7 s8
# agent2.sources.source1.interceptors.interceptor1.serializers.s1.name = clientip
# agent2.sources.source1.interceptors.interceptor1.serializers.s2.name = datetime
# agent2.sources.source1.interceptors.interceptor1.serializers.s3.name = method
# agent2.sources.source1.interceptors.interceptor1.serializers.s4.name = request
# agent2.sources.source1.interceptors.interceptor1.serializers.s5.name = response
# agent2.sources.source1.interceptors.interceptor1.serializers.s6.name = status
# agent2.sources.source1.interceptors.interceptor1.serializers.s7.name = bytes
# agent2.sources.source1.interceptors.interceptor1.serializers.s8.name = requesttime
#  

################################################
# Describe Sink
################################################

# Sink ElasticSearch
# Elasticsearch lib ---> flume/lib
# elasticsearch/config/elasticsearch.yml cluster.name clusterName. data/clustername data.
agent2.sinks.sink1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
agent2.sinks.sink1.hostNames = 10.20.156.16:9300,10.20.176.20:9300
agent2.sinks.sink1.indexName = pdi
agent2.sinks.sink1.indexType = pdi_metrics
agent2.sinks.sink1.clusterName = My-ES-CLUSTER
agent2.sinks.sink1.batchSize = 1000
agent2.sinks.sink1.ttl = 2
#this serializer is crucial in order to use kibana
agent2.sinks.sink1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer



################################################
# Describe Channel
################################################

# Channel Memory
agent2.channels.channel1.type = memory
agent2.channels.channel1.capacity = 10000000
agent2.channels.channel1.transactionCapacity = 1000

################################################
# Bind the source and sink to the channel
################################################

agent2.sources.source1.channels = channel1
agent2.sinks.sink1.channel = channel1
链接地址: http://www.djcxy.com/p/88969.html

上一篇: Configure sink elasticsearch apache

下一篇: In POSIX, can a main(void) recover command line arguments?