Declarative, step-based pipelines for ETL jobs, data migrations, and scheduled tasks.
Why NBatch?
Wire up readers, processors, and writers — NBatch handles chunking, error skipping, progress tracking, and restart-from-failure so you can focus on your business logic.
Highlights
| Feature | Description |
|---|---|
| Chunk-oriented processing | Read, transform, and write data in configurable batches |
| Skip policies | Keep the job running when a record is malformed; skip it and move on |
| Restart from failure | SQL-backed job store tracks progress so a crashed job resumes where it left off |
| Tasklet steps | Fire-and-forget units of work (send an email, call an API, run a stored proc) |
| Lambda-friendly | Processors and writers can be plain lambdas; no extra classes required |
| DI & hosted service | First-class IServiceCollection integration with AddNBatch(), RunOnce(), and RunEvery() |
| Multi-target | Supports .NET 8, .NET 9, and .NET 10 |
| Provider-agnostic | SQL Server, PostgreSQL, SQLite, or MySQL for the job store; any EF Core provider for your data |
Packages
| Package | Description |
|---|---|
NBatch | Core framework — job builder, chunking, skip policies, readers, writers, DI, hosted service |
NBatch.EntityFrameworkCore | EF Core job store for restart-from-failure (SQL Server, PostgreSQL, SQLite, MySQL) |
Quick Start
Install
dotnet add package NBatch
dotnet add package NBatch.EntityFrameworkCore # optional — only if you need persistent job tracking
Your First Job
var job = Job.CreateBuilder("csv-to-db")
.AddStep("import", step => step
.ReadFrom(new CsvReader<Product>(filePath, row => new Product
{
Name = row.GetString("Name"),
Description = row.GetString("Description"),
Price = row.GetDecimal("Price")
}))
.WriteTo(new DbWriter<Product>(dbContext))
.WithSkipPolicy(SkipPolicy.For<FlatFileParseException>(maxSkips: 3))
.WithChunkSize(100))
.Build();
await job.RunAsync();
With SQL-Backed Job Store (Restart-from-Failure)
dotnet add package NBatch.EntityFrameworkCore
var job = Job.CreateBuilder("csv-to-db")
.UseJobStore(connStr, DatabaseProvider.SqlServer) // from NBatch.EntityFrameworkCore
.AddStep("import", step => step
.ReadFrom(new CsvReader<Product>(filePath, mapFn))
.WriteTo(new DbWriter<Product>(dbContext))
.WithChunkSize(100))
.Build();
await job.RunAsync();
With Dependency Injection & Hosted Service
builder.Services.AddNBatch(nbatch =>
{
nbatch.AddJob("csv-import", (sp, job) => job
.AddStep("import", step => step
.ReadFrom(new CsvReader<Product>("data.csv", mapFn))
.WriteTo(new DbWriter<Product>(sp.GetRequiredService<AppDbContext>()))
.WithChunkSize(100)))
.RunEvery(TimeSpan.FromHours(6));
});
Documentation
| Page | Description |
|---|---|
| Core Concepts | Jobs, steps, chunk processing, and the pipeline model |
| Readers & Writers | Built-in components for CSV, database, and flat-file I/O |
| Skip Policies | Error tolerance and skip-limit configuration |
| Job Store | SQL-backed progress tracking and restart-from-failure |
| DI & Hosted Service | AddNBatch(), IJobRunner, RunOnce(), RunEvery() |
| Listeners | Job and step lifecycle hooks for logging and monitoring |
| API Reference | Interfaces, builders, and result types |
| Examples | Real-world usage patterns and recipes |
More Examples
Database to File
var job = Job.CreateBuilder("db-to-file")
.AddStep("export", step => step
.ReadFrom(new DbReader<Product>(dbContext, q => q.OrderBy(p => p.Id)))
.WriteTo(new FlatFileItemWriter<Product>("output.csv").WithToken(','))
.WithChunkSize(50))
.Build();
await job.RunAsync();
Multi-step with Tasklet
var job = Job.CreateBuilder("ETL")
.AddStep("extract-transform", step => step
.ReadFrom(new DbReader<Order>(sourceDb, q => q.OrderBy(o => o.Id)))
.ProcessWith(o => new OrderDto { Id = o.Id, Total = o.Total })
.WriteTo(new FlatFileItemWriter<OrderDto>("orders.csv"))
.WithChunkSize(100))
.AddStep("notify", step => step
.Execute(() => SendEmailAsync()))
.Build();
await job.RunAsync();
Lambda-Only (No Extra Classes)
var job = Job.CreateBuilder("quick-job")
.AddStep("transform", step => step
.ReadFrom(new CsvReader<Product>("data.csv", row => new Product
{
Name = row.GetString("Name"),
Price = row.GetDecimal("Price")
}))
.ProcessWith(p => new Product { Name = p.Name.ToUpper(), Price = p.Price })
.WriteTo(async items =>
{
foreach (var item in items)
Console.WriteLine(item);
}))
.Build();
await job.RunAsync();
Background Job with DI
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddDbContext<AppDbContext>(o => o.UseSqlServer(connStr));
builder.Services.AddNBatch(nbatch =>
{
// Run once on startup
nbatch.AddJob("seed-data", job => job
.AddStep("seed", step => step
.Execute(() => SeedDatabaseAsync())))
.RunOnce();
// Run every hour
nbatch.AddJob("hourly-sync", (sp, job) => job
.AddStep("sync", step => step
.ReadFrom(new DbReader<Order>(sp.GetRequiredService<AppDbContext>(), q => q.OrderBy(o => o.Id)))
.WriteTo(new FlatFileItemWriter<Order>("orders.csv"))
.WithChunkSize(200)))
.RunEvery(TimeSpan.FromHours(1));
});
var app = builder.Build();
app.Run();
Contributing
- Fork the repo
- Create a feature branch:
git checkout -b my-feature - Commit your changes:
git commit -m "Add my feature" - Push:
git push origin my-feature - Open a pull request
License
MIT – Copyright – Tenzin Kabsang