Em sistemas que utilizam processamento assíncrono, como chamadas gRPC não bloqueantes, é comum encontrar cenários onde o gerenciamento inadequado de pools de threads pode levar a problemas de desempenho e até mesmo a um OutOfMemoryError (OOM). Um caso típico ocorre quando o mesmo pool de threads é utilizado tanto para enviar requisições quanto para processar as respostas. Isso pode resultar em um acúmulo de tarefas pendentes que consomem toda a memória disponível, causando um OOM.
Esse artigo descreve o problema que eu passei a semana investigando e como podemos atacá-lo! Vamos lá 👊!
Simulando o problema
Vamos considerar um exemplo onde um FixedThreadPool é utilizado para enviar milhões de requisições a um "fake" cliente gRPC e, simultaneamente, processar as respostas dessas requisições. O código abaixo demonstra como esse cenário pode levar a um OOM:
publicclassThreadPoolsOOMExample{privatestaticfinalintTHREAD_POOL_SIZE=10;privatestaticfinalintTOTAL_TASKS=1_000_000;privatefinalExecutorServicemainExecutor=Executors.newFixedThreadPool(THREAD_POOL_SIZE);publicstaticvoidmain(String[]args){ThreadPoolsOOMExampleexample=newThreadPoolsOOMExample();example.run();}publicvoidrun(){for(inti=0;i<TOTAL_TASKS;i++){mainExecutor.submit(this::submitRequest);}}privatevoidsubmitRequest(){// Simula o envio de uma requisição para o cliente gRPCCompletableFuture<Response>future=asyncGrpcCall();// Processa a resposta usando o mesmo executorfuture.thenApplyAsync(this::processResponse,mainExecutor);}privateCompletableFuture<Response>asyncGrpcCall(){// Simula uma chamada gRPC assíncronaCompletableFuture<Response>future=newCompletableFuture<>();newThread(()->{try{Thread.sleep(100);// Simula o atraso da redefuture.complete(newResponse(512));}catch(InterruptedExceptione){future.completeExceptionally(e);}}).start();intqueueSize=((ThreadPoolExecutor)mainExecutor).getQueue().size();System.out.println("Current queue size: "+queueSize);printHeapSize();returnfuture;}privateResponseprocessResponse(Responseresponse){// Processa a resposta do cliente gRPCSystem.out.println("Processando resposta");returnresponse;}privatevoidprintHeapSize(){Runtimeruntime=Runtime.getRuntime();longtotalMemory=runtime.totalMemory();longfreeMemory=runtime.freeMemory();longusedMemory=totalMemory-freeMemory;longmaxMemory=runtime.maxMemory();System.out.println("Heap size (total): "+totalMemory/(1024*1024)+" MB");System.out.println("Heap size (used): "+usedMemory/(1024*1024)+" MB");System.out.println("Heap size (max): "+maxMemory/(1024*1024)+" MB");}publicstaticclassResponse{privatebyte[]data;publicResponse(intsizeInKB){this.data=newbyte[sizeInKB*1024];// 1 KB = 1024 bytes}publicbyte[]getData(){returndata;}}}
Neste exemplo, o mainExecutor é utilizado tanto para enviar requisições quanto para processar as respostas. Como o número de tarefas é muito grande (TOTAL_TASKS = 1000000), as respostas ficam acumuladas no final da fila do mainExecutor, esperando que todas as requisições sejam enviadas primeiro. Isso leva a um acúmulo de respostas pendentes, consumindo muita memória e eventualmente causando um OutOfMemoryError.
Possíveis soluções
1. Use Pools de Threads Separados
Uma solução eficaz é utilizar pools de threads separados para a produção de requisições e o processamento de respostas. Isso garante que as respostas possam ser processadas independentemente do envio de novas requisições, evitando o acúmulo de tarefas na fila.
packagecom.hugodesmarques.threads;importjava.util.concurrent.CompletableFuture;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.ThreadPoolExecutor;publicclassSeparateThreadPoolsExample{privatestaticfinalintPRODUCER_THREAD_POOL_SIZE=10;privatestaticfinalintCONSUMER_THREAD_POOL_SIZE=10;privatestaticfinalintTOTAL_TASKS=1_000_000;privatefinalExecutorServiceproducerThreadPool=Executors.newFixedThreadPool(PRODUCER_THREAD_POOL_SIZE);privatefinalExecutorServiceconsumerThreadPool=Executors.newFixedThreadPool(CONSUMER_THREAD_POOL_SIZE);publicstaticvoidmain(String[]args){SeparateThreadPoolsExampleexample=newSeparateThreadPoolsExample();example.run();}publicvoidrun(){for(inti=0;i<TOTAL_TASKS;i++){producerThreadPool.submit(this::submitRequest);intproducerQueueSize=((ThreadPoolExecutor)producerThreadPool).getQueue().size();System.out.println("Producer queue size: "+producerQueueSize);}}privatevoidsubmitRequest(){// Simula o envio de uma requisição para o cliente gRPCCompletableFuture<Response>future=asyncGrpcCall();// Processa a resposta usando o mesmo executorfuture.thenApplyAsync(this::processResponse,consumerThreadPool);}privateCompletableFuture<Response>asyncGrpcCall(){// Simula uma chamada gRPC assíncronaCompletableFuture<Response>future=newCompletableFuture<>();newThread(()->{try{Thread.sleep(100);// Simula o atraso da redefuture.complete(newResponse(512));}catch(InterruptedExceptione){future.completeExceptionally(e);}}).start();intconsumerQueueSize=((ThreadPoolExecutor)consumerThreadPool).getQueue().size();System.out.println("Consumer queue size: "+consumerQueueSize);printHeapSize();returnfuture;}privateResponseprocessResponse(Responseresponse){// Processa a resposta do cliente gRPCSystem.out.println("Processando resposta");returnresponse;}privatevoidprintHeapSize(){Runtimeruntime=Runtime.getRuntime();longtotalMemory=runtime.totalMemory();longfreeMemory=runtime.freeMemory();longusedMemory=totalMemory-freeMemory;longmaxMemory=runtime.maxMemory();System.out.println("Heap size (total): "+totalMemory/(1024*1024)+" MB");System.out.println("Heap size (used): "+usedMemory/(1024*1024)+" MB");System.out.println("Heap size (max): "+maxMemory/(1024*1024)+" MB");}publicstaticclassResponse{privatebyte[]data;publicResponse(intsizeInKB){this.data=newbyte[sizeInKB*1024];// 1 KB = 1024 bytes}publicbyte[]getData(){returndata;}}}
2. Use um Executor Com Capacidade Limitada
Outra abordagem é limitar a capacidade do ThreadPoolExecutor para evitar que ele aceite mais tarefas do que pode processar. Isso pode ser feito usando um BlockingQueue com capacidade limitada e uma política de rejeição adequada.
packagecom.hugodesmarques.threads;importjava.util.concurrent.ArrayBlockingQueue;importjava.util.concurrent.CompletableFuture;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.RejectedExecutionHandler;importjava.util.concurrent.ThreadFactory;importjava.util.concurrent.ThreadPoolExecutor;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.atomic.AtomicInteger;publicclassThreadPoolWithLimitsExample{privatestaticfinalintTOTAL_TASKS=1_000_000;privatestaticfinalintTHREAD_POOL_SIZE=10;privatestaticfinalintQUEUE_CAPACITY=100;privatefinalExecutorServicemainExecutor=newThreadPoolExecutor(THREAD_POOL_SIZE,THREAD_POOL_SIZE,0L,TimeUnit.MILLISECONDS,newArrayBlockingQueue<>(QUEUE_CAPACITY),newNamedThreadFactory("Producer"),// ThreadFactory com expressão lambda)newRejectionLoggingPolicy(newThreadPoolExecutor.CallerRunsPolicy())// Política de rejeição com log);publicstaticvoidmain(String[]args){ThreadPoolWithLimitsExampleexample=newThreadPoolWithLimitsExample();example.run();}publicvoidrun(){for(inti=0;i<TOTAL_TASKS;i++){mainExecutor.submit(this::submitRequest);}}privatevoidsubmitRequest(){// Simula o envio de uma requisição para o cliente gRPCCompletableFuture<Response>future=asyncGrpcCall();// Processa a resposta usando o mesmo executorfuture.thenApplyAsync(this::processResponse,mainExecutor);}privateCompletableFuture<Response>asyncGrpcCall(){// Simula uma chamada gRPC assíncronaCompletableFuture<Response>future=newCompletableFuture<>();newThread(()->{try{Thread.sleep(100);// Simula o atraso da redefuture.complete(newResponse(512));}catch(InterruptedExceptione){future.completeExceptionally(e);}}).start();intqueueSize=((ThreadPoolExecutor)mainExecutor).getQueue().size();System.out.println("Current queue size: "+queueSize);printHeapSize();returnfuture;}privateResponseprocessResponse(Responseresponse){// Processa a resposta do cliente gRPCSystem.out.println("Processando resposta... thread: "+Thread.currentThread().getName());returnresponse;}privatevoidprintHeapSize(){Runtimeruntime=Runtime.getRuntime();longtotalMemory=runtime.totalMemory();longfreeMemory=runtime.freeMemory();longusedMemory=totalMemory-freeMemory;longmaxMemory=runtime.maxMemory();System.out.println("Heap size (total): "+totalMemory/(1024*1024)+" MB");System.out.println("Heap size (used): "+usedMemory/(1024*1024)+" MB");System.out.println("Heap size (max): "+maxMemory/(1024*1024)+" MB");}// Política de rejeição personalizada que registra um log quando uma tarefa é rejeitadastaticclassRejectionLoggingPolicyimplementsRejectedExecutionHandler{privatefinalRejectedExecutionHandlerhandler;publicRejectionLoggingPolicy(RejectedExecutionHandlerhandler){this.handler=handler;}@OverridepublicvoidrejectedExecution(Runnabler,ThreadPoolExecutorexecutor){System.out.println("Tarefa rejeitada: "+r.toString()+" thread: "+Thread.currentThread().getName());handler.rejectedExecution(r,executor);}}// ThreadFactory personalizado para nomear threadsstaticclassNamedThreadFactoryimplementsThreadFactory{privatefinalAtomicIntegerthreadNumber=newAtomicInteger(1);privatefinalStringnamePrefix;publicNamedThreadFactory(StringnamePrefix){this.namePrefix=namePrefix;}@OverridepublicThreadnewThread(Runnabler){returnnewThread(r,namePrefix+"-thread-"+threadNumber.getAndIncrement());}}publicstaticclassResponse{privatebyte[]data;publicResponse(intsizeInKB){this.data=newbyte[sizeInKB*1024];// 1 KB = 1024 bytes}publicbyte[]getData(){returndata;}}}
3. Controle o fluxo de produção com o uso de semáforos
Implementar controle de fluxo é uma abordagem eficaz para garantir que a produção de novas requisições não ultrapasse a capacidade do sistema de processar respostas. Isso pode ser feito usando semáforos (Semaphore) ou outras técnicas de controle de fluxo. A ideia é limitar o número de requisições simultâneas para evitar sobrecarregar o sistema.
packagecom.hugodesmarques.threads;importjava.util.concurrent.CompletableFuture;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.Semaphore;publicclassThreadPoolsWithSemaphores{privatestaticfinalintTHREAD_POOL_SIZE=10;privatestaticfinalintTOTAL_TASKS=1_000_000;privatestaticfinalintMAX_CONCURRENT_REQUESTS=100;privatefinalSemaphoresemaphore=newSemaphore(MAX_CONCURRENT_REQUESTS);privatefinalExecutorServicemainExecutor=Executors.newFixedThreadPool(THREAD_POOL_SIZE);publicstaticvoidmain(String[]args){ThreadPoolsWithSemaphoresexample=newThreadPoolsWithSemaphores();example.run();}publicvoidrun(){for(inti=0;i<TOTAL_TASKS;i++){try{System.out.println("Submetendo request: "+i);semaphore.acquire();mainExecutor.submit(this::submitRequest);}catch(InterruptedExceptione){Thread.currentThread().interrupt();}}}privatevoidsubmitRequest(){try{// Simula o envio de uma requisição para o cliente gRPCCompletableFuture<Response>future=asyncGrpcCall();// Processa a resposta usando o mesmo executorfuture.thenApplyAsync(this::processResponse,mainExecutor).whenComplete((result,ex)->semaphore.release());}catch(Exceptione){semaphore.release();}}privateCompletableFuture<Response>asyncGrpcCall(){// Simula uma chamada gRPC assíncronaCompletableFuture<Response>future=newCompletableFuture<>();newThread(()->{try{Thread.sleep(100);// Simula o atraso da redefuture.complete(newResponse(512));}catch(InterruptedExceptione){future.completeExceptionally(e);}}).start();printHeapSize();returnfuture;}privateResponseprocessResponse(Responseresponse){// Processa a resposta do cliente gRPCSystem.out.println("Processando resposta");returnresponse;}privatevoidprintHeapSize(){Runtimeruntime=Runtime.getRuntime();longtotalMemory=runtime.totalMemory();longfreeMemory=runtime.freeMemory();longusedMemory=totalMemory-freeMemory;longmaxMemory=runtime.maxMemory();System.out.println("Heap size (total): "+totalMemory/(1024*1024)+" MB");System.out.println("Heap size (used): "+usedMemory/(1024*1024)+" MB");System.out.println("Heap size (max): "+maxMemory/(1024*1024)+" MB");}publicstaticclassResponse{privatebyte[]data;publicResponse(intsizeInKB){this.data=newbyte[sizeInKB*1024];// 1 KB = 1024 bytes}publicbyte[]getData(){returndata;}}}
Conclusão
O gerenciamento adequado de pools de threads em sistemas assíncronos é crucial para evitar problemas de desempenho e OutOfMemoryError (OOM). Utilizar pools de threads separados, limitar a capacidade do ThreadPoolExecutor e implementar controle de fluxo são abordagens eficazes para garantir que a produção de novas requisições não ultrapasse a capacidade do sistema de processar respostas.
Ao aplicar essas soluções, você pode garantir que seu sistema permaneça eficiente e responsivo, mesmo sob alta carga, evitando o acúmulo excessivo de tarefas na fila e prevenindo OOM.
Todos os exemplos deste post estão disponíveis no meu repositório: