Dynamic rule processing for data streams using Flink & Serverless on AWS

Mohammed - Aug 7 - - Dev Community

The write up below requires some working knowledge of Apache Flink along with basic understanding of AWS. Apache Flink is a Complex Event Processing (CEP) framework primarily used for working with continuous data streams at hyperscale offering tools to work with a wide variety of use cases.
If you’re new to Flink and do not have the vaguest idea, I’d recommend reading this piece first.

The what & why

Pattern detection is one of the areas Flink excels at scale. When working with data streams, there could be use cases wherein the incoming event must be subjected to a set of rules to determine if it needs to be processed further by downstream systems. These rules are usually simple JSON payloads.

Alternatively, let’s say there is some kind of data enrichment required for specific events within the source stream. The data used for the enrichment is static and may be altered dynamically over time by pushing a new dataset.

Flink enables achieving both the above use cases with the Broadcast pattern.

How Broadcast works

Broadcasting on Flink is the idea of pushing a payload (set of rules/items) and making it available to all data streams that you might have within your Flink application.

Flink maintains the internal state for a Broadcast stream. Every time an event is pushed into the source topic receiving the events intended for the Broadcast stream, the state is updated (overwritten or appended based on business logic) automatically.

If new “rules” are to be appended with the existing Broadcast message payload, then there are two ways to achieve this,

  1. The first is to push a combined message payload to the source topic containing all the relevant attributes within the single payload. This requires no code adjustment within Flink and only requires the source to handle the payload that’s being pushed.
  2. The second approach is to update the Broadcast state manually at the code level within Flink by appending the state from the incoming payload to the existing state.

Both approaches work well with their own merits & demerits. It should be noted that whenever the state is updated, all operators consuming the Broadcast stream will have access to the same state.

Flow setup on AWS

The diagram above depicts a simple flow using AWS resources. Kafka (AWS MSK) acts as the message broker. Kinesis Data Streams can also be used as the source and sink with the appropriate connector configuration within Flink. The Flink job is deployed into the Managed Service for Apache Flink (MSF).

The rules are ingested via an upload to the S3 bucket. This bucket acts as a source for a Lambda function which transforms the CSV file(s) into a JSON payload before pushing it to a Kafka topic. This is the source topic for Flink to receive the message/rules for Broadcasting. Of course, the delivery of the message to the Kafka topic can be done in multiple ways. For instance, an API Gateway + Lambda could also work well.

Once the message (rule) gets ingested, it will be applied to every event that comes through Flink from the source event topic, processed based on the business logic, and pushed out to the sink (Kafka topic).

Under the hood

At a high level, the data stream setup in the Flink job looks something like this,

// source stream
  DataStream<String> sourceStream = env.fromSource(eventSource, WatermarkStrategy.noWatermarks(),
    "event-source");

  // ETD rules stream
  DataStream<String> ruleStream = env.fromSource(ruleEventSource, WatermarkStrategy.noWatermarks(),
    "rule-source");

  // Broadcast state descriptor
  MapStateDescriptor<String, String> broadcastStateDescriptor =
    new MapStateDescriptor<>("broadcastState", String.class, String.class);

  // Broadcast stream
  BroadcastStream<String> broadcastStream = ruleStream.broadcast(broadcastStateDescriptor);

  SingleOutputStreamOperator<String> flightDelayCompensationStream = sourceStream.connect(broadcastStream)
    .process(new EventBroadcastProcessFunction(broadcastStateDescriptor)).uid("rule-processor");
Enter fullscreen mode Exit fullscreen mode

Depending on the source event stream being keyed or not, the appropriate Broadcast processor class needs to be extended, which is either the KeyedBroadcastProcessFunction or BroadcastProcessFunction.

The setup below has a non-keyed stream, therefore the methods belonging to the regular BroadcastProcessFunction class are overridden.

    @Override
    public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception
    {
        {
            try
            {
                ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(broadcastStateDescriptor);
                String rules = broadcastState.get("rules");

                //business logic

                out.collect(value);
            }
            catch (Exception e)
            {
                logger.error("Something went wrong"+e.toString())
            }

        }
    }

    @Override
    public void processBroadcastElement(String rule, Context ctx, Collector<String> collector) throws Exception
    {
        final String uuid = UUID.randomUUID().toString();
        ctx.getBroadcastState(broadcastStateDescriptor).put("rules", rule);
    }
Enter fullscreen mode Exit fullscreen mode

The processBroadcastElement function is where the Broadcast state can manipulated. Each time a payload comes through the Broadcast message source, we could append the values to the existing state or overwrite as needed.

The Broadcast state can be accessed within the processElement function and that’s where the actual source event comes through. After reading from the state, the rules can then be applied to the business logic for every incoming event.

Running it locally

You can get this up and running in your local environment with docker. This is the GitHub link to the Flink application including the docker-compose file. To make it easy, the docker file already includes the Kafka topics creation utilized within the Flink job.

These are steps you would need to follow,

  • Build the JAR file.
  • Install docker.
  • Run the docker command — docker-compose up -d
  • Go to http://localhost:8081 and submit a job with the JAR file created in step 1.
  • Connect to the Kafka container — docker container exec -it <cluster-name> /bin/bash
  • Push messages to the appropriate Kafka topic to monitor the flow — kafka-console-producer — broker-list localhost:9092 — topic <name>

Conclusion

The above solution works great when there are a finite number of rules, although there is no direct restriction on the amount of data Flink can hold in the internal state. It’s the performance and code management that needs to be taken into consideration.

There could be an argument about using a Lambda function instead of Flink to achieve the same result. While that makes complete sense in terms of simplicity, when real-time processing at a heavy scale is paramount, Flink is the better choice.

It is also possible to use two different sources and then subject the combined stream to the Broadcasted rules. Most complex scenarios are likely possible with Flink when compared to other forms of compute for data streams. It all boils down to what works best in the long run and where application management is simpler.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
Terabox Video Player