logstash output to elasticsearch index and mapping
I'm trying to have logstash output to elasticsearch but I'm not sure how to use the mapping I defined in elasticsearch...
In Kibana, I did this:
Created an index and mapping like this:
PUT /kafkajmx2
{
  "mappings": {
    "kafka_mbeans": {
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "@version": {
          "type": "integer"
        },
        "host": {
          "type": "keyword"
        },
        "metric_path": {
          "type": "text"
        },
        "type": {
          "type": "keyword"
        },
        "path": {
          "type": "text"
        },
        "metric_value_string": {
          "type": "keyword"
        },
        "metric_value_number": {
          "type": "float"
        }
      }
    }
  }
}
Can write data to it like this:
POST /kafkajmx2/kafka_mbeans
{
  "metric_value_number":159.03478490788203,
  "path":"/home/usrxxx/logstash-5.2.0/bin/jmxconf",
  "@timestamp":"2017-02-12T23:08:40.934Z",
  "@version":"1","host":"localhost",
  "metric_path":"node1.kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec.FifteenMinuteRate",
  "type":null
}
now my logstash output looks like this:
input {
        kafka {
                kafka details here
        }
}
output {
    elasticsearch {
            hosts => "http://elasticsearch:9050"
            index => "kafkajmx2"
    }
}
 and it just writes it to the kafkajmx2 index but doesn't use the map, when I query it like this in kibana:  
get /kafkajmx2/kafka_mbeans/_search?q=*
{
}
I get this back:
      {
        "_index": "kafkajmx2",
        "_type": "logs",
        "_id": "AVo34xF_j-lM6k7wBavd",
        "_score": 1,
        "_source": {
          "@timestamp": "2017-02-13T14:31:53.337Z",
          "@version": "1",
          "message": """
{"metric_value_number":0,"path":"/home/usrxxx/logstash-5.2.0/bin/jmxconf","@timestamp":"2017-02-13T14:31:52.654Z","@version":"1","host":"localhost","metric_path":"node1.kafka.server:type=SessionExpireListener,name=ZooKeeperAuthFailuresPerSec.Count","type":null}
"""
        }
      }
 how do I tell it to use the map kafka_mbeans in the logstash output?  
-----EDIT-----
I tried my output like this but still get the same results:
output {
        elasticsearch {
                hosts => "http://10.204.93.209:9050"
                index => "kafkajmx2"
                template_name => "kafka_mbeans"
                codec => plain {
                        format => "%{message}"
                }
        }
}
the data in elastic search should look like this:
{
  "@timestamp": "2017-02-13T14:31:52.654Z", 
  "@version": "1", 
  "host": "localhost", 
  "metric_path": "node1.kafka.server:type=SessionExpireListener,name=ZooKeeperAuthFailuresPerSec.Count", 
  "metric_value_number": 0, 
  "path": "/home/usrxxx/logstash-5.2.0/bin/jmxconf", 
  "type": null
}
--------EDIT 2--------------
I atleast got the message to parse into json by adding a filter like this:
input {
        kafka {
                ...kafka details....
        }
}
filter {
        json {
                source => "message"
                remove_field => ["message"]
        }
}
output {
        elasticsearch {
                hosts => "http://node1:9050"
                index => "kafkajmx2"
                template_name => "kafka_mbeans"
        }
}
It doesn't use the template still but this atleast parses the json correctly...so now I get this:
  {
    "_index": "kafkajmx2",
    "_type": "logs",
    "_id": "AVo4a2Hzj-lM6k7wBcMS",
    "_score": 1,
    "_source": {
      "metric_value_number": 0.9967205071482902,
      "path": "/home/usrxxx/logstash-5.2.0/bin/jmxconf",
      "@timestamp": "2017-02-13T16:54:16.701Z",
      "@version": "1",
      "host": "localhost",
      "metric_path": "kafka1.kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent.Value",
      "type": null
    }
  }
 What you need to change is very simple.  First use the json codec in your kafka input.  No need for the json filter, you can remove it.  
    kafka {
            ...kafka details....
            codec => "json"
    }
 Then in your elasticsearch output you're missing the mapping type (parameter document_type below), which is important otherwise it defaults to logs (as you can see) and that doesn't match your kafka_mbeans mapping type.  Moreover, you don't really need to use template since your index already exists.  Make the following modification:  
    elasticsearch {
            hosts => "http://node1:9050"
            index => "kafkajmx2"
            document_type => "kafka_mbeans"
    }
 This is defined with the template_name parameter on the elasticsearch output.  
elasticsearch {
        hosts         => "http://elasticsearch:9050"
        index         => "kafkajmx2"
        template_name => "kafka_mbeans"
}
One warning, though. If you want to start creating indexes that are boxed on time, such as one index a week, you will have to take a few more steps to ensure your mapping stays with each. You have a couple of options there:
kafkajmx2-*  template parameter on the output, which specifies a JSON file that defines your mapping that will be used with all indexes created through that output.  