intersect
The intersect
proc is part of the set operators that can be used to join two
streams back into one but applying some set operator to the items of the
stream. In this case only allow points that appear in both streams to pass.
...
( ... , ....) | intersect(*fieldnames) | ...
Argument | Description | Required? |
---|---|---|
*fieldnames | list or tuple of field names to calculate the intersect on | No, default: time |
intersecting two streams
from flume import *
(
(emit(points=[
{'time': '2010-01-01T00:00:00.000Z', 'a': 0},
{'time': '2010-01-01T00:01:00.000Z', 'a': 1},
{'time': '2010-01-01T00:02:00.000Z', 'a': 2}
]),
emit(points=[
{'time': '2010-01-01T00:01:00.000Z', 'a': 1},
{'time': '2010-01-01T00:02:00.000Z', 'a': 2},
{'time': '2010-01-01T00:05:00.000Z', 'a': 3}
]))
| intersect()
| write('stdio')
).execute()
The above would produce the output:
{"a": 1, "time": "2010-01-01T00:01:00.000Z"}
{"a": 2, "time": "2010-01-01T00:02:00.000Z"}