Configuration

All settings are loaded from IConfiguration (typically appsettings.json). The library validates configuration on startup and fails fast if required values are missing.

Table of contents

  1. Consumer Options
    1. Minimal Configuration
    2. With Dead Letter Queue
    3. No Retry, No DLQ
  2. Connection Settings
    1. Secured Cluster Example
  3. ConsumerConfig Overrides
  4. Configuration Validation

Consumer Options

Configure under KafkaWorker:Consumer (or a custom section — see Multiple Consumers).

Setting Type Default Description
GroupId string (required) Kafka consumer group ID
Topic string (required) Topic to consume from
MaxRetries int 3 Retry attempts before sending to DLQ. Set to 0 to disable retries entirely. Range: 0–5
DeadLetterTopic string? null DLQ topic. Leave null to disable DLQ — failed messages are logged and skipped
DeadLetterMaxReprocessAttempts int 3 Max times the DLQ consumer retries a message (1–5). Only applies when DeadLetterTopic is set
DeadLetterProcessingIntervalMinutes int 60 Minutes between DLQ reprocessing batches. Only applies when DeadLetterTopic is set
DeadLetterStartFrom DateTimeOffset? null UTC timestamp from which the DLQ consumer should start processing when no committed offsets exist. E.g. "2025-06-01T00:00:00Z"

Minimal Configuration

{
  "KafkaWorker": {
    "Connection": {
      "BootstrapServers": "localhost:9092"
    },
    "Consumer": {
      "GroupId": "my-order-processor",
      "Topic": "orders.v1",
      "MaxRetries": 3
    }
  }
}

With Dead Letter Queue

{
  "KafkaWorker": {
    "Connection": {
      "BootstrapServers": "localhost:9092"
    },
    "Consumer": {
      "GroupId": "my-order-processor",
      "Topic": "orders.v1",
      "MaxRetries": 3,
      "DeadLetterTopic": "orders.v1.dlq",
      "DeadLetterMaxReprocessAttempts": 3,
      "DeadLetterProcessingIntervalMinutes": 60
    }
  }
}

No Retry, No DLQ

Set MaxRetries to 0 and omit DeadLetterTopic for a simple consumer that logs failures and moves on:

"Consumer": {
  "GroupId": "my-order-processor",
  "Topic": "orders.v1",
  "MaxRetries": 0
}

Connection Settings

Configure under KafkaWorker:Connection. Shared by all consumers and producers in the host.

Setting Type Default Description
BootstrapServers string (required) Comma-separated list of Kafka broker addresses (e.g. "broker1:9092,broker2:9092")
SchemaRegistryUrls string? null Comma-separated Schema Registry URLs. Required when using Avro, Protobuf, or Registry JSON packages
IsSecuredCluster bool false Whether the cluster requires SASL/SSL authentication. When true, Username and Password are required
Username string? null SASL username (required when IsSecuredCluster is true)
Password string? null SASL password (required when IsSecuredCluster is true)

Secured Cluster Example

{
  "KafkaWorker": {
    "Connection": {
      "BootstrapServers": "broker1:9092,broker2:9092",
      "IsSecuredCluster": true,
      "Username": "<username>",
      "Password": "<password>",
      "SchemaRegistryUrls": "http://schema-registry:8081"
    }
  }
}

When IsSecuredCluster is true, the library configures SASL/SSL automatically:

  • SecurityProtocol = SaslSsl
  • SaslMechanism = Plain
  • SslEndpointIdentificationAlgorithm = Https

ConsumerConfig Overrides

All registration methods accept an optional Action<ConsumerConfig> callback to customize the underlying Confluent consumer configuration:

builder.Services.AddKafkaWorker<OrderMessage, OrderMessageHandler>(
    builder.Configuration,
    configureConsumer: config =>
    {
        config.AutoOffsetReset = AutoOffsetReset.Earliest;
        config.SessionTimeoutMs = 45_000;
        config.MaxPollIntervalMs = 600_000;
    });

The callback runs before the library enforces its invariants — EnableAutoCommit and EnableAutoOffsetStore are always set to false after your callback, since the library manages offsets manually.

The consumer defaults to AutoOffsetReset.Latest, meaning a brand-new consumer group (or one with expired offsets) will skip all existing messages and only process new ones. Override to Earliest if you need to process historical messages on first deploy.


Configuration Validation

The library validates configuration at startup using .NET’s ValidateDataAnnotations() and ValidateOnStart():

  • Required fieldsGroupId, Topic, and BootstrapServers must be present
  • Range constraintsMaxRetries must be 0–5, DeadLetterMaxReprocessAttempts must be 1–5
  • Conditional validation — When IsSecuredCluster is true, Username and Password are required
  • DLQ topic validation — Calling AddKafkaWorkerDeadLetter without a configured DeadLetterTopic throws at startup

If any validation fails, the host throws an exception during startup before consuming any messages.