Class BatchProcessor<B,BC extends BatchCollector<B>,BJ extends Consumer<B>>

java.lang.Object
uk.ac.ebi.utils.threading.batchproc.BatchProcessor<B,BC,BJ>
Type Parameters:
B - the type of batch to be handled
BC - the type of BatchCollector to use for batch operations
BJ - the type of job that is able to process a batch of type `B`
Direct Known Subclasses:
ItemizedBatchProcessor

@Deprecated public abstract class BatchProcessor<B,BC extends BatchCollector<B>,BJ extends Consumer<B>> extends Object
Deprecated.
the functionality available in this package is provided by project Reactor and we recommend to switch to that. @see ReactorUtils

Batch Processor Skeleton

A simple skeleton to manage processing of data in multi-thread mode. The idea for which this class and batch collectors provide scaffolding for is that a data source is used to build `B` batches and, when a batch is ready to be processed, it is submitted to a `BJ` job and run in parallel by an getExecutor(). This class is just a skeleton, for a concrete implementation you need to realise the above source processing loop on your own. In order to facilitate that, handleNewBatch(Object, boolean) is provided. You should call it periodically, and it checks if the current batch (that you're filling) is ready for submission and, if yes, it submits the current batch to a new parallel job, creates a new batch and returns it (normally it returns the current batch). At the end of such a loop, you should call waitExecutor(String), so that the final batch you were building is forced into processing and other close-up operations are performed. As you can see above, BatchCollector is another facility to manage batch creation and decide if a batch is ready to be processed. Specific implementations are provided for both BatchCollector and this processor class. For instance, We have an item-based processor, which implements the above-described source dispatching loop as the fetch from a stream of items, which are sent to batches. This specific processor uses item-based collectors, which of default implementation decides if a batch is ready to go on the basis of the number of items it has. Other specific processors are those based on Java collections, which makes use of corresponding CollectionBatchCollector. Note that, while we consider sychronisation issues for the batch processing jobs, we assume that the initialisation and submission operations are run by a single thread (eg, `main`), so the code about that isn't thread-safe. For instance, waitExecutor(String) won't synchronise over the current executor and setBatchJob(Consumer) won't synchronise over the current job. Instantiating multiple processors is a safe way to deal with such a multi-multi-thread scenario. Likely, you'll want to share the executor job in such a case.
Author:
brandizi
Date:
1 Dec 2017
  • Field Details

    • jobLogPeriod

      protected long jobLogPeriod
      Deprecated.
    • log

      protected org.slf4j.Logger log
      Deprecated.
  • Constructor Details

    • BatchProcessor

      public BatchProcessor(BJ batchJob, BC batchCollector)
      Deprecated.
    • BatchProcessor

      public BatchProcessor(BJ batchJob)
      Deprecated.
    • BatchProcessor

      public BatchProcessor()
      Deprecated.
  • Method Details

    • handleNewBatch

      protected B handleNewBatch(B currentBatch)
      Deprecated.
    • handleNewBatch

      protected B handleNewBatch(B currentBatch, boolean forceFlush)
      Deprecated.
      This is the method that possibly issues a new task, via the getExecutor(), which runs the getBatchJob() against the current batch. Note that the batch job will be executed under the default wrapper. This method also resets the internal ExecutorService, which will be recreated (once) upon the first invocation of getExecutor(). This behaviour ensures that a processor can be invoked multiple times reusing the same batchJob instance (normally that's not possible for an ExecutorService after its ExecutorService.awaitTermination(long, TimeUnit) method is called).
      Parameters:
      forceFlush - if true it flushes the data independently of BatchCollector.batchReadyFlag(). Which is typically needed when you've exhausted a stream of data and you have a last partially-filled batch to process.
    • getBatchCollector

      public BC getBatchCollector()
      Deprecated.
      As explained in BatchCollector, this is used to create a new batch and decide if it's ready for submission to a new job.
      Returns:
    • setBatchCollector

      public void setBatchCollector(BC batchCollector)
      Deprecated.
    • getBatchJob

      public BJ getBatchJob()
      Deprecated.
      handleNewBatch(Object, boolean) launches this job every time the current batch being considered is ready for processing. Note that your job is wrapped into wrapBatchJob(Runnable).
    • setBatchJob

      public void setBatchJob(BJ batchJob)
      Deprecated.
    • getExecutor

      public ExecutorService getExecutor()
      Deprecated.
      The executor service used by handleNewBatch(Object) to submit batch jobs and run them in parallel. By default this is HackedBlockingQueue.createExecutor(), ie, a fixed size executor pool, which is able to block and wait when it's full. Moreover, such executor is equipped with a convenient naming thread factory, which names the threads based on the processor class (ie, myself or one extension of mine). Normally you shouldn't need to change this parameter, unless you want some particular execution policy. A situation where you want to use the setter for this property is when you want to hook multiple batch processors to the same executor, to avoid recreating threads too many times, and/or to decide an overall thread management policy.
    • setExecutor

      public void setExecutor(ExecutorService executor)
      Deprecated.
    • waitExecutor

      protected void waitExecutor(String pleaseWaitMessage)
      Deprecated.

      Waits that all the parallel jobs submitted to the batchJob are finished. It keeps polling ExecutorService.isTerminated() and invoking ExecutorService.awaitTermination(long, TimeUnit).

      As explained above, this resets the ExecutorService that is returned by getExecutor(), so that the next time that method is invoked, it will get a new executor from #getExecutorFactory().

      Parameters:
      pleaseWaitMessage - the message to be reported (via logger/INFO level) while waiting.
    • wrapBatchJob

      protected Runnable wrapBatchJob(Runnable batchJob)
      Deprecated.
      Wraps the task into some common operations. At the moment, * wraps exception, * logs the progress of completed tasks every jobLogPeriod completed tasks.
    • getSubmittedBatches

      public long getSubmittedBatches()
      Deprecated.
    • getCompletedBatches

      public long getCompletedBatches()
      Deprecated.
    • setJobLogPeriod

      public void setJobLogPeriod(long jobLogPeriod)
      Deprecated.
      If > 0, methods like handleNewBatch(Object, boolean) and waitExecutor(String) log messages about how many submitted and completed jobs the processor is dealing with, and does it every a number of submitted jobs equal to this property. Additionally, waitExecutor(String) logs the parameter it receives. If it's 0, just does the latter. If it's -1, none of such logging happens. It might be useful to disable these messages with 0 or -1 if the caller wants to use its own logging about similar events. In particular, it might be useful when you use mukltiple processors in parallel, or the same for multiple times. **WARNING**: implementors of this class's subclasses should comply with the above semantics for this parameter, at least if those classes are general purpose and meant to be reused by third parties.