1
|
/**
|
2
|
* Copyright (C) 2009 EDIT
|
3
|
* European Distributed Institute of Taxonomy
|
4
|
* http://www.e-taxonomy.eu
|
5
|
*
|
6
|
* The contents of this file are subject to the Mozilla Public License Version 1.1
|
7
|
* See LICENSE.TXT at the top of this package for the full license terms.
|
8
|
*/
|
9
|
package eu.etaxonomy.cdm.persistence.validation;
|
10
|
|
11
|
import java.lang.ref.WeakReference;
|
12
|
import java.util.ArrayList;
|
13
|
import java.util.Iterator;
|
14
|
import java.util.concurrent.Executors;
|
15
|
import java.util.concurrent.RejectedExecutionHandler;
|
16
|
import java.util.concurrent.ThreadPoolExecutor;
|
17
|
import java.util.concurrent.TimeUnit;
|
18
|
|
19
|
import javax.validation.ConstraintValidator;
|
20
|
|
21
|
import org.apache.logging.log4j.LogManager;
|
22
|
import org.apache.logging.log4j.Logger;
|
23
|
|
24
|
/**
|
25
|
* A {@code ThreadPoolExecutor} specialized in dealing with
|
26
|
* {@link EntityValidationThread}s and validation tasks (see
|
27
|
* {@link EntityValidationTaskBase}). This implementation creates a thread pool
|
28
|
* containing just one thread, meaning all validation tasks are run one after
|
29
|
* another on that one thread. Especially for Level-3 validation tasks this is
|
30
|
* probably exactly what you want. These tasks are run upon CRUD events, and you
|
31
|
* don't want the database to be crawled to validate entire object graphs every
|
32
|
* time a CRUD event takes place, especially since one CRUD operation may be
|
33
|
* meant to cancel or correct a previous CRUD operation (e.g. a user of the
|
34
|
* taxonomic editor may realize he/she did something wrong and then quickly
|
35
|
* correct it).
|
36
|
*
|
37
|
* <p>
|
38
|
* Although a {@code ValidationExecutor} sets up a thread pool containing just a
|
39
|
* single thread, it does not logically or functionally <i>depend</i> on the
|
40
|
* thread pool containing at most one thread. Thus, should performance become an
|
41
|
* issue, and concurrency the solution, increasing the pool size is still an
|
42
|
* option. For example, Level-2 validation tasks might be quite amenable to
|
43
|
* being executed concurrently.
|
44
|
*
|
45
|
* <p>
|
46
|
* The reason we extend {@code ThreadPoolExecutor} rather than simply use
|
47
|
* {@link Executors#newSingleThreadExecutor()} is that we need access to the
|
48
|
* threads in the thread pool for the reason indicated above: if an entity
|
49
|
* annotated with Level-2 or Level-3 validation constraints is updated, it will
|
50
|
* be validated on the validation thread. However, if it is quickly thereafter
|
51
|
* updated again, you really would like to terminate the first validation if
|
52
|
* it's still running. After all, it doesn't make sense to validate an entity in
|
53
|
* a state that it no longer has. For Level-2 validations this may not be so
|
54
|
* important, because they are likely to run fast. But for Level-3 validations
|
55
|
* you want to prevent needless queueing and execution of long-running tasks.
|
56
|
* Thus, you really would like to know which entity is being validated on the
|
57
|
* validation thread. The {@code ThreadPoolExecutor} provides a
|
58
|
* {@link #beforeExecute(Thread, Runnable)} method, passing us the thread and
|
59
|
* the task that it is about to run. This allows us to track the threads in the
|
60
|
* thread pool.
|
61
|
* <p>
|
62
|
* If the {@code ValidationExecutor} detects that a validation task enters the
|
63
|
* task queue that will validate the same entity as the entity currently being
|
64
|
* validated on the validation thread, it will call
|
65
|
* {@link EntityValidationThread#setTerminationRequested(boolean)}. This gives
|
66
|
* the {@link ConstraintValidator} running in the validation thread a chance to
|
67
|
* terminate itself:<br>
|
68
|
* <code>
|
69
|
* if(Thread.currentThread() instanceof EntityValidationThread) {
|
70
|
* EntityValidationThread evt = (EntityValidationThread) Thread.currentThread();
|
71
|
* if(evt.isTerminationRequested()) {
|
72
|
* // Stop with what I am doing
|
73
|
* }
|
74
|
* }
|
75
|
* </code><br>
|
76
|
* Constraint validators are free to include this logic or not. If they know
|
77
|
* themselves to be short-lived it may not be worth it. But if they potentially
|
78
|
* take a lot of time to complete, they can and and probably should include this
|
79
|
* logic to prevent needless queueing and queue overruns. This would make them
|
80
|
* dependent, though, on at least the {@link EntityValidationThread} class, so
|
81
|
* there are some architectural issues here.
|
82
|
*
|
83
|
* @author ayco_holleman
|
84
|
*
|
85
|
*/
|
86
|
//maybe we want to make this a spring component ? #4663
|
87
|
public class ValidationExecutor
|
88
|
extends ThreadPoolExecutor
|
89
|
implements RejectedExecutionHandler {
|
90
|
|
91
|
private static final Logger logger = LogManager.getLogger();
|
92
|
|
93
|
// Number of threads to keep in the thread pool
|
94
|
static final int CORE_POOL_SIZE = 0;
|
95
|
// Maximum number of theads in the thread pool
|
96
|
static final int MAX_POOL_SIZE = 1;
|
97
|
// Number of seconds to wait for a new task before killing the validation
|
98
|
// thread
|
99
|
static final int KEEP_ALIFE_TIME = 5;
|
100
|
// Maximum number of tasks allowed to wait to be executed by the validation
|
101
|
// thread
|
102
|
static final int TASK_QUEUE_SIZE = 1000;
|
103
|
|
104
|
// Our basis for tracking the threads in the thread pool. We maintain
|
105
|
// a list of weak references to the thread in the real thread pool,
|
106
|
// maintained but totally hidden by the super class (ThreadPoolExecutor).
|
107
|
final ArrayList<WeakReference<EntityValidationThread>> threads = new ArrayList<WeakReference<EntityValidationThread>>(
|
108
|
MAX_POOL_SIZE);
|
109
|
|
110
|
/**
|
111
|
* Creates a {@code ValidationExecutor} with a task queue size of 1000. Thus
|
112
|
* there can be at most 1000 pending validations. Thereafter newly submitted
|
113
|
* validation tasks will simply be discarded. See
|
114
|
* {@link #rejectedExecution(Runnable, ThreadPoolExecutor)}.
|
115
|
*/
|
116
|
public ValidationExecutor(){
|
117
|
this(TASK_QUEUE_SIZE);
|
118
|
}
|
119
|
|
120
|
/**
|
121
|
* Creates a {@code ValidationExecutor} with a custom task queue size.
|
122
|
*
|
123
|
* @param taskQueueSize
|
124
|
*/
|
125
|
public ValidationExecutor(int taskQueueSize){
|
126
|
super(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIFE_TIME, TimeUnit.SECONDS,
|
127
|
new EntityValidationTaskQueue(taskQueueSize));
|
128
|
setThreadFactory(new ValidationThreadFactory());
|
129
|
setRejectedExecutionHandler(this);
|
130
|
}
|
131
|
|
132
|
/**
|
133
|
* Implements the one method from {@link RejectedExecutionHandler}, which is
|
134
|
* called in case of task queue overruns. Because Level-2 and Level-3
|
135
|
* validations may not obstruct the CRUD events that triggered them, or
|
136
|
* impair the stability of the system as a whole, this method only writes an
|
137
|
* error message to the log4j log file. Thus, task queue overruns may cause
|
138
|
* Level-2 and/or Level-3 constraint violations to creep into the database.
|
139
|
* And thus, some other, batch-like process needs to crawl the entire
|
140
|
* database in search of Level-2 and Level-3 constraint violations every
|
141
|
* once in a while.
|
142
|
*/
|
143
|
@Override
|
144
|
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor){
|
145
|
EntityValidationTaskBase task = (EntityValidationTaskBase) r;
|
146
|
logger.error(String.format(
|
147
|
"Validation of %s cancelled. Too many validation tasks waiting to be executed.",
|
148
|
task.getEntity().toString()));
|
149
|
}
|
150
|
|
151
|
/**
|
152
|
* Overrides method from {@link ThreadPoolExecutor} to prevent thread pool
|
153
|
* size from being altered. Will throw a RuntimeException. Future versions
|
154
|
* could abandon this restriction once it has become clear that concurrent
|
155
|
* execution of Level-2 and/or Level-3 validations constitutes no problem
|
156
|
* and may solve performance problems.
|
157
|
*/
|
158
|
@Override
|
159
|
public void setMaximumPoolSize(int maximumPoolSize){
|
160
|
throw new RuntimeException(
|
161
|
"Altering maximum pool size for ValidationExecutor instances currently not allowed");
|
162
|
}
|
163
|
|
164
|
@Override
|
165
|
protected void beforeExecute(Thread thread, Runnable runnable){
|
166
|
EntityValidationThread validationThread = (EntityValidationThread) thread;
|
167
|
EntityValidationTaskBase task = (EntityValidationTaskBase) runnable;
|
168
|
validationThread.setTerminationRequested(false);
|
169
|
task.setValidator(validationThread.getValidator());
|
170
|
checkPool(validationThread, task);
|
171
|
validationThread.setCurrentTask(task);
|
172
|
}
|
173
|
|
174
|
/*
|
175
|
* This method does 2 things. [A] It keeps track of the threads in the
|
176
|
* thread pool. If pendingThread is not yet in our "shadow pool" we add it
|
177
|
* to the shadow pool. [B] It searches for other threads in the trhead pool
|
178
|
* that are still busy validating an older version of the entity to be
|
179
|
* validated during pendingTask. If there is such a thread, we ask it to
|
180
|
* terminate itself. Whether or not this request is honored, we wait for the
|
181
|
* thread to complete. Otherwise the two threads might conflict with
|
182
|
* eachother when reading/writing from the error tables (i.e. the tables in
|
183
|
* which the outcome of a validation is stored). Note that, currently, this
|
184
|
* is all a bit theoretical because we only allow one thread in the thread
|
185
|
* pool. However, we want to be prepared for a future with truely concurrent
|
186
|
* validation.
|
187
|
*/
|
188
|
private void checkPool(EntityValidationThread pendingThread,
|
189
|
EntityValidationTaskBase pendingTask){
|
190
|
boolean found = false;
|
191
|
Iterator<WeakReference<EntityValidationThread>> iterator = threads.iterator();
|
192
|
while (iterator.hasNext()) {
|
193
|
EntityValidationThread pooledThread = iterator.next().get();
|
194
|
if (pooledThread == null) {
|
195
|
// Thread has been removed from the real thread pool
|
196
|
// and got garbage collected. Remove our weak reference
|
197
|
// to the thread
|
198
|
iterator.remove();
|
199
|
} else if (pooledThread == pendingThread) {
|
200
|
found = true;
|
201
|
} else if (pooledThread.isAlive()) {
|
202
|
if (pooledThread.getCurrentTask().equals(pendingTask)) {
|
203
|
pooledThread.setTerminationRequested(true);
|
204
|
pendingTask.waitFor(pooledThread);
|
205
|
}
|
206
|
}
|
207
|
}
|
208
|
if (!found) {
|
209
|
threads.add(new WeakReference<EntityValidationThread>(pendingThread));
|
210
|
}
|
211
|
threads.trimToSize();
|
212
|
}
|
213
|
|
214
|
}
|