Introdução

Todo desenvolvedor .NET que trabalha com aplicações de médio ou grande porte já esbarrou em pelo menos um destes cenários: uma rotina noturna que precisa gravar 50 mil pedidos no banco, uma consulta de relatório que traz 300 mil registros de uma única vez e trava a aplicação, ou uma API que demora 8 segundos para responder porque o EF Core está executando 10 mil INSERT individuais em sequência.

O SQL Server e o Oracle são bancos robustos e maduros, mas nenhum deles foi projetado para receber milhares de gravações em rajada, nem para retornar centenas de milhares de linhas de uma só vez sem custo. O problema, na maioria dos casos, não é o banco — é a forma como a aplicação C# interage com ele.

Neste artigo vamos explorar as duas faces do gargalo: escrita em massa e leitura de grandes volumes. Para cada uma, você vai ver a causa raiz, as particularidades de SQL Server e Oracle com EF Core 8.0+, e a solução prática — mensageria para gravação e paginação eficiente para leitura. Todo o código é produção-ready e compatível com .NET 8/9.

Pré-requisitos: Conhecimento básico de C# e EF Core. Recomenda-se ter lido o artigo sobre programação assíncrona com C# antes de continuar.

📦 Código-fonte: A implementação completa deste artigo está no repositório blog-zocateli-sample no GitHub. Clone, explore e adapte ao seu contexto.


O Gargalo de Escrita: Por Que Gravar Milhares de Registros Dói

O Custo Real de Cada INSERT

Quando você salva uma lista de entidades com o EF Core da forma mais comum, algo assim acontece:

1
2
3
4
5
6
// ❌ Abordagem ingênua — InsertOne por vez
foreach (var pedido in listaDe50MilPedidos)
{
    await context.Pedidos.AddAsync(pedido);
    await context.SaveChangesAsync(); // PROBLEMA: 1 roundtrip por registro!
}

Cada SaveChangesAsync() dentro do loop representa:

  1. Uma transação aberta → commit → fechada no banco
  2. Um roundtrip de rede (latência de 1–5ms por chamada)
  3. Log do banco de dados sendo escrito para cada operação
  4. Locks de linha sendo alocados e liberados 50 mil vezes

Para 50.000 pedidos com 2ms de latência por roundtrip, isso representa 100 segundos de processamento puro de I/O. E isso é no melhor cenário, sem contenção.

Particularidades: SQL Server vs Oracle

Ainda que a solução seja similar nos dois bancos, há diferenças importantes:

AspectoSQL ServerOracle
Bulk Insert nativoBULK INSERT / SqlBulkCopyODP.NET BulkCopy / INSERT ALL
Tamanho máximo de lote padrão1.000 linhas por INSERTDepende do ArrayBindCount
Sequências / IdentityIDENTITY ou SEQUENCEApenas SEQUENCE (obrigatório)
Rollback de bulkPode ser minimamente logadoSempre logado (redo log)
EF Core providerMicrosoft.EntityFrameworkCore.SqlServerOracle.EntityFrameworkCore

No Oracle, um detalhe crítico é que o provider oficial (Oracle.EntityFrameworkCore 8.x) exige que você configure a geração de chaves via SEQUENCE + TRIGGER (ou GENERATED ALWAYS AS IDENTITY no Oracle 12c+). Ignorar isso em gravações em massa vai gerar N chamadas extras ao banco só para obter os IDs.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// Oracle: configuração correta no OnModelCreating
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
    modelBuilder.Entity<Pedido>(entity =>
    {
        entity.HasKey(e => e.Id);

        // Oracle 12c+: identity nativo, evita roundtrip extra por ID
        entity.Property(e => e.Id)
              .UseIdentityColumn();

        // Alternativa para Oracle 11g/legado
        // entity.Property(e => e.Id)
        //       .HasDefaultValueSql("SEQ_PEDIDOS.NEXTVAL");
    });
}

A Solução Imediata: SaveChanges em Lote com EF Core 8

Antes de introduzir filas, a primeira otimização é mover o SaveChangesAsync() para fora do loop e usar o chunk para não sobrecarregar o contexto:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// ✅ Melhor: batch por chunk + 1 SaveChanges por lote
public async Task GravarPedidosEmLoteAsync(
    IEnumerable<Pedido> pedidos,
    CancellationToken ct = default)
{
    const int tamanhoLote = 500;

    foreach (var chunk in pedidos.Chunk(tamanhoLote))
    {
        await context.Pedidos.AddRangeAsync(chunk, ct);
        await context.SaveChangesAsync(ct);

        // Limpar o ChangeTracker para não acumular entidades em memória
        context.ChangeTracker.Clear();
    }
}

💡 Dica: O método Chunk() foi introduzido no .NET 6. Combinado com ChangeTracker.Clear(), evita que o contexto EF Core cresça indefinidamente ao rastrear dezenas de milhares de entidades.

ExecuteInsertAsync e BulkInsert no EF Core 8

O EF Core 7 trouxe ExecuteUpdateAsync e ExecuteDeleteAsync. O EF Core 8 melhorou o suporte a operações em massa. Para cenários de altíssima performance, use a biblioteca EFCore.BulkExtensions:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// ✅ BulkInsert para SQL Server e Oracle (via EFCore.BulkExtensions)
// dotnet add package EFCore.BulkExtensions
public async Task BulkInsertPedidosAsync(
    List<Pedido> pedidos,
    CancellationToken ct = default)
{
    var bulkConfig = new BulkConfig
    {
        BatchSize = 1000,
        UseTempDB = true,              // SQL Server: tabela temporária para staging
        SetOutputIdentity = true,      // Preenche os IDs gerados pelo banco
        PreserveInsertOrder = true
    };

    await context.BulkInsertAsync(pedidos, bulkConfig, cancellationToken: ct);
}

Com BulkInsert, 50.000 registros que levavam 100 segundos com SaveChanges individual passam a ser gravados em 2–4 segundos no SQL Server, e em 3–6 segundos no Oracle.


Mensageria: A Solução Arquitetural para Gravação em Massa

Otimizar o próprio INSERT resolve o sintoma, mas não a causa raiz. O problema real é que a aplicação está tentando processar um volume enorme de forma síncrona, bloqueando a thread e a requisição enquanto grava. A solução arquitetural correta é desacoplar a recepção da gravação usando uma fila de mensagens.

Como a Mensageria Resolve o Problema

Em vez de gravar diretamente no banco ao receber os dados, a aplicação:

  1. Publica os registros em uma fila (RabbitMQ, Azure Service Bus, etc.) — operação rápida (~1ms por mensagem)
  2. Retorna imediatamente para o cliente com 202 Accepted
  3. Um Consumer separado lê a fila em lotes e grava no banco com bulk insert

Arquitetura de mensageria para gravação em massa no banco de dados

Essa separação traz benefícios além da performance:

  • Resiliência: se o banco cair temporariamente, as mensagens ficam na fila. Naão há perda de dados.
  • Controle de fluxo: o consumer pode processar na velocidade que o banco suporta
  • Backpressure natural: a fila amortece picos de carga
  • Observabilidade: você pode monitorar o tamanho da fila e saber exatamente o backlog pendente

Implementação com RabbitMQ.Client

A biblioteca oficial RabbitMQ.Client é 100% open-source (Apache 2.0) e fornece acesso direto ao protocolo AMQP. Combinada com o BackgroundService do .NET, implementamos um consumer que acumula mensagens em lote e grava tudo de uma vez com BulkInsert.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// dotnet add package RabbitMQ.Client
// dotnet add package EFCore.BulkExtensions

// --- Mensagem ---
public record PedidoCriadoMessage(
    Guid Id,
    string ClienteId,
    decimal Valor,
    DateTime DataCriacao);

// --- Producer ---
public class PedidoProducer(IConnection connection)
{
    private const string QueueName = "pedidos-queue";

    public async Task PublicarAsync(
        PedidoCriadoMessage mensagem,
        CancellationToken ct = default)
    {
        await using var channel = await connection.CreateChannelAsync(cancellationToken: ct);

        await channel.QueueDeclareAsync(
            queue: QueueName,
            durable: true,
            exclusive: false,
            autoDelete: false,
            cancellationToken: ct);

        var json = JsonSerializer.SerializeToUtf8Bytes(mensagem);
        var props = new BasicProperties { DeliveryMode = DeliveryModes.Persistent };

        await channel.BasicPublishAsync(
            exchange: string.Empty,
            routingKey: QueueName,
            mandatory: false,
            basicProperties: props,
            body: json,
            cancellationToken: ct);
    }
}

// --- Consumer em lote (BackgroundService) ---
public class PedidoConsumerWorker(
    IConnection connection,
    IServiceScopeFactory scopeFactory,
    ILogger<PedidoConsumerWorker> logger) : BackgroundService
{
    private const string QueueName = "pedidos-queue";
    private const int TamanhoLote = 500;

    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        await using var channel = await connection.CreateChannelAsync(cancellationToken: ct);

        await channel.QueueDeclareAsync(
            queue: QueueName, durable: true,
            exclusive: false, autoDelete: false,
            cancellationToken: ct);

        // Backpressure: limita mensagens em voo para não sobrecarregar a memória
        await channel.BasicQosAsync(
            prefetchSize: 0,
            prefetchCount: TamanhoLote + 100,
            global: false,
            cancellationToken: ct);

        var lote = new List<(ulong Tag, PedidoCriadoMessage Msg)>();

        var consumer = new AsyncEventingBasicConsumer(channel);
        consumer.ReceivedAsync += async (_, ea) =>
        {
            var msg = JsonSerializer.Deserialize<PedidoCriadoMessage>(ea.Body.Span)!;
            lote.Add((ea.DeliveryTag, msg));

            if (lote.Count >= TamanhoLote)
                await ProcessarLoteAsync(channel, lote, ct);
        };

        await channel.BasicConsumeAsync(
            queue: QueueName,
            autoAck: false,
            consumer: consumer,
            cancellationToken: ct);

        // Timer de flush: força o processamento mesmo que o lote não esteja cheio
        using var timer = new PeriodicTimer(TimeSpan.FromSeconds(5));
        while (await timer.WaitForNextTickAsync(ct))
        {
            if (lote.Count > 0)
                await ProcessarLoteAsync(channel, lote, ct);
        }
    }

    private async Task ProcessarLoteAsync(
        IChannel channel,
        List<(ulong Tag, PedidoCriadoMessage Msg)> lote,
        CancellationToken ct)
    {
        var snapshot = lote.ToList();
        lote.Clear();

        var pedidos = snapshot.Select(x => new Pedido
        {
            Id           = x.Msg.Id,
            ClienteId    = x.Msg.ClienteId,
            Valor        = x.Msg.Valor,
            DataCriacao  = x.Msg.DataCriacao
        }).ToList();

        using var scope    = scopeFactory.CreateScope();
        var dbContext       = scope.ServiceProvider.GetRequiredService<AppDbContext>();
        var bulkConfig      = new BulkConfig { BatchSize = 500 };

        await dbContext.BulkInsertAsync(pedidos, bulkConfig, cancellationToken: ct);

        // ACK múltiplo: confirma todo o lote de uma só vez
        await channel.BasicAckAsync(
            deliveryTag: snapshot[^1].Tag,
            multiple: true,
            cancellationToken: ct);

        logger.LogInformation("Lote de {Count} pedidos gravado", pedidos.Count);
    }
}

Registrando no Program.cs

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// Program.cs
// dotnet add package RabbitMQ.Client
builder.Services.AddSingleton<IConnection>(_ =>
{
    var factory = new ConnectionFactory
    {
        HostName = "localhost",
        UserName = "guest",
        Password = "guest"
    };
    // CreateConnectionAsync retorna Task — resolvemos aqui para o DI container
    return factory.CreateConnectionAsync().GetAwaiter().GetResult();
});

builder.Services.AddScoped<PedidoProducer>();
builder.Services.AddHostedService<PedidoConsumerWorker>();

⚠️ Atenção: No Oracle, o BulkInsert do EFCore.BulkExtensions requer o Oracle Data Provider for .NET (ODP.NET). Certifique-se de registrar o provider correto e de que as sequences foram configuradas corretamente no modelo, ou a inserção em massa vai falhar com erro de constraint de chave primária.


RabbitMQ vs Azure Service Bus: Qual Escolher?

As duas opções resolvem o mesmo problema — desacoplar produção e consumo de mensagens — mas com modelos operacionais e econômicos bem distintos. A escolha depende do seu contexto: infraestrutura self-hosted vs cloud managed, custo variável vs fixo, e grau de controle desejado.

CritérioRabbitMQAzure Service Bus
TipoOpen-source (MPL 2.0), self-hostedPaaS gerenciado pela Microsoft
ProtocoloAMQP 0-9-1 (nativo), AMQP 1.0, MQTT, STOMPAMQP 1.0
HospedagemContainer próprio, VM, KubernetesAzure (sem infra para gerenciar)
CustoInfraestrutura + operação (seu time)Pay-per-use (~$0,10/milhão de ops)
FilasQueues, Exchanges, BindingsQueues e Topics/Subscriptions
Tamanho máximo de mensagem128 MB (padrão)256 KB (Standard) / 100 MB (Premium)
Retenção de mensagensAté o disco encher / TTL configurável14 dias (máximo)
Dead-letter queue✅ Configurável✅ Nativo
Sessions (ordenação garantida)✅ Via x-single-active-consumer✅ Service Bus Sessions
Retry automáticoManual (Dead-letter + requeue)Nativo (MaxDeliveryCount)
Escalabilidade horizontalManual (cluster Erlang)Automática
Biblioteca .NETRabbitMQ.Client (Apache 2.0)Azure.Messaging.ServiceBus (MIT)
Melhor paraOn-premise, multi-cloud, custo controladoEcossistema Azure, equipe pequena, SLA garantido

Implementação Equivalente com Azure Service Bus

Para migrar do RabbitMQ para o Azure Service Bus em ambiente Azure, o padrão de producer/consumer é similar, usando Azure.Messaging.ServiceBus:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
// dotnet add package Azure.Messaging.ServiceBus

// --- Producer com envio em lote nativo ---
public class PedidoServiceBusProducer(ServiceBusSender sender)
{
    public async Task PublicarLoteAsync(
        IEnumerable<PedidoCriadoMessage> mensagens,
        CancellationToken ct = default)
    {
        // O ServiceBusMessageBatch respeita o limite de tamanho automaticamente
        using var batch = await sender.CreateMessageBatchAsync(ct);

        foreach (var msg in mensagens)
        {
            var sbMsg = new ServiceBusMessage(
                BinaryData.FromObjectAsJson(msg))
            {
                ContentType = "application/json",
                MessageId   = msg.Id.ToString()
            };

            if (!batch.TryAddMessage(sbMsg))
                throw new InvalidOperationException(
                    $"Mensagem {msg.Id} excede o limite do lote");
        }

        await sender.SendMessagesAsync(batch, ct);
    }
}

// --- Consumer com BackgroundService ---
public class PedidoServiceBusWorker(
    ServiceBusProcessor processor,
    IServiceScopeFactory scopeFactory,
    ILogger<PedidoServiceBusWorker> logger) : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        processor.ProcessMessageAsync += async args =>
        {
            var msg = args.Message.Body
                          .ToObjectFromJson<PedidoCriadoMessage>();

            using var scope   = scopeFactory.CreateScope();
            var dbContext      = scope.ServiceProvider
                                      .GetRequiredService<AppDbContext>();
            var pedido         = new Pedido
            {
                Id          = msg.Id,
                ClienteId   = msg.ClienteId,
                Valor       = msg.Valor,
                DataCriacao = msg.DataCriacao
            };

            await dbContext.BulkInsertAsync(
                new List<Pedido> { pedido },
                cancellationToken: ct);

            // Confirma o processamento — remove da fila
            await args.CompleteMessageAsync(args.Message, ct);
        };

        processor.ProcessErrorAsync += args =>
        {
            logger.LogError(args.Exception,
                "Erro ao processar mensagem do Service Bus");
            return Task.CompletedTask;
        };

        await processor.StartProcessingAsync(ct);
        await Task.Delay(Timeout.Infinite, ct);
    }

    public override async Task StopAsync(CancellationToken ct)
    {
        await processor.StopProcessingAsync(ct);
        await base.StopAsync(ct);
    }
}

// Program.cs
builder.Services.AddSingleton(provider =>
{
    var client = new ServiceBusClient(
        "Endpoint=sb://meu-namespace.servicebus.windows.net/;...");
    return client.CreateSender("pedidos-queue");
});

builder.Services.AddSingleton(provider =>
{
    var client = new ServiceBusClient(
        "Endpoint=sb://meu-namespace.servicebus.windows.net/;...");
    return client.CreateProcessor("pedidos-queue", new ServiceBusProcessorOptions
    {
        MaxConcurrentCalls   = 4,   // Paralelismo no consumer
        AutoCompleteMessages = false // Controle manual de ACK
    });
});

builder.Services.AddHostedService<PedidoServiceBusWorker>();

💡 Dica: Em produção no Azure, prefira autenticação via Managed Identity em vez de connection string, usando new ServiceBusClient("meu-namespace.servicebus.windows.net", new DefaultAzureCredential()). Isso elimina segredos na configuração.


O Gargalo de Leitura: Por Que Trazer Tudo de Uma Vez é Perigoso

O Problema da Query Sem Limite

1
2
3
4
5
// ❌ NUNCA faça isso em produção com tabelas grandes
var todosPedidos = await context.Pedidos
    .Include(p => p.Itens)
    .Include(p => p.Cliente)
    .ToListAsync(); // Pode trazer 500 mil registros para a memória!

Essa query aparentemente inocente pode:

  • Consumir gigabytes de RAM no servidor da aplicação
  • Bloquear o banco com um table scan de longa duração
  • Gerar timeouts em ambientes com alta concorrência
  • Saturar a rede entre aplicação e banco com tráfego desnecessário
  • Travar o GC do .NET com objetos grandes que precisam ir para o LOH (Large Object Heap)

No Oracle, um aspecto adicional é o uso do ROWNUM (Oracle 11g) vs FETCH FIRST … ROWS ONLY (Oracle 12c+), que afeta como a paginação é expressa em SQL. O provider Oracle.EntityFrameworkCore abstrai isso, mas é importante entender o que está sendo gerado.

Tipos de Paginação: Offset vs Keyset (Cursor)

📖 Artigo dedicado: Para um guia completo sobre todas as estratégias de paginação (Offset, Keyset, Cursor Server-Side, Time-based e Token Opaco) com SQL Server, Oracle e PostgreSQL — incluindo código EF Core 8+ para cada combinação —, veja: Paginação em APIs REST com C# e EF Core 8: Todos os Tipos, Todos os Bancos. Esta seção apresenta os conceitos essenciais; o artigo linked aprofunda quando e por que usar cada abordagem.

Existem duas estratégias principais de paginação, e cada uma tem casos de uso distintos:

CaracterísticaOffset PaginationKeyset (Cursor) Pagination
SQL geradoOFFSET N ROWS FETCH NEXT MWHERE id > :lastId
Performance em páginas iniciais✅ Rápida✅ Rápida
Performance em páginas tardias❌ Lenta (scan cresce)✅ Constante
Suporte a salto de página✅ Direto❌ Apenas sequencial
Registros novos entre páginas❌ Pode duplicar/omitir✅ Consistente
Caso de uso idealUI com números de páginaScroll infinito / APIs

Paginação por Offset: Simples e Adequada para UIs

A paginação por offset é a mais familiar, e o EF Core a gera corretamente para SQL Server e Oracle:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// Modelo de request padronizado
public record PaginacaoRequest(int Pagina = 1, int TamanhoPagina = 20)
{
    // Garante limites seguros
    public int TamanhoSeguro => Math.Min(Math.Max(TamanhoPagina, 1), 100);
    public int OffsetSeguro  => (Math.Max(Pagina, 1) - 1) * TamanhoSeguro;
}

// Modelo de resposta padronizado
public record PaginaResultado<T>(
    IReadOnlyList<T> Dados,
    int PaginaAtual,
    int TamanhoPagina,
    long TotalRegistros,
    int TotalPaginas);

// Serviço de consulta paginada
public class PedidoQueryService(AppDbContext context)
{
    public async Task<PaginaResultado<PedidoResumoDto>> ListarPedidosAsync(
        PaginacaoRequest paginacao,
        string? clienteId = null,
        CancellationToken ct = default)
    {
        // Construir query base com filtros opcionais
        var query = context.Pedidos
            .AsNoTracking()           // Leitura: sempre AsNoTracking!
            .Where(p => clienteId == null || p.ClienteId == clienteId);

        // COUNT separado para o total (EF Core 8 otimiza isso)
        var total = await query.LongCountAsync(ct);

        // Dados da página
        var dados = await query
            .OrderBy(p => p.DataCriacao)   // Orderby obrigatório para paginação consistente
            .Skip(paginacao.OffsetSeguro)
            .Take(paginacao.TamanhoSeguro)
            .Select(p => new PedidoResumoDto(   // Projection: só os campos necessários
                p.Id,
                p.ClienteId,
                p.Valor,
                p.DataCriacao,
                p.Status))
            .ToListAsync(ct);

        return new PaginaResultado<PedidoResumoDto>(
            Dados: dados,
            PaginaAtual: paginacao.Pagina,
            TamanhoPagina: paginacao.TamanhoSeguro,
            TotalRegistros: total,
            TotalPaginas: (int)Math.Ceiling(total / (double)paginacao.TamanhoSeguro));
    }
}

💡 Dica: Sempre use .Select() com uma projection (DTO ou record), nunca retorne a entidade completa em queries de listagem. Isso reduz o volume de dados transferidos e evita o carregamento de navegações desnecessárias.

SQL Gerado: SQL Server vs Oracle

O EF Core 8 gera SQL correto e otimizado para ambos os bancos:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
-- SQL Server (gerado pelo EF Core 8)
SELECT p.Id, p.ClienteId, p.Valor, p.DataCriacao, p.Status
FROM Pedidos AS p
WHERE p.ClienteId = N'cliente-123'
ORDER BY p.DataCriacao
OFFSET 40 ROWS FETCH NEXT 20 ROWS ONLY;

-- Oracle 12c+ (gerado pelo Oracle.EntityFrameworkCore 8)
SELECT p.Id, p.ClienteId, p.Valor, p.DataCriacao, p.Status
FROM Pedidos p
WHERE p.ClienteId = :clienteId_0
ORDER BY p.DataCriacao
OFFSET 40 ROWS FETCH NEXT 20 ROWS ONLY;

⚠️ Atenção: Para Oracle 11g (sem FETCH FIRST), o provider gera uma query com ROWNUM aninhado, que pode ter custo adicional em tabelas muito grandes. Considere migrar para Oracle 12c+ ou usar views materializadas para relatórios pesados.


Keyset Pagination: Alta Performance para Scroll Infinito e APIs

Quando o usuário navega para a página 500 de um resultado com 10 mil itens, o banco precisa fazer um scan de 10.000 linhas só para pular as primeiras 9.980. Com keyset pagination, você usa o valor da última linha buscada como ponto de partida da próxima consulta:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// Request com cursor (keyset)
public record KeysetRequest(
    Guid? UltimoId = null,
    DateTime? UltimaDataCriacao = null,
    int Limite = 20);

// Resultado com cursor para a próxima página
public record KeysetResultado<T>(
    IReadOnlyList<T> Dados,
    bool TemProximaPagina,
    Guid? ProximoCursorId,
    DateTime? ProximoCursorData);

public class PedidoKeysetService(AppDbContext context)
{
    public async Task<KeysetResultado<PedidoResumoDto>> ListarComCursorAsync(
        KeysetRequest request,
        CancellationToken ct = default)
    {
        // Filtro de keyset: busca registros "após" o cursor
        // Usando (DataCriacao, Id) como chave composta para estabilidade
        var query = context.Pedidos
            .AsNoTracking()
            .Where(p =>
                request.UltimaDataCriacao == null ||
                p.DataCriacao > request.UltimaDataCriacao ||
                (p.DataCriacao == request.UltimaDataCriacao &&
                 p.Id.CompareTo(request.UltimoId!.Value) > 0))
            .OrderBy(p => p.DataCriacao)
            .ThenBy(p => p.Id);

        // Busca Limite + 1 para saber se há próxima página
        var dados = await query
            .Take(request.Limite + 1)
            .Select(p => new PedidoResumoDto(p.Id, p.ClienteId, p.Valor, p.DataCriacao, p.Status))
            .ToListAsync(ct);

        var temProxima = dados.Count > request.Limite;
        if (temProxima) dados.RemoveAt(dados.Count - 1); // Remove o item extra

        var ultimo = dados.LastOrDefault();

        return new KeysetResultado<PedidoResumoDto>(
            Dados: dados,
            TemProximaPagina: temProxima,
            ProximoCursorId: ultimo?.Id,
            ProximoCursorData: ultimo?.DataCriacao);
    }
}

Este padrão mantém performance constante independente do número da página, pois o SQL gerado usa um simples WHERE com índice em vez de OFFSET.

1
2
3
4
5
6
-- SQL gerado (SQL Server / Oracle)
SELECT TOP(21) p.Id, p.ClienteId, p.Valor, p.DataCriacao, p.Status
FROM Pedidos AS p
WHERE p.DataCriacao > '2025-06-01'
   OR (p.DataCriacao = '2025-06-01' AND p.Id > 'abc-123')
ORDER BY p.DataCriacao, p.Id;

💡 Dica: Crie um índice composto nas colunas de keyset para garantir que a query use index seek em vez de table scan:

1
2
3
4
5
-- SQL Server
CREATE INDEX IX_Pedidos_Keyset ON Pedidos (DataCriacao, Id) INCLUDE (ClienteId, Valor, Status);

-- Oracle
CREATE INDEX IX_Pedidos_Keyset ON Pedidos (DataCriacao, Id);

Juntando Tudo: A Arquitetura Completa

Com as duas soluções combinadas, a arquitetura da aplicação fica assim:

  • Escrita: API recebe → publica na fila → retorna 202 Accepted → Consumer processa em lote com BulkInsert
  • Leitura: API recebe request paginado → executa query com Skip/Take (offset) ou WHERE (keyset) → retorna somente os dados necessários
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Endpoint ASP.NET Core — escrita assíncrona
app.MapPost("/pedidos/lote", async (
    [FromBody] List<CriarPedidoRequest> requests,
    PedidoService service,
    CancellationToken ct) =>
{
    var ids = await Task.WhenAll(
        requests.Select(r => service.CriarPedidoAsync(r, ct)));

    // 202 Accepted: os pedidos foram enfileirados, não necessariamente gravados
    return Results.Accepted("/pedidos/status", new { Ids = ids });
});

// Endpoint ASP.NET Core — leitura paginada
app.MapGet("/pedidos", async (
    [AsParameters] PaginacaoRequest paginacao,
    [FromQuery] string? clienteId,
    PedidoQueryService queryService,
    CancellationToken ct) =>
{
    var resultado = await queryService.ListarPedidosAsync(paginacao, clienteId, ct);
    return Results.Ok(resultado);
});

Dicas e Boas Práticas

  • Índices adequados: Toda coluna usada em OrderBy, Where ou keyset cursor deve ter índice. No Oracle, use índices funcionais para colunas com transformações (ex.: UPPER(nome)).
  • AsNoTracking sempre em leituras: Desativa o Change Tracker para queries de leitura, reduzindo uso de memória e CPU em até 30%.
  • Evite Count() em tabelas grandes: Para keyset pagination, você não precisa do total de registros. Para offset, considere cachear o total por alguns segundos.
  • Defina timeouts explícitos: Configure context.Database.SetCommandTimeout(30) ou use WithTimeout por consulta para evitar queries longas bloqueando a aplicação indefinidamente.
  • Monitore o tamanho da fila: Configure alertas para quando a fila ultrapassar N mensagens — é o sinal de que o consumer não está dando conta do volume.
  • Use IAsyncEnumerable para streaming: Para exportações de CSV ou processamento de grandes volumes que não precisam de paginação, use AsAsyncEnumerable() para processar linha por linha sem carregar tudo na memória:
1
2
3
4
5
6
7
8
// Streaming com IAsyncEnumerable — sem carregar tudo na memória
await foreach (var pedido in context.Pedidos
    .AsNoTracking()
    .AsAsyncEnumerable()
    .WithCancellation(ct))
{
    await exportador.EscreverLinhaAsync(pedido, ct);
}
  • Transações explícitas para consistência: Ao usar BulkInsert com múltiplas tabelas (ex.: gravar Pedido e ItensPedido em conjunto), envolva a operação em uma transação explícita para garantir atomicidade.

Conclusão

Os gargalos de banco de dados com EF Core raramente são culpa do banco em si. Na esmagadora maioria dos casos, o problema está na forma como a aplicação interage com ele: inserindo um registro por vez em vez de em lote, e trazendo todo o resultado em vez de paginar.

A combinação de mensageria (RabbitMQ ou Azure Service Bus) para desacoplar gravação em massa e paginação eficiente (offset para UIs, keyset para APIs e scroll infinito) resolve os dois gargalos de forma elegante, escalável e resiliente. Com EF Core 8.0+ e as técnicas apresentadas aqui, você consegue suportar volumes muito maiores sem mudar a estrutura do banco, apenas ajustando a forma como a aplicação se comunica com ele.

O próximo passo natural é observabilidade: instrumentar as queries lentas com Application Performance Monitoring (APM), criar alertas para queries acima de N segundos, e mapear os índices faltantes via Query Store (SQL Server) ou AWR (Oracle).


Leia Também


Referências