merged trunk to cdm-3.3
[cdmlib.git] / cdmlib-services / src / main / java / eu / etaxonomy / cdm / api / service / search / CdmMassIndexer.java
1 // $Id$
2 /**
3 * Copyright (C) 2011 EDIT
4 * European Distributed Institute of Taxonomy
5 * http://www.e-taxonomy.eu
6 *
7 * The contents of this file are subject to the Mozilla Public License Version 1.1
8 * See LICENSE.TXT at the top of this package for the full license terms.
9 */
10 package eu.etaxonomy.cdm.api.service.search;
11
12 import java.io.IOException;
13 import java.lang.reflect.Field;
14 import java.util.ArrayList;
15 import java.util.Iterator;
16 import java.util.List;
17
18 import org.apache.log4j.Logger;
19 import org.apache.lucene.index.IndexReader;
20 import org.apache.lucene.index.IndexWriterConfig;
21 import org.apache.lucene.search.spell.Dictionary;
22 import org.apache.lucene.search.spell.LuceneDictionary;
23 import org.apache.lucene.search.spell.SpellChecker;
24 import org.apache.lucene.store.Directory;
25 import org.hibernate.CacheMode;
26 import org.hibernate.FlushMode;
27 import org.hibernate.ScrollMode;
28 import org.hibernate.ScrollableResults;
29 import org.hibernate.Session;
30 import org.hibernate.Transaction;
31 import org.hibernate.search.FullTextSession;
32 import org.hibernate.search.Search;
33 import org.hibernate.search.engine.spi.SearchFactoryImplementor;
34 import org.hibernate.search.indexes.impl.DirectoryBasedIndexManager;
35 import org.hibernate.search.indexes.spi.IndexManager;
36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.orm.hibernate4.HibernateTransactionManager;
38 import org.springframework.stereotype.Component;
39 import org.springframework.transaction.PlatformTransactionManager;
40 import org.springframework.transaction.annotation.Transactional;
41
42 import eu.etaxonomy.cdm.common.monitor.IProgressMonitor;
43 import eu.etaxonomy.cdm.common.monitor.NullProgressMonitor;
44 import eu.etaxonomy.cdm.common.monitor.RestServiceProgressMonitor;
45 import eu.etaxonomy.cdm.common.monitor.SubProgressMonitor;
46 import eu.etaxonomy.cdm.config.Configuration;
47 import eu.etaxonomy.cdm.model.common.CdmBase;
48 import eu.etaxonomy.cdm.model.description.DescriptionElementBase;
49 import eu.etaxonomy.cdm.model.name.NonViralName;
50 import eu.etaxonomy.cdm.model.name.TaxonNameBase;
51 import eu.etaxonomy.cdm.model.occurrence.SpecimenOrObservationBase;
52 import eu.etaxonomy.cdm.model.taxon.Classification;
53 import eu.etaxonomy.cdm.model.taxon.TaxonBase;
54
55 /**
56 * @author Andreas Kohlbecker
57 * @date Dec 15, 2011
58 *
59 */
60 @Component
61 @Transactional
62 public class CdmMassIndexer implements ICdmMassIndexer {
63
64 public static final Logger logger = Logger.getLogger(CdmMassIndexer.class);
65
66 private static final int BATCH_SIZE = 2000;
67
68 public HibernateTransactionManager transactionManager;
69
70 @Autowired
71 public void setTransactionManager(PlatformTransactionManager transactionManager) {
72 this.transactionManager = (HibernateTransactionManager)transactionManager;
73 }
74
75 protected Session getSession(){
76 Session session = transactionManager.getSessionFactory().getCurrentSession();
77 return session;
78 }
79
80 protected <T extends CdmBase>void reindex(Class<T> type, IProgressMonitor monitor) {
81
82 FullTextSession fullTextSession = Search.getFullTextSession(getSession());
83
84 fullTextSession.setFlushMode(FlushMode.MANUAL);
85 fullTextSession.setCacheMode(CacheMode.IGNORE);
86
87 logger.info("start indexing " + type.getName());
88 monitor.subTask("indexing " + type.getSimpleName());
89
90 Long countResult = countEntities(type);
91 int numOfBatches = calculateNumOfBatches(countResult);
92
93 SubProgressMonitor subMonitor = new SubProgressMonitor(monitor, 1);
94 subMonitor.beginTask("Indexing " + type.getSimpleName(), numOfBatches);
95
96 // Scrollable results will avoid loading too many objects in memory
97 ScrollableResults results = fullTextSession.createCriteria(type).setFetchSize(BATCH_SIZE).scroll(ScrollMode.FORWARD_ONLY);
98 long index = 0;
99 int batchesWorked = 0;
100
101 try {
102 while (results.next()) {
103 index++;
104 fullTextSession.index(results.get(0)); // index each element
105 if (index % BATCH_SIZE == 0 || index == countResult) {
106 batchesWorked++;
107 fullTextSession.flushToIndexes(); // apply changes to indexes
108 fullTextSession.clear(); // clear since the queue is processed
109 subMonitor.worked(1);
110 logger.info("\tbatch " + batchesWorked + "/" + numOfBatches + " processed");
111 //if(index / BATCH_SIZE > 10 ) break;
112 }
113 }
114 } catch (RuntimeException e) {
115 //TODO better means to notify that the process has been stopped, using the STOPPED_WORK_INDICATOR is only a hack
116 monitor.worked(RestServiceProgressMonitor.STOPPED_WORK_INDICATOR);
117 monitor.done();
118 throw e;
119 }
120 logger.info("end indexing " + type.getName());
121 subMonitor.done();
122 }
123
124 /**
125 *
126 *
127 * @param type
128 * @param monitor
129 */
130 protected <T extends CdmBase> void createDictionary(Class<T> type, IProgressMonitor monitor) {
131 String indexName = null;
132 if(type.isAnnotationPresent(org.hibernate.search.annotations.Indexed.class)) {
133 indexName = type.getAnnotation(org.hibernate.search.annotations.Indexed.class).index();
134 } else {
135 //TODO:give some indication that this class is infact not indexed
136 return;
137 }
138 SearchFactoryImplementor searchFactory = (SearchFactoryImplementor)Search.getFullTextSession(getSession()).getSearchFactory();
139 IndexManager indexManager = searchFactory.getAllIndexesManager().getIndexManager(indexName);
140 IndexReader indexReader = searchFactory.getIndexReaderAccessor().open(type);
141 List<String> idFields = getIndexedDeclaredFields(type);
142
143 monitor.subTask("creating dictionary " + type.getSimpleName());
144
145 SubProgressMonitor subMonitor = new SubProgressMonitor(monitor, 1);
146 subMonitor.beginTask("Creating dictionary " + type.getSimpleName(), 1);
147
148 Directory directory = ((DirectoryBasedIndexManager) indexManager).getDirectoryProvider().getDirectory();
149 SpellChecker spellChecker = null;
150 try {
151 spellChecker = new SpellChecker(directory);
152 Iterator<String> itr = idFields.iterator();
153 while(itr.hasNext()) {
154 String indexedField = itr.next();
155 logger.info("creating dictionary for field " + indexedField);
156 Dictionary dictionary = new LuceneDictionary(indexReader, indexedField);
157 IndexWriterConfig iwc = new IndexWriterConfig(Configuration.luceneVersion, searchFactory.getAnalyzer(type));
158 spellChecker.indexDictionary(dictionary, iwc, true);
159 }
160 subMonitor.internalWorked(1);
161 } catch (IOException e) {
162 logger.error("IOException when creating dictionary", e);
163 //TODO better means to notify that the process has been stopped, using the STOPPED_WORK_INDICATOR is only a hack
164 monitor.worked(RestServiceProgressMonitor.STOPPED_WORK_INDICATOR);
165 monitor.done();
166 } catch (RuntimeException e) {
167 logger.error("RuntimeException when creating dictionary", e);
168 //TODO better means to notify that the process has been stopped, using the STOPPED_WORK_INDICATOR is only a hack
169 monitor.worked(RestServiceProgressMonitor.STOPPED_WORK_INDICATOR);
170 monitor.done();
171 } finally {
172 searchFactory.getIndexReaderAccessor().close(indexReader);
173 }
174 if (spellChecker != null) {
175 try {
176 logger.info("closing spellchecker ");
177 spellChecker.close();
178 } catch (IOException e) {
179 logger.error("IOException when closing spellchecker", e);
180 }
181 }
182
183 logger.info("end creating dictionary " + type.getName());
184 subMonitor.done();
185 }
186
187 /**
188 * @param countResult
189 * @return
190 */
191 private int calculateNumOfBatches(Long countResult) {
192 Long numOfBatches = countResult > 0 ? ((countResult-1)/BATCH_SIZE)+1 : 0;
193 return numOfBatches.intValue();
194 }
195
196 /**
197 * @param type
198 * @return
199 */
200 private <T> Long countEntities(Class<T> type) {
201 Object countResultObj = getSession().createQuery("select count(*) from " + type.getName()).uniqueResult();
202 Long countResult = (Long)countResultObj;
203 return countResult;
204 }
205
206 protected <T extends CdmBase>void purge(Class<T> type, IProgressMonitor monitor) {
207
208 FullTextSession fullTextSession = Search.getFullTextSession(getSession());
209 logger.info("purging " + type.getName());
210 fullTextSession.purgeAll(type);
211
212
213 SearchFactoryImplementor searchFactory = (SearchFactoryImplementor)Search.getFullTextSession(getSession()).getSearchFactory();
214 IndexManager indexManager = searchFactory.getAllIndexesManager().getIndexManager(type.getName());
215 Directory directory = ((DirectoryBasedIndexManager) indexManager).getDirectoryProvider().getDirectory();
216 SpellChecker spellChecker = null;
217 try {
218 spellChecker = new SpellChecker(directory);
219 spellChecker.clearIndex();
220 } catch (IOException e) {
221 logger.error("IOException when creating dictionary", e);
222 //TODO better means to notify that the process has been stopped, using the STOPPED_WORK_INDICATOR is only a hack
223 //monitor.worked(RestServiceProgressMonitor.STOPPED_WORK_INDICATOR);
224 //monitor.done();
225 }
226
227 if (spellChecker != null) {
228 try {
229 logger.info("closing spellchecker ");
230 spellChecker.close();
231 } catch (IOException e) {
232 logger.error("IOException when closing spellchecker", e);
233 }
234 }
235 }
236
237
238 /* (non-Javadoc)
239 * @see eu.etaxonomy.cdm.database.IMassIndexer#reindex()
240 */
241 @Override
242 public void reindex(IProgressMonitor monitor){
243
244 if(monitor == null){
245 monitor = new NullProgressMonitor();
246 }
247
248 monitor.setTaskName("CdmMassIndexer");
249 int steps = indexedClasses().length + 1; // +1 for optimize
250 monitor.beginTask("Reindexing " + indexedClasses().length + " classes", steps);
251
252 for(Class<? extends CdmBase> type : indexedClasses()){
253 reindex(type, monitor);
254 // clear the session after each class to free memory
255 getSession().clear();
256 }
257
258 optimize();
259 monitor.worked(1);
260
261 monitor.done();
262 }
263
264 @Override
265 public void createDictionary(IProgressMonitor monitor) {
266 if(monitor == null){
267 monitor = new NullProgressMonitor();
268 }
269
270 monitor.setTaskName("CdmMassIndexer_Dictionary");
271 int steps = dictionaryClasses().length; // +1 for optimize
272 monitor.beginTask("Creating Dictionary " + dictionaryClasses().length + " classes", steps);
273
274 for(Class type : dictionaryClasses()){
275 createDictionary(type, monitor);
276 }
277
278 monitor.done();
279
280 }
281 protected void optimize() {
282
283 FullTextSession fullTextSession = Search.getFullTextSession(getSession());
284 fullTextSession.getSearchFactory().optimize();
285 fullTextSession.flushToIndexes();
286 fullTextSession.clear();
287 }
288
289 /**
290 * @return
291 */
292 private int totalBatchCount() {
293 int totalNumOfBatches = 0;
294 for(Class type : indexedClasses()){
295 totalNumOfBatches += calculateNumOfBatches(countEntities(type));
296 }
297 return totalNumOfBatches;
298 }
299
300 /* (non-Javadoc)
301 * @see eu.etaxonomy.cdm.database.IMassIndexer#purge()
302 */
303 @Override
304 public void purge(IProgressMonitor monitor){
305
306 if(monitor == null){
307 monitor = new NullProgressMonitor();
308 }
309
310 monitor.setTaskName("CdmMassIndexer");
311 int steps = indexedClasses().length + 1; // +1 for optimize
312 monitor.beginTask("Purging " + indexedClasses().length + " classes", steps);
313
314 for(Class<? extends CdmBase> type : indexedClasses()){
315 purge(type, monitor);
316 monitor.worked(1);
317 }
318
319 // // need to commit and start new transaction before optimizing
320 FullTextSession fullTextSession = Search.getFullTextSession(getSession());
321 Transaction tx = fullTextSession.getTransaction();
322 tx.commit();
323 fullTextSession.beginTransaction(); // will be committed automatically at the end of this method since this class is transactional
324
325 optimize();
326 monitor.worked(1);
327
328 monitor.done();
329 }
330
331
332 /**
333 * Returns a list of declared indexable fields within a class through reflection.
334 *
335 * @param clazz
336 * @return
337 */
338 private List<String> getIndexedDeclaredFields(Class clazz) {
339 List<String> idFields = new ArrayList<String>();
340 if(clazz.isAnnotationPresent(org.hibernate.search.annotations.Indexed.class)) {
341 Field[] declaredFields = clazz.getDeclaredFields();
342 for(int i=0;i<declaredFields.length;i++ ) {
343 logger.info("checking field " + declaredFields[i].getName());
344 if(declaredFields[i].isAnnotationPresent(org.hibernate.search.annotations.Field.class) ||
345 declaredFields[i].isAnnotationPresent(org.hibernate.search.annotations.Fields.class)) {
346 idFields.add(declaredFields[i].getName());
347 logger.info("adding field " + declaredFields[i].getName());
348 }
349 }
350 }
351 return idFields;
352 }
353 /**
354 * @return
355 */
356 @SuppressWarnings("unchecked")
357 @Override
358 public Class<? extends CdmBase>[] indexedClasses() {
359 return new Class[] {
360 DescriptionElementBase.class,
361 Classification.class,
362 TaxonBase.class,
363 TaxonNameBase.class,
364 SpecimenOrObservationBase.class
365 };
366 }
367
368 /**
369 * @return
370 */
371 @Override
372 public Class[] dictionaryClasses() {
373 return new Class[] {
374 NonViralName.class
375 };
376 }
377
378
379
380
381 }