reduce
The reduce proc is used to calculate reductions over the stream of points
using reducers and a few arguments to get the desired calculation
done.
...  | reduce(every=moment.forever(),
              reset=True,
              by=None,
              *field_assignments) | ...
| Argument | Description | Required? | 
|---|---|---|
| every | the interval at which to compute the reduction over. | No, default: moment.forever() | 
| reset | specifies if we should reset the reducers at the end of everyinterval | No, default: True | 
| by | specifies the list of fields to calculate the reductions over | No, default: time | 
| *field_assignments | list of field assignments to make to each point | No, default: None | 
A field assignment can be as simple as foo='bar' or can also be an assignment
to a reducer such as foo=count().
count days in each month
from flume import *
(
    emit(limit=365, start='2015-01-01', every='1d')
    | reduce(count=count(), every='1 month')
    | write('stdio')
).execute()
The above would produce the output:
{"count": 31, "time": "2015-01-01T00:00:00.000Z"}
{"count": 28, "time": "2015-02-01T00:00:00.000Z"}
{"count": 31, "time": "2015-03-01T00:00:00.000Z"}
{"count": 30, "time": "2015-04-01T00:00:00.000Z"}
{"count": 31, "time": "2015-05-01T00:00:00.000Z"}
{"count": 30, "time": "2015-06-01T00:00:00.000Z"}
{"count": 31, "time": "2015-07-01T00:00:00.000Z"}
{"count": 31, "time": "2015-08-01T00:00:00.000Z"}
{"count": 30, "time": "2015-09-01T00:00:00.000Z"}
{"count": 31, "time": "2015-10-01T00:00:00.000Z"}
{"count": 30, "time": "2015-11-01T00:00:00.000Z"}
{"count": 31, "time": "2015-12-01T00:00:00.000Z"}