Configuration
All settings are loaded from IConfiguration (typically appsettings.json). The library validates configuration on startup and fails fast if required values are missing.
Table of contents
Consumer Options
Configure under KafkaWorker:Consumer (or a custom section — see Multiple Consumers).
| Setting | Type | Default | Description |
|---|---|---|---|
GroupId | string | (required) | Kafka consumer group ID |
Topic | string | (required) | Topic to consume from |
MaxRetries | int | 3 | Retry attempts before sending to DLQ. Set to 0 to disable retries entirely. Range: 0–5 |
DeadLetterTopic | string? | null | DLQ topic. Leave null to disable DLQ — failed messages are logged and skipped |
DeadLetterMaxReprocessAttempts | int | 3 | Max times the DLQ consumer retries a message (1–5). Only applies when DeadLetterTopic is set |
DeadLetterProcessingIntervalMinutes | int | 60 | Minutes between DLQ reprocessing batches. Only applies when DeadLetterTopic is set |
DeadLetterStartFrom | DateTimeOffset? | null | UTC timestamp from which the DLQ consumer should start processing when no committed offsets exist. E.g. "2025-06-01T00:00:00Z" |
Minimal Configuration
{
"KafkaWorker": {
"Connection": {
"BootstrapServers": "localhost:9092"
},
"Consumer": {
"GroupId": "my-order-processor",
"Topic": "orders.v1",
"MaxRetries": 3
}
}
}
With Dead Letter Queue
{
"KafkaWorker": {
"Connection": {
"BootstrapServers": "localhost:9092"
},
"Consumer": {
"GroupId": "my-order-processor",
"Topic": "orders.v1",
"MaxRetries": 3,
"DeadLetterTopic": "orders.v1.dlq",
"DeadLetterMaxReprocessAttempts": 3,
"DeadLetterProcessingIntervalMinutes": 60
}
}
}
No Retry, No DLQ
Set MaxRetries to 0 and omit DeadLetterTopic for a simple consumer that logs failures and moves on:
"Consumer": {
"GroupId": "my-order-processor",
"Topic": "orders.v1",
"MaxRetries": 0
}
Connection Settings
Configure under KafkaWorker:Connection. Shared by all consumers and producers in the host.
| Setting | Type | Default | Description |
|---|---|---|---|
BootstrapServers | string | (required) | Comma-separated list of Kafka broker addresses (e.g. "broker1:9092,broker2:9092") |
SchemaRegistryUrls | string? | null | Comma-separated Schema Registry URLs. Required when using Avro, Protobuf, or Registry JSON packages |
IsSecuredCluster | bool | false | Whether the cluster requires SASL/SSL authentication. When true, Username and Password are required |
Username | string? | null | SASL username (required when IsSecuredCluster is true) |
Password | string? | null | SASL password (required when IsSecuredCluster is true) |
Secured Cluster Example
{
"KafkaWorker": {
"Connection": {
"BootstrapServers": "broker1:9092,broker2:9092",
"IsSecuredCluster": true,
"Username": "<username>",
"Password": "<password>",
"SchemaRegistryUrls": "http://schema-registry:8081"
}
}
}
When IsSecuredCluster is true, the library configures SASL/SSL automatically:
SecurityProtocol = SaslSslSaslMechanism = PlainSslEndpointIdentificationAlgorithm = Https
ConsumerConfig Overrides
All registration methods accept an optional Action<ConsumerConfig> callback to customize the underlying Confluent consumer configuration:
builder.Services.AddKafkaWorker<OrderMessage, OrderMessageHandler>(
builder.Configuration,
configureConsumer: config =>
{
config.AutoOffsetReset = AutoOffsetReset.Earliest;
config.SessionTimeoutMs = 45_000;
config.MaxPollIntervalMs = 600_000;
});
The callback runs before the library enforces its invariants — EnableAutoCommit and EnableAutoOffsetStore are always set to false after your callback, since the library manages offsets manually.
The consumer defaults to
AutoOffsetReset.Latest, meaning a brand-new consumer group (or one with expired offsets) will skip all existing messages and only process new ones. Override toEarliestif you need to process historical messages on first deploy.
Configuration Validation
The library validates configuration at startup using .NET’s ValidateDataAnnotations() and ValidateOnStart():
- Required fields —
GroupId,Topic, andBootstrapServersmust be present - Range constraints —
MaxRetriesmust be 0–5,DeadLetterMaxReprocessAttemptsmust be 1–5 - Conditional validation — When
IsSecuredClusteristrue,UsernameandPasswordare required - DLQ topic validation — Calling
AddKafkaWorkerDeadLetterwithout a configuredDeadLetterTopicthrows at startup
If any validation fails, the host throws an exception during startup before consuming any messages.