Class ReactorUtils.ParallelBatchFluxBuilder<T,B extends Collection<T>>

java.lang.Object
uk.ac.ebi.utils.opt.runcontrol.ReactorUtils.ParallelBatchFluxBuilder<T,B>
Enclosing class:
ReactorUtils

public static class ReactorUtils.ParallelBatchFluxBuilder<T,B extends Collection<T>> extends Object
Little helper to build a common ParallelFlux to process a source of items in parallel batches.
Author:
Marco Brandizi
Date:
30 Jun 2024
  • Field Details

    • DEFAULT_FLUX_SCHEDULER

      public static final reactor.core.scheduler.Scheduler DEFAULT_FLUX_SCHEDULER
      Schedulers.newBoundedElastic(int, int, String) with the number of processors as thread cap and that number * 50 as queue max size. This seems suitable for batch processing, where we don't have much thread switching and we enqueue a flood of tasks.
    • DEFAULT_BATCH_SIZE

      public static final int DEFAULT_BATCH_SIZE
      This has been tested in tasks like saving data on a database.
      See Also:
  • Constructor Details

    • ParallelBatchFluxBuilder

      public ParallelBatchFluxBuilder(reactor.core.publisher.Flux<? extends T> flux)
    • ParallelBatchFluxBuilder

      public ParallelBatchFluxBuilder(Stream<? extends T> stream)
    • ParallelBatchFluxBuilder

      public ParallelBatchFluxBuilder(Collection<? extends T> collection)
  • Method Details

    • withParallelism

      public ReactorUtils.ParallelBatchFluxBuilder<T,B> withParallelism(int parallelism)
      The degree of parallelism of the resulting flux. This is passed to Flux.parallel(int, int). Defaults to Schedulers.DEFAULT_POOL_SIZE, as per Reactor default.
    • withParallelismPreFetch

      public ReactorUtils.ParallelBatchFluxBuilder<T,B> withParallelismPreFetch(int parallelismPreFetch)
      The prefetch parameter passed to Flux.parallel(int, int). Default is Queues.SMALL_BUFFER_SIZE, as per Reactor default.
    • withScheduler

      public ReactorUtils.ParallelBatchFluxBuilder<T,B> withScheduler(reactor.core.scheduler.Scheduler scheduler)
      The scheduler used to run the resulting flux. This is passed to ParallelFlux.runOn(Scheduler). Default is DEFAULT_FLUX_SCHEDULER, as per Reactor default.
    • withBatchSize

      public ReactorUtils.ParallelBatchFluxBuilder<T,B> withBatchSize(int batchSize)
      The parallel flux scheduler to use. This is passed to ParallelFlux.runOn(Scheduler). Defaults it DEFAULT_BATCH_SIZE, as per Reactor default.
    • withBatchSupplier

      public ReactorUtils.ParallelBatchFluxBuilder<T,B> withBatchSupplier(Supplier<? extends Collection<? super T>> batchSupplier)
      Default is null, which falls back to Flux.buffer(int), usually a List supplier.
    • build

      public reactor.core.publisher.ParallelFlux<B> build()