How to use stateful operations in Kafka Streams?

Abhishek Gupta - Mar 11 '20 - - Dev Community

The first part of the Kafka Streams API blog series covered stateless functions such as filter, map etc. In this part, we will explore stateful operations in the Kafka Streams DSL API. It focuses on aggregation operations such as aggregate, count, reduce along with a discussion of related concepts.

Aggregation

Aggregation operation is applied to records of the same key. Kafka Streams supports the following aggregations - aggregate, count, reduce. As mentioned in the previous blog, grouping is a pre-requisite for aggregation. You can run groupBy (or its variations) on a KStream or a KTable which results in a KGroupedStream and KGroupedTable respectively.

KTable grouping was not covered in the stateless operations blog

aggregate

The aggregate function has two key components - Initializer and Aggregator. When the first record is received, the Initializer is invoked and it's used as a starting point for the Aggregator. For subsequent records, the Aggregator uses the current record along with the computed aggregate (until now) for its calculation. Conceptually, this is a stateful computation being performed on an infinite data set - it is stateful because calculating the current state takes into account the current state (the key-value record) along with the latest state (current aggregate). This can be used for scenarios such as moving average, sum, count, etc.

Here is an example of how you can calculate the count i.e. number of times a specific key was received

code examples are available on GitHub

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream(INPUT_TOPIC);

        KTable<String, Count> aggregate = stream.groupByKey()
                .aggregate(new Initializer<Count>() {
                    @Override
                    public Count apply() {
                        return new Count("", 0);
                    }
                }, new Aggregator<String, String, Count>() {
                    @Override
                    public Count apply(String k, String v, Count aggKeyCount) {
                        Integer currentCount = aggKeyCount.getCount();
                        return new Count(k, currentCount + 1);
                    }
                });


        aggregate.toStream()
                 .map((k,v) -> new KeyValue<>(k, v.getCount()))
                 .to(COUNTS_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()));
Enter fullscreen mode Exit fullscreen mode

count

count is such a commonly used form of aggregation that it is offered as a first class operation. Once you have the stream records grouped by key (KGroupedStream) you can count the number of records of a specific key by using this operation.

The aggregate way of doing things can be replaced by a single method call!

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream(INPUT_TOPIC);

        stream.groupByKey().count();
Enter fullscreen mode Exit fullscreen mode

reduce

You can use reduce to combine the stream of values. The aggregate operation which was covered earlier is a generalized form of reduce. You can implement functionality such as sum, min, max etc. Here is an example of max

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, Long> stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.Long()));

        stream.groupByKey()
                .reduce(new Reducer<Long>() {
                    @Override
                    public Long apply(Long currentMax, Long v) {
                        Long max = (currentMax > v) ? currentMax : v;
                        return max;
                    }
                }).toStream().to(OUTPUT_TOPIC);

        return builder.build();
Enter fullscreen mode Exit fullscreen mode

Note that all the aggregation operations ignore records with null key which is obvious since the very goal of these set of functions is to operate on records of a specific key

Aggregation and state stores

In the above examples, the aggregated values were pushed to an output topic - this is not mandatory though. It is possible to store the aggregation results in local state stores. Here is an example:

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream(INPUT_TOPIC);

        stream.groupByKey().count(Materialized.as("count-store"));
Enter fullscreen mode Exit fullscreen mode

In the above example, the call to count also creates a local state store named count-store which can then be introspected using Interactive Queries.

These state stores can either be in-memory or stored on disk using RocksDB. This allows for scalability since each state store is present locally in the specific Kafka Streams application which processes inputs from different partitions of a topic - thus, the overall state is distributed across (potential) multiple instances of your application (except in case of GlobalKTables). Another key property is high availability because the contents of these state stores are backed up into Kafka as changelog aka compacted topics (although this can be disabled) which provides high availability - if an app instance crashes, it's state store contents can be restored from Kafka itself

KGroupedTable

A KGroupedTable is obtained when groupBy* operations are invoked on a KTable. Just like KGroupedStream, having a KGroupedTable is a pre-requisite for applying aggregation on a KTable. aggregate, count and reduce work the same way in KGroupedTable as they do with a KGrou`pedStream. But, there is an important difference that needs to be highlighted.

A KTable is conceptually different from a KStream in the sense that it represents a snapshot of the data at a point in time (very much like a database table). It is a mutable entity as opposed to a KStream which represents an immutable + infinite sequence of records. To factor this difference, the aggregate and reduce functions in a KGroupedTable also add an additional Aggregator (often known as a subtractor) and it is invoked when a key is updated or a null value is obtained.

Windowing

Stateful Kafka Streams operations also support Windowing. This allows you to scope your stream processing pipelines to a specific time window/range e.g. track no. of link clicks per minute or no. of unique page views per hour

To perform Windowed aggregations on a group of records, you will have to create a KGroupedStream (as explained above) using groupBy on a KStream and then using the windowedBy operation (available in two overloaded forms). You can choose between traditional windows (tumbling, hopping or sliding) or session-based time windows

Using windowedBy(Windows<W> windows) on a KGroupedStream returns a TimeWindowedKStream on top of which you can invoke the above mentioned aggregate operations. For e.g. if you want number of clicks over a specific time range (say 5 mins), choose a tumbling time window. This will ensure that the records are clearly segregated across the given time boundaries i.e. clicks from user1 from 10 AM to 10:05 AM will be aggregated (counted) separately and a new time block (window) starts from 10:06 AM during which the clicks counter is reset to zero and counted again

`
StreamsBuilder builder = new StreamsBuilder();
KStream stream = builder.stream(INPUT_TOPIC);

TimeWindowedKStream windowed = stream.groupByKey().windowedBy(TimeWindows.of(Duration.ofMinutes(5)));

windowed.count().toStream().to(OUTPUT_TOPIC);
`

Other window types include:

  • Tumbling time windows which never overlap i.e. a record will only be part of a one window...
  • ... in contrast to Hopping time windows where records can be present in one or more time range/window
  • Sliding time windows are meant for use with Joining operations

There is another type of stateful operation which is Joining. It is an extensive topic which deserves an entire post (or maybe another series?) by itself

If you want to take into account the "session" i.e. period of activity separated by a defined gap of inactivity, please use windowedBy(SessionWindows windows) which returns a SessionWindowedKStream.

`
StreamsBuilder builder = new StreamsBuilder();
KStream stream = builder.stream(INPUT_TOPIC);

    stream.groupByKey().windowedBy(SessionWindows.with(Duration.ofMinutes(5)))
                       .toStream().to(OUTPUT_TOPIC);
Enter fullscreen mode Exit fullscreen mode

return builder.build();
`

That's all for this part of Kafka Streams blog series. Stay tuned for the next part which will demonstrate how to test Kafka Streams applications using the in-built test utilities.

References

Please don't forget to check out the following resources for Kafka Streams

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
Terabox Video Player