Project

General

Profile

Download (19.7 KB) Statistics
| Branch: | Tag: | Revision:
1
/**
2
* Copyright (C) 2011 EDIT
3
* European Distributed Institute of Taxonomy
4
* http://www.e-taxonomy.eu
5
*
6
* The contents of this file are subject to the Mozilla Public License Version 1.1
7
* See LICENSE.TXT at the top of this package for the full license terms.
8
*/
9
package eu.etaxonomy.cdm.api.service.search;
10

    
11
import java.io.IOException;
12
import java.lang.management.ManagementFactory;
13
import java.lang.management.MemoryMXBean;
14
import java.lang.management.MemoryUsage;
15
import java.lang.reflect.Field;
16
import java.util.ArrayList;
17
import java.util.Collection;
18
import java.util.HashSet;
19
import java.util.Iterator;
20
import java.util.List;
21
import java.util.Set;
22

    
23
import org.apache.log4j.Logger;
24
import org.apache.lucene.index.IndexReader;
25
import org.apache.lucene.index.IndexWriterConfig;
26
import org.apache.lucene.search.spell.Dictionary;
27
import org.apache.lucene.search.spell.LuceneDictionary;
28
import org.apache.lucene.search.spell.SpellChecker;
29
import org.apache.lucene.store.Directory;
30
import org.hibernate.CacheMode;
31
import org.hibernate.FlushMode;
32
import org.hibernate.ObjectNotFoundException;
33
import org.hibernate.ScrollMode;
34
import org.hibernate.ScrollableResults;
35
import org.hibernate.Session;
36
import org.hibernate.search.FullTextSession;
37
import org.hibernate.search.Search;
38
import org.hibernate.search.SearchFactory;
39
import org.hibernate.search.batchindexing.MassIndexerProgressMonitor;
40
import org.hibernate.search.indexes.spi.DirectoryBasedIndexManager;
41
import org.hibernate.search.indexes.spi.IndexManager;
42
import org.hibernate.search.spi.SearchIntegrator;
43
import org.springframework.beans.factory.annotation.Autowired;
44
import org.springframework.orm.hibernate5.HibernateTransactionManager;
45
import org.springframework.stereotype.Component;
46
import org.springframework.transaction.PlatformTransactionManager;
47
import org.springframework.transaction.annotation.Transactional;
48

    
49
import eu.etaxonomy.cdm.common.monitor.IProgressMonitor;
50
import eu.etaxonomy.cdm.common.monitor.NullProgressMonitor;
51
import eu.etaxonomy.cdm.common.monitor.RestServiceProgressMonitor;
52
import eu.etaxonomy.cdm.common.monitor.SubProgressMonitor;
53
import eu.etaxonomy.cdm.model.common.CdmBase;
54
import eu.etaxonomy.cdm.model.description.DescriptionElementBase;
55
import eu.etaxonomy.cdm.model.name.TaxonNameBase;
56
import eu.etaxonomy.cdm.model.occurrence.SpecimenOrObservationBase;
57
import eu.etaxonomy.cdm.model.taxon.Classification;
58
import eu.etaxonomy.cdm.model.taxon.TaxonBase;
59
import eu.etaxonomy.cdm.model.taxon.TaxonRelationship;
60

    
61
/**
62
 * @author Andreas Kohlbecker
63
 * @date Dec 15, 2011
64
 *
65
 */
66
@Component
67
@Transactional
68
public class CdmMassIndexer implements ICdmMassIndexer {
69

    
70
    private final Set<Class<? extends CdmBase>> indexedClasses = new HashSet<Class<? extends CdmBase>>();
71
    public static final Logger logger = Logger.getLogger(CdmMassIndexer.class);
72

    
73
    /*
74
     * flag to enable old hibernate search 3.1 mode
75
     */
76
    private static final boolean HS_31_MODE = false;
77

    
78
    public HibernateTransactionManager transactionManager;
79

    
80
    @Autowired
81
    public void setTransactionManager(PlatformTransactionManager transactionManager) {
82
        this.transactionManager = (HibernateTransactionManager)transactionManager;
83
    }
84

    
85
    protected Session getSession(){
86
        Session session = transactionManager.getSessionFactory().getCurrentSession();
87
        return session;
88
    }
89

    
90
    /**
91
     * reindex method based on hibernate search  3.1
92
     *
93
     * @param type
94
     * @param monitor
95
     */
96
    protected <T extends CdmBase>void reindex_31(Class<T> type, IProgressMonitor monitor) {
97

    
98
        //TODO set the application in maintenance mode: making
99
        // queries to the index is not recommended when a MassIndexer is busy.
100
        // fullTextSession.createIndexer().startAndWait();
101

    
102
        FullTextSession fullTextSession = Search.getFullTextSession(getSession());
103

    
104
        fullTextSession.setFlushMode(FlushMode.MANUAL);
105
        fullTextSession.setCacheMode(CacheMode.IGNORE);
106

    
107
        logger.info("start indexing " + type.getName());
108
        monitor.subTask("indexing " + type.getSimpleName());
109

    
110
        Long countResult = countEntities(type);
111
        int batchSize = sweetestBatchSize(type);
112
        int numOfBatches = calculateNumOfBatches(countResult, batchSize);
113

    
114
        SubProgressMonitor subMonitor = new SubProgressMonitor(monitor, 1);
115
        subMonitor.beginTask("Indexing " + type.getSimpleName(), numOfBatches);
116

    
117

    
118
        // Scrollable results will avoid loading too many objects in memory
119
        ScrollableResults results = fullTextSession.createCriteria(type).setFetchSize(batchSize).scroll(ScrollMode.FORWARD_ONLY);
120
        long index = 0;
121
        int batchesWorked = 0;
122

    
123

    
124
        try {
125
            while (results.next()) {
126
                index++;
127
                fullTextSession.index(results.get(0)); // index each element
128
                if (index % batchSize == 0 || index == countResult) {
129
                    batchesWorked++;
130
                    try {
131
                        fullTextSession.flushToIndexes(); // apply changes to indexes
132
                    } catch(ObjectNotFoundException e){
133
                        // TODO report this issue to progress monitor once it can report on errors
134
                        logger.error("possibly invalid data, thus skipping this batch and continuing with next one", e);
135
                    } finally {
136
                        fullTextSession.clear(); // clear since the queue is processed
137
                        getSession().clear(); // clear session to free memory
138
                        subMonitor.worked(1);
139
                        logger.info("\tbatch " + batchesWorked + "/" + numOfBatches + " processed");
140
                    }
141
                }
142
            }
143
        } catch (RuntimeException e) {
144
            //TODO better means to notify that the process has been stopped, using the STOPPED_WORK_INDICATOR is only a hack
145
            monitor.worked(RestServiceProgressMonitor.STOPPED_WORK_INDICATOR);
146
            monitor.done();
147
            throw	e;
148
        }
149
        logger.info("end indexing " + type.getName());
150
        subMonitor.done();
151
    }
152

    
153
    /**
154
     *
155
     *
156
     * @param type
157
     * @param monitor
158
     */
159
    protected <T extends CdmBase> void createDictionary(Class<T> type, IProgressMonitor monitor)  {
160
        String indexName = null;
161
        if(type.isAnnotationPresent(org.hibernate.search.annotations.Indexed.class)) {
162
            indexName = type.getAnnotation(org.hibernate.search.annotations.Indexed.class).index();
163
        } else {
164
            //TODO:give some indication that this class is infact not indexed
165
            return;
166
        }
167
        SearchFactory searchFactory = Search.getFullTextSession(getSession()).getSearchFactory();
168
        IndexManager indexManager = obtainIndexManager(searchFactory, indexName);
169

    
170
        IndexReader indexReader = searchFactory.getIndexReaderAccessor().open(type);
171
        List<String> idFields = getIndexedDeclaredFields(type);
172

    
173
        monitor.subTask("creating dictionary " + type.getSimpleName());
174

    
175
        SubProgressMonitor subMonitor = new SubProgressMonitor(monitor, 1);
176
        subMonitor.beginTask("Creating dictionary " + type.getSimpleName(), 1);
177

    
178
        Directory directory = ((DirectoryBasedIndexManager) indexManager).getDirectoryProvider().getDirectory();
179
        SpellChecker spellChecker = null;
180
        try {
181
            spellChecker = new SpellChecker(directory);
182
            Iterator<String> itr = idFields.iterator();
183
            while(itr.hasNext()) {
184
                String indexedField = itr.next();
185
                logger.info("creating dictionary for field " + indexedField);
186
                Dictionary dictionary = new LuceneDictionary(indexReader, indexedField);
187
                IndexWriterConfig iwc = new IndexWriterConfig(searchFactory.getAnalyzer(type));
188
                spellChecker.indexDictionary(dictionary, iwc, true);
189
            }
190
            subMonitor.internalWorked(1);
191
        } catch (IOException e) {
192
            logger.error("IOException when creating dictionary", e);
193
            //TODO better means to notify that the process has been stopped, using the STOPPED_WORK_INDICATOR is only a hack
194
            monitor.worked(RestServiceProgressMonitor.STOPPED_WORK_INDICATOR);
195
            monitor.done();
196
        } catch (RuntimeException e) {
197
            logger.error("RuntimeException when creating dictionary", e);
198
            //TODO better means to notify that the process has been stopped, using the STOPPED_WORK_INDICATOR is only a hack
199
            monitor.worked(RestServiceProgressMonitor.STOPPED_WORK_INDICATOR);
200
            monitor.done();
201
        } finally {
202
            searchFactory.getIndexReaderAccessor().close(indexReader);
203
        }
204
        if (spellChecker != null) {
205
            try {
206
                logger.info("closing spellchecker ");
207
                spellChecker.close();
208
            } catch (IOException e) {
209
                logger.error("IOException when closing spellchecker", e);
210
            }
211
        }
212

    
213
        logger.info("end creating dictionary " + type.getName());
214
        subMonitor.done();
215
    }
216

    
217
    private IndexManager obtainIndexManager(SearchFactory searchFactory, String indexName){
218
        SearchIntegrator searchIntegrator = searchFactory.unwrap(SearchIntegrator.class );
219
        IndexManager indexManager = searchIntegrator.getIndexManager(indexName);
220
        return indexManager;
221
    }
222

    
223
    private int sweetestBatchSize(Class<? extends CdmBase> type){
224

    
225
        Runtime.getRuntime().gc();
226
        long freeMemoryMB;
227
        MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
228
        if(memoryMXBean != null){
229
            logger.debug("NonHeapMemoryUsage: "+memoryMXBean.getHeapMemoryUsage());
230
             MemoryUsage memusage = memoryMXBean.getHeapMemoryUsage();
231
             freeMemoryMB =( memusage.getMax() - memusage.getUsed()) / (1024 * 1024);
232
        } else {
233
            // will be smaller than the actual free mem since Runtime does not
234
            // know about Committed heap mem
235
            freeMemoryMB = Runtime.getRuntime().freeMemory() / (1024 * 1024);
236
        }
237

    
238
        // TODO check for min free:
239
        // < 600MB => ERROR may fail with out of memory
240
        // < 750MB => WARNING may be slow
241
        if(freeMemoryMB < 600) {
242
            logger.error("The available free heap space appears to be too small (<600MB), the mass indexer may run out of memory!");
243
        }
244
        if(freeMemoryMB < 750) {
245
            logger.warn("The available free heap space appears to be small (<750MB), the mass indexer could be slow!");
246
        }
247

    
248
        double factor = 0.769; // default
249
        if(DescriptionElementBase.class.isAssignableFrom(type)) {
250
            factor = 0.025;
251
        }
252

    
253
        int batchSize = (int) Math.floor( factor * freeMemoryMB);
254
        logger.info("calculated batch size sweet spot for indexing " + type.getSimpleName()
255
                + " with " +  freeMemoryMB +  "MB free mem is " + batchSize);
256
        return batchSize;
257
    }
258

    
259
    /**
260
     * @param countResult
261
     * @return
262
     */
263
    private int calculateNumOfBatches(Long countResult, int batchSize) {
264
        Long numOfBatches =  countResult > 0 ? ((countResult-1)/batchSize)+1 : 0;
265
        return numOfBatches.intValue();
266
    }
267

    
268
    /**
269
     * @param type
270
     * @return
271
     */
272
    private <T> Long countEntities(Class<T> type) {
273
        Object countResultObj = getSession().createQuery("select count(*) from " + type.getName()).uniqueResult();
274
        Long countResult = (Long)countResultObj;
275
        return countResult;
276
    }
277

    
278
    protected <T extends CdmBase>void purge(Class<T> type, IProgressMonitor monitor) {
279

    
280
        FullTextSession fullTextSession = Search.getFullTextSession(getSession());
281
        logger.info("purging " + type.getName());
282
        fullTextSession.purgeAll(type);
283

    
284
        // TODO
285
        // toggle on/off flag doSpellIndex introduced for debugging, see ticket:
286
        //  #3721 (CdmMassIndexer.purge throwing errors due to LockObtainFailedException)
287
        // remove once this is fixed
288
        boolean doSpellIndex = false;
289

    
290
        if(doSpellIndex){
291
            SearchFactory searchFactory = fullTextSession.getSearchFactory();
292
            IndexManager indexManager = obtainIndexManager(searchFactory, type.getName());
293
            if(indexManager == null){
294
                logger.info("No IndexManager found for " + type.getName() + ", thus nothing to purge");
295
                return;
296
            }
297

    
298
            Directory directory = ((DirectoryBasedIndexManager) indexManager).getDirectoryProvider().getDirectory();
299
            SpellChecker spellChecker = null;
300
            try {
301
                spellChecker = new SpellChecker(directory);
302
                spellChecker.clearIndex();
303
            } catch (IOException e) {
304
                logger.error("IOException when creating dictionary", e);
305
                //TODO better means to notify that the process has been stopped, using the STOPPED_WORK_INDICATOR is only a hack
306
                monitor.worked(RestServiceProgressMonitor.STOPPED_WORK_INDICATOR);
307
                monitor.done();
308
            }
309

    
310
            if (spellChecker != null) {
311
                try {
312
                    logger.info("closing spellchecker ");
313
                    spellChecker.close();
314
                } catch (IOException e) {
315
                    logger.error("IOException when closing spellchecker", e);
316
                }
317
            }
318
        }
319
    }
320

    
321
    @Override
322
    public void reindex(Collection<Class<? extends CdmBase>> types, IProgressMonitor monitor){
323

    
324
        if(monitor == null){
325
            monitor = new NullProgressMonitor();
326
        }
327
        if(types == null){
328
            types = indexedClasses();
329
        }
330

    
331
        if(HS_31_MODE) {
332

    
333
        }
334

    
335
        monitor.setTaskName("CdmMassIndexer");
336
        int steps = types.size() + (HS_31_MODE ? 1 /* +1 for optimize */ : 0);
337
        monitor.beginTask("Reindexing " + types.size() + " classes", steps);
338

    
339
        boolean optimize = true;
340

    
341
        long start = System.currentTimeMillis();
342
        for(Class<? extends CdmBase> type : types){
343
            long perTypeStart = System.currentTimeMillis();
344

    
345
            if(HS_31_MODE) {
346
                // TODO remove this mode and all related code once the old reindex method is vanished
347
                reindex_31(type, monitor);
348
            } else {
349
                reindex_55(type, monitor);
350
                optimize = false;
351
            }
352

    
353

    
354
            logger.info("Indexing of " + type.getSimpleName() + " in " + ((System.currentTimeMillis() - perTypeStart) / 1000) + "s");
355
        }
356

    
357
        if(HS_31_MODE) {
358
            monitor.subTask("Optimizing Index");
359
            SubProgressMonitor subMonitor = new SubProgressMonitor(monitor, 1);
360
            subMonitor.beginTask("Optimizing Index",1);
361
            optimize();
362
            logger.info("end index optimization");
363
            subMonitor.worked(1);
364
            subMonitor.done();
365
        }
366
        logger.info("reindexing completed in " + ((System.currentTimeMillis() - start) / 1000) + "s");
367

    
368
        //monitor.worked(1);
369
        monitor.done();
370

    
371
    }
372

    
373
    /**
374
     * new reindex method which benefits from
375
     * the mass indexer available in hibernate search 5.5
376
     *
377
     * @param type
378
     * @param monitor
379
     * @throws InterruptedException
380
     */
381
    protected void reindex_55(Class<? extends CdmBase> type, IProgressMonitor monitor) {
382

    
383
        FullTextSession fullTextSession = Search.getFullTextSession(getSession());
384

    
385

    
386
        logger.info("start indexing " + type.getName());
387
        monitor.subTask("indexing " + type.getSimpleName());
388

    
389
        Long countResult = countEntities(type);
390
        int batchSize = sweetestBatchSize(type);
391
        int numOfBatches = calculateNumOfBatches(countResult * 2, batchSize); // each entity is worked two times 1. document added, 2. document build
392

    
393
        SubProgressMonitor subMonitor = new SubProgressMonitor(monitor, 1);
394
        subMonitor.beginTask("Indexing " + type.getSimpleName(), numOfBatches);
395

    
396

    
397
        MassIndexerProgressMonitor indexerMonitorWrapper = new MassIndexerProgressMonitorWrapper(subMonitor, batchSize);
398

    
399
        try {
400
            fullTextSession
401
            .createIndexer(type)
402
            .batchSizeToLoadObjects(batchSize)
403
            .cacheMode(CacheMode.IGNORE)
404
            .threadsToLoadObjects(4) // optimize http://docs.jboss.org/hibernate/stable/search/reference/en-US/html_single/#search-batchindexing-threadsandconnections
405
            .idFetchSize(150) //TODO optimize
406
            .progressMonitor(indexerMonitorWrapper)
407
            .startAndWait();
408
        } catch (InterruptedException ie) {
409
            logger.info("Mass indexer has been interrupted");
410
            subMonitor.isCanceled();
411
        }
412
    }
413

    
414
    @Override
415
    public void createDictionary(IProgressMonitor monitor) {
416
        if(monitor == null){
417
            monitor = new NullProgressMonitor();
418
        }
419

    
420
        monitor.setTaskName("CdmMassIndexer_Dictionary");
421
        int steps = dictionaryClasses().length; // +1 for optimize
422
        monitor.beginTask("Creating Dictionary " + dictionaryClasses().length + " classes", steps);
423

    
424
        for(Class type : dictionaryClasses()){
425
            createDictionary(type, monitor);
426
        }
427

    
428
        monitor.done();
429

    
430
    }
431
    protected void optimize() {
432

    
433
        FullTextSession fullTextSession = Search.getFullTextSession(getSession());
434
        fullTextSession.getSearchFactory().optimize();
435
        fullTextSession.flushToIndexes();
436
        fullTextSession.clear();
437
    }
438

    
439
    @Override
440
    public void purge(IProgressMonitor monitor){
441

    
442
        if(monitor == null){
443
            monitor = new NullProgressMonitor();
444
        }
445

    
446
        monitor.setTaskName("CdmMassIndexer");
447
        int steps = indexedClasses().size() + 1; // +1 for optimize
448
        monitor.beginTask("Purging " + indexedClasses().size() + " classes", steps);
449

    
450
        for(Class<? extends CdmBase> type : indexedClasses()){
451
            purge(type, monitor);
452
            monitor.worked(1);
453
        }
454
        // need to flush to the index before optimizing
455
        // the purge method is not doing the flushing by itself
456
        FullTextSession fullTextSession = Search.getFullTextSession(getSession());
457
        fullTextSession.flushToIndexes();
458

    
459
        // optimize
460
        optimize();
461
        monitor.worked(1);
462

    
463
        // done
464
        monitor.done();
465
    }
466

    
467

    
468
    /**
469
     * Returns a list of declared indexable fields within a class through reflection.
470
     *
471
     * @param clazz
472
     * @return
473
     */
474
    private List<String> getIndexedDeclaredFields(Class clazz) {
475
        List<String> idFields = new ArrayList<String>();
476
        if(clazz.isAnnotationPresent(org.hibernate.search.annotations.Indexed.class)) {
477
            Field[] declaredFields = clazz.getDeclaredFields();
478
            for(int i=0;i<declaredFields.length;i++ ) {
479
                logger.info("checking field " + declaredFields[i].getName());
480
                if(declaredFields[i].isAnnotationPresent(org.hibernate.search.annotations.Field.class) ||
481
                        declaredFields[i].isAnnotationPresent(org.hibernate.search.annotations.Fields.class)) {
482
                    idFields.add(declaredFields[i].getName());
483
                    logger.info("adding field " + declaredFields[i].getName());
484
                }
485
            }
486
        }
487
        return idFields;
488
    }
489
    /**
490
     * @return
491
     */
492
    @Override
493
    public Set<Class<? extends CdmBase>> indexedClasses() {
494
        // if no indexed classes have been 'manually' set then
495
        // the default is the full list
496
        if(indexedClasses.size() == 0) {
497
            indexedClasses.add(DescriptionElementBase.class);
498
            indexedClasses.add(TaxonBase.class);
499
            indexedClasses.add(Classification.class);
500
            indexedClasses.add(TaxonNameBase.class);
501
            indexedClasses.add(SpecimenOrObservationBase.class);
502
            indexedClasses.add(TaxonRelationship.class);
503
        }
504
        return indexedClasses;
505
    }
506

    
507
    /**
508
     * @return
509
     */
510
    @Override
511
    public Class[] dictionaryClasses() {
512
        return new Class[] {
513
                TaxonNameBase.class
514
                };
515
    }
516

    
517
}
(1-1/16)