Configure sink elasticsearch apache
This is my first time here, so sorry if I don't post fine, and sorry for my bad English.
I'm trying to configure Apache Flume and Elasticsearch sinks. Everything is ok, it seems that it runs fine, but there are 2 warnings when I start an agent; the following ones:
2015-11-16 09:11:22,122 (lifecycleSupervisor-1-3) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@ce359aa counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoSuchMethodError: org.elasticsearch.common.transport.InetSocketTransportAddress.<init>(Ljava/lang/String;I)V
at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.configureHostnames(ElasticSearchTransportClient.java:143)
at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.<init>(ElasticSearchTransportClient.java:77)
at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48)
at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:357)
at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2015-11-16 09:11:22,137 (lifecycleSupervisor-1-3) [WARN - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:260)] Component SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@ce359aa counterGroup:{ name:null counters:{} } } stopped, since it could not besuccessfully started due to missing dependencies
My agent configuration:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink ES
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = items
a1.sinks.k1.indexType = item
a1.sinks.k1.clusterName = elasticsearch
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer=org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
It starts the netcat and all is fine, but I'm afraid about theses warnings, I don't understand it.
I found a reason, it seems that Apache Flume 1.6.0 and Elasticsearch 2.0 cant communicate right.
I found a good sink from a 3rd person that I modified.
Here is the link
And this is my final configuration,
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink ES
a1.sinks = k1
a1.sinks.k1.type = com.frontier45.flume.sink.elasticsearch2.ElasticSearchSink
a1.sinks.k1.hostNames = 127.0.0.1:9300
a1.sinks.k1.indexName = items
a1.sinks.k1.indexType = item
a1.sinks.k1.clusterName = elasticsearch
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = com.frontier45.flume.sink.elasticsearch2.ElasticSearchDynamicSerializer
a1.sinks.k1.indexNameBuilder = com.frontier45.flume.sink.elasticsearch2.TimeBasedIndexNameBuilder
a1.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
It works for me.
Thanks for answers.
PS yes, I had to move the libraries.
Attending to the logs, there is a problem with some missing dependency.
If you have a look to the ElasticSearchSink
documentation, you'll see the following:
The elasticsearch and lucene-core jars required for your environment must be placed in the lib directory of the Apache Flume installation. Elasticsearch requires that the major version of the client JAR match that of the server and that both are running the same minor version of the JVM. SerializationExceptions will appear if this is incorrect. To select the required version first determine the version of elasticsearch and the JVM version the target cluster is running. Then select an elasticsearch client library which matches the major version. A 0.19.x client can talk to a 0.19.x cluster; 0.20.x can talk to 0.20.x and 0.90.x can talk to 0.90.x. Once the elasticsearch version has been determined then read the pom.xml file to determine the correct lucene-core JAR version to use. The Flume agent which is running the ElasticSearchSink should also match the JVM the target cluster is running down to the minor version.
Most probably you did not place the required Java jars, or the version is not the appropriate one.
Added below 2 JARs only in flume/lib dir and it worked, do not have to add all other Lucene JARs:
elasticsearch-1.7.1.jar
lucene-core-4.10.4.jar
command to start flume:
bin/flume-ng agent --conf conf --conf-file conf/flume-aggregator.conf --name agent2 -Dflume.root.logger=INFO,console
make sure to add below to flume-env.sh
export JAVA_HOME=/usr/java/default
export JAVA_OPTS="-Xms3072m -Xmx3072m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
FLUME_CLASSPATH="/usr/flume/flume1.6/apache-flume-1.6.0-bin/;/usr/flume/flume1.6/apache-flume-1.6.0-bin/lib"
Flume aggregator config to load data in ES : flume-aggregator.conf
agent2.sources = source1
agent2.sinks = sink1
agent2.channels = channel1
################################################
# Describe Source
################################################
# Source Avro
agent2.sources.source1.type = avro
agent2.sources.source1.bind = 0.0.0.0
agent2.sources.source1.port = 9997
################################################
# Describe Interceptors
################################################
# an example of nginx access log regex match
# agent2.sources.source1.interceptors = interceptor1
# agent2.sources.source1.interceptors.interceptor1.type = regex_extractor
#
# agent2.sources.source1.interceptors.interceptor1.regex = "^(S+) [(.*?)] "(.*?)" (S+) (S+)( "(.*?)" "(.*?)")?"
#
# # agent2.sources.source1.interceptors.interceptor1.regex = ^(.*) ([a-zA-Z.@-+_%]+) ([a-zA-Z.@-+_%]+) [(.*)] "(POST|GET) ([A-Za-z0-9$.+@#%_/-]*)??(.*) (.*)" ([a-zA-Z0-9./s-]*) (.*) ([0-9]+) ([0-9]+) ([0-9.]+)
# # agent2.sources.source1.interceptors.interceptor1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 s9 s10 s11 s12 s13
#
# agent2.sources.source1.interceptors.interceptor1.serializers = s1 s2 s3 s4 s5 s6 s7 s8
# agent2.sources.source1.interceptors.interceptor1.serializers.s1.name = clientip
# agent2.sources.source1.interceptors.interceptor1.serializers.s2.name = datetime
# agent2.sources.source1.interceptors.interceptor1.serializers.s3.name = method
# agent2.sources.source1.interceptors.interceptor1.serializers.s4.name = request
# agent2.sources.source1.interceptors.interceptor1.serializers.s5.name = response
# agent2.sources.source1.interceptors.interceptor1.serializers.s6.name = status
# agent2.sources.source1.interceptors.interceptor1.serializers.s7.name = bytes
# agent2.sources.source1.interceptors.interceptor1.serializers.s8.name = requesttime
#
################################################
# Describe Sink
################################################
# Sink ElasticSearch
# Elasticsearch lib ---> flume/lib
# elasticsearch/config/elasticsearch.yml cluster.name clusterName. data/clustername data.
agent2.sinks.sink1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
agent2.sinks.sink1.hostNames = 10.20.156.16:9300,10.20.176.20:9300
agent2.sinks.sink1.indexName = pdi
agent2.sinks.sink1.indexType = pdi_metrics
agent2.sinks.sink1.clusterName = My-ES-CLUSTER
agent2.sinks.sink1.batchSize = 1000
agent2.sinks.sink1.ttl = 2
#this serializer is crucial in order to use kibana
agent2.sinks.sink1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer
################################################
# Describe Channel
################################################
# Channel Memory
agent2.channels.channel1.type = memory
agent2.channels.channel1.capacity = 10000000
agent2.channels.channel1.transactionCapacity = 1000
################################################
# Bind the source and sink to the channel
################################################
agent2.sources.source1.channels = channel1
agent2.sinks.sink1.channel = channel1
链接地址: http://www.djcxy.com/p/88970.html