Project

General

Profile

Download (15.1 KB) Statistics
| Branch: | Tag: | Revision:
1
// $Id$
2
/**
3
* Copyright (C) 2011 EDIT
4
* European Distributed Institute of Taxonomy
5
* http://www.e-taxonomy.eu
6
*
7
* The contents of this file are subject to the Mozilla Public License Version 1.1
8
* See LICENSE.TXT at the top of this package for the full license terms.
9
*/
10
package eu.etaxonomy.cdm.api.service.search;
11

    
12
import java.io.IOException;
13
import java.lang.reflect.Field;
14
import java.util.ArrayList;
15
import java.util.HashSet;
16
import java.util.Iterator;
17
import java.util.List;
18
import java.util.Set;
19

    
20
import org.apache.log4j.Logger;
21
import org.apache.lucene.index.IndexReader;
22
import org.apache.lucene.index.IndexWriterConfig;
23
import org.apache.lucene.search.spell.Dictionary;
24
import org.apache.lucene.search.spell.LuceneDictionary;
25
import org.apache.lucene.search.spell.SpellChecker;
26
import org.apache.lucene.store.Directory;
27
import org.hibernate.CacheMode;
28
import org.hibernate.FlushMode;
29
import org.hibernate.ObjectNotFoundException;
30
import org.hibernate.ScrollMode;
31
import org.hibernate.ScrollableResults;
32
import org.hibernate.Session;
33
import org.hibernate.search.FullTextSession;
34
import org.hibernate.search.Search;
35
import org.hibernate.search.engine.spi.SearchFactoryImplementor;
36
import org.hibernate.search.indexes.impl.DirectoryBasedIndexManager;
37
import org.hibernate.search.indexes.spi.IndexManager;
38
import org.springframework.beans.factory.annotation.Autowired;
39
import org.springframework.orm.hibernate4.HibernateTransactionManager;
40
import org.springframework.stereotype.Component;
41
import org.springframework.transaction.PlatformTransactionManager;
42
import org.springframework.transaction.annotation.Transactional;
43

    
44
import eu.etaxonomy.cdm.common.monitor.IProgressMonitor;
45
import eu.etaxonomy.cdm.common.monitor.NullProgressMonitor;
46
import eu.etaxonomy.cdm.common.monitor.RestServiceProgressMonitor;
47
import eu.etaxonomy.cdm.common.monitor.SubProgressMonitor;
48
import eu.etaxonomy.cdm.config.Configuration;
49
import eu.etaxonomy.cdm.model.common.CdmBase;
50
import eu.etaxonomy.cdm.model.description.DescriptionElementBase;
51
import eu.etaxonomy.cdm.model.name.NonViralName;
52
import eu.etaxonomy.cdm.model.name.TaxonNameBase;
53
import eu.etaxonomy.cdm.model.occurrence.SpecimenOrObservationBase;
54
import eu.etaxonomy.cdm.model.taxon.Classification;
55
import eu.etaxonomy.cdm.model.taxon.TaxonBase;
56

    
57
/**
58
 * @author Andreas Kohlbecker
59
 * @date Dec 15, 2011
60
 *
61
 */
62
@Component
63
@Transactional
64
public class CdmMassIndexer implements ICdmMassIndexer {
65

    
66
	private Set<Class<? extends CdmBase>> indexedClasses = new HashSet<Class<? extends CdmBase>>();
67
    public static final Logger logger = Logger.getLogger(CdmMassIndexer.class);
68

    
69
    /*
70
     *      !!! DO NOTE CHANGE THIS !!!
71
     *
72
     * batch_size optimized for 200MB of heap memory
73
     */
74
    private static final int BATCH_SIZE = 200;
75

    
76
    public HibernateTransactionManager transactionManager;
77

    
78
    @Autowired
79
    public void setTransactionManager(PlatformTransactionManager transactionManager) {
80
        this.transactionManager = (HibernateTransactionManager)transactionManager;
81
    }
82

    
83
    protected Session getSession(){
84
        Session session = transactionManager.getSessionFactory().getCurrentSession();
85
        return session;
86
    }
87

    
88
    protected <T extends CdmBase>void reindex(Class<T> type, IProgressMonitor monitor) {
89

    
90
        FullTextSession fullTextSession = Search.getFullTextSession(getSession());
91

    
92
        fullTextSession.setFlushMode(FlushMode.MANUAL);
93
        fullTextSession.setCacheMode(CacheMode.IGNORE);
94

    
95
        logger.info("start indexing " + type.getName());
96
        monitor.subTask("indexing " + type.getSimpleName());
97

    
98
        Long countResult = countEntities(type);
99
        int numOfBatches = calculateNumOfBatches(countResult);
100

    
101
        SubProgressMonitor subMonitor = new SubProgressMonitor(monitor, 1);
102
        subMonitor.beginTask("Indexing " + type.getSimpleName(), numOfBatches);
103

    
104
        // Scrollable results will avoid loading too many objects in memory
105
        ScrollableResults results = fullTextSession.createCriteria(type).setFetchSize(BATCH_SIZE).scroll(ScrollMode.FORWARD_ONLY);
106
        long index = 0;
107
        int batchesWorked = 0;
108

    
109
        try {
110
            while (results.next()) {
111
                index++;
112
                fullTextSession.index(results.get(0)); // index each element
113
                if (index % BATCH_SIZE == 0 || index == countResult) {
114
                    batchesWorked++;
115
                    try {
116
                        fullTextSession.flushToIndexes(); // apply changes to indexes
117
                    } catch(ObjectNotFoundException e){
118
                        // TODO report this issue to progress monitor once it can report on errors
119
                        logger.error("possibly invalid data, thus skipping this batch and continuing with next one", e);
120
                    } finally {
121
                        fullTextSession.clear(); // clear since the queue is processed
122
                        getSession().clear(); // clear session to free memory
123
                        subMonitor.worked(1);
124
                        logger.info("\tbatch " + batchesWorked + "/" + numOfBatches + " processed");
125
                    }
126
                }
127
            }
128
        } catch (RuntimeException e) {
129
            //TODO better means to notify that the process has been stopped, using the STOPPED_WORK_INDICATOR is only a hack
130
            monitor.worked(RestServiceProgressMonitor.STOPPED_WORK_INDICATOR);
131
            monitor.done();
132
            throw	e;
133
        }
134
        logger.info("end indexing " + type.getName());
135
        subMonitor.done();
136
    }
137

    
138
    /**
139
     *
140
     *
141
     * @param type
142
     * @param monitor
143
     */
144
    protected <T extends CdmBase> void createDictionary(Class<T> type, IProgressMonitor monitor)  {
145
        String indexName = null;
146
        if(type.isAnnotationPresent(org.hibernate.search.annotations.Indexed.class)) {
147
            indexName = type.getAnnotation(org.hibernate.search.annotations.Indexed.class).index();
148
        } else {
149
            //TODO:give some indication that this class is infact not indexed
150
            return;
151
        }
152
        SearchFactoryImplementor searchFactory = (SearchFactoryImplementor)Search.getFullTextSession(getSession()).getSearchFactory();
153
        IndexManager indexManager = searchFactory.getAllIndexesManager().getIndexManager(indexName);
154
        IndexReader indexReader = searchFactory.getIndexReaderAccessor().open(type);
155
        List<String> idFields = getIndexedDeclaredFields(type);
156

    
157
        monitor.subTask("creating dictionary " + type.getSimpleName());
158

    
159
        SubProgressMonitor subMonitor = new SubProgressMonitor(monitor, 1);
160
        subMonitor.beginTask("Creating dictionary " + type.getSimpleName(), 1);
161

    
162
        Directory directory = ((DirectoryBasedIndexManager) indexManager).getDirectoryProvider().getDirectory();
163
        SpellChecker spellChecker = null;
164
        try {
165
            spellChecker = new SpellChecker(directory);
166
            Iterator<String> itr = idFields.iterator();
167
            while(itr.hasNext()) {
168
                String indexedField = itr.next();
169
                logger.info("creating dictionary for field " + indexedField);
170
                Dictionary dictionary = new LuceneDictionary(indexReader, indexedField);
171
                IndexWriterConfig iwc = new IndexWriterConfig(Configuration.luceneVersion, searchFactory.getAnalyzer(type));
172
                spellChecker.indexDictionary(dictionary, iwc, true);
173
            }
174
            subMonitor.internalWorked(1);
175
        } catch (IOException e) {
176
            logger.error("IOException when creating dictionary", e);
177
            //TODO better means to notify that the process has been stopped, using the STOPPED_WORK_INDICATOR is only a hack
178
            monitor.worked(RestServiceProgressMonitor.STOPPED_WORK_INDICATOR);
179
            monitor.done();
180
        } catch (RuntimeException e) {
181
            logger.error("RuntimeException when creating dictionary", e);
182
            //TODO better means to notify that the process has been stopped, using the STOPPED_WORK_INDICATOR is only a hack
183
            monitor.worked(RestServiceProgressMonitor.STOPPED_WORK_INDICATOR);
184
            monitor.done();
185
        } finally {
186
            searchFactory.getIndexReaderAccessor().close(indexReader);
187
        }
188
        if (spellChecker != null) {
189
            try {
190
                logger.info("closing spellchecker ");
191
                spellChecker.close();
192
            } catch (IOException e) {
193
                logger.error("IOException when closing spellchecker", e);
194
            }
195
        }
196

    
197
        logger.info("end creating dictionary " + type.getName());
198
        subMonitor.done();
199
    }
200

    
201
    /**
202
     * @param countResult
203
     * @return
204
     */
205
    private int calculateNumOfBatches(Long countResult) {
206
        Long numOfBatches =  countResult > 0 ? ((countResult-1)/BATCH_SIZE)+1 : 0;
207
        return numOfBatches.intValue();
208
    }
209

    
210
    /**
211
     * @param type
212
     * @return
213
     */
214
    private <T> Long countEntities(Class<T> type) {
215
        Object countResultObj = getSession().createQuery("select count(*) from " + type.getName()).uniqueResult();
216
        Long countResult = (Long)countResultObj;
217
        return countResult;
218
    }
219

    
220
    protected <T extends CdmBase>void purge(Class<T> type, IProgressMonitor monitor) {
221

    
222
        FullTextSession fullTextSession = Search.getFullTextSession(getSession());
223
        logger.info("purging " + type.getName());
224
        fullTextSession.purgeAll(type);
225

    
226

    
227
        SearchFactoryImplementor searchFactory = (SearchFactoryImplementor)fullTextSession.getSearchFactory();
228
        IndexManager indexManager = searchFactory.getAllIndexesManager().getIndexManager(type.getName());
229
        Directory directory = ((DirectoryBasedIndexManager) indexManager).getDirectoryProvider().getDirectory();
230
        SpellChecker spellChecker = null;
231
        try {
232
            spellChecker = new SpellChecker(directory);
233
            spellChecker.clearIndex();
234
        } catch (IOException e) {
235
            logger.error("IOException when creating dictionary", e);
236
            //TODO better means to notify that the process has been stopped, using the STOPPED_WORK_INDICATOR is only a hack
237
            monitor.worked(RestServiceProgressMonitor.STOPPED_WORK_INDICATOR);
238
            monitor.done();
239
        }
240

    
241
        if (spellChecker != null) {
242
            try {
243
                logger.info("closing spellchecker ");
244
                spellChecker.close();
245
            } catch (IOException e) {
246
                logger.error("IOException when closing spellchecker", e);
247
            }
248
        }
249
    }
250

    
251

    
252
    /* (non-Javadoc)
253
     * @see eu.etaxonomy.cdm.database.IMassIndexer#reindex()
254
     */
255
    @Override
256
    public void reindex(IProgressMonitor monitor){
257

    
258
        if(monitor == null){
259
            monitor = new NullProgressMonitor();
260
        }
261

    
262
        monitor.setTaskName("CdmMassIndexer");
263
        int steps = indexedClasses().size() + 1; // +1 for optimize
264
        monitor.beginTask("Reindexing " + indexedClasses().size() + " classes", steps);
265

    
266
        for(Class<? extends CdmBase> type : indexedClasses()){
267
            reindex(type, monitor);
268
        }
269

    
270
        monitor.subTask("Optimizing Index");
271
        SubProgressMonitor subMonitor = new SubProgressMonitor(monitor, 1);
272
        subMonitor.beginTask("Optimizing Index",1);
273
        optimize();        
274
        subMonitor.worked(1);
275
        logger.info("end index optimization");
276
        subMonitor.done();
277
        
278
        //monitor.worked(1);
279
        monitor.done();
280
    }
281

    
282
    @Override
283
    public void createDictionary(IProgressMonitor monitor) {
284
        if(monitor == null){
285
            monitor = new NullProgressMonitor();
286
        }
287

    
288
        monitor.setTaskName("CdmMassIndexer_Dictionary");
289
        int steps = dictionaryClasses().length; // +1 for optimize
290
        monitor.beginTask("Creating Dictionary " + dictionaryClasses().length + " classes", steps);
291

    
292
        for(Class type : dictionaryClasses()){
293
            createDictionary(type, monitor);
294
        }
295

    
296
        monitor.done();
297

    
298
    }
299
    protected void optimize() {
300

    
301
        FullTextSession fullTextSession = Search.getFullTextSession(getSession());
302
        fullTextSession.getSearchFactory().optimize();
303
        fullTextSession.flushToIndexes();
304
        fullTextSession.clear();
305
    }
306

    
307
    /**
308
     * @return
309
     */
310
    private int totalBatchCount() {
311
        int totalNumOfBatches = 0;
312
        for(Class type : indexedClasses()){
313
            totalNumOfBatches += calculateNumOfBatches(countEntities(type));
314
        }
315
        return totalNumOfBatches;
316
    }
317

    
318
    /* (non-Javadoc)
319
     * @see eu.etaxonomy.cdm.database.IMassIndexer#purge()
320
     */
321
    @Override
322
    public void purge(IProgressMonitor monitor){
323

    
324
        if(monitor == null){
325
            monitor = new NullProgressMonitor();
326
        }
327

    
328
        monitor.setTaskName("CdmMassIndexer");
329
        int steps = indexedClasses().size() + 1; // +1 for optimize
330
        monitor.beginTask("Purging " + indexedClasses().size() + " classes", steps);
331

    
332
        for(Class<? extends CdmBase> type : indexedClasses()){
333
            purge(type, monitor);
334
            monitor.worked(1);
335
        }
336
        // need to flush to the index before optimizing
337
        // the purge method is not doing the flushing by itself
338
        FullTextSession fullTextSession = Search.getFullTextSession(getSession());
339
        fullTextSession.flushToIndexes();
340

    
341
        // optimize
342
        optimize();
343
        monitor.worked(1);
344

    
345
        // done
346
        monitor.done();
347
    }
348

    
349

    
350
    /**
351
     * Returns a list of declared indexable fields within a class through reflection.
352
     *
353
     * @param clazz
354
     * @return
355
     */
356
    private List<String> getIndexedDeclaredFields(Class clazz) {
357
        List<String> idFields = new ArrayList<String>();
358
        if(clazz.isAnnotationPresent(org.hibernate.search.annotations.Indexed.class)) {
359
            Field[] declaredFields = clazz.getDeclaredFields();
360
            for(int i=0;i<declaredFields.length;i++ ) {
361
                logger.info("checking field " + declaredFields[i].getName());
362
                if(declaredFields[i].isAnnotationPresent(org.hibernate.search.annotations.Field.class) ||
363
                        declaredFields[i].isAnnotationPresent(org.hibernate.search.annotations.Fields.class)) {
364
                    idFields.add(declaredFields[i].getName());
365
                    logger.info("adding field " + declaredFields[i].getName());
366
                }
367
            }
368
        }
369
        return idFields;
370
    }
371
    /**
372
     * @return
373
     */
374
    @SuppressWarnings("unchecked")
375
    @Override
376
    public Set<Class<? extends CdmBase>> indexedClasses() {
377
    	// if no indexed classes have been 'manually' set then
378
    	// the default is the full list
379
    	if(indexedClasses.size() == 0) {
380
    		indexedClasses.add(DescriptionElementBase.class);
381
    		indexedClasses.add(TaxonBase.class);    		
382
    		indexedClasses.add(Classification.class);
383
    		indexedClasses.add(TaxonNameBase.class);
384
    		indexedClasses.add(SpecimenOrObservationBase.class);
385
    	}
386
        return indexedClasses;
387
    }
388

    
389
    /**
390
     * @return
391
     */
392
    @Override
393
    public Class[] dictionaryClasses() {
394
        return new Class[] {
395
                NonViralName.class
396
                };
397
    }
398

    
399
	@Override
400
	public void addToIndexedClasses(Class<? extends CdmBase> cdmBaseClass) {
401
		indexedClasses.add(cdmBaseClass);
402
		
403
	}
404

    
405
	@Override
406
	public void clearIndexedClasses() {
407
		indexedClasses.clear();		
408
	}
409

    
410

    
411
}
(1-1/10)