Project

General

Profile

Download (9.3 KB) Statistics
| Branch: | Tag: | Revision:
1
/**
2
 * Copyright (C) 2009 EDIT
3
 * European Distributed Institute of Taxonomy
4
 * http://www.e-taxonomy.eu
5
 *
6
 * The contents of this file are subject to the Mozilla Public License Version 1.1
7
 * See LICENSE.TXT at the top of this package for the full license terms.
8
 */
9
package eu.etaxonomy.cdm.persistence.validation;
10

    
11
import java.lang.ref.WeakReference;
12
import java.util.ArrayList;
13
import java.util.Iterator;
14
import java.util.concurrent.Executors;
15
import java.util.concurrent.RejectedExecutionHandler;
16
import java.util.concurrent.ThreadPoolExecutor;
17
import java.util.concurrent.TimeUnit;
18

    
19
import javax.validation.ConstraintValidator;
20

    
21
import org.apache.logging.log4j.LogManager;
22
import org.apache.logging.log4j.Logger;
23

    
24
/**
25
 * A {@code ThreadPoolExecutor} specialized in dealing with
26
 * {@link EntityValidationThread}s and validation tasks (see
27
 * {@link EntityValidationTaskBase}). This implementation creates a thread pool
28
 * containing just one thread, meaning all validation tasks are run one after
29
 * another on that one thread. Especially for Level-3 validation tasks this is
30
 * probably exactly what you want. These tasks are run upon CRUD events, and you
31
 * don't want the database to be crawled to validate entire object graphs every
32
 * time a CRUD event takes place, especially since one CRUD operation may be
33
 * meant to cancel or correct a previous CRUD operation (e.g. a user of the
34
 * taxonomic editor may realize he/she did something wrong and then quickly
35
 * correct it).
36
 *
37
 * <p>
38
 * Although a {@code ValidationExecutor} sets up a thread pool containing just a
39
 * single thread, it does not logically or functionally <i>depend</i> on the
40
 * thread pool containing at most one thread. Thus, should performance become an
41
 * issue, and concurrency the solution, increasing the pool size is still an
42
 * option. For example, Level-2 validation tasks might be quite amenable to
43
 * being executed concurrently.
44
 *
45
 * <p>
46
 * The reason we extend {@code ThreadPoolExecutor} rather than simply use
47
 * {@link Executors#newSingleThreadExecutor()} is that we need access to the
48
 * threads in the thread pool for the reason indicated above: if an entity
49
 * annotated with Level-2 or Level-3 validation constraints is updated, it will
50
 * be validated on the validation thread. However, if it is quickly thereafter
51
 * updated again, you really would like to terminate the first validation if
52
 * it's still running. After all, it doesn't make sense to validate an entity in
53
 * a state that it no longer has. For Level-2 validations this may not be so
54
 * important, because they are likely to run fast. But for Level-3 validations
55
 * you want to prevent needless queueing and execution of long-running tasks.
56
 * Thus, you really would like to know which entity is being validated on the
57
 * validation thread. The {@code ThreadPoolExecutor} provides a
58
 * {@link #beforeExecute(Thread, Runnable)} method, passing us the thread and
59
 * the task that it is about to run. This allows us to track the threads in the
60
 * thread pool.
61
 * <p>
62
 * If the {@code ValidationExecutor} detects that a validation task enters the
63
 * task queue that will validate the same entity as the entity currently being
64
 * validated on the validation thread, it will call
65
 * {@link EntityValidationThread#setTerminationRequested(boolean)}. This gives
66
 * the {@link ConstraintValidator} running in the validation thread a chance to
67
 * terminate itself:<br>
68
 * <code>
69
 * if(Thread.currentThread() instanceof EntityValidationThread) {
70
 * 	EntityValidationThread evt = (EntityValidationThread) Thread.currentThread();
71
 * 	if(evt.isTerminationRequested()) {
72
 * 		// Stop with what I am doing
73
 * 	}
74
 * }
75
 * </code><br>
76
 * Constraint validators are free to include this logic or not. If they know
77
 * themselves to be short-lived it may not be worth it. But if they potentially
78
 * take a lot of time to complete, they can and and probably should include this
79
 * logic to prevent needless queueing and queue overruns. This would make them
80
 * dependent, though, on at least the {@link EntityValidationThread} class, so
81
 * there are some architectural issues here.
82
 *
83
 * @author ayco_holleman
84
 *
85
 */
86
//maybe we want to make this a spring component ?  #4663
87
public class ValidationExecutor
88
        extends ThreadPoolExecutor
89
        implements RejectedExecutionHandler {
90

    
91
    private static final Logger logger = LogManager.getLogger();
92

    
93
	// Number of threads to keep in the thread pool
94
	static final int CORE_POOL_SIZE = 0;
95
	// Maximum number of theads in the thread pool
96
	static final int MAX_POOL_SIZE = 1;
97
	// Number of seconds to wait for a new task before killing the validation
98
	// thread
99
	static final int KEEP_ALIFE_TIME = 5;
100
	// Maximum number of tasks allowed to wait to be executed by the validation
101
	// thread
102
	static final int TASK_QUEUE_SIZE = 1000;
103

    
104
	// Our basis for tracking the threads in the thread pool. We maintain
105
	// a list of weak references to the thread in the real thread pool,
106
	// maintained but totally hidden by the super class (ThreadPoolExecutor).
107
	final ArrayList<WeakReference<EntityValidationThread>> threads = new ArrayList<WeakReference<EntityValidationThread>>(
108
			MAX_POOL_SIZE);
109

    
110
	/**
111
	 * Creates a {@code ValidationExecutor} with a task queue size of 1000. Thus
112
	 * there can be at most 1000 pending validations. Thereafter newly submitted
113
	 * validation tasks will simply be discarded. See
114
	 * {@link #rejectedExecution(Runnable, ThreadPoolExecutor)}.
115
	 */
116
	public ValidationExecutor(){
117
	    this(TASK_QUEUE_SIZE);
118
	}
119

    
120
	/**
121
	 * Creates a {@code ValidationExecutor} with a custom task queue size.
122
	 *
123
	 * @param taskQueueSize
124
	 */
125
	public ValidationExecutor(int taskQueueSize){
126
		super(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIFE_TIME, TimeUnit.SECONDS,
127
				new EntityValidationTaskQueue(taskQueueSize));
128
		setThreadFactory(new ValidationThreadFactory());
129
		setRejectedExecutionHandler(this);
130
	}
131

    
132
	/**
133
	 * Implements the one method from {@link RejectedExecutionHandler}, which is
134
	 * called in case of task queue overruns. Because Level-2 and Level-3
135
	 * validations may not obstruct the CRUD events that triggered them, or
136
	 * impair the stability of the system as a whole, this method only writes an
137
	 * error message to the log4j log file. Thus, task queue overruns may cause
138
	 * Level-2 and/or Level-3 constraint violations to creep into the database.
139
	 * And thus, some other, batch-like process needs to crawl the entire
140
	 * database in search of Level-2 and Level-3 constraint violations every
141
	 * once in a while.
142
	 */
143
	@Override
144
	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor){
145
		EntityValidationTaskBase task = (EntityValidationTaskBase) r;
146
		logger.error(String.format(
147
				"Validation of %s cancelled. Too many validation tasks waiting to be executed.",
148
				task.getEntity().toString()));
149
	}
150

    
151
	/**
152
	 * Overrides method from {@link ThreadPoolExecutor} to prevent thread pool
153
	 * size from being altered. Will throw a RuntimeException. Future versions
154
	 * could abandon this restriction once it has become clear that concurrent
155
	 * execution of Level-2 and/or Level-3 validations constitutes no problem
156
	 * and may solve performance problems.
157
	 */
158
	@Override
159
	public void setMaximumPoolSize(int maximumPoolSize){
160
		throw new RuntimeException(
161
				"Altering maximum pool size for ValidationExecutor instances currently not allowed");
162
	}
163

    
164
	@Override
165
	protected void beforeExecute(Thread thread, Runnable runnable){
166
		EntityValidationThread validationThread = (EntityValidationThread) thread;
167
		EntityValidationTaskBase task = (EntityValidationTaskBase) runnable;
168
		validationThread.setTerminationRequested(false);
169
		task.setValidator(validationThread.getValidator());
170
		checkPool(validationThread, task);
171
		validationThread.setCurrentTask(task);
172
	}
173

    
174
	/*
175
	 * This method does 2 things. [A] It keeps track of the threads in the
176
	 * thread pool. If pendingThread is not yet in our "shadow pool" we add it
177
	 * to the shadow pool. [B] It searches for other threads in the trhead pool
178
	 * that are still busy validating an older version of the entity to be
179
	 * validated during pendingTask. If there is such a thread, we ask it to
180
	 * terminate itself. Whether or not this request is honored, we wait for the
181
	 * thread to complete. Otherwise the two threads might conflict with
182
	 * eachother when reading/writing from the error tables (i.e. the tables in
183
	 * which the outcome of a validation is stored). Note that, currently, this
184
	 * is all a bit theoretical because we only allow one thread in the thread
185
	 * pool. However, we want to be prepared for a future with truely concurrent
186
	 * validation.
187
	 */
188
	private void checkPool(EntityValidationThread pendingThread,
189
			EntityValidationTaskBase pendingTask){
190
		boolean found = false;
191
		Iterator<WeakReference<EntityValidationThread>> iterator = threads.iterator();
192
		while (iterator.hasNext()) {
193
			EntityValidationThread pooledThread = iterator.next().get();
194
			if (pooledThread == null) {
195
				// Thread has been removed from the real thread pool
196
				// and got garbage collected. Remove our weak reference
197
				// to the thread
198
				iterator.remove();
199
			} else if (pooledThread == pendingThread) {
200
				found = true;
201
			} else if (pooledThread.isAlive()) {
202
				if (pooledThread.getCurrentTask().equals(pendingTask)) {
203
					pooledThread.setTerminationRequested(true);
204
					pendingTask.waitFor(pooledThread);
205
				}
206
			}
207
		}
208
		if (!found) {
209
			threads.add(new WeakReference<EntityValidationThread>(pendingThread));
210
		}
211
		threads.trimToSize();
212
	}
213

    
214
}
(6-6/7)