Welcome to this series of blog posts which covers Redis Streams with the help of a practical example. We will use a sample application to make Twitter data available for search and query in real-time. RediSearch and Redis Streams serve as the backbone of this solution that consists of several co-operating components, each of which will we covered in a dedicated blog post.
The code is available in this GitHub repo - https://github.com/abhirockzz/redis-streams-in-action
This blog post will cover a Java based Tweets processor application whose role is to pick up tweets from Redis Streams and store them (as a HASH
) so that they can be queried using RediSearch
(the accurate term for this is "indexing documents" in RediSearch
). You will deploy the application to Azure, validate it, run a few RediSearch
queries to search tweets. Finally, there is a section where we will walk through the code to understand "how things work".
Pre-requisites
Please make sure that you read part 2 of this series and have the Tweets consumer application up and running. This application will read tweets from the Twitter Streaming API and push them to Redis Streams. Our tweets processor app (the one described in this blog) will then take over.
You will need an Azure account which you can get for free and the Azure CLI. Like the previous application, this one will also be deployed to Azure Container Instances using regular Docker CLI commands. This capability is enabled by integration between Docker and Azure. Just ensure that you have Docker Desktop version 2.3.0.5 or later, for Windows, macOS, or install the Docker ACI Integration CLI for Linux.
Deploy the app to Azure Container Instances
If you've been following along from the previous blog post, you should have setup the Enterprise tier of Azure Cache for Redis, using this quickstart. Once you finish this step, ensure that you save the following information: the Redis host name and Access key
The application is available as a Docker container - the easiest way is to simply re-use it. If you wish to build you own image, please use the Dockerfile
available on the GitHub repo.
If you choose to build your own image, make sure to build the JAR file using Maven (
mvn clean install
) first
It's really convenient to deploy it to Azure Container Instances, that allows you to run Docker containers on-demand in a managed, serverless Azure environment.
Make sure you create an Azure context to associate Docker with an Azure subscription and resource group so you can create and manage container instances.
docker login azure
docker context create aci aci-context
docker context use aci-context
Set the environment variables - make sure to update Redis host and credentials as per your account:
export STREAM_NAME=tweets_stream # don't change
export STREAM_CONSUMER_GROUP_NAME=redisearch_app_group # don't change
export REDIS_HOST=<redis host port e.g. my-redis-host>
export REDIS_PORT=<redis port>
export REDIS_PASSWORD=<redis access key (password)>
export SSL=true
.. and then use docker run
to deploy the container to Azure:
docker run -d --name redis-streams-consumer \
-e STREAM_NAME=$STREAM_NAME \
-e STREAM_CONSUMER_GROUP_NAME=$STREAM_CONSUMER_GROUP_NAME \
-e REDIS_HOST=$REDIS_HOST \
-e REDIS_PORT=$REDIS_PORT \
-e REDIS_PASSWORD=$REDIS_PASSWORD \
-e SSL=$SSL \
abhirockzz/tweets-redis-streams-consumer-java
As the container is being created, you should see an output similar to this:
[+] Running 2/2
⠿ Group redis-streams-consumer Created 5.2s
⠿ redis-streams-consumer Created 10.5s
Validate this using the Azure portal:
To check the container logs, you can use the usual docker logs
command:
docker logs redis-streams-consumer
You should see an output similar to this:
Reading from stream tweets_stream with XREADGROUP
saved tweet to hash tweet:1393089239324282880
Reading from stream tweets_stream with XREADGROUP
saved tweet to hash tweet:1393089243539517441
Reading from stream tweets_stream with XREADGROUP
not processed - tweet:1393089247721132033
Reading from stream tweets_stream with XREADGROUP
saved tweet to hash tweet:1393089256105693184
Reading from stream tweets_stream with XREADGROUP
saved tweet to hash tweet:1393089260304179200
....
Notice the
not processed
logs? We will discuss them in the next section
Once the app is up and running, it will start consuming from tweets_stream
Redis Stream and store each tweet info in a separate HASH
, which in turn will be indexed by RediSearch
. Before moving on, login to the Redis instance using redis-cli:
redis-cli -h <hostname> -p 10000 -a <password> --tls
How are things looking?
If you see the logs carefully, you should be able to find the name of the HASH
(which is based on the tweet ID) e.g. tweet:<tweet id>
. Just inspect it's contents with HGETALL
:
redis-cli> TYPE tweet:1393089163856056320
redis-cli> hash
redis-cli> HGETALL tweet:1393089163856056320
The result will look like any other HASH
. For e.g.
1) "location"
2) "Nairobi, Kenya"
3) "text"
4) "RT @WanjaNjubi: #EidMubarak \xf0\x9f\x99\x8f\nMay peace be upon you now and always.\n#EidUlFitr https://t.co/MlL0DbM2aS"
5) "id"
6) "1393089163856056320"
7) "user"
8) "Hot_96Kenya"
9) "hashtags"
10) "EidMubarak,EidUlFitr"
Alright, its time to query tweets with RediSearch
! Let's use a few commands to search the tweets-index
index:
-
FT.SEARCH tweets-index hello
- will return tweets which -
FT.SEARCH tweets-index hello|world
- its the same as above, just that it's applicable for "hello" OR "world" - Use
FT.SEARCH tweets-index "@location:India"
if you're interested in tweets from a specificlocation
-
FT.SEARCH tweets-index "@user:jo* @location:India"
- this combineslocation
along with a criteria that the username should start withjo
-
FT.SEARCH tweets-index "@user:jo* | @location:India"
- this is subtle variant of the above.|
signifies anOR
criteria - You can search using hash tags as well -
FT.SEARCH tweets-index "@hashtags:{cov*}
- Include multiple hash tags as such -
FT.SEARCH tweets-index "@hashtags:{cov*|Med*}"
These are just a few examples. I would highly recommend you to refer to the RediSearch documentation and try other other queries as well.
Let's scale out
One of the key benefits of using Redis Streams is to leverage its Consumer Groups feature. This means that you can simply add more instances to the application (horizontal scale out) in order to improve the processing - the more number of instances, the faster the tweets gets processed. Each application will consume from a different part of the same Redis Stream (tweets_stream
), thus the workload is distributed (almost) evenly amongst all the instances - this gives you the ability to scale linearly.
Let's try this out. To start another instance, use docker run
- make sure to use a different name:
docker run -d --name redis-streams-consumer_2 \
-e STREAM_NAME=$STREAM_NAME \
-e STREAM_CONSUMER_GROUP_NAME=$STREAM_CONSUMER_GROUP_NAME \
-e REDIS_HOST=$REDIS_HOST \
-e REDIS_PORT=$REDIS_PORT \
-e REDIS_PASSWORD=$REDIS_PASSWORD \
-e SSL=$SSL \
abhirockzz/tweets-redis-streams-consumer-java
Notice that I used a different name
--name redis-streams-consumer_2
Things will continue like before - just a little faster since we have another helping hand. You can check the logs the new instance as well - docker logs redis-streams-consumer_2
.
You can continue to experiment further and try scaling out to more instances.
Let's dig a little deeper
We can introspect Redis Streams using the XPENDING command:
XPENDING tweets_stream redisearch_app_group
You will an output similar to this:
1) (integer) 25
2) "1618572598902-0"
3) "1618573768902-0"
4) 1) 1) "consumer-b6410cf9-8244-41ba-b0a5-d79b66d33d65"
2) "20"
2) 1) "consumer-e5a872d4-b488-416e-92ee-55d2902b338f"
2) "5"
If you're new to Redis Streams, this output might not make a lot of sense. The call to XPENDING
returns the no. of messages that were received by our processing application, but have not been processed (and acknowledged) yet. In this case, we have two application instances (they randomly generate UUIDs) and have 20
and 5
unprocessed messages respectively (of course, the numbers will differ in your case).
In production scenario, application failures could happen due to multiple reasons. However, in our sample app, the below code snippet was used to simulate this situation - it randomly chooses (about 20% probability) to not process the tweet received from Redis Streams:
if (!(random.nextInt(5) == 0)) {
conn.hset(hashName, entry.getFields());
conn.xack(streamName, consumerGroupName, entry.getID());
}
That's the reason you will see XPENDING
count increasing slowly but surely. In production, if one (or more) instances crash, the XPENDING
count for those instance(s) will stop increasing but remain constant. It implies that, these messages are now left unprocessed - in this specific example, it means that the tweet information will not be available in RediSearch
for you to query.
Redis Streams to the rescue
Redis Streams provides reliable messaging. It stores the state for each consumer - that's exactly what you see with XPENDING
! If you start another consumer instance with the same group and consumer name, you will be able to replay the same messages and re-process them to ensure that tweets are stored in Redis. This does not involve doing anything different/additional on your part.
Another option is to have a dedicated application that can periodically check the consumer group state (XPENDING
), claim messages that have been left abandoned, re-process and (most importantly) acknowledge (XACK
) them. In the next (final) part of this series, we will explore how you can build an application to do exactly this!
So, how does it all work?
It's a good time to walk through the code real quick.
You can refer to the code in the GitHub repo
The app uses JRediSearch which abstracts the API of the RediSearch
module. The first thing we do is establish a connection to Redis:
GenericObjectPoolConfig<Jedis> jedisPoolConfig = new GenericObjectPoolConfig<>();
JedisPool pool = new JedisPool(jedisPoolConfig, redisHost, Integer.valueOf(redisPort), timeout, redisPassword, isSSL);
Client redisearch = new Client(INDEX_NAME, pool);
Then we create a Schema and the Index definition.
Schema sc = new Schema().addTextField(SCHEMA_FIELD_ID, 1.0).addTextField(SCHEMA_FIELD_USER, 1.0)
.addTextField(SCHEMA_FIELD_TWEET, 1.0).addTextField(SCHEMA_FIELD_LOCATION, 1.0)
.addTagField(SCHEMA_FIELD_HASHTAGS);
IndexDefinition def = new IndexDefinition().setPrefixes(new String[] { INDEX_PREFIX });
try {
boolean indexCreated = redisearch.createIndex(sc, Client.IndexOptions.defaultOptions().setDefinition(def));
if (indexCreated) {
System.out.println("Created RediSearch index ");
}
} catch (Exception e) {
System.out.println("Did not create RediSearch index - " + e.getMessage());
}
To explore the Redis Streams APIs (
xgroupCreate
,xreadGroup
etc.) exposed by the Jedis library, take a look at it's javadocs
Before moving on, we create a Redis Streams Consumer group (using xgroupCreate
) - this is mandatory. A consumer group represents a set of applications that work "together" and co-operate with each other to share the processing load:
try {
conn = pool.getResource();
String res = conn.xgroupCreate(streamName, consumerGroupName, StreamEntryID.LAST_ENTRY, true);
}
Each app in the consumer group needs to be uniquely identified. While it is possible to assign a name manually, we generate a random consumer name.
String consumerName = "consumer-" + UUID.randomUUID().toString();
The main part of the consumer app is loop that uses xreadGroup
to read from the Redis Stream. Notice the StreamEntryID.UNRECEIVED_ENTRY
- this means that we will are asking Redis to return stream entries which has not been received by any other consumer in the group. Also, our invocation blocks for 15 seconds
and we opt to get a maximum of 50
messages per call to XREADGROUP
(of course, you can change this as per requirements).
while (true) {
List<Entry<String, List<StreamEntry>>> results = conn.xreadGroup(consumerGroupName, consumerName, 50,
15000, false, Map.entry(streamName, StreamEntryID.UNRECEIVED_ENTRY));
if (results == null) {
continue;
}
....
}
Each stream entry needs to be saved to a Redis HASH
(using hset
). The good thing is that reading a stream entry returns a HashMap
and this is exactly what HSET
API expects as well. So we are able to re-use the HashMap
!
That's not all though, notice the xack
method - this is way to call XACK
and communicate that we have indeed processed the message successfully:
for (Entry<String, List<StreamEntry>> result : results) {
List<StreamEntry> entries = result.getValue();
for (StreamEntry entry : entries) {
String tweetid = entry.getFields().get("id");
String hashName = INDEX_PREFIX + tweetid;
try {
// simulate random failure/anomaly. ~ 20% will NOT be ACKed
if (!(random.nextInt(5) == 0)) {
conn.hset(hashName, entry.getFields());
conn.xack(streamName, consumerGroupName, entry.getID());
}
} catch (Exception e) {
continue;
}
}
}
There is a lot of scope for optimization here. For e.g. you can make this process multi-threaded by spawning a thread for each batch (say 50 messages)
That's all for this blog!
Interested in the final part?
So far, we covered high level overview in part 1, the tweets consumer Rust app in part 2 and a Java app to process those tweets from Redis Streams. As promised, the final part of the series will cover an app to monitor the process and re-process abandoned messages in order to keep our overall system robust - this will a Serverless Go application deployed to Azure Functions. Stay tuned!