reduce

The reduce proc is used to calculate reductions over the stream of points using reducers and a few arguments to get the desired calculation done.

...  | reduce(every=moment.forever(),
              reset=True,
              by=None,
              *field_assignments) | ...
Argument Description Required?
every the interval at which to compute the reduction over. No, default: moment.forever()
reset specifies if we should reset the reducers at the end of every interval No, default: True
by specifies the list of fields to calculate the reductions over No, default: time
*field_assignments list of field assignments to make to each point No, default: None

A field assignment can be as simple as foo='bar' or can also be an assignment to a reducer such as foo=count().

count days in each month

from flume import *

(
    emit(limit=365, start='2015-01-01', every='1d')
    | reduce(count=count(), every='1 month')
    | write('stdio')
).execute()

The above would produce the output:

{"count": 31, "time": "2015-01-01T00:00:00.000Z"}
{"count": 28, "time": "2015-02-01T00:00:00.000Z"}
{"count": 31, "time": "2015-03-01T00:00:00.000Z"}
{"count": 30, "time": "2015-04-01T00:00:00.000Z"}
{"count": 31, "time": "2015-05-01T00:00:00.000Z"}
{"count": 30, "time": "2015-06-01T00:00:00.000Z"}
{"count": 31, "time": "2015-07-01T00:00:00.000Z"}
{"count": 31, "time": "2015-08-01T00:00:00.000Z"}
{"count": 30, "time": "2015-09-01T00:00:00.000Z"}
{"count": 31, "time": "2015-10-01T00:00:00.000Z"}
{"count": 30, "time": "2015-11-01T00:00:00.000Z"}
{"count": 31, "time": "2015-12-01T00:00:00.000Z"}