The previous blog posts in the Kafka Streams series covered Stateless
and Stateful
operations in the DSL API. In this blog, we will explore a few examples to demonstrate how to use the testing utilities to validate topologies based on the Kafka Streams DSL API.
Kafka Streams provides testing utilities to execute unit tests for your stream processing pipelines without having to rely on an external or embedded Kafka cluster. In addition to testing, these utilities also serve as a great learning tool to grok various API features.
Let's start with a high-level overview of testing related APIs
Code is available on GitHub and tests can be executed by cloning the repo followed by
mvn test
Key concepts
Initially, there were a few classes in the org.apache.kafka.streams.test
package. They are now deprecated in favor of the following classes
TestInputTopic
An instance of TestInputTopic
represents an input topic and you can send records to it using the pipeInput
method (and its overloaded versions). Create TestInputTopic
instances using TopologyTestDriver
(explained below) and use custom serializers if needed. You can then send key-value pairs, just values one at a time or in a batch (using a List
)
TestOutputTopic
TestOutputTopic
is the other half of the send-receive equation and complements a TestInputTopic
. You can use it to read records from output topics that your topology operations write to. Its methods include reading records (key-values pairs), only the value, querying the size (no. of current records which have not been consumed), etc.
TopologyTestDriver
TopologyTestDriver
contains a reference to the Topology
as well the configuration related to your Kafka Streams application. As mentioned earlier, it is used to create instances of TestInputTopic
,TestOutputTopic
, provide access to state stores etc.
High level flow
If you're using Maven
, you can include the testing utility as a dependency
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>2.4.0</version>
<scope>test</scope>
</dependency>
and you will (most likely) use JUnit
and hamcrest
to write matching rules...
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
Here is how a test case might look like (similar to how you would unit test any Java code with JUnit
etc.)
- setup global state (if any) using
@BeforeClass
annotated method - setup state for each test run using
@Before
annotated method - this is where you would typically createTopologyTestDriver
etc. -
@Test
methods which validate theTopology
-
@After
(and/or@AfterClass
) methods for tearing down any state (be it global or otherwise)
Please ensure that you call
TopologyTestDriver.close()
to clean up processors in the topology and their associated state. Failure to do so might result in test failures due to inconsistent state
Now that you have an understanding of the concepts and basic setup, let's look at a few concrete examples. We'll start off with stateless operations
Testing stateless operations
filter
Here is the Topology
which uses the filter method to only allow values which have a length greater than five.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.filter((k, v) -> v.length() > 5).to(OUTPUT_TOPIC);
And here is the corresponding test:
@Test
public void shouldIncludeValueWithLengthGreaterThanFive() {
topology = App.retainWordsLongerThan5Letters();
td = new TopologyTestDriver(topology, config);
inputTopic = td.createInputTopic(App.INPUT_TOPIC, Serdes.String().serializer(), Serdes.String().serializer());
outputTopic = td.createOutputTopic(App.OUTPUT_TOPIC, Serdes.String().deserializer(), Serdes.String().deserializer());
assertThat(outputTopic.isEmpty(), is(true));
inputTopic.pipeInput("key1", "barrrrr");
assertThat(outputTopic.readValue(), equalTo("barrrrr"));
assertThat(outputTopic.isEmpty(), is(true));
inputTopic.pipeInput("key2", "bar");
assertThat(outputTopic.isEmpty(), is(true));
}
We start by choosing the Topology
we want to test, create the TopologyTestDriver
instance along with the TestInputTopic
and TestOutputTopic
objects.
Next, we confirm whether the output topic is empty before sending any data - assertThat(outputTopic.isEmpty(), is(true));
Now data/records can be sent to the input topic using inputTopic.pipeInput("key1", "barrrrr");
This is a synchronous process and triggers the Topology
which in this case executes the filter
operation and pushes this to the output topic since the value length is more than five. We confirm the same using assertThat(outputTopic.readValue(), equalTo("barrrrr"));
and double check to see whether the output topic is empty
Finally, we send the value bar
and confirm that it was not was sent to the output topic because its length is smaller than five.
flatMap
As explained in part 1 of this series (stateless operations), here is a flatMap
operation
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.flatMap(new KeyValueMapper<String, String, Iterable<? extends KeyValue<? extends String, ? extends String>>>() {
@Override
public Iterable<? extends KeyValue<? extends String, ? extends String>> apply(String k, String csv) {
String[] values = csv.split(",");
return Arrays.asList(values)
.stream()
.map(value -> new KeyValue<>(k, value))
.collect(Collectors.toList());
}
}).to(OUTPUT_TOPIC);
In the above example, each record in the stream gets flatMap
ped such that each CSV (comma separated) value is first split into its constituents and a KeyValue
pair is created for each part of the CSV string.
To test this....
topology = App.flatMap();
td = new TopologyTestDriver(topology, config);
inputTopic = td.createInputTopic(App.INPUT_TOPIC, Serdes.String().serializer(), Serdes.String().serializer());
outputTopic = td.createOutputTopic(App.OUTPUT_TOPIC, Serdes.String().deserializer(), Serdes.String().deserializer());
inputTopic.pipeInput("random", "foo,bar,baz");
inputTopic.pipeInput("hello", "world,universe");
inputTopic.pipeInput("hi", "there");
assertThat(outputTopic.getQueueSize(), equalTo(6L));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("random", "foo")));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("random", "bar")));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("random", "baz")));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("hello", "world")));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("hello", "universe")));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("hi", "there")));
assertThat(outputTopic.isEmpty(), is(true));
As usual, we setup the required test util classes and push input records to the input topic. e.g. for key random
and its comma-separated values foo,bar,baz
will be split into individual key-value pairs i.e. they will result in three records being pushed to the output table. The same applies to other input records as well.
We confirm the number of recorsd in the putput topic assertThat(outputTopic.getQueueSize(), equalTo(6L));
and validate each key-value pair to confirm the flatMap
behavior
Stateful operation without State store
Here is an example of a Topology which uses groupByKey
followed by count
and stores the results in an output topic
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.groupByKey()
.count()
.toStream()
.to(OUTPUT_TOPIC);
Testing a stateful operation is not very different than that of a stateless one.
topology = App.count();
td = new TopologyTestDriver(topology, config);
inputTopic = td.createInputTopic(App.INPUT_TOPIC, Serdes.String().serializer(), Serdes.String().serializer());
TestOutputTopic<String, Long> ot = td.createOutputTopic(App.OUTPUT_TOPIC, Serdes.String().deserializer(), Serdes.Long().deserializer());
inputTopic.pipeInput("key1", "value1");
inputTopic.pipeInput("key1", "value2");
inputTopic.pipeInput("key2", "value3");
inputTopic.pipeInput("key3", "value4");
inputTopic.pipeInput("key2", "value5");
assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key1", 1L)));
assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key1", 2L)));
assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key2", 1L)));
assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key3", 1L)));
assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key2", 2L)));
Individual records are sent to an input topic and the output topic and then the counts are validated. As expected, keys key1
, key2
and key3
have counts 2, 2, 1 respectively.
Stateful operation with a State store
Things get interesting when Topology consists of a state store. In this example, instead of sending the counds to an output topic, an intermediate state store is used (this can be queried via Interactive Queries)
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.groupByKey().count(Materialized.as("count-store"));
TopologyTestDriver provides access to the state store (KeyValueStore
) via getKeyValueStore
. The state store count is validated after each record is sent to the input topic e.g. assertThat(countStore.get("key1"), equalTo(1L));
topology = App.countWithStateStore();
td = new TopologyTestDriver(topology, config);
inputTopic = td.createInputTopic(App.INPUT_TOPIC, Serdes.String().serializer(), Serdes.String().serializer());
KeyValueStore<String, Long> countStore = td.getKeyValueStore("count-store");
inputTopic.pipeInput("key1", "value1");
assertThat(countStore.get("key1"), equalTo(1L));
inputTopic.pipeInput("key1", "value2");
assertThat(countStore.get("key1"), equalTo(2L));
inputTopic.pipeInput("key2", "value3");
assertThat(countStore.get("key2"), equalTo(1L));
inputTopic.pipeInput("key3", "value4");
assertThat(countStore.get("key3"), equalTo(1L));
inputTopic.pipeInput("key2", "value5");
assertThat(countStore.get("key2"), equalTo(2L));
Note that in our tests, we had created the Topology, TopologyTestDriver, TestInputTopic and TestOutputTopic in each of the tests method. This was simply because we were testing different Topologies. If you were testing a single Topology using a bunch of test cases as a part of single JUnit class, you can very easily move this to setup method annotated with
@Before
so that its runs automatically before the start of each test case
That's all for now! This was a short but hopefully useful introduction to testing your Kafka Streams based processing pipelines.