Class BatchService<TK extends BatchServiceTask>

java.lang.Object
uk.ac.ebi.utils.threading.BatchService<TK>

public class BatchService<TK extends BatchServiceTask> extends Object

A pool-based thread execution service, that dynamically optimises its size.

This is a base class that allow you to manage the execution of a number of tasks in parallel. This is achieved by means of a thread pool, which of size is dynamically adjusted via PoolSizeTuner, in order to optimise the task throughput (i.e., the number of tasks that complete their execution in the unit of time).

You should initialise this class with a moderate initial pool size (default is the number of available processors), cause the tuning algorithm works well (i.e. converges) when it approaches the best value from the left.

The theory says that the optimal number of threads is given by the number of available processors and the ratio between task waiting time and real CPU consumption time. However things can be more complicated when task latencies depend on how much they communicate or interfere each other, for instance by hitting transactions on the same database. This is the rationale to base thread optimisation on live performance measurement.

date
8 Oct 2013
Author:
Marco Brandizi
  • Field Details

    • lastExitCode

      protected int lastExitCode
      This should be 1 if there are multiple and different exit codes returned by submitted tasks BatchServiceTask.getExitCode(). It should be a given value if all the submitted tasks returned that same value and it is non-zero. Should be 0 in all other cases.
    • poolSizeTuner

      protected PoolSizeTuner poolSizeTuner
    • log

      protected uk.org.lidalia.slf4jext.Logger log
  • Constructor Details

    • BatchService

      public BatchService()
    • BatchService

      public BatchService(int initialThreadPoolSize)
      Initialises a pool service with this number of initial threads.
  • Method Details

    • newPoolSizeTuner

      protected BatchService<TK>.BatchServiceTuner newPoolSizeTuner()
      Allows you to initialise with a custom BatchService<TK extends BatchServiceTask>.BatchServiceTuner. In most cases you will be fine with the default. If you need to change its tuning parameters, use the poolSizeTuner field. This is called by the BatchService(int) constructor (and all the others).
    • newThreadPoolExecutor

      protected ExecutorService newThreadPoolExecutor(int initialThreadPoolSize)
      Allows you to initialise with a custom ExecutorService. WARNING: this class was designed and tested with fixed pools in mind. This method is supposed to instantiate variants of fixed thread pools, in other to accommodate specific needs. One example is when you need to give different priorities to the tasks in a pool. We have BatchServiceTask.TaskComparator for that, which can be instantiated from this method this way (method inspired to this)):
       return new ThreadPoolExecutor ( 
         initialThreadPoolSize, initialThreadPoolSize, 0L, TimeUnit.MILLISECONDS,
         new PriorityBlockingQueue ( initialThreadPoolSize, new BatchServiceTask.TaskComparator () )
       );
       
      Use executors other than fixed pool size at your own risk!
      Parameters:
      initialThreadPoolSize - the initial thread pool size.
    • submit

      public void submit(TK batchServiceTask)
      Submits a task into the pool, synchronising updates requested from poolSizeTuner and other internal state information.
    • waitAllFinished

      public void waitAllFinished()
      This can be used after you have submitted all the tasks that you have to run, when you want to wait that all of them complete their execution. The method starts a timer that reports the current state (with INFO log messages).
    • getCompletedTasks

      public long getCompletedTasks()
      The no of completed tasks. This method is not synchronised, so you might get a number slightly lower than the real one.
    • getPoolSizeTuner

      public PoolSizeTuner getPoolSizeTuner()
      Used to dynamically adjust the no. threads that the service runs in parallel. Most cases you will be fine with the default implementation of this. If not, you should set this field in a constructor.
    • getThreadPoolSize

      public int getThreadPoolSize()
    • setThreadPoolSize

      public void setThreadPoolSize(int threadPoolSize)
    • getLastExitCode

      public int getLastExitCode()
    • getBusyTasks

      public int getBusyTasks()
    • getSubmissionMsgLogLevel

      public uk.org.lidalia.slf4jext.Level getSubmissionMsgLogLevel()
      The submission of a new task is notified to the logging system via this level (Level.INFO by default). If your application has very many tasks and you don't want to get bothered with these messages, you can change the log granularity here. If the level you set is disabled, this class will instead log a notification from time to time.
    • setSubmissionMsgLogLevel

      public void setSubmissionMsgLogLevel(uk.org.lidalia.slf4jext.Level submissionMsgLogLevel)
    • setThreadFactory

      public void setThreadFactory(ThreadFactory threadFactory)
      It's like the ThreadPoolExecutor.setThreadFactory(ThreadFactory) and might be useful here as well.
    • finalize

      protected void finalize() throws Throwable
      Overrides:
      finalize in class Object
      Throws:
      Throwable