Declarative, step-based pipelines for ETL jobs, data migrations, and scheduled tasks.

NBatch NuGet Version NBatch.EntityFrameworkCore NuGet Version NuGet Downloads License


Why NBatch?

Wire up readers, processors, and writers — NBatch handles chunking, error skipping, progress tracking, and restart-from-failure so you can focus on your business logic.

Highlights

Feature Description
Chunk-oriented processing Read, transform, and write data in configurable batches
Skip policies Keep the job running when a record is malformed; skip it and move on
Restart from failure SQL-backed job store tracks progress so a crashed job resumes where it left off
Tasklet steps Fire-and-forget units of work (send an email, call an API, run a stored proc)
Lambda-friendly Processors and writers can be plain lambdas; no extra classes required
DI & hosted service First-class IServiceCollection integration with AddNBatch(), RunOnce(), and RunEvery()
Multi-target Supports .NET 8, .NET 9, and .NET 10
Provider-agnostic SQL Server, PostgreSQL, SQLite, or MySQL for the job store; any EF Core provider for your data

Packages

Package Description
NBatch Core framework — job builder, chunking, skip policies, readers, writers, DI, hosted service
NBatch.EntityFrameworkCore EF Core job store for restart-from-failure (SQL Server, PostgreSQL, SQLite, MySQL)

Quick Start

Install

dotnet add package NBatch
dotnet add package NBatch.EntityFrameworkCore   # optional — only if you need persistent job tracking

Your First Job

var job = Job.CreateBuilder("csv-to-db")
    .AddStep("import", step => step
        .ReadFrom(new CsvReader<Product>(filePath, row => new Product
        {
            Name        = row.GetString("Name"),
            Description = row.GetString("Description"),
            Price       = row.GetDecimal("Price")
        }))
        .WriteTo(new DbWriter<Product>(dbContext))
        .WithSkipPolicy(SkipPolicy.For<FlatFileParseException>(maxSkips: 3))
        .WithChunkSize(100))
    .Build();

await job.RunAsync();

With SQL-Backed Job Store (Restart-from-Failure)

dotnet add package NBatch.EntityFrameworkCore
var job = Job.CreateBuilder("csv-to-db")
    .UseJobStore(connStr, DatabaseProvider.SqlServer)   // from NBatch.EntityFrameworkCore
    .AddStep("import", step => step
        .ReadFrom(new CsvReader<Product>(filePath, mapFn))
        .WriteTo(new DbWriter<Product>(dbContext))
        .WithChunkSize(100))
    .Build();

await job.RunAsync();

With Dependency Injection & Hosted Service

builder.Services.AddNBatch(nbatch =>
{
    nbatch.AddJob("csv-import", (sp, job) => job
        .AddStep("import", step => step
            .ReadFrom(new CsvReader<Product>("data.csv", mapFn))
            .WriteTo(new DbWriter<Product>(sp.GetRequiredService<AppDbContext>()))
            .WithChunkSize(100)))
        .RunEvery(TimeSpan.FromHours(6));
});

Documentation

Page Description
Core Concepts Jobs, steps, chunk processing, and the pipeline model
Readers & Writers Built-in components for CSV, database, and flat-file I/O
Skip Policies Error tolerance and skip-limit configuration
Job Store SQL-backed progress tracking and restart-from-failure
DI & Hosted Service AddNBatch(), IJobRunner, RunOnce(), RunEvery()
Listeners Job and step lifecycle hooks for logging and monitoring
API Reference Interfaces, builders, and result types
Examples Real-world usage patterns and recipes

More Examples

Database to File

var job = Job.CreateBuilder("db-to-file")
    .AddStep("export", step => step
        .ReadFrom(new DbReader<Product>(dbContext, q => q.OrderBy(p => p.Id)))
        .WriteTo(new FlatFileItemWriter<Product>("output.csv").WithToken(','))
        .WithChunkSize(50))
    .Build();

await job.RunAsync();

Multi-step with Tasklet

var job = Job.CreateBuilder("ETL")
    .AddStep("extract-transform", step => step
        .ReadFrom(new DbReader<Order>(sourceDb, q => q.OrderBy(o => o.Id)))
        .ProcessWith(o => new OrderDto { Id = o.Id, Total = o.Total })
        .WriteTo(new FlatFileItemWriter<OrderDto>("orders.csv"))
        .WithChunkSize(100))
    .AddStep("notify", step => step
        .Execute(() => SendEmailAsync()))
    .Build();

await job.RunAsync();

Lambda-Only (No Extra Classes)

var job = Job.CreateBuilder("quick-job")
    .AddStep("transform", step => step
        .ReadFrom(new CsvReader<Product>("data.csv", row => new Product
        {
            Name  = row.GetString("Name"),
            Price = row.GetDecimal("Price")
        }))
        .ProcessWith(p => new Product { Name = p.Name.ToUpper(), Price = p.Price })
        .WriteTo(async items =>
        {
            foreach (var item in items)
                Console.WriteLine(item);
        }))
    .Build();

await job.RunAsync();

Background Job with DI

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddDbContext<AppDbContext>(o => o.UseSqlServer(connStr));

builder.Services.AddNBatch(nbatch =>
{
    // Run once on startup
    nbatch.AddJob("seed-data", job => job
        .AddStep("seed", step => step
            .Execute(() => SeedDatabaseAsync())))
        .RunOnce();

    // Run every hour
    nbatch.AddJob("hourly-sync", (sp, job) => job
        .AddStep("sync", step => step
            .ReadFrom(new DbReader<Order>(sp.GetRequiredService<AppDbContext>(), q => q.OrderBy(o => o.Id)))
            .WriteTo(new FlatFileItemWriter<Order>("orders.csv"))
            .WithChunkSize(200)))
        .RunEvery(TimeSpan.FromHours(1));
});

var app = builder.Build();
app.Run();

Contributing

  1. Fork the repo
  2. Create a feature branch: git checkout -b my-feature
  3. Commit your changes: git commit -m "Add my feature"
  4. Push: git push origin my-feature
  5. Open a pull request

License

MIT – Copyright – Tenzin Kabsang


Back to top

NBatch — lightweight batch processing for .NET.