first steps
For most of the programs below we'll use the flume CLI since it allows us to quickly write and iterate on a program while immediately seeing the result of executing that program.
your first flume program
Lets start with writing a program where we generate some points using a special source called emit which can emit points to simulate real world data:
flume "emit(limit=10) | write('stdio')"
The above emits 10 points to the sink write and pushing those writes through the stdio adapter. The output will look like so (each point is emitted in realtime):
{"time": "2016-07-17T19:55:40.853Z"}
{"time": "2016-07-17T19:55:41.853Z"}
{"time": "2016-07-17T19:55:42.853Z"}
{"time": "2016-07-17T19:55:43.853Z"}
{"time": "2016-07-17T19:55:44.853Z"}
{"time": "2016-07-17T19:55:45.853Z"}
{"time": "2016-07-17T19:55:46.853Z"}
{"time": "2016-07-17T19:55:47.853Z"}
{"time": "2016-07-17T19:55:48.853Z"}
{"time": "2016-07-17T19:55:49.853Z"}
Those points emit in realtime one by one since the default
value for the argument start
is set to now, you can set start
to
something such as 2013-01-01
and it would emit those
points immediately.
reading from a log file
Now let's do something more interesting and read actual data from a real source
such as the syslog file in examples/grok/syslog
(download it from the source and change the path in the file
argument below,
if you don't happen to not be working on a copy of the source of flume):
flume "read('stdio', format='grok', pattern='%{SYSLOGLINE}', file='examples/grok/syslog') | write('stdio')"
The above uses quite a few things to achieve the desired result of parsing the
syslog file into data points in the flume stream. So there's the read
processor which uses the stdio adapter to parse the file
examples/grok/syslog
using the grok
stream parser. We don't have actual a time
field in our data so we should tell
the read
processor which field in our data is a time field, like so:
flume "read('stdio', format='grok', pattern='%{SYSLOGLINE}', file='examples/grok/syslog', time='timestamp') | write('stdio')"
We could now do something interesting to our data such as calculating how many log lines we have per hour in this file. So we have the reduce processor which is used to calculate reductions on our stream. Nothing like an example to better show case how reduce is used:
flume "read('stdio', format='grok', pattern='%{SYSLOGLINE}', file='examples/grok/syslog', time='timestamp') | reduce(count=count(), every='1h') | write('stdio')"
Our command line is getting a little difficult to write on a single command line so we could use some feature to short hand certain parts of our pipeline. This is all python code so really there already exists such shorthands by simply defining new python functions that wrap existing flume procesors like so:
def syslog(filename):
return read('stdio', format='grok', pattern='%{SYSLOGLINE}', file=filename, time='timestamp')
And to use your new syslog helper/alias you simply need to create a local
file with the name .flumerc.py
which can contain utilities you can use
when running the flume command line tool. The .flumerc.py
file should look
like so:
from flume import *
def syslog(file):
return read('stdio', format='grok', pattern='%{SYSLOGLINE}', file=file, time='timestamp')
The .flumerc.py
file can be used to define anything you'd like to expose
globally for your flume command line programs. With the above .flumerc.py
file in your current working directory or globally accessible in your home (~/)
directory you can now run the earlier program like so:
> flume "syslog('examples/grok/syslog') | reduce(count=count(), every='1h') | write('stdio')"
{"count": 27, "time": "2016-07-17T13:59:44.000Z"}
{"count": 1, "time": "2016-07-17T14:59:44.000Z"}
{"count": 110, "time": "2016-07-17T15:59:44.000Z"}
{"count": 8, "time": "2016-07-17T16:59:44.000Z"}
{"count": 118, "time": "2016-07-17T17:59:44.000Z"}
{"count": 10, "time": "2016-07-17T18:59:44.000Z"}
That is a lot easier to read and write on the command line and also highlights the main reason I wanted flume to be just an extension of the python runtime where you can simply use existing familiar constructs to build parts of the flume pipeline.
Now what if we actually wanted to get the count of lines generated by each
program writing to the syslog file. This can be easily achieved using the
argument by
to the reduce processor like so:
> flume "syslog('examples/grok/syslog') | reduce(count=count(), by=['program']) | write('stdio')"
{"count": 178, "program": "kernel", "time": "2016-07-17T13:59:44.000Z"}
{"count": 21, "program": "laptop-mode", "time": "2016-07-17T13:59:44.000Z"}
{"count": 18, "program": "wpa_supplicant", "time": "2016-07-17T13:59:44.000Z"}
{"count": 14, "program": "anacron", "time": "2016-07-17T13:59:44.000Z"}
{"count": 19, "program": "CRON", "time": "2016-07-17T13:59:44.000Z"}
{"count": 3, "program": "cinnamon-screensaver-dialog", "time": "2016-07-17T13:59:44.000Z"}
{"count": 18, "program": "NetworkManager", "time": "2016-07-17T13:59:44.000Z"}
{"count": 3, "program": "console-kit-daemon", "time": "2016-07-17T13:59:44.000Z"}
Making the above easier to read we could sort by count
using the sort
processor and get something like so:
> flume "syslog('examples/grok/syslog') | reduce(count=count(), by=['program']) | sort('count') | write('stdio')"
{"count": 3, "program": "cinnamon-screensaver-dialog"}
{"count": 3, "program": "console-kit-daemon"}
{"count": 14, "program": "anacron"}
{"count": 18, "program": "wpa_supplicant"}
{"count": 18, "program": "NetworkManager"}
{"count": 19, "program": "CRON"}
{"count": 21, "program": "laptop-mode"}
{"count": 178, "program": "kernel"}
Which makes it easy to see that the kernel
is responsible for the majority
of log lines in our syslog file. For those wondering why the time
field just
disappeared from our output it's because we can't continue to do other things
downstream with points if they're not in chronological order.
At this point I'd bet some of you are saying well I can totally do all of the above with my GNU command line tools. Of course you can and it would probably look something like so:
> cat examples/grok/syslog | awk '{print $5}' | sed 's/\[[0-9]*\]//g' | sort | uniq -c | sort -n
3 cinnamon-screensaver-dialog:
3 console-kit-daemon:
14 anacron:
18 NetworkManager:
18 wpa_supplicant:
19 CRON:
21 laptop-mode:
178 kernel:
It is actually shorter than using flume but I doubt you'll find many people who can read that in a single pass and understand what it does.
reading from an HTTP request
We could have presented this earlier since with the http you can read the syslog file mentioned in the previous section directly from github like so:
flume "read('http', url='https://raw.githubusercontent.com/rlgomes/flume/master/examples/grok/syslog', format='grok', pattern='%{SYSLOGLINE}', time='timestamp') | write('stdio')"
The above is getting a bit out of hand in terms of the length of the flume
program so one could shorten it by creating a .flumerc.py
helper like so:
def http_syslog(url):
return read('http', url=url, format='grok', pattern='%{SYSLOGLINE}', time='timestamp')
Which allows us to shorten the previous command line to simply:
flume "http_syslog('https://raw.githubusercontent.com/rlgomes/flume/master/examples/grok/syslog') | write('stdio')"
This example is to really highlight how simple the transition is from reading data from a local log file to reading an HTTP end point in order to get real data into your pipeline.
running flume in Python
The previous examples focused on showing off how to use flume from the
command line but its ultimately intended to be used from your own Python source
code and so lets dive into how that works. Flume itself is just a python
module that exports the various sources
,sinks
and procs
so you can use
them when building a flume pipeline. To stat you must import the elements
you want from flume like so:
from flume import emit, count, put, write
Then you can construct your flume pipeline as you did on the command line:
from flume import emit, count, put, write
pipeline = emit(limit=5) | put(count=count()) | write('stdio')
Now if you execute the above Python program you won't get any output and
that is because all you did above was construct a flume pipeline and it
won't execute till you tell it to do so using the execute()
method, like so:
from flume import emit, count, put, write
pipeline = emit(limit=5) | put(count=count()) | write('stdio')
pipeline.execute()
Now executing the above will produce in realtime, the following output:
> python test.py
{"count": 1, "time": "2016-07-30T10:40:50.068Z"}
{"count": 2, "time": "2016-07-30T10:40:51.068Z"}
{"count": 3, "time": "2016-07-30T10:40:52.068Z"}
{"count": 4, "time": "2016-07-30T10:40:53.068Z"}
{"count": 5, "time": "2016-07-30T10:40:54.068Z"}
There we have our first flume pipeline in Python which can be rerun by
simple calling the execute()
method on the pipeline we constructed. The main
reason for this type of construct is that it allows you to control the order
of execution of multiple pipelines or even calculate a set of values in one
pipeline before you execute another pipeline that depends on the previous one.