Package uk.ac.ebi.utils.opt.runcontrol
Class ReactorUtils.ParallelBatchFluxBuilder<T,B extends Collection<T>>
java.lang.Object
uk.ac.ebi.utils.opt.runcontrol.ReactorUtils.ParallelBatchFluxBuilder<T,B>
- Enclosing class:
- ReactorUtils
Little helper to build a common
ParallelFlux
to process a source of items
in parallel batches.- Author:
- Marco Brandizi
- Date:
- 30 Jun 2024
-
Field Summary
Modifier and TypeFieldDescriptionstatic final int
This has been tested in tasks like saving data on a database.static final reactor.core.scheduler.Scheduler
Schedulers.newBoundedElastic(int, int, String)
with the number of processors as thread cap and that number * 50 as queue max size. -
Constructor Summary
ConstructorDescriptionParallelBatchFluxBuilder
(Collection<? extends T> collection) ParallelBatchFluxBuilder
(Stream<? extends T> stream) ParallelBatchFluxBuilder
(reactor.core.publisher.Flux<? extends T> flux) -
Method Summary
Modifier and TypeMethodDescriptionreactor.core.publisher.ParallelFlux<B>
build()
withBatchSize
(int batchSize) The parallel flux scheduler to use.withBatchSupplier
(Supplier<? extends Collection<? super T>> batchSupplier) Default is null, which falls back toFlux.buffer(int)
, usually aList
supplier.withParallelism
(int parallelism) The degree of parallelism of the resulting flux.withParallelismPreFetch
(int parallelismPreFetch) The prefetch parameter passed toFlux.parallel(int, int)
.withScheduler
(reactor.core.scheduler.Scheduler scheduler) The scheduler used to run the resulting flux.
-
Field Details
-
DEFAULT_FLUX_SCHEDULER
public static final reactor.core.scheduler.Scheduler DEFAULT_FLUX_SCHEDULERSchedulers.newBoundedElastic(int, int, String)
with the number of processors as thread cap and that number * 50 as queue max size. This seems suitable for batch processing, where we don't have much thread switching and we enqueue a flood of tasks. -
DEFAULT_BATCH_SIZE
public static final int DEFAULT_BATCH_SIZEThis has been tested in tasks like saving data on a database.- See Also:
-
-
Constructor Details
-
ParallelBatchFluxBuilder
-
ParallelBatchFluxBuilder
-
ParallelBatchFluxBuilder
-
-
Method Details
-
withParallelism
The degree of parallelism of the resulting flux. This is passed toFlux.parallel(int, int)
. Defaults toSchedulers.DEFAULT_POOL_SIZE
, as per Reactor default. -
withParallelismPreFetch
The prefetch parameter passed toFlux.parallel(int, int)
. Default isQueues.SMALL_BUFFER_SIZE
, as per Reactor default. -
withScheduler
public ReactorUtils.ParallelBatchFluxBuilder<T,B> withScheduler(reactor.core.scheduler.Scheduler scheduler) The scheduler used to run the resulting flux. This is passed toParallelFlux.runOn(Scheduler)
. Default isDEFAULT_FLUX_SCHEDULER
, as per Reactor default. -
withBatchSize
The parallel flux scheduler to use. This is passed toParallelFlux.runOn(Scheduler)
. Defaults itDEFAULT_BATCH_SIZE
, as per Reactor default. -
withBatchSupplier
public ReactorUtils.ParallelBatchFluxBuilder<T,B> withBatchSupplier(Supplier<? extends Collection<? super T>> batchSupplier) Default is null, which falls back toFlux.buffer(int)
, usually aList
supplier. -
build
-