Examples
Real-world usage patterns for common batch processing scenarios.
CSV to Database
Import a CSV file into a database with error tolerance and progress tracking.
dotnet add package NBatch
dotnet add package NBatch.EntityFrameworkCore
var job = Job.CreateBuilder("csv-to-db")
.UseJobStore(connStr)
.AddStep("import", step => step
.ReadFrom(new CsvReader<Product>("products.csv", row => new Product
{
Name = row.GetString("Name"),
Description = row.GetString("Description"),
Price = row.GetDecimal("Price")
}))
.WriteTo(new DbWriter<Product>(dbContext))
.WithSkipPolicy(SkipPolicy.For<FlatFileParseException>(maxSkips: 3))
.WithChunkSize(100))
.Build();
await job.RunAsync();
Database to File
Export database records to a flat file.
var job = Job.CreateBuilder("db-to-file")
.AddStep("export", step => step
.ReadFrom(new DbReader<Product>(dbContext, q => q.OrderBy(p => p.Id)))
.WriteTo(new FlatFileItemWriter<Product>("output.csv").WithToken(','))
.WithChunkSize(50))
.Build();
await job.RunAsync();
ETL with Transformation
Read from one source, transform the data, and write to another.
var job = Job.CreateBuilder("etl-orders")
.AddStep("extract-transform", step => step
.ReadFrom(new DbReader<Order>(sourceDb, q => q.OrderBy(o => o.Id)))
.ProcessWith(o => new OrderDto
{
Id = o.Id,
Total = o.Total,
Date = o.CreatedAt.ToString("yyyy-MM-dd")
})
.WriteTo(new FlatFileItemWriter<OrderDto>("orders.csv"))
.WithChunkSize(100))
.Build();
await job.RunAsync();
Async Processor with CancellationToken
Use an async processor when transformation needs I/O (API calls, lookups, etc.):
var job = Job.CreateBuilder("enrich-products")
.AddStep("enrich", step => step
.ReadFrom(new CsvReader<Product>("products.csv", mapFn))
.ProcessWith(async (product, ct) =>
{
var rate = await exchangeRateService.GetRateAsync("USD", ct);
return new ProductDto
{
Name = product.Name,
PriceUsd = product.Price * rate
};
})
.WriteTo(new DbWriter<ProductDto>(dbContext))
.WithChunkSize(50))
.Build();
await job.RunAsync();
Multi-Step with Tasklet
Combine chunk-oriented steps with fire-and-forget tasklets.
var job = Job.CreateBuilder("full-pipeline")
.UseJobStore(connStr)
.AddStep("extract", step => step
.ReadFrom(new CsvReader<Product>("data.csv", mapFn))
.WriteTo(new DbWriter<Product>(dbContext))
.WithChunkSize(200))
.AddStep("validate", step => step
.Execute(async () =>
{
var count = await dbContext.Products.CountAsync();
if (count == 0) throw new Exception("No products imported!");
}))
.AddStep("notify", step => step
.Execute(() => emailService.SendAsync("Import complete!")))
.Build();
await job.RunAsync();
Lambda-Only Pipeline
No custom classes needed – everything is inline.
var job = Job.CreateBuilder("quick-job")
.AddStep("transform", step => step
.ReadFrom(new CsvReader<Product>("data.csv", row => new Product
{
Name = row.GetString("Name"),
Price = row.GetDecimal("Price")
}))
.ProcessWith(p => new Product
{
Name = p.Name.ToUpper(),
Price = p.Price
})
.WriteTo(async items =>
{
foreach (var item in items)
Console.WriteLine($"{item.Name}: {item.Price:C}");
}))
.Build();
await job.RunAsync();
Dependency Injection with IJobRunner
Register jobs via AddNBatch() and trigger them on-demand from a controller, endpoint, or any service.
// Program.cs
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddDbContext<AppDbContext>(o => o.UseSqlServer(connStr));
builder.Services.AddNBatch(nbatch =>
{
nbatch.AddJob("csv-import", (sp, job) => job
.UseJobStore(connStr)
.AddStep("import", step => step
.ReadFrom(new CsvReader<Product>("products.csv", mapFn))
.WriteTo(new DbWriter<Product>(sp.GetRequiredService<AppDbContext>()))
.WithChunkSize(100)));
});
var app = builder.Build();
// Trigger from a minimal API endpoint
app.MapPost("/jobs/csv-import", async (IJobRunner runner, CancellationToken ct) =>
{
var result = await runner.RunAsync("csv-import", ct);
return result.Success ? Results.Ok(result) : Results.StatusCode(500);
});
app.Run();
Background Job with RunOnce()
Run a job once at application startup, then stop the worker.
builder.Services.AddNBatch(nbatch =>
{
nbatch.AddJob("seed-database", job => job
.AddStep("seed", step => step
.Execute(async () =>
{
await dbContext.Database.MigrateAsync();
await SeedDefaultDataAsync(dbContext);
})))
.RunOnce();
});
Recurring Background Job with RunEvery()
Run a job on a repeating interval. The interval is measured from the completion of each run, so runs never overlap.
builder.Services.AddNBatch(nbatch =>
{
nbatch.AddJob("hourly-sync", (sp, job) => job
.UseJobStore(connStr, DatabaseProvider.PostgreSql)
.WithLogger(sp.GetRequiredService<ILoggerFactory>().CreateLogger("HourlySync"))
.AddStep("sync", step => step
.ReadFrom(new DbReader<Order>(
sp.GetRequiredService<AppDbContext>(),
q => q.Where(o => o.Status == "new").OrderBy(o => o.Id)))
.ProcessWith(o => new OrderExport { Id = o.Id, Total = o.Total })
.WriteTo(new FlatFileItemWriter<OrderExport>("orders.csv"))
.WithChunkSize(200)))
.RunEvery(TimeSpan.FromHours(1));
});
Multiple Jobs in One Application
Register multiple jobs — each gets its own background worker (if scheduled) and is independently triggerable via IJobRunner.
builder.Services.AddNBatch(nbatch =>
{
// Recurring import
nbatch.AddJob("import-products", (sp, job) => job
.AddStep("import", step => step
.ReadFrom(new CsvReader<Product>("products.csv", mapFn))
.WriteTo(new DbWriter<Product>(sp.GetRequiredService<AppDbContext>()))
.WithChunkSize(100)))
.RunEvery(TimeSpan.FromMinutes(30));
// On-demand export (no schedule — trigger via IJobRunner)
nbatch.AddJob("export-orders", (sp, job) => job
.AddStep("export", step => step
.ReadFrom(new DbReader<Order>(
sp.GetRequiredService<AppDbContext>(),
q => q.OrderBy(o => o.Id)))
.WriteTo(new FlatFileItemWriter<Order>("orders.csv"))
.WithChunkSize(200)));
});
TSV File with Custom Delimiter
Read tab-separated values.
var reader = new CsvReader<LogEntry>("access.tsv", row => new LogEntry
{
Url = row.GetString("url"),
Status = row.GetInt("status")
}).WithDelimiter('\t');
With Logging
Attach an ILogger for diagnostic output.
using var loggerFactory = LoggerFactory.Create(b => b
.AddConsole()
.SetMinimumLevel(LogLevel.Information));
var logger = loggerFactory.CreateLogger<Program>();
var job = Job.CreateBuilder("logged-job")
.WithLogger(logger)
.AddStep("work", step => step
.ReadFrom(reader)
.WriteTo(writer)
.WithChunkSize(50))
.Build();
await job.RunAsync();
With Listeners
Monitor job and step lifecycle events.
var job = Job.CreateBuilder("monitored-job")
.WithListener(new TimingListener())
.AddStep("import", step => step
.ReadFrom(reader)
.WriteTo(writer)
.WithListener(new StepMetricsListener()))
.Build();
var result = await job.RunAsync();
// Inspect results
Console.WriteLine($"Job: {result.Name}, Success: {result.Success}");
foreach (var step in result.Steps)
{
Console.WriteLine($" {step.Name}: Read={step.ItemsRead}, " +
$"Processed={step.ItemsProcessed}, Skipped={step.ErrorsSkipped}");
}
PostgreSQL Job Store
Use PostgreSQL instead of SQL Server for progress tracking.
var job = Job.CreateBuilder("pg-job")
.UseJobStore(pgConnStr, DatabaseProvider.PostgreSql)
.AddStep("work", step => step
.ReadFrom(reader)
.WriteTo(writer))
.Build();
MySQL / MariaDB Job Store
var job = Job.CreateBuilder("mysql-job")
.UseJobStore(mysqlConnStr, DatabaseProvider.MySql) // .NET 8 & 9 only
.AddStep("work", step => step
.ReadFrom(reader)
.WriteTo(writer))
.Build();
Running Locally with Docker
# Start the database
docker compose up -d
# Build and run
dotnet build
dotnet run --project NBatch.ConsoleApp
# Run tests
dotnet test
Tip: The job store tracks progress across runs. To reprocess data, reset the
nbatch.jobsandnbatch.stepstables or recreate the Docker container.