The first part of the Kafka Streams API blog series covered stateless functions such as filter
, map
etc. In this part, we will explore stateful operations in the Kafka Streams DSL API. It focuses on aggregation operations such as aggregate
, count
, reduce
along with a discussion of related concepts.
Aggregation
Aggregation operation is applied to records of the same key. Kafka Streams supports the following aggregations - aggregate
, count
, reduce
. As mentioned in the previous blog, grouping is a pre-requisite for aggregation. You can run groupBy
(or its variations) on a KStream
or a KTable
which results in a KGroupedStream
and KGroupedTable respectively
.
KTable
grouping was not covered in the stateless operations blog
aggregate
The aggregate
function has two key components - Initializer
and Aggregator
. When the first record is received, the Initializer
is invoked and it's used as a starting point for the Aggregator
. For subsequent records, the Aggregator uses the current record along with the computed aggregate (until now) for its calculation. Conceptually, this is a stateful computation being performed on an infinite data set - it is stateful because calculating the current state takes into account the current state (the key-value record) along with the latest state (current aggregate). This can be used for scenarios such as moving average, sum, count, etc.
Here is an example of how you can calculate the count i.e. number of times a specific key was received
code examples are available on GitHub
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
KTable<String, Count> aggregate = stream.groupByKey()
.aggregate(new Initializer<Count>() {
@Override
public Count apply() {
return new Count("", 0);
}
}, new Aggregator<String, String, Count>() {
@Override
public Count apply(String k, String v, Count aggKeyCount) {
Integer currentCount = aggKeyCount.getCount();
return new Count(k, currentCount + 1);
}
});
aggregate.toStream()
.map((k,v) -> new KeyValue<>(k, v.getCount()))
.to(COUNTS_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()));
count
count
is such a commonly used form of aggregation that it is offered as a first class operation. Once you have the stream records grouped by key (KGroupedStream
) you can count the number of records of a specific key by using this operation.
The aggregate
way of doing things can be replaced by a single method call!
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.groupByKey().count();
reduce
You can use reduce
to combine the stream of values. The aggregate
operation which was covered earlier is a generalized form of reduce
. You can implement functionality such as sum
, min
, max
etc. Here is an example of max
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.Long()));
stream.groupByKey()
.reduce(new Reducer<Long>() {
@Override
public Long apply(Long currentMax, Long v) {
Long max = (currentMax > v) ? currentMax : v;
return max;
}
}).toStream().to(OUTPUT_TOPIC);
return builder.build();
Note that all the aggregation operations ignore records with
null
key which is obvious since the very goal of these set of functions is to operate on records of a specific key
Aggregation and state stores
In the above examples, the aggregated values were pushed to an output topic - this is not mandatory though. It is possible to store the aggregation results in local state stores. Here is an example:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.groupByKey().count(Materialized.as("count-store"));
In the above example, the call to count
also creates a local state store named count-store
which can then be introspected using Interactive Queries.
These state stores can either be in-memory or stored on disk using RocksDB
. This allows for scalability
since each state store is present locally in the specific Kafka Streams application which processes inputs from different partitions of a topic - thus, the overall state is distributed across (potential) multiple instances of your application (except in case of GlobalKTable
s). Another key property is high availability
because the contents of these state stores are backed up into Kafka as changelog
aka compacted
topics (although this can be disabled) which provides high availability - if an app instance crashes, it's state store contents can be restored from Kafka itself
KGroupedTable
A KGroupedTable
is obtained when groupBy
* operations are invoked on a KTable
. Just like KGroupedStream
, having a KGroupedTable
is a pre-requisite for applying aggregation on a KTable
. aggregate
, count
and reduce
work the same way in KGroupedTable
as they do with a KGrou`pedStream. But, there is an important difference that needs to be highlighted.
A KTable
is conceptually different from a KStream
in the sense that it represents a snapshot of the data at a point in time (very much like a database table). It is a mutable entity as opposed to a KStream
which represents an immutable + infinite sequence of records. To factor this difference, the aggregate
and reduce
functions in a KGroupedTable
also add an additional Aggregator
(often known as a subtractor) and it is invoked when a key is updated or a null
value is obtained.
Windowing
Stateful Kafka Streams operations also support Windowing
. This allows you to scope your stream processing pipelines to a specific time window/range e.g. track no. of link clicks per minute or no. of unique page views per hour
To perform Window
ed aggregations on a group of records, you will have to create a KGroupedStream
(as explained above) using groupBy
on a KStream
and then using the windowedBy
operation (available in two overloaded forms). You can choose between traditional windows (tumbling, hopping or sliding) or session-based time windows
Using windowedBy(Windows<W> windows)
on a KGroupedStream
returns a TimeWindowedKStream
on top of which you can invoke the above mentioned aggregate operations. For e.g. if you want number of clicks over a specific time range (say 5 mins), choose a tumbling time window. This will ensure that the records are clearly segregated across the given time boundaries i.e. clicks from user1 from 10 AM to 10:05 AM will be aggregated (counted) separately and a new time block (window) starts from 10:06 AM during which the clicks counter is reset to zero and counted again
`
StreamsBuilder builder = new StreamsBuilder();
KStream stream = builder.stream(INPUT_TOPIC);
TimeWindowedKStream windowed = stream.groupByKey().windowedBy(TimeWindows.of(Duration.ofMinutes(5)));
windowed.count().toStream().to(OUTPUT_TOPIC);
`
Other window types include:
-
Tumbling
time windows which never overlap i.e. a record will only be part of a one window... - ... in contrast to
Hopping
time windows where records can be present in one or more time range/window -
Sliding
time windows are meant for use with Joining operations
There is another type of stateful operation which is
Joining
. It is an extensive topic which deserves an entire post (or maybe another series?) by itself
If you want to take into account the "session" i.e. period of activity separated by a defined gap of inactivity, please use windowedBy(SessionWindows windows)
which returns a SessionWindowedKStream
.
`
StreamsBuilder builder = new StreamsBuilder();
KStream stream = builder.stream(INPUT_TOPIC);
stream.groupByKey().windowedBy(SessionWindows.with(Duration.ofMinutes(5)))
.toStream().to(OUTPUT_TOPIC);
return builder.build();
`
That's all for this part of Kafka Streams blog series. Stay tuned for the next part which will demonstrate how to test Kafka Streams applications using the in-built test utilities.
References
Please don't forget to check out the following resources for Kafka Streams