Class BatchService<TK extends BatchServiceTask>
A pool-based thread execution service, that dynamically optimises its size.
This is a base class that allow you to manage the execution of a number of tasks
in parallel.
This is achieved by means of a thread pool, which of size is dynamically adjusted via PoolSizeTuner
,
in order to optimise the task throughput (i.e., the number of tasks that complete their execution in the unit of time).
You should initialise this class with a moderate initial pool size (default is the number of available processors), cause the tuning algorithm works well (i.e. converges) when it approaches the best value from the left.
The theory says that the optimal number of threads is given by the number of available processors and the ratio between task waiting time and real CPU consumption time. However things can be more complicated when task latencies depend on how much they communicate or interfere each other, for instance by hitting transactions on the same database. This is the rationale to base thread optimisation on live performance measurement.
- date
- 8 Oct 2013
- Author:
- Marco Brandizi
-
Nested Class Summary
Modifier and TypeClassDescriptionprotected class
The customPoolSizeTuner
that is used to optimise this service. -
Field Summary
Modifier and TypeFieldDescriptionprotected int
This should be 1 if there are multiple and different exit codes returned by submitted tasksBatchServiceTask.getExitCode()
.protected uk.org.lidalia.slf4jext.Logger
protected PoolSizeTuner
-
Constructor Summary
ConstructorDescriptionDefaults toRuntime.getRuntime().availableProcessors()
BatchService
(int initialThreadPoolSize) Initialises a pool service with this number of initial threads. -
Method Summary
Modifier and TypeMethodDescriptionprotected void
finalize()
StopspoolSizeTuner
.int
long
The no of completed tasks.int
Used to dynamically adjust the no. threads that the service runs in parallel.uk.org.lidalia.slf4jext.Level
The submission of a new task is notified to the logging system via this level (Level.INFO
by default).int
protected BatchService<TK>.BatchServiceTuner
Allows you to initialise with a customBatchService<TK extends BatchServiceTask>.BatchServiceTuner
.protected ExecutorService
newThreadPoolExecutor
(int initialThreadPoolSize) Allows you to initialise with a customExecutorService
.void
setSubmissionMsgLogLevel
(uk.org.lidalia.slf4jext.Level submissionMsgLogLevel) void
setThreadFactory
(ThreadFactory threadFactory) It's like theThreadPoolExecutor.setThreadFactory(ThreadFactory)
and might be useful here as well.void
setThreadPoolSize
(int threadPoolSize) void
Submits a task into the pool, synchronising updates requested frompoolSizeTuner
and other internal state information.void
This can be used after you have submitted all the tasks that you have to run, when you want to wait that all of them complete their execution.
-
Field Details
-
lastExitCode
protected int lastExitCodeThis should be 1 if there are multiple and different exit codes returned by submitted tasksBatchServiceTask.getExitCode()
. It should be a given value if all the submitted tasks returned that same value and it is non-zero. Should be 0 in all other cases. -
poolSizeTuner
-
log
protected uk.org.lidalia.slf4jext.Logger log
-
-
Constructor Details
-
BatchService
public BatchService()Defaults toRuntime.getRuntime().availableProcessors()
-
BatchService
public BatchService(int initialThreadPoolSize) Initialises a pool service with this number of initial threads.
-
-
Method Details
-
newPoolSizeTuner
Allows you to initialise with a customBatchService<TK extends BatchServiceTask>.BatchServiceTuner
. In most cases you will be fine with the default. If you need to change its tuning parameters, use thepoolSizeTuner
field. This is called by theBatchService(int)
constructor (and all the others). -
newThreadPoolExecutor
Allows you to initialise with a customExecutorService
. WARNING: this class was designed and tested withfixed pools
in mind. This method is supposed to instantiate variants of fixed thread pools, in other to accommodate specific needs. One example is when you need to give different priorities to the tasks in a pool. We haveBatchServiceTask.TaskComparator
for that, which can be instantiated from this method this way (method inspired to this)):return new ThreadPoolExecutor ( initialThreadPoolSize, initialThreadPoolSize, 0L, TimeUnit.MILLISECONDS, new PriorityBlockingQueue
Use executors other than fixed pool size at your own risk!( initialThreadPoolSize, new BatchServiceTask.TaskComparator () ) ); - Parameters:
initialThreadPoolSize
- the initial thread pool size.
-
submit
Submits a task into the pool, synchronising updates requested frompoolSizeTuner
and other internal state information. -
waitAllFinished
public void waitAllFinished()This can be used after you have submitted all the tasks that you have to run, when you want to wait that all of them complete their execution. The method starts atimer
that reports the current state (with INFO log messages). -
getCompletedTasks
public long getCompletedTasks()The no of completed tasks. This method is not synchronised, so you might get a number slightly lower than the real one. -
getPoolSizeTuner
Used to dynamically adjust the no. threads that the service runs in parallel. Most cases you will be fine with thedefault implementation of this
. If not, you should set this field in aconstructor
. -
getThreadPoolSize
public int getThreadPoolSize() -
setThreadPoolSize
public void setThreadPoolSize(int threadPoolSize) -
getLastExitCode
public int getLastExitCode() -
getBusyTasks
public int getBusyTasks() -
getSubmissionMsgLogLevel
public uk.org.lidalia.slf4jext.Level getSubmissionMsgLogLevel()The submission of a new task is notified to the logging system via this level (Level.INFO
by default). If your application has very many tasks and you don't want to get bothered with these messages, you can change the log granularity here. If the level you set is disabled, this class will instead log anotification
from time to time. -
setSubmissionMsgLogLevel
public void setSubmissionMsgLogLevel(uk.org.lidalia.slf4jext.Level submissionMsgLogLevel) -
setThreadFactory
It's like theThreadPoolExecutor.setThreadFactory(ThreadFactory)
and might be useful here as well. -
finalize
StopspoolSizeTuner
.
-