elastic adapter
The elastic adapter can be used to read and write data from a running elasticsearch instance. The currently supported versions of elasticsearch that we have integration tests to verify against is 2.3.3 . We'll be adding verification of a view other versions shortly and if you want to see support for a specific version please let us know on our github issues tracking system.
read
Reading from the elastic adapter is done using the read source with the following options available at this time:
read('elastic',
index=None,
type='metric',
host='localhost',
port=9200,
time='time',
filter=None,
batch=1024) | ...
Argument | Description | Required? |
---|---|---|
index | elasticsearch index to read from | No, default: _all for read, metrics for write |
type | elasticsearch document type to read | No, default: metric |
host | hostname of the elasticsearch instance | No, default: localhost |
port | port of the elasticsearch instance | No, default: 9200 |
time | field name that contains valid timestamp | No, default: time |
filter | filter expression to run against the es data (*) | No, default: None |
batch | the read/write batch size | No, default: 1024 |
- (*) filter expressions are further explained here
write
Writing to the elastic adapter is done using the write sink with the following options available at this time:
... | write('elastic',
index='_all',
type='metric',
host='localhost',
port=9200,
time='time',
filter=None,
batch=1024)
Argument | Description | Required? |
---|---|---|
index | elasticsearch index to write to | No, default: _all |
type | elasticsearch document type to write | No, default: metric |
host | hostname of the elasticsearch instance | No, default: localhost |
port | port of the elasticsearch instance | No, default: 9200 |
writing a few points to elastic
from flume import emit, write
(
emit(limit=10, start='2013-01-01', every='day')
| write('elastic', index='test-index')
).execute()
To test out the above using a quick elasticsearch instance spun up using docker just use the following quick command line:
docker run -p 9200:9200 elasticsearch:2.3.3
Then you can execute the program, but there will be no output and you can easily verify your data was stored to that local instance by hitting:
http://localhost:9200/test-index/_search
Where the JSON response should have 10 hits and you should be able to see the
_source
field contains the timestamps from 2013-01-01T00:00:00.000Z
to
2013-01-101T00:00:00.000Z
(they're not sorted at this point).
reading points from elastic
from flume import read, write
(
read('elastic', index='test-index')
| write('stdio')
).execute()
Which produces the following output:
> python test.py
{"time": "2013-01-01T00:00:00.000Z"}
{"time": "2013-01-02T00:00:00.000Z"}
{"time": "2013-01-03T00:00:00.000Z"}
{"time": "2013-01-04T00:00:00.000Z"}
{"time": "2013-01-05T00:00:00.000Z"}
{"time": "2013-01-06T00:00:00.000Z"}
{"time": "2013-01-07T00:00:00.000Z"}
{"time": "2013-01-08T00:00:00.000Z"}
{"time": "2013-01-09T00:00:00.000Z"}
{"time": "2013-01-10T00:00:00.000Z"}