first steps

For most of the programs below we'll use the flume CLI since it allows us to quickly write and iterate on a program while immediately seeing the result of executing that program.

your first flume program

Lets start with writing a program where we generate some points using a special source called emit which can emit points to simulate real world data:

flume "emit(limit=10) | write('stdio')"

The above emits 10 points to the sink write and pushing those writes through the stdio adapter. The output will look like so (each point is emitted in realtime):

{"time": "2016-07-17T19:55:40.853Z"}
{"time": "2016-07-17T19:55:41.853Z"}
{"time": "2016-07-17T19:55:42.853Z"}
{"time": "2016-07-17T19:55:43.853Z"}
{"time": "2016-07-17T19:55:44.853Z"}
{"time": "2016-07-17T19:55:45.853Z"}
{"time": "2016-07-17T19:55:46.853Z"}
{"time": "2016-07-17T19:55:47.853Z"}
{"time": "2016-07-17T19:55:48.853Z"}
{"time": "2016-07-17T19:55:49.853Z"}

Those points emit in realtime one by one since the default value for the argument start is set to now, you can set start to something such as 2013-01-01 and it would emit those points immediately.

reading from a log file

Now let's do something more interesting and read actual data from a real source such as the syslog file in examples/grok/syslog (download it from the source and change the path in the file argument below, if you don't happen to not be working on a copy of the source of flume):

flume "read('stdio', format='grok', pattern='%{SYSLOGLINE}', file='examples/grok/syslog') | write('stdio')"

The above uses quite a few things to achieve the desired result of parsing the syslog file into data points in the flume stream. So there's the read processor which uses the stdio adapter to parse the file examples/grok/syslog using the grok stream parser. We don't have actual a time field in our data so we should tell the read processor which field in our data is a time field, like so:

flume "read('stdio', format='grok', pattern='%{SYSLOGLINE}', file='examples/grok/syslog', time='timestamp') | write('stdio')"

We could now do something interesting to our data such as calculating how many log lines we have per hour in this file. So we have the reduce processor which is used to calculate reductions on our stream. Nothing like an example to better show case how reduce is used:

flume "read('stdio', format='grok', pattern='%{SYSLOGLINE}', file='examples/grok/syslog', time='timestamp') | reduce(count=count(), every='1h') | write('stdio')"

Our command line is getting a little difficult to write on a single command line so we could use some feature to short hand certain parts of our pipeline. This is all python code so really there already exists such shorthands by simply defining new python functions that wrap existing flume procesors like so:

def syslog(filename):
    return read('stdio', format='grok', pattern='%{SYSLOGLINE}', file=filename, time='timestamp')

And to use your new syslog helper/alias you simply need to create a local file with the name .flumerc.py which can contain utilities you can use when running the flume command line tool. The .flumerc.py file should look like so:

from flume import *

def syslog(file):
    return read('stdio', format='grok', pattern='%{SYSLOGLINE}', file=file, time='timestamp')

The .flumerc.py file can be used to define anything you'd like to expose globally for your flume command line programs. With the above .flumerc.py file in your current working directory or globally accessible in your home (~/) directory you can now run the earlier program like so:

> flume "syslog('examples/grok/syslog') | reduce(count=count(), every='1h') | write('stdio')"
{"count": 27, "time": "2016-07-17T13:59:44.000Z"}
{"count": 1, "time": "2016-07-17T14:59:44.000Z"}
{"count": 110, "time": "2016-07-17T15:59:44.000Z"}
{"count": 8, "time": "2016-07-17T16:59:44.000Z"}
{"count": 118, "time": "2016-07-17T17:59:44.000Z"}
{"count": 10, "time": "2016-07-17T18:59:44.000Z"}

That is a lot easier to read and write on the command line and also highlights the main reason I wanted flume to be just an extension of the python runtime where you can simply use existing familiar constructs to build parts of the flume pipeline.

Now what if we actually wanted to get the count of lines generated by each program writing to the syslog file. This can be easily achieved using the argument by to the reduce processor like so:

> flume "syslog('examples/grok/syslog') | reduce(count=count(), by=['program']) | write('stdio')"
{"count": 178, "program": "kernel", "time": "2016-07-17T13:59:44.000Z"}
{"count": 21, "program": "laptop-mode", "time": "2016-07-17T13:59:44.000Z"}
{"count": 18, "program": "wpa_supplicant", "time": "2016-07-17T13:59:44.000Z"}
{"count": 14, "program": "anacron", "time": "2016-07-17T13:59:44.000Z"}
{"count": 19, "program": "CRON", "time": "2016-07-17T13:59:44.000Z"}
{"count": 3, "program": "cinnamon-screensaver-dialog", "time": "2016-07-17T13:59:44.000Z"}
{"count": 18, "program": "NetworkManager", "time": "2016-07-17T13:59:44.000Z"}
{"count": 3, "program": "console-kit-daemon", "time": "2016-07-17T13:59:44.000Z"}

Making the above easier to read we could sort by count using the sort processor and get something like so:

> flume "syslog('examples/grok/syslog') | reduce(count=count(), by=['program']) | sort('count') | write('stdio')"
{"count": 3, "program": "cinnamon-screensaver-dialog"}
{"count": 3, "program": "console-kit-daemon"}
{"count": 14, "program": "anacron"}
{"count": 18, "program": "wpa_supplicant"}
{"count": 18, "program": "NetworkManager"}
{"count": 19, "program": "CRON"}
{"count": 21, "program": "laptop-mode"}
{"count": 178, "program": "kernel"}

Which makes it easy to see that the kernel is responsible for the majority of log lines in our syslog file. For those wondering why the time field just disappeared from our output it's because we can't continue to do other things downstream with points if they're not in chronological order.

At this point I'd bet some of you are saying well I can totally do all of the above with my GNU command line tools. Of course you can and it would probably look something like so:

> cat examples/grok/syslog | awk '{print $5}' | sed 's/\[[0-9]*\]//g' | sort | uniq -c | sort -n
      3 cinnamon-screensaver-dialog:
      3 console-kit-daemon:
     14 anacron:
     18 NetworkManager:
     18 wpa_supplicant:
     19 CRON:
     21 laptop-mode:
    178 kernel:

It is actually shorter than using flume but I doubt you'll find many people who can read that in a single pass and understand what it does.

reading from an HTTP request

We could have presented this earlier since with the http you can read the syslog file mentioned in the previous section directly from github like so:

flume "read('http', url='https://raw.githubusercontent.com/rlgomes/flume/master/examples/grok/syslog', format='grok', pattern='%{SYSLOGLINE}', time='timestamp') | write('stdio')"

The above is getting a bit out of hand in terms of the length of the flume program so one could shorten it by creating a .flumerc.py helper like so:

def http_syslog(url):
    return read('http', url=url, format='grok', pattern='%{SYSLOGLINE}', time='timestamp')

Which allows us to shorten the previous command line to simply:

flume "http_syslog('https://raw.githubusercontent.com/rlgomes/flume/master/examples/grok/syslog') | write('stdio')"

This example is to really highlight how simple the transition is from reading data from a local log file to reading an HTTP end point in order to get real data into your pipeline.

running flume in Python

The previous examples focused on showing off how to use flume from the command line but its ultimately intended to be used from your own Python source code and so lets dive into how that works. Flume itself is just a python module that exports the various sources,sinks and procs so you can use them when building a flume pipeline. To stat you must import the elements you want from flume like so:

from flume import emit, count, put, write

Then you can construct your flume pipeline as you did on the command line:

from flume import emit, count, put, write

pipeline = emit(limit=5) | put(count=count()) | write('stdio')

Now if you execute the above Python program you won't get any output and that is because all you did above was construct a flume pipeline and it won't execute till you tell it to do so using the execute() method, like so:

from flume import emit, count, put, write

pipeline = emit(limit=5) | put(count=count()) | write('stdio')
pipeline.execute()

Now executing the above will produce in realtime, the following output:

> python test.py 
{"count": 1, "time": "2016-07-30T10:40:50.068Z"}
{"count": 2, "time": "2016-07-30T10:40:51.068Z"}
{"count": 3, "time": "2016-07-30T10:40:52.068Z"}
{"count": 4, "time": "2016-07-30T10:40:53.068Z"}
{"count": 5, "time": "2016-07-30T10:40:54.068Z"}

There we have our first flume pipeline in Python which can be rerun by simple calling the execute() method on the pipeline we constructed. The main reason for this type of construct is that it allows you to control the order of execution of multiple pipelines or even calculate a set of values in one pipeline before you execute another pipeline that depends on the previous one.