Apache Kafka, originally developed by LinkedIn and open sourced in 2011, is the de-facto industry standard for real-time data feeds that can reliably handle large volumes of data with extremely high throughput and low latency. Companies like Uber, Netflix and Slack use Kafka to process trillions of messages per day, and, unlike a traditional queue or message broker, Kafka functions as a unified, durable log of append-only, ordered events that can be replayed or archived.
You can download the code for the event stream processing micro-framework and sample application here: https://github.com/event-streams-dotnet/event-stream-processing
Kafka and .NET
Kafka is written in Java, and most of the libraries and tools are only available in Java. This makes the experience of developing for Kafka in C# somewhat limiting. Kafka Streams and Kafka Connect API’s, for example, are only available in Java, and Confluent’s ksqlDB product is only accessible via a REST interface.
All is not lost, however, for C# developers wishing to use Kafka. Confluent, a company founded by the creators of Kafka, offers confluent-kafka-dotnet, a .NET Kafka client that provides high-level Consumer
, Producer
and AdminClient
API’s. This is useful for building single event stream processing applications, which can be used for scenarios like event sourcing and real-time ETL (extract-transform-load) data pipelines. (There is also an open source kafka-streams-dotnet project that aims to provide the same functionality as Kafka Streams on .NET for multiple event stream processing applications.)
Single Event Stream Processing
As the name implies, single event stream processing entails consuming and processing one event at a time, rather than capturing and processing multiple events at the same time (for example, to aggregate results for a specific timeframe). This is a very powerful paradigm for both event-driven microservice architectures and transforming data as it flows from one data source to another.
A good example is sending an event through a chain of message handlers which apply validation, enrichment and filtering, before writing processed events back to Kafka as a new event stream.
Event Stream Processing Micro-Framework
Because this is the kind of thing you might want to do all the time, it makes sense to create a reusable framework for processing event streams. The framework abstractions should provide a standard approach that is generic, type-safe and extensible, without being coupled to Kafka or any other streaming platform. This is the purpose of the EventStreamProcessing.Abstractions package. There you will find an abstract EventProcessor
class that implements the IEventProcessor
interface. Notice the generic TSourceEvent
and TSinkEvent
type arguments, which allow you to specify any message type.
public abstract class EventProcessor<TSourceEvent, TSinkEvent> : EventProcessorBase, IEventProcessor | |
{ | |
protected readonly IEventConsumer<TSourceEvent> consumer; | |
protected readonly IEventProducer<TSinkEvent> producer; | |
public EventProcessor( | |
IEventConsumer<TSourceEvent> consumer, | |
IEventProducer<TSinkEvent> producer, | |
params IMessageHandler[] handlers) | |
: base(handlers) | |
{ | |
this.consumer = consumer; | |
this.producer = producer; | |
} | |
public abstract Task Process(CancellationToken cancellationToken = default); | |
} |
Next there is the abstract MessageHandler class that implements IMessageHandler, which is used to build a chain of message handlers.
public abstract class MessageHandler : IMessageHandler | |
{ | |
private IMessageHandler nextHandler; | |
public void SetNextHandler(IMessageHandler nextHandler) | |
{ | |
this.nextHandler = nextHandler; | |
} | |
public virtual async Task<Message> HandleMessage(Message sourceMessage) | |
{ | |
return nextHandler != null ? await (nextHandler?.HandleMessage(sourceMessage)) : sourceMessage; | |
} | |
} |
Lastly, the Message class encapsulates an event as a key-value pair.
public class Message<TKey, TValue> : Message | |
{ | |
public TKey Key { get; set; } | |
public TValue Value { get; set; } | |
public Message(TKey key, TValue value) | |
{ | |
Key = key; | |
Value = value; | |
} | |
} |
In addition to a platform-agnostic set of abstractions, there is an EventStreamProcessing.Kafka package that references Confluent.Kafka and has Kafka-specific implementations of the IEventConsumer
, IEventProducer
and IEventProcessor
interfaces. The KafkaEventProcessor
class overrides the Process
method with code that consumes a raw event, builds the chain of handlers, and produces a processed event.
public override async Task Process(CancellationToken cancellationToken = default) | |
{ | |
// Build chain of handlers | |
BuildHandlerChain(); | |
// Consume event | |
var sourceEvent = consumer.ConsumeEvent(cancellationToken); | |
// Return if EOF | |
if (sourceEvent == null) return; | |
// Invoke handler chain | |
var sourceMessage = new Message<TSourceKey, TSourceValue>(sourceEvent.Key, sourceEvent.Value); | |
var sinkMessage = await handlers[0].HandleMessage(sourceMessage) as Message<TSinkKey, TSinkValue>; | |
// Return if message filtered out | |
if (sinkMessage == null) return; | |
// Produce event | |
var sinkEvent = new Confluent.Kafka.Message<TSinkKey, TSinkValue> | |
{ | |
Key = sinkMessage.Key, | |
Value = sinkMessage.Value | |
}; | |
producer.ProduceEvent(sinkEvent); | |
} |
Build Your Own Event Stream Processing Service
To build your own event stream processing service it’s best to start by creating a new .NET Core Worker Service.
dotnet new worker --name MyWorker |
Add the EventStreamProcessing.Kafka package. This will also bring in the EventStreamProcessing.Abstractions and Confluent.Kafka packages.
dotnet add package EventStreamProcessing.Kafka |
Inject IEventProcessor
into the Worker
class constructor, then call eventProcessor.Process
inside the while
loop in the ExecuteAsync
method.
public class Worker : BackgroundService | |
{ | |
private readonly IEventProcessor _eventProcessor; | |
private readonly ILogger<Worker> _logger; | |
public Worker(IEventProcessor eventProcessor, ILogger<Worker> logger) | |
{ | |
_eventProcessor = eventProcessor; | |
_logger = logger; | |
} | |
protected override async Task ExecuteAsync(CancellationToken stoppingToken) | |
{ | |
while (!stoppingToken.IsCancellationRequested) | |
{ | |
_logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now); | |
await _eventProcessor.Process(stoppingToken); | |
} | |
} | |
} |
To set up the event processor you will need some helper methods for creating Kafka consumers and producers.
public static class KafkaUtils | |
{ | |
public static IConsumer<int, string> CreateConsumer(string brokerList, List<string> topics) | |
{ | |
var config = new ConsumerConfig | |
{ | |
BootstrapServers = brokerList, | |
GroupId = "sample-consumer" | |
}; | |
var consumer = new ConsumerBuilder<int, string>(config).Build(); | |
consumer.Subscribe(topics); | |
return consumer; | |
} | |
public static IProducer<int, string> CreateProducer(string brokerList) | |
{ | |
var config = new ProducerConfig { BootstrapServers = brokerList }; | |
var producer = new ProducerBuilder<int, string>(config).Build(); | |
return producer; | |
} | |
} |
Next create some classes that extend MessageHandler
in which you override the HandleMessage
method to process the message. Here is an example of a handler that transforms the message.
public class TransformHandler : MessageHandler | |
{ | |
public override async Task<Message> HandleMessage(Message sourceMessage) | |
{ | |
var message = (Message<int, string>)sourceMessage; | |
var sinkMessage = new Message<int, string>(message.Key, message.Value.ToUpper()); | |
return await base.HandleMessage(sinkMessage); | |
} | |
} |
Then add code to the CreateHostBuilder
method in the Program
class where you set up dependency injection for IEventProcessor
.
public class Program | |
{ | |
public static void Main(string[] args) | |
{ | |
CreateHostBuilder(args).Build().Run(); | |
} | |
public static IHostBuilder CreateHostBuilder(string[] args) => | |
Host.CreateDefaultBuilder(args) | |
.ConfigureServices((hostContext, services) => | |
{ | |
// Add event processor | |
services.AddSingleton<IEventProcessor>(sp => | |
{ | |
// Create Kafka consumer and producer | |
var kafkaConsumer = KafkaUtils.CreateConsumer("localhost:9092", new List<string> { "raw-events" }); | |
var kafkaProducer = KafkaUtils.CreateProducer("localhost:9092"); | |
// Create handlers | |
var handlers = new List<MessageHandler> { new TransformHandler() }; | |
// Create event processor | |
return new KafkaEventProcessor<int, string, int, string>( | |
new KafkaEventConsumer<int, string>(kafkaConsumer), | |
new KafkaEventProducer<int, string>(kafkaProducer, "processed-events"), | |
handlers.ToArray()); | |
}); | |
services.AddHostedService<Worker>(); | |
}); | |
} |
Run Kafka Locally with Docker
Note: To run Kafka you will need to allocate 8 GB of memory to Docker Desktop.
Confluent has a convenient repository with a docker-compose.yml file for running Kafka locally with Docker. Simply clone the repo and run docker-compose.
git clone https://github.com/confluentinc/cp-all-in-one | |
cd cp-all-in-one | |
git checkout 5.5.0-post | |
cd cp-all-in-one/ | |
docker-compose up -d --build | |
docker-compose ps |
Open the Kafka control center: http://localhost:9021/
. Then select the main cluster, go to Topics and create the “raw-events” and “processed-events” topics.
Use the console consumer to show the processed events.
docker exec -it broker bash | |
cd /usr/bin | |
./kafka-console-consumer --bootstrap-server broker:29092 --topic "processed-events" |
Use the console producer to create raw events.
docker exec -it broker bash | |
cd /usr/bin | |
./kafka-console-producer --broker-list broker:29092 --topic "raw-events" |
Check Out the Sample
The event-stream-processing repository has a samples folder that contains a working example of an event processing service based on the Event Stream Processing Micro-Framework. Here is a diagram showing the data pipeline used by the Sample Worker.
- The Sample Producer console app lets the user write a stream of events to the Kafka broker using the “raw-events” topic. The numeral represents the event key, and the text “Hello World” presents the event value.
- The Sample Worker service injects an
IEventProcessor
into theKafkaWorker
class constructor. ThenExecuteAsync
method callseventProcessor.Process
in awhile
loop until the operation is cancelled. - The
Program.CreateHostBuilder
method registers anIEventProcessor
for dependency injection with aKafkaEventProcessor
that usesKafkaEventConsumer
,KafkaEventProducer
and an array ofMessageHandler
withValidationHandler
,EnrichmentHandler
andFilterHandler
. - The
KafkaEventConsumer
in Sample Worker subscribes to the “raw-events” topic of the Kafka broker running onlocalhost:9092
. The message handlers validate, enrich and filter the events one at a time. If there are validation errors, those are written back to Kafka with a “validation-errors” topic. This takes place if the message key does not correlate to a key in the language store. TheEnrichmentHandler
looks up a translation for “Hello” in the language store and transforms the message with the selected translation. TheFilterHandler
accepts a lambda expression for filtering messages. In this case the English phrase “Hello” is filtered out. Lastly, theKafkaEventProducer
writes processed events back to Kafka using the “final-events” topic. - The Sample Consumer console app reads the “validation-errors” and “final-events” topics, displaying them in the console.
Follow instructions in the project ReadMe file to run the sample. If you wish to run the Sample Worker in a Docker container, you will need to place it in the same network as the Kafka broker, which can be accomplished using a separate docker-compose.yml file for the Sample Worker.
Happy coding!