Multiple Consumers

Run multiple Kafka consumers in a single host by pointing each registration to a different configuration section.

Table of contents

  1. Registration
  2. Configuration
  3. DLQ Registration with Custom Sections
  4. Duplicate Registration Guard
  5. Mixing Serialization Formats

Registration

Each consumer needs a distinct TMessage type and its own config section:

var builder = Host.CreateApplicationBuilder(args);

builder.Services.AddKafkaWorker<OrderMessage, OrderMessageHandler>(
    builder.Configuration,
    configSection: "KafkaWorker:OrderConsumer");

builder.Services.AddKafkaWorker<PaymentMessage, PaymentMessageHandler>(
    builder.Configuration,
    configSection: "KafkaWorker:PaymentConsumer");

builder.Build().Run();

Configuration

Each consumer gets its own section. The Connection section is shared:

{
  "KafkaWorker": {
    "Connection": {
      "BootstrapServers": "localhost:9092"
    },
    "OrderConsumer": {
      "GroupId": "order-processor",
      "Topic": "orders.v1",
      "MaxRetries": 3,
      "DeadLetterTopic": "orders.v1.dlq"
    },
    "PaymentConsumer": {
      "GroupId": "payment-processor",
      "Topic": "payments.v1",
      "MaxRetries": 5
    }
  }
}

The default configSection is KafkaWorker:Consumer — if you only have one consumer, you don’t need to specify it.


DLQ Registration with Custom Sections

When using custom config sections, pass the same configSection to AddKafkaWorkerDeadLetter:

builder.Services.AddKafkaWorkerDeadLetter<OrderMessage>(
    builder.Configuration,
    configSection: "KafkaWorker:OrderConsumer");

Duplicate Registration Guard

Each TMessage type can only be registered once per host. Calling AddKafkaWorker<OrderMessage, ...>() twice throws an InvalidOperationException at startup.

If you need two consumers for the same data shape, create distinct message types:

public record OrderMessageV1 { /* ... */ }
public record OrderMessageV2 { /* ... */ }

Mixing Serialization Formats

You can mix serialization formats in the same host:

// Plain JSON consumer
builder.Services.AddKafkaWorker<OrderMessage, OrderHandler>(
    builder.Configuration,
    configSection: "KafkaWorker:OrderConsumer");

// Avro consumer
builder.Services.AddKafkaWorkerAvro<InventoryEvent, InventoryHandler>(
    builder.Configuration,
    configSection: "KafkaWorker:InventoryConsumer");

// Protobuf consumer
builder.Services.AddKafkaWorkerProtobuf<ShipmentEvent, ShipmentHandler>(
    builder.Configuration,
    configSection: "KafkaWorker:ShipmentConsumer");

Each consumer runs as an independent hosted service with its own Kafka consumer instance, configuration, and lifecycle.