An Event Stream Processing Micro-Framework for Apache Kafka

Apache Kafka, originally developed by LinkedIn and open sourced in 2011, is the de-facto industry standard for real-time data feeds that can reliably handle large volumes of data with extremely high throughput and low latency. Companies like Uber, Netflix and Slack use Kafka to process trillions of messages per day, and, unlike a traditional queue or message broker, Kafka functions as a unified, durable log of append-only, ordered events that can be replayed or archived.

You can download the code for the event stream processing micro-framework and sample application here: https://github.com/event-streams-dotnet/event-stream-processing

Kafka and .NET

Kafka is written in Java, and most of the libraries and tools are only available in Java. This makes the experience of developing for Kafka in C# somewhat limiting. Kafka Streams and Kafka Connect API’s, for example, are only available in Java, and Confluent’s ksqlDB product is only accessible via a REST interface.

All is not lost, however, for C# developers wishing to use Kafka. Confluent, a company founded by the creators of Kafka, offers confluent-kafka-dotnet, a .NET Kafka client that provides high-level Consumer, Producer and AdminClient API’s. This is useful for building single event stream processing applications, which can be used for scenarios like event sourcing and real-time ETL (extract-transform-load) data pipelines. (There is also an open source kafka-streams-dotnet project that aims to provide the same functionality as Kafka Streams on .NET for multiple event stream processing applications.)

Single Event Stream Processing

As the name implies, single event stream processing entails consuming and processing one event at a time, rather than capturing and processing multiple events at the same time (for example, to aggregate results for a specific timeframe). This is a very powerful paradigm for both event-driven microservice architectures and transforming data as it flows from one data source to another.

A good example is sending an event through a chain of message handlers which apply validation, enrichment and filtering, before writing processed events back to Kafka as a new event stream.

Event Stream Processing Micro-Framework

Because this is the kind of thing you might want to do all the time, it makes sense to create a reusable framework for processing event streams. The framework abstractions should provide a standard approach that is generic, type-safe and extensible, without being coupled to Kafka or any other streaming platform. This is the purpose of the EventStreamProcessing.Abstractions package. There you will find an abstract EventProcessor class that implements the IEventProcessor interface. Notice the generic TSourceEvent and TSinkEvent type arguments, which allow you to specify any message type.

public abstract class EventProcessor<TSourceEvent, TSinkEvent> : EventProcessorBase, IEventProcessor
{
protected readonly IEventConsumer<TSourceEvent> consumer;
protected readonly IEventProducer<TSinkEvent> producer;
public EventProcessor(
IEventConsumer<TSourceEvent> consumer,
IEventProducer<TSinkEvent> producer,
params IMessageHandler[] handlers)
: base(handlers)
{
this.consumer = consumer;
this.producer = producer;
}
public abstract Task Process(CancellationToken cancellationToken = default);
}

Next there is the abstract MessageHandler class that implements IMessageHandler, which is used to build a chain of message handlers.

public abstract class MessageHandler : IMessageHandler
{
private IMessageHandler nextHandler;
public void SetNextHandler(IMessageHandler nextHandler)
{
this.nextHandler = nextHandler;
}
public virtual async Task<Message> HandleMessage(Message sourceMessage)
{
return nextHandler != null ? await (nextHandler?.HandleMessage(sourceMessage)) : sourceMessage;
}
}

Lastly, the Message class encapsulates an event as a key-value pair.

public class Message<TKey, TValue> : Message
{
public TKey Key { get; set; }
public TValue Value { get; set; }
public Message(TKey key, TValue value)
{
Key = key;
Value = value;
}
}
view raw message.cs hosted with ❤ by GitHub

In addition to a platform-agnostic set of abstractions, there is an EventStreamProcessing.Kafka package that references Confluent.Kafka and has Kafka-specific implementations of the IEventConsumer, IEventProducer and IEventProcessor interfaces. The KafkaEventProcessor class overrides the Process method with code that consumes a raw event, builds the chain of handlers, and produces a processed event.

public override async Task Process(CancellationToken cancellationToken = default)
{
// Build chain of handlers
BuildHandlerChain();
// Consume event
var sourceEvent = consumer.ConsumeEvent(cancellationToken);
// Return if EOF
if (sourceEvent == null) return;
// Invoke handler chain
var sourceMessage = new Message<TSourceKey, TSourceValue>(sourceEvent.Key, sourceEvent.Value);
var sinkMessage = await handlers[0].HandleMessage(sourceMessage) as Message<TSinkKey, TSinkValue>;
// Return if message filtered out
if (sinkMessage == null) return;
// Produce event
var sinkEvent = new Confluent.Kafka.Message<TSinkKey, TSinkValue>
{
Key = sinkMessage.Key,
Value = sinkMessage.Value
};
producer.ProduceEvent(sinkEvent);
}

Build Your Own Event Stream Processing Service

To build your own event stream processing service it’s best to start by creating a new .NET Core Worker Service.

dotnet new worker --name MyWorker

Add the EventStreamProcessing.Kafka package. This will also bring in the EventStreamProcessing.Abstractions and Confluent.Kafka packages.

dotnet add package EventStreamProcessing.Kafka

Inject IEventProcessor into the Worker class constructor, then call eventProcessor.Process inside the while loop in the ExecuteAsync method.

public class Worker : BackgroundService
{
private readonly IEventProcessor _eventProcessor;
private readonly ILogger<Worker> _logger;
public Worker(IEventProcessor eventProcessor, ILogger<Worker> logger)
{
_eventProcessor = eventProcessor;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
await _eventProcessor.Process(stoppingToken);
}
}
}

To set up the event processor you will need some helper methods for creating Kafka consumers and producers.

public static class KafkaUtils
{
public static IConsumer<int, string> CreateConsumer(string brokerList, List<string> topics)
{
var config = new ConsumerConfig
{
BootstrapServers = brokerList,
GroupId = "sample-consumer"
};
var consumer = new ConsumerBuilder<int, string>(config).Build();
consumer.Subscribe(topics);
return consumer;
}
public static IProducer<int, string> CreateProducer(string brokerList)
{
var config = new ProducerConfig { BootstrapServers = brokerList };
var producer = new ProducerBuilder<int, string>(config).Build();
return producer;
}
}
view raw kafka-utils.cs hosted with ❤ by GitHub

Next create some classes that extend MessageHandler in which you override the HandleMessage method to process the message. Here is an example of a handler that transforms the message.

public class TransformHandler : MessageHandler
{
public override async Task<Message> HandleMessage(Message sourceMessage)
{
var message = (Message<int, string>)sourceMessage;
var sinkMessage = new Message<int, string>(message.Key, message.Value.ToUpper());
return await base.HandleMessage(sinkMessage);
}
}

Then add code to the CreateHostBuilder method in the Program class where you set up dependency injection for IEventProcessor.

public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
// Add event processor
services.AddSingleton<IEventProcessor>(sp =>
{
// Create Kafka consumer and producer
var kafkaConsumer = KafkaUtils.CreateConsumer("localhost:9092", new List<string> { "raw-events" });
var kafkaProducer = KafkaUtils.CreateProducer("localhost:9092");
// Create handlers
var handlers = new List<MessageHandler> { new TransformHandler() };
// Create event processor
return new KafkaEventProcessor<int, string, int, string>(
new KafkaEventConsumer<int, string>(kafkaConsumer),
new KafkaEventProducer<int, string>(kafkaProducer, "processed-events"),
handlers.ToArray());
});
services.AddHostedService<Worker>();
});
}

Run Kafka Locally with Docker

Note: To run Kafka you will need to allocate 8 GB of memory to Docker Desktop.

Confluent has a convenient repository with a docker-compose.yml file for running Kafka locally with Docker. Simply clone the repo and run docker-compose.

git clone https://github.com/confluentinc/cp-all-in-one
cd cp-all-in-one
git checkout 5.5.0-post
cd cp-all-in-one/
docker-compose up -d --build
docker-compose ps

Open the Kafka control center: http://localhost:9021/. Then select the main cluster, go to Topics and create the “raw-events” and “processed-events” topics.

Use the console consumer to show the processed events.

docker exec -it broker bash
cd /usr/bin
./kafka-console-consumer --bootstrap-server broker:29092 --topic "processed-events"

Use the console producer to create raw events.

docker exec -it broker bash
cd /usr/bin
./kafka-console-producer --broker-list broker:29092 --topic "raw-events"

Check Out the Sample

The event-stream-processing repository has a samples folder that contains a working example of an event processing service based on the Event Stream Processing Micro-Framework. Here is a diagram showing the data pipeline used by the Sample Worker.

event-stream-processing-sample

  1. The Sample Producer console app lets the user write a stream of events to the Kafka broker using the “raw-events” topic. The numeral represents the event key, and the text “Hello World” presents the event value.
  2. The Sample Worker service injects an IEventProcessor into the KafkaWorker class constructor. Then ExecuteAsync method calls eventProcessor.Process in a while loop until the operation is cancelled.
  3. The Program.CreateHostBuilder method registers an IEventProcessor for dependency injection with a KafkaEventProcessor that uses KafkaEventConsumerKafkaEventProducer and an array of MessageHandler with ValidationHandlerEnrichmentHandler and FilterHandler.
  4. The KafkaEventConsumer in Sample Worker subscribes to the “raw-events” topic of the Kafka broker running on localhost:9092. The message handlers validate, enrich and filter the events one at a time. If there are validation errors, those are written back to Kafka with a “validation-errors” topic. This takes place if the message key does not correlate to a key in the language store. The EnrichmentHandler looks up a translation for “Hello” in the language store and transforms the message with the selected translation. The FilterHandler accepts a lambda expression for filtering messages. In this case the English phrase “Hello” is filtered out. Lastly, the KafkaEventProducer writes processed events back to Kafka using the “final-events” topic.
  5. The Sample Consumer console app reads the “validation-errors” and “final-events” topics, displaying them in the console.

Follow instructions in the project ReadMe file to run the sample. If you wish to run the Sample Worker in a Docker container, you will need to place it in the same network as the Kafka broker, which can be accomplished using a separate docker-compose.yml file for the Sample Worker.

Happy coding!

About Tony Sneed

Sr. Software Solutions Architect, Hilti Global Application Software
This entry was posted in Technical and tagged . Bookmark the permalink.

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.