Project

General

Profile

Download (9.46 KB) Statistics
| Branch: | Tag: | Revision:
1
/**
2
 * Copyright (C) 2009 EDIT
3
 * European Distributed Institute of Taxonomy
4
 * http://www.e-taxonomy.eu
5
 *
6
 * The contents of this file are subject to the Mozilla Public License Version 1.1
7
 * See LICENSE.TXT at the top of this package for the full license terms.
8
 */
9
package eu.etaxonomy.cdm.persistence.validation;
10

    
11
import java.lang.ref.WeakReference;
12
import java.util.ArrayList;
13
import java.util.Iterator;
14
import java.util.concurrent.Executors;
15
import java.util.concurrent.RejectedExecutionHandler;
16
import java.util.concurrent.ThreadPoolExecutor;
17
import java.util.concurrent.TimeUnit;
18

    
19
import javax.validation.ConstraintValidator;
20

    
21
import org.apache.log4j.Logger;
22

    
23
/**
24
 * A {@code ThreadPoolExecutor} specialized in dealing with
25
 * {@link EntityValidationThread}s and validation tasks (see
26
 * {@link EntityValidationTaskBase}). This implementation creates a thread pool
27
 * containing just one thread, meaning all validation tasks are run one after
28
 * another on that one thread. Especially for Level-3 validation tasks this is
29
 * probably exactly what you want. These tasks are run upon CRUD events, and you
30
 * don't want the database to be crawled to validate entire object graphs every
31
 * time a CRUD event takes place, especially since one CRUD operation may be
32
 * meant to cancel or correct a previous CRUD operation (e.g. a user of the
33
 * taxonomic editor may realize he/she did something wrong and then quickly
34
 * correct it).
35
 *
36
 * <p>
37
 * Although a {@code ValidationExecutor} sets up a thread pool containing just a
38
 * single thread, it does not logically or functionally <i>depend</i> on the
39
 * thread pool containing at most one thread. Thus, should performance become an
40
 * issue, and concurrency the solution, increasing the pool size is still an
41
 * option. For example, Level-2 validation tasks might be quite amenable to
42
 * being executed concurrently.
43
 *
44
 * <p>
45
 * The reason we extend {@code ThreadPoolExecutor} rather than simply use
46
 * {@link Executors#newSingleThreadExecutor()} is that we need access to the
47
 * threads in the thread pool for the reason indicated above: if an entity
48
 * annotated with Level-2 or Level-3 validation constraints is updated, it will
49
 * be validated on the validation thread. However, if it is quickly thereafter
50
 * updated again, you really would like to terminate the first validation if
51
 * it's still running. After all, it doesn't make sense to validate an entity in
52
 * a state that it no longer has. For Level-2 validations this may not be so
53
 * important, because they are likely to run fast. But for Level-3 validations
54
 * you want to prevent needless queueing and execution of long-running tasks.
55
 * Thus, you really would like to know which entity is being validated on the
56
 * validation thread. The {@code ThreadPoolExecutor} provides a
57
 * {@link #beforeExecute(Thread, Runnable)} method, passing us the thread and
58
 * the task that it is about to run. This allows us to track the threads in the
59
 * thread pool.
60
 * <p>
61
 * If the {@code ValidationExecutor} detects that a validation task enters the
62
 * task queue that will validate the same entity as the entity currently being
63
 * validated on the validation thread, it will call
64
 * {@link EntityValidationThread#setTerminationRequested(boolean)}. This gives
65
 * the {@link ConstraintValidator} running in the validation thread a chance to
66
 * terminate itself:<br>
67
 * <code>
68
 * if(Thread.currentThread() instanceof EntityValidationThread) {
69
 * 	EntityValidationThread evt = (EntityValidationThread) Thread.currentThread();
70
 * 	if(evt.isTerminationRequested()) {
71
 * 		// Stop with what I am doing
72
 * 	}
73
 * }
74
 * </code><br>
75
 * Constraint validators are free to include this logic or not. If they know
76
 * themselves to be short-lived it may not be worth it. But if they potentially
77
 * take a lot of time to complete, they can and and probably should include this
78
 * logic to prevent needless queueing and queue overruns. This would make them
79
 * dependent, though, on at least the {@link EntityValidationThread} class, so
80
 * there are some architectural issues here.
81
 *
82
 * @author ayco_holleman
83
 *
84
 */
85
//maybe we want to make this a spring component ?  #4663
86
public class ValidationExecutor extends ThreadPoolExecutor implements RejectedExecutionHandler {
87

    
88
	private static final Logger logger = Logger.getLogger(ValidationExecutor.class);
89

    
90
	// Number of threads to keep in the thread pool
91
	static final int CORE_POOL_SIZE = 0;
92
	// Maximum number of theads in the thread pool
93
	static final int MAX_POOL_SIZE = 1;
94
	// Number of seconds to wait for a new task before killing the validation
95
	// thread
96
	static final int KEEP_ALIFE_TIME = 5;
97
	// Maximum number of tasks allowed to wait to be executed by the validation
98
	// thread
99
	static final int TASK_QUEUE_SIZE = 1000;
100

    
101
	// Our basis for tracking the threads in the thread pool. We maintain
102
	// a list of weak references to the thread in the real thread pool,
103
	// maintained but totally hidden by the super class (ThreadPoolExecutor).
104
	final ArrayList<WeakReference<EntityValidationThread>> threads = new ArrayList<WeakReference<EntityValidationThread>>(
105
			MAX_POOL_SIZE);
106

    
107
	/**
108
	 * Creates a {@code ValidationExecutor} with a task queue size of 1000. Thus
109
	 * there can be at most 1000 pending validations. Thereafter newly submitted
110
	 * validation tasks will simply be discarded. See
111
	 * {@link #rejectedExecution(Runnable, ThreadPoolExecutor)}.
112
	 */
113
	public ValidationExecutor(){
114
	    this(TASK_QUEUE_SIZE);
115
	}
116

    
117
	/**
118
	 * Creates a {@code ValidationExecutor} with a custom task queue size.
119
	 *
120
	 * @param taskQueueSize
121
	 */
122
	public ValidationExecutor(int taskQueueSize){
123
		super(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIFE_TIME, TimeUnit.SECONDS,
124
				new EntityValidationTaskQueue(taskQueueSize));
125
		setThreadFactory(new ValidationThreadFactory());
126
		setRejectedExecutionHandler(this);
127
	}
128

    
129
	/**
130
	 * Implements the one method from {@link RejectedExecutionHandler}, which is
131
	 * called in case of task queue overruns. Because Level-2 and Level-3
132
	 * validations may not obstruct the CRUD events that triggered them, or
133
	 * impair the stability of the system as a whole, this method only writes an
134
	 * error message to the log4j log file. Thus, task queue overruns may cause
135
	 * Level-2 and/or Level-3 constraint violations to creep into the database.
136
	 * And thus, some other, batch-like process needs to crawl the entire
137
	 * database in search of Level-2 and Level-3 constraint violations every
138
	 * once in a while.
139
	 */
140
	@Override
141
	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor){
142
		EntityValidationTaskBase task = (EntityValidationTaskBase) r;
143
		logger.error(String.format(
144
				"Validation of %s cancelled. Too many validation tasks waiting to be executed.",
145
				task.getEntity().toString()));
146
	}
147

    
148
	/**
149
	 * Overrides method from {@link ThreadPoolExecutor} to prevent thread pool
150
	 * size from being altered. Will throw a RuntimeException. Future versions
151
	 * could abandon this restriction once it has become clear that concurrent
152
	 * execution of Level-2 and/or Level-3 validations constitutes no problem
153
	 * and may solve performance problems.
154
	 */
155
	@Override
156
	public void setMaximumPoolSize(int maximumPoolSize){
157
		throw new RuntimeException(
158
				"Altering maximum pool size for ValidationExecutor instances currently not allowed");
159
	}
160

    
161
	@Override
162
	protected void beforeExecute(Thread thread, Runnable runnable){
163
		EntityValidationThread validationThread = (EntityValidationThread) thread;
164
		EntityValidationTaskBase task = (EntityValidationTaskBase) runnable;
165
		validationThread.setTerminationRequested(false);
166
		task.setValidator(validationThread.getValidator());
167
		checkPool(validationThread, task);
168
		validationThread.setCurrentTask(task);
169
	}
170

    
171
	/*
172
	 * This method does 2 things. [A] It keeps track of the threads in the
173
	 * thread pool. If pendingThread is not yet in our "shadow pool" we add it
174
	 * to the shadow pool. [B] It searches for other threads in the trhead pool
175
	 * that are still busy validating an older version of the entity to be
176
	 * validated during pendingTask. If there is such a thread, we ask it to
177
	 * terminate itself. Whether or not this request is honored, we wait for the
178
	 * thread to complete. Otherwise the two threads might conflict with
179
	 * eachother when reading/writing from the error tables (i.e. the tables in
180
	 * which the outcome of a validation is stored). Note that, currently, this
181
	 * is all a bit theoretical because we only allow one thread in the thread
182
	 * pool. However, we want to be prepared for a future with truely concurrent
183
	 * validation.
184
	 */
185
	private void checkPool(EntityValidationThread pendingThread,
186
			EntityValidationTaskBase pendingTask){
187
		boolean found = false;
188
		Iterator<WeakReference<EntityValidationThread>> iterator = threads.iterator();
189
		while (iterator.hasNext()) {
190
			EntityValidationThread pooledThread = iterator.next().get();
191
			if (pooledThread == null) {
192
				// Thread has been removed from the real thread pool
193
				// and got garbage collected. Remove our weak reference
194
				// to the thread
195
				iterator.remove();
196
			} else if (pooledThread == pendingThread) {
197
				found = true;
198
			} else if (pooledThread.isAlive()) {
199
				if (pooledThread.getCurrentTask().equals(pendingTask)) {
200
					pooledThread.setTerminationRequested(true);
201
					pendingTask.waitFor(pooledThread);
202
				}
203
			}
204
		}
205
		if (!found) {
206
			threads.add(new WeakReference<EntityValidationThread>(pendingThread));
207
		}
208
		threads.trimToSize();
209
	}
210

    
211
}
(6-6/7)