Package uk.ac.ebi.utils.opt.runcontrol
Class ReactorUtils.ParallelBatchFluxBuilder<T,B extends Collection<T>>
java.lang.Object
uk.ac.ebi.utils.opt.runcontrol.ReactorUtils.ParallelBatchFluxBuilder<T,B>
- Enclosing class:
ReactorUtils
Little helper to build a common
ParallelFlux to process a source of items
in parallel batches.- Author:
- Marco Brandizi
- Date:
- 30 Jun 2024
-
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final intThis has been tested in tasks like saving data on a database.static final reactor.core.scheduler.SchedulerSchedulers.newBoundedElastic(int, int, String)with the number of processors as thread cap and that number * 50 as queue max size. -
Constructor Summary
ConstructorsConstructorDescriptionParallelBatchFluxBuilder(Collection<? extends T> collection) ParallelBatchFluxBuilder(Stream<? extends T> stream) ParallelBatchFluxBuilder(reactor.core.publisher.Flux<? extends T> flux) -
Method Summary
Modifier and TypeMethodDescriptionreactor.core.publisher.ParallelFlux<B> build()reactor.core.publisher.ParallelFlux<B> build(Consumer<ReactorUtils.ParallelBatchFluxBuilder<? super T, ? super B>> visitor) intintintreactor.core.scheduler.SchedulervoidsetParallelism(int parallelism) withBatchSize(int batchSize) The parallel flux scheduler to use.withBatchSupplier(Supplier<? extends Collection<? super T>> batchSupplier) Default is null, which falls back toFlux.buffer(int), usually aListsupplier.withParallelism(int parallelism) The degree of parallelism of the resulting flux.withParallelismPreFetch(int parallelismPreFetch) The prefetch parameter passed toFlux.parallel(int, int).withScheduler(reactor.core.scheduler.Scheduler scheduler) The scheduler used to run the resulting flux.
-
Field Details
-
DEFAULT_FLUX_SCHEDULER
public static final reactor.core.scheduler.Scheduler DEFAULT_FLUX_SCHEDULERSchedulers.newBoundedElastic(int, int, String)with the number of processors as thread cap and that number * 50 as queue max size. This seems suitable for batch processing, where we don't have much thread switching and we enqueue a flood of tasks. -
DEFAULT_BATCH_SIZE
public static final int DEFAULT_BATCH_SIZEThis has been tested in tasks like saving data on a database.- See Also:
-
-
Constructor Details
-
ParallelBatchFluxBuilder
-
ParallelBatchFluxBuilder
-
ParallelBatchFluxBuilder
-
-
Method Details
-
withParallelism
The degree of parallelism of the resulting flux. This is passed toFlux.parallel(int, int). Defaults toSchedulers.DEFAULT_POOL_SIZE, as per Reactor default. -
withParallelismPreFetch
The prefetch parameter passed toFlux.parallel(int, int). Default isQueues.SMALL_BUFFER_SIZE, as per Reactor default. -
withScheduler
public ReactorUtils.ParallelBatchFluxBuilder<T,B> withScheduler(reactor.core.scheduler.Scheduler scheduler) The scheduler used to run the resulting flux. This is passed toParallelFlux.runOn(Scheduler). Default isDEFAULT_FLUX_SCHEDULER, as per Reactor default. -
withBatchSize
The parallel flux scheduler to use. This is passed toParallelFlux.runOn(Scheduler). Defaults itDEFAULT_BATCH_SIZE, as per Reactor default. -
withBatchSupplier
public ReactorUtils.ParallelBatchFluxBuilder<T,B> withBatchSupplier(Supplier<? extends Collection<? super T>> batchSupplier) Default is null, which falls back toFlux.buffer(int), usually aListsupplier. -
getParallelism
public int getParallelism() -
setParallelism
public void setParallelism(int parallelism) -
getParallelismPreFetch
public int getParallelismPreFetch() -
getScheduler
public reactor.core.scheduler.Scheduler getScheduler() -
getBatchSize
public int getBatchSize() -
getBatchSupplier
-
build
public reactor.core.publisher.ParallelFlux<B> build(Consumer<ReactorUtils.ParallelBatchFluxBuilder<? super T, ? super B>> visitor) - Parameters:
visitor- if non-null, I'll call it with myself before creating the result. This can be used to inspect a builder during the build process, to set info from defaults (eg, fromgetParallelism()).
-
build
-