overview
basics
A flume program is pipeline of flume nodes that read points from a source and process those points using various flume procs (short for processors) which in turn push the points to a sink. Here's the basic structure of a flume program:
source | proc1 | ... | procN | sink
Now the above is a very simplified view of the world since a flume program can in fact consist of multiple sources, sinks and even split and rejoin streams of data at various points in the pipeline. For example you can read from multiple sources and merge the points like so:
(source1 ; source2 ; source3) | ... | sink
The parenthesis are used to expression parallel pipelines in flume and at
the end of the parenthesis block all those points are merged together in order
unless you use one of set procs such as union,
intersect or diff to create the new
stream applying the specific set operation you want applied. This same
ability to handle N
sources allows you to also split the stream at any point
in your pipeline. So here's another somewhat less abstract example:
source
| (
filter('value % 2 == 0') | put(even=count()),
filter('value % 2 != 0') | put(odd=count())
) | sink
The above wouldn't be possible to calculate unless we split the stream since we have to eliminate things from the stream on either side that are in direct conflict with the other type of calculation involved. In a more imperative way of doing this you'd also split the stream by keeping the intermediate value in two different variables tracking the odd vs even count.
streams and reducers
Flume can handle streams of points with and without a valid time field. The
main reason you want a time field is calculate reductions over your stream of
points. Without a time field you can do things such as adding fields to your
data and/or joining with other streams to decorate those with missing
information. Lets start by first diving into what a stream with a time
field
looks like:
emit(limit=10) | write('stdio')
The previous flume program uses the emit proc to create
points that contain just a time
field starting as of right now and creating
a point every second until we've "emitted" a total of 10 points. The output
of running the above with the flume CLI looks like so:
> flume "emit(limit=10) | write('stdio')"
{"time": "2016-07-30T15:18:15.562Z"}
{"time": "2016-07-30T15:18:16.562Z"}
{"time": "2016-07-30T15:18:17.562Z"}
{"time": "2016-07-30T15:18:18.562Z"}
{"time": "2016-07-30T15:18:19.562Z"}
{"time": "2016-07-30T15:18:20.562Z"}
{"time": "2016-07-30T15:18:21.562Z"}
{"time": "2016-07-30T15:18:22.562Z"}
{"time": "2016-07-30T15:18:23.562Z"}
{"time": "2016-07-30T15:18:24.562Z"}
The output format used is jsonlines which allows for a
true stream of data since each line represents a valid JSON object. The
previous stream has a time
field so we can used the reduce
proc to calculate time aligned reductions. This is done by specifying the
reduction we want to calculate and the new field we want to create with the
result of that reduction. Here's a simple example:
> flume "emit(limit=10) | reduce(count=count()) | write('stdio')"
{"count": 10, "time": "2016-07-30T15:22:30.033Z"}
Now the reducer used was count
which simply calculates for a given interval the number of points that passed
through the reduce proc during that time. The interval itself
is set with the argument every
to the reduce proc and in this case since
we didn't set it to anything we basically said to reduce over the full length
of our stream.