Class BatchProcessor<B,BC extends BatchCollector<B>,BJ extends Consumer<B>>
java.lang.Object
uk.ac.ebi.utils.threading.batchproc.BatchProcessor<B,BC,BJ>
- Type Parameters:
B
- the type of batch to be handledBC
- the type ofBatchCollector
to use for batch operationsBJ
- the type of job that is able to process a batch of type `B`
- Direct Known Subclasses:
ItemizedBatchProcessor
@Deprecated
public abstract class BatchProcessor<B,BC extends BatchCollector<B>,BJ extends Consumer<B>>
extends Object
Deprecated.
the functionality available in this package is provided by project
Reactor and we recommend to switch to that. @see ReactorUtils
Batch Processor Skeleton
A simple skeleton to manage processing of data in multi-thread mode. The idea for which this class andbatch collectors
provide scaffolding for
is that a data source is used to build `B` batches and, when
a batch is ready to be processed
, it is submitted to a `BJ`
job and run in parallel by an getExecutor()
.
This class is just a skeleton, for a concrete implementation you need to realise the above source
processing loop on your own. In order to facilitate that, handleNewBatch(Object, boolean)
is provided. You should call it periodically, and it checks if the current batch (that you're filling)
is ready for submission and, if yes, it submits the current batch to a new parallel job,
creates a new batch
and returns it (normally it returns the current
batch).
At the end of such a loop, you should call waitExecutor(String)
, so that the final batch you were building
is forced into processing and other close-up operations are performed.
As you can see above, BatchCollector
is another facility to manage batch creation and decide if a
batch is ready to be processed.
Specific implementations are provided for both BatchCollector
and this processor class.
For instance, We have an item-based processor
, which
implements
the above-described source dispatching loop
as the fetch from a stream of items, which are sent to batches.
This specific processor uses item-based collectors
, which of default implementation
decides if a batch is ready to go on the basis of the number of items
it has. Other specific processors are those based on Java collections
, which
makes use of corresponding CollectionBatchCollector
.
Note that, while we consider sychronisation issues for the batch processing jobs
, we assume that
the initialisation and submission operations are run by a single thread (eg, `main`), so the code about that isn't
thread-safe. For instance, waitExecutor(String)
won't synchronise over the current executor
and setBatchJob(Consumer)
won't synchronise over the current job. Instantiating multiple processors is a
safe way to deal with such a multi-multi-thread scenario. Likely, you'll want to share the executor job in such a case.- Author:
- brandizi
- Date:
- 1 Dec 2017
-
Field Summary
Modifier and TypeFieldDescriptionprotected long
Deprecated.protected org.slf4j.Logger
Deprecated. -
Constructor Summary
ConstructorDescriptionDeprecated.BatchProcessor
(BJ batchJob) Deprecated.BatchProcessor
(BJ batchJob, BC batchCollector) Deprecated. -
Method Summary
Modifier and TypeMethodDescriptionDeprecated.As explained inBatchCollector
, this is used to create a new batch and decide if it's ready for submission to a newjob
.Deprecated.handleNewBatch(Object, boolean)
launches this job every time the current batch being considered isready for processing
.long
Deprecated.Deprecated.The executor service used byhandleNewBatch(Object)
to submitbatch jobs and run them in parallel
.long
Deprecated.protected B
handleNewBatch
(B currentBatch) Deprecated.protected B
handleNewBatch
(B currentBatch, boolean forceFlush) Deprecated.This is the method that possibly issues a new task, via thegetExecutor()
, which runs thegetBatchJob()
against the current batch.void
setBatchCollector
(BC batchCollector) Deprecated.void
setBatchJob
(BJ batchJob) Deprecated.void
setExecutor
(ExecutorService executor) Deprecated.void
setJobLogPeriod
(long jobLogPeriod) Deprecated.If > 0, methods likehandleNewBatch(Object, boolean)
andwaitExecutor(String)
log messages about how many submitted and completedjobs
the processor is dealing with, and does it every a number of submitted jobs equal to this property.protected void
waitExecutor
(String pleaseWaitMessage) Deprecated.Waits that all the parallel jobs submitted to the batchJob are finished.protected Runnable
wrapBatchJob
(Runnable batchJob) Deprecated.Wraps the task into some common operations.
-
Field Details
-
jobLogPeriod
protected long jobLogPeriodDeprecated. -
log
protected org.slf4j.Logger logDeprecated.
-
-
Constructor Details
-
BatchProcessor
Deprecated. -
BatchProcessor
Deprecated. -
BatchProcessor
public BatchProcessor()Deprecated.
-
-
Method Details
-
handleNewBatch
Deprecated. -
handleNewBatch
Deprecated.This is the method that possibly issues a new task, via thegetExecutor()
, which runs thegetBatchJob()
against the current batch. Note that the batch job will be executed under thedefault wrapper
. This method also resets the internalExecutorService
, which will be recreated (once) upon the first invocation ofgetExecutor()
. This behaviour ensures that a processor can be invoked multiple times reusing the same batchJob instance (normally that's not possible for anExecutorService
after itsExecutorService.awaitTermination(long, TimeUnit)
method is called).- Parameters:
forceFlush
- if true it flushes the data independently ofBatchCollector.batchReadyFlag()
. Which is typically needed when you've exhausted a stream of data and you have a last partially-filled batch to process.
-
getBatchCollector
Deprecated.As explained inBatchCollector
, this is used to create a new batch and decide if it's ready for submission to a newjob
.- Returns:
-
setBatchCollector
Deprecated. -
getBatchJob
Deprecated.handleNewBatch(Object, boolean)
launches this job every time the current batch being considered isready for processing
. Note that your job is wrapped intowrapBatchJob(Runnable)
. -
setBatchJob
Deprecated. -
getExecutor
Deprecated.The executor service used byhandleNewBatch(Object)
to submitbatch jobs and run them in parallel
. By default this isHackedBlockingQueue.createExecutor()
, ie, a fixed size executor pool, which is able to block and wait when it's full. Moreover, such executor is equipped with a convenientnaming thread factory
, which names the threads based on the processor class (ie, myself or one extension of mine). Normally you shouldn't need to change this parameter, unless you want some particular execution policy. A situation where you want to use thesetter for this property
is when you want to hook multiple batch processors to the same executor, to avoid recreating threads too many times, and/or to decide an overall thread management policy. -
setExecutor
Deprecated. -
waitExecutor
Deprecated.Waits that all the parallel jobs submitted to the batchJob are finished. It keeps polling
ExecutorService.isTerminated()
and invokingExecutorService.awaitTermination(long, TimeUnit)
.As explained above, this resets the
ExecutorService
that is returned bygetExecutor()
, so that the next time that method is invoked, it will get a new executor from#getExecutorFactory()
.- Parameters:
pleaseWaitMessage
- the message to be reported (via logger/INFO level) while waiting.
-
wrapBatchJob
Deprecated.Wraps the task into some common operations. At the moment, * wraps exception, * logs the progress of completed tasks everyjobLogPeriod
completed tasks. -
getSubmittedBatches
public long getSubmittedBatches()Deprecated. -
getCompletedBatches
public long getCompletedBatches()Deprecated. -
setJobLogPeriod
public void setJobLogPeriod(long jobLogPeriod) Deprecated.If > 0, methods likehandleNewBatch(Object, boolean)
andwaitExecutor(String)
log messages about how many submitted and completedjobs
the processor is dealing with, and does it every a number of submitted jobs equal to this property. Additionally,waitExecutor(String)
logs the parameter it receives. If it's 0, just does the latter. If it's -1, none of such logging happens. It might be useful to disable these messages with 0 or -1 if the caller wants to use its own logging about similar events. In particular, it might be useful when you use mukltiple processors in parallel, or the same for multiple times. **WARNING**: implementors of this class's subclasses should comply with the above semantics for this parameter, at least if those classes are general purpose and meant to be reused by third parties.
-