Project

General

Profile

Download (45 KB) Statistics
| Branch: | Tag: | Revision:
1
// $Id$
2
/**
3
* Copyright (C) 2013 EDIT
4
* European Distributed Institute of Taxonomy
5
* http://www.e-taxonomy.eu
6
*
7
* The contents of this file are subject to the Mozilla Public License Version 1.1
8
* See LICENSE.TXT at the top of this package for the full license terms.
9
*/
10
package eu.etaxonomy.cdm.api.service.description;
11

    
12
import java.util.ArrayList;
13
import java.util.Arrays;
14
import java.util.HashMap;
15
import java.util.HashSet;
16
import java.util.Iterator;
17
import java.util.LinkedList;
18
import java.util.List;
19
import java.util.Map;
20
import java.util.Set;
21
import java.util.UUID;
22

    
23
import org.apache.log4j.Logger;
24
import org.hibernate.FlushMode;
25
import org.hibernate.HibernateException;
26
import org.hibernate.Session;
27
import org.hibernate.engine.spi.SessionFactoryImplementor;
28
import org.hibernate.search.Search;
29
import org.springframework.beans.factory.annotation.Autowired;
30
import org.springframework.orm.hibernate5.HibernateTransactionManager;
31
import org.springframework.stereotype.Service;
32
import org.springframework.transaction.TransactionDefinition;
33
import org.springframework.transaction.TransactionStatus;
34
import org.springframework.transaction.support.DefaultTransactionDefinition;
35

    
36
import eu.etaxonomy.cdm.api.service.IClassificationService;
37
import eu.etaxonomy.cdm.api.service.IDescriptionService;
38
import eu.etaxonomy.cdm.api.service.INameService;
39
import eu.etaxonomy.cdm.api.service.ITaxonService;
40
import eu.etaxonomy.cdm.api.service.ITermService;
41
import eu.etaxonomy.cdm.common.DynamicBatch;
42
import eu.etaxonomy.cdm.common.JvmLimitsException;
43
import eu.etaxonomy.cdm.common.monitor.IProgressMonitor;
44
import eu.etaxonomy.cdm.common.monitor.NullProgressMonitor;
45
import eu.etaxonomy.cdm.common.monitor.SubProgressMonitor;
46
import eu.etaxonomy.cdm.model.common.CdmBase;
47
import eu.etaxonomy.cdm.model.common.DefinedTermBase;
48
import eu.etaxonomy.cdm.model.common.Extension;
49
import eu.etaxonomy.cdm.model.common.ExtensionType;
50
import eu.etaxonomy.cdm.model.common.Marker;
51
import eu.etaxonomy.cdm.model.common.MarkerType;
52
import eu.etaxonomy.cdm.model.common.OrderedTermBase;
53
import eu.etaxonomy.cdm.model.description.DescriptionElementBase;
54
import eu.etaxonomy.cdm.model.description.DescriptionElementSource;
55
import eu.etaxonomy.cdm.model.description.Distribution;
56
import eu.etaxonomy.cdm.model.description.PresenceAbsenceTerm;
57
import eu.etaxonomy.cdm.model.description.TaxonDescription;
58
import eu.etaxonomy.cdm.model.location.NamedArea;
59
import eu.etaxonomy.cdm.model.name.Rank;
60
import eu.etaxonomy.cdm.model.taxon.Classification;
61
import eu.etaxonomy.cdm.model.taxon.Taxon;
62
import eu.etaxonomy.cdm.model.taxon.TaxonBase;
63
import eu.etaxonomy.cdm.persistence.dto.ClassificationLookupDTO;
64

    
65
/**
66
 *
67
 * <h2>GENERAL NOTES </h2>
68
 * <em>TODO: These notes are directly taken from original Transmission Engine Occurrence
69
 * version 14 written in Visual Basic and still need to be
70
 * adapted to the java version of the transmission engine!</em>
71
 *
72
 * <h3>summaryStatus</h3>
73
 *
74
 *   Each distribution information has a summaryStatus, this is an summary of the status codes
75
 *   as stored in the fields of emOccurrence native, introduced, cultivated, ...
76
 *   The summaryStatus seems to be equivalent to  the CDM DistributionStatus
77
 *
78
 * <h3>map generation</h3>
79
 *
80
 *   When generating maps from the accumulated distribution information some special cases have to be handled:
81
 * <ol>
82
 *   <li>if a entered or imported status information exist for the same area for which calculated (accumulated)
83
 *       data is available, the calculated data has to be given preference over other data.
84
 *   </li>
85
 *   <li>If there is an area with a sub area and both areas have the same calculated status only the subarea
86
 *       status should be shown in the map, whereas the super area should be ignored.
87
 *   </li>
88
 * </ol>
89
 *
90
 * @author Anton Güntsch (author of original Transmission Engine Occurrence version 14 written in Visual Basic)
91
 * @author Andreas Kohlbecker (2013, porting Transmission Engine Occurrence to Java)
92
 * @date Feb 22, 2013
93
 */
94
@Service
95
public class TransmissionEngineDistribution { //TODO extends IoBase?
96

    
97

    
98
    public static final String EXTENSION_VALUE_PREFIX = "transmissionEngineDistribution.priority:";
99

    
100
    public static final Logger logger = Logger.getLogger(TransmissionEngineDistribution.class);
101

    
102
    /**
103
     * only used for performance testing
104
     */
105
    final boolean ONLY_FISRT_BATCH = false;
106

    
107

    
108
    protected static final List<String> TAXONDESCRIPTION_INIT_STRATEGY = Arrays.asList(new String [] {
109
            "description.markers.markerType",
110
            "description.elements.markers.markerType",
111
            "description.elements.area",
112
            "description.elements.status",
113
            "description.elements.sources.citation.authorship",
114
//            "description.elements.sources.nameUsedInSource",
115
//            "description.elements.multilanguageText",
116
//            "name.status.type",
117
    });
118

    
119

    
120
    /**
121
     * A map which contains the status terms as key and the priority as value
122
     * The map will contain both, the PresenceTerms and the AbsenceTerms
123
     */
124
    private Map<PresenceAbsenceTerm, Integer> statusPriorityMap = null;
125

    
126
    @Autowired
127
    private IDescriptionService descriptionService;
128

    
129
    @Autowired
130
    private ITermService termService;
131

    
132
    @Autowired
133
    private ITaxonService taxonService;
134

    
135
    @Autowired
136
    private IClassificationService classificationService;
137

    
138
    @Autowired
139
    private INameService mameService;
140

    
141
    @Autowired
142
    private HibernateTransactionManager transactionManager;
143

    
144
    private List<PresenceAbsenceTerm> byAreaIgnoreStatusList = null;
145

    
146
    private List<PresenceAbsenceTerm> byRankIgnoreStatusList = null;
147

    
148
    private final Map<NamedArea, Set<NamedArea>> subAreaMap = new HashMap<NamedArea, Set<NamedArea>>();
149

    
150
    int byRankTicks = 300;
151
    int byAreasTicks = 100;
152

    
153

    
154
    private static final long BATCH_MIN_FREE_HEAP = 800  * 1024 * 1024;
155
    /**
156
     * ratio of the initially free heap which should not be used
157
     * during the batch processing. This amount of the heap is reserved
158
     * for the flushing of the session and to the index
159
     */
160
    private static final double BATCH_FREE_HEAP_RATIO = 0.9;
161
    private static final int BATCH_SIZE_BY_AREA = 1000;
162
    private static final int BATCH_SIZE_BY_RANK = 500;
163

    
164
    private long batchMinFreeHeap = BATCH_MIN_FREE_HEAP;
165

    
166

    
167

    
168
    /**
169
     * byAreaIgnoreStatusList contains by default:
170
     *  <ul>
171
     *    <li>AbsenceTerm.CULTIVATED_REPORTED_IN_ERROR()</li>
172
     *    <li>AbsenceTerm.INTRODUCED_REPORTED_IN_ERROR()</li>
173
     *    <li>AbsenceTerm.INTRODUCED_FORMERLY_INTRODUCED()</li>
174
     *    <li>AbsenceTerm.NATIVE_REPORTED_IN_ERROR()</li>
175
     *    <li>AbsenceTerm.NATIVE_FORMERLY_NATIVE()</li>
176
     *  </ul>
177
     *
178
     * @return the byAreaIgnoreStatusList
179
     */
180
    public List<PresenceAbsenceTerm> getByAreaIgnoreStatusList() {
181
        if(byAreaIgnoreStatusList == null ){
182
            byAreaIgnoreStatusList = Arrays.asList(
183
                    new PresenceAbsenceTerm[] {
184
                    		PresenceAbsenceTerm.CULTIVATED_REPORTED_IN_ERROR(),
185
                    		PresenceAbsenceTerm.INTRODUCED_REPORTED_IN_ERROR(),
186
                    		PresenceAbsenceTerm.NATIVE_REPORTED_IN_ERROR(),
187
                    		PresenceAbsenceTerm.INTRODUCED_FORMERLY_INTRODUCED(),
188
                    		PresenceAbsenceTerm.NATIVE_FORMERLY_NATIVE()
189
                            // TODO what about PresenceAbsenceTerm.ABSENT() also ignore?
190
                    });
191
        }
192
        return byAreaIgnoreStatusList;
193
    }
194

    
195
    /**
196
     * @param byAreaIgnoreStatusList the byAreaIgnoreStatusList to set
197
     */
198
    public void setByAreaIgnoreStatusList(List<PresenceAbsenceTerm> byAreaIgnoreStatusList) {
199
        this.byAreaIgnoreStatusList = byAreaIgnoreStatusList;
200
    }
201

    
202
    /**
203
     * byRankIgnoreStatusList contains by default
204
     *  <ul>
205
     *    <li>PresenceTerm.ENDEMIC_FOR_THE_RELEVANT_AREA()</li>
206
     *  </ul>
207
     *
208
     * @return the byRankIgnoreStatusList
209
     */
210
    public List<PresenceAbsenceTerm> getByRankIgnoreStatusList() {
211

    
212
        if (byRankIgnoreStatusList == null) {
213
            byRankIgnoreStatusList = Arrays.asList(
214
                    new PresenceAbsenceTerm[] {
215
                    		PresenceAbsenceTerm.ENDEMIC_FOR_THE_RELEVANT_AREA()
216
                    });
217
        }
218
        return byRankIgnoreStatusList;
219
    }
220

    
221
    /**
222
     * @param byRankIgnoreStatusList the byRankIgnoreStatusList to set
223
     */
224
    public void setByRankIgnoreStatusList(List<PresenceAbsenceTerm> byRankIgnoreStatusList) {
225
        this.byRankIgnoreStatusList = byRankIgnoreStatusList;
226
    }
227

    
228
    /**
229
     *
230
     * @param superAreas
231
     */
232
    public TransmissionEngineDistribution() {
233
    }
234

    
235
    /**
236
     * initializes the map which contains the status terms as key and the priority as value
237
     * The map will contain both, the PresenceTerms and the AbsenceTerms
238
     */
239
    private void initializeStatusPriorityMap() {
240

    
241
        statusPriorityMap = new HashMap<PresenceAbsenceTerm, Integer>();
242
        Integer priority;
243

    
244
        // PresenceTerms
245
        for(PresenceAbsenceTerm term : termService.list(PresenceAbsenceTerm.class, null, null, null, null)){
246
            priority = getPriorityFor(term);
247
            if(priority != null){
248
                statusPriorityMap.put(term, priority);
249
            }
250
        }
251
    }
252

    
253
    /**
254
     * Compares the PresenceAbsenceTermBase terms contained in <code>a.status</code> and <code>b.status</code> after
255
     * the priority as stored in the statusPriorityMap. The StatusAndSources object with
256
     * the higher priority is returned. In the case of <code>a == b</code> the sources of b will be added to the sources
257
     * of a.
258
     *
259
     * If either a or b or the status are null b or a is returned.
260
     *
261
     * @see initializeStatusPriorityMap()
262
     *
263
     * @param a
264
     * @param b
265
     * @param sourcesForWinnerB
266
     *  In the case when <code>b</code> is preferred over <code>a</code> these Set of sources will be added to the sources of <code>b</code>
267
     * @return
268
     */
269
    private StatusAndSources choosePreferred(StatusAndSources a, StatusAndSources b, Set<DescriptionElementSource> sourcesForWinnerB){
270

    
271
        if (statusPriorityMap == null) {
272
            initializeStatusPriorityMap();
273
        }
274

    
275
        if (b == null || b.status == null) {
276
            return a;
277
        }
278
        if (a == null || a.status == null) {
279
            return b;
280
        }
281

    
282
        if (statusPriorityMap.get(a.status) == null) {
283
            logger.warn("No priority found in map for " + a.status.getLabel());
284
            return b;
285
        }
286
        if (statusPriorityMap.get(b.status) == null) {
287
            logger.warn("No priority found in map for " + b.status.getLabel());
288
            return a;
289
        }
290
        if(statusPriorityMap.get(a.status) < statusPriorityMap.get(b.status)){
291
            if(sourcesForWinnerB != null) {
292
                b.addSources(sourcesForWinnerB);
293
            }
294
            return b;
295
        } else if (statusPriorityMap.get(a.status) == statusPriorityMap.get(b.status)){
296
            a.addSources(b.sources);
297
            return a;
298
        } else {
299
            return a;
300
        }
301
    }
302

    
303
    /**
304
     * reads the priority for the given status term from the extensions.
305
     *
306
     * @param term
307
     * @return the priority value
308
     */
309
    private Integer getPriorityFor(DefinedTermBase<?> term) {
310
        Set<Extension> extensions = term.getExtensions();
311
        for(Extension extension : extensions){
312
            if(!extension.getType().equals(ExtensionType.ORDER())) {
313
                continue;
314
            }
315
            int pos = extension.getValue().indexOf(EXTENSION_VALUE_PREFIX);
316
            if(pos == 0){ // if starts with EXTENSION_VALUE_PREFIX
317
                try {
318
                    Integer priority = Integer.valueOf(extension.getValue().substring(EXTENSION_VALUE_PREFIX.length()));
319
                    return priority;
320
                } catch (NumberFormatException e) {
321
                    logger.warn("Invalid number format in Extension:" + extension.getValue());
322
                }
323
            }
324
        }
325
        logger.warn("no priority defined for '" + term.getLabel() + "'");
326
        return null;
327
    }
328

    
329
    /**
330
     * runs both steps
331
     * <ul>
332
     * <li>Step 1: Accumulate occurrence records by area</li>
333
     * <li>Step 2: Accumulate by ranks starting from lower rank to upper rank,
334
     * the status of all children are accumulated on each rank starting from
335
     * lower rank to upper rank.</li>
336
     * </ul>
337
     *
338
     * @param superAreas
339
     *            the areas to which the subordinate areas should be projected.
340
     * @param lowerRank
341
     * @param upperRank
342
     * @param classification
343
     * @param classification
344
     *            limit the accumulation process to a specific classification
345
     *            (not yet implemented)
346
     * @param monitor
347
     *            the progress monitor to use for reporting progress to the
348
     *            user. It is the caller's responsibility to call done() on the
349
     *            given monitor. Accepts null, indicating that no progress
350
     *            should be reported and that the operation cannot be cancelled.
351
     */
352
    public void accumulate(AggregationMode mode, List<NamedArea> superAreas, Rank lowerRank, Rank upperRank,
353
            Classification classification, IProgressMonitor monitor) throws JvmLimitsException {
354

    
355
        if (monitor == null) {
356
            monitor = new NullProgressMonitor();
357
        }
358

    
359
        // only for debugging:
360
        //logger.setLevel(Level.TRACE); // TRACE will slow down a lot since it forces loading all term representations
361
        //Logger.getLogger("org.hibernate.SQL").setLevel(Level.DEBUG);
362

    
363
        logger.info("Hibernate JDBC Batch size: "
364
                + ((SessionFactoryImplementor) getSession().getSessionFactory()).getSettings().getJdbcBatchSize());
365

    
366
        Set<Classification> classifications = new HashSet<Classification>();
367
        if(classification == null) {
368
            classifications.addAll(classificationService.listClassifications(null, null, null, null));
369
        } else {
370
            classifications.add(classification);
371
        }
372

    
373
        int aggregationWorkTicks;
374
        switch(mode){
375
        case byAreasAndRanks:
376
            aggregationWorkTicks = byAreasTicks + byRankTicks;
377
            break;
378
        case byAreas:
379
            aggregationWorkTicks = byAreasTicks;
380
            break;
381
        case byRanks:
382
            aggregationWorkTicks = byRankTicks;
383
            break;
384
        default:
385
            aggregationWorkTicks = 0;
386
            break;
387
        }
388

    
389
        // take start time for performance testing
390
        // NOTE: use ONLY_FISRT_BATCH = true to measure only one batch
391
        double start = System.currentTimeMillis();
392

    
393
        monitor.beginTask("Accumulating distributions", (classifications.size() * aggregationWorkTicks) + 1 );
394

    
395
        updatePriorities();
396

    
397
        List<Rank> ranks = rankInterval(lowerRank, upperRank);
398

    
399
        monitor.worked(1);
400

    
401

    
402
        for(Classification _classification : classifications) {
403

    
404
            ClassificationLookupDTO classificationLookupDao = classificationService.classificationLookup(_classification);
405
            classificationLookupDao.filterInclude(ranks);
406

    
407
            double end1 = System.currentTimeMillis();
408
            logger.info("Time elapsed for classificationLookup() : " + (end1 - start) / (1000) + "s");
409
            double start2 = System.currentTimeMillis();
410

    
411
            monitor.subTask("Accumulating distributions to super areas for " + _classification.getTitleCache());
412
            if (mode.equals(AggregationMode.byAreas) || mode.equals(AggregationMode.byAreasAndRanks)) {
413
                accumulateByArea(superAreas, classificationLookupDao, new SubProgressMonitor(monitor, byAreasTicks), true);
414
            }
415
            monitor.subTask("Accumulating distributions to higher ranks for " + _classification.getTitleCache());
416

    
417
            double end2 = System.currentTimeMillis();
418
            logger.info("Time elapsed for accumulateByArea() : " + (end2 - start2) / (1000) + "s");
419

    
420
            double start3 = System.currentTimeMillis();
421
            if (mode.equals(AggregationMode.byRanks) || mode.equals(AggregationMode.byAreasAndRanks)) {
422
                accumulateByRank(ranks, classificationLookupDao, new SubProgressMonitor(monitor, byRankTicks), mode.equals(AggregationMode.byRanks));
423
            }
424

    
425
            double end3 = System.currentTimeMillis();
426
            logger.info("Time elapsed for accumulateByRank() : " + (end3 - start3) / (1000) + "s");
427
            logger.info("Time elapsed for accumulate(): " + (end3 - start) / (1000) + "s");
428

    
429
            if(ONLY_FISRT_BATCH) {
430
                monitor.done();
431
                break;
432
            }
433
        }
434
        monitor.done();
435
    }
436

    
437

    
438
    /**
439
     * Step 1: Accumulate occurrence records by area
440
     * <ul>
441
     * <li>areas are projected to super areas e.g.:  HS <-- HS(A), HS(G), HS(S)</li>
442
     * <li>super areas do initially not have a status set ==> Prerequisite to check in CDM</li>
443
     * <li>areas having a summary status of summary value different from {@link #getByAreaIgnoreStatusList()} are ignored</li>
444
     * <li>areas have a priority value, the status of the area with highest priority determines the status of the super area</li>
445
     * <li>the source references of the accumulated distributions are also accumulated into the new distribution,,</li>
446
     * <li>this has been especially implemented for the EuroMed Checklist Vol2 and might not be a general requirement</li>
447
     * </ul>
448
     *
449
     * @param superAreas
450
     *      the areas to which the subordinate areas should be projected
451
     * @param classificationLookupDao
452
     * @throws JvmLimitsException
453
     *
454
     */
455
    protected void accumulateByArea(List<NamedArea> superAreas, ClassificationLookupDTO classificationLookupDao,  IProgressMonitor subMonitor, boolean doClearDescriptions) throws JvmLimitsException {
456

    
457
        DynamicBatch batch = new DynamicBatch(BATCH_SIZE_BY_AREA, batchMinFreeHeap);
458
        batch.setRequiredFreeHeap(BATCH_FREE_HEAP_RATIO);
459

    
460
        TransactionStatus txStatus = startTransaction(false);
461

    
462
        // reload superAreas TODO is it faster to getSession().merge(object) ??
463
        Set<UUID> superAreaUuids = new HashSet<UUID>(superAreas.size());
464
        for (NamedArea superArea : superAreas){
465
            superAreaUuids.add(superArea.getUuid());
466
        }
467

    
468
        // visit all accepted taxa
469
        subMonitor.beginTask("Accumulating by area ",  classificationLookupDao.getTaxonIds().size());
470
        Iterator<Integer> taxonIdIterator = classificationLookupDao.getTaxonIds().iterator();
471

    
472
        while (taxonIdIterator.hasNext() || batch.hasUnprocessedItems()) {
473

    
474
            if(txStatus == null) {
475
                // transaction has been comitted at the end of this batch, start a new one
476
                txStatus = startTransaction(false);
477
            }
478

    
479
            // the session is cleared after each batch, so load the superAreaList for each batch
480
            List<NamedArea> superAreaList = (List)termService.find(superAreaUuids);
481

    
482
            // load taxa for this batch
483
            List<Integer> taxonIds = batch.nextItems(taxonIdIterator);
484
//            logger.debug("accumulateByArea() - taxon " + taxonPager.getFirstRecord() + " to " + taxonPager.getLastRecord() + " of " + taxonPager.getCount() + "]");
485
            List<TaxonBase> taxa = taxonService.loadByIds(taxonIds, TAXONDESCRIPTION_INIT_STRATEGY);
486

    
487
            // iterate over the taxa and accumulate areas
488
            // start processing the new batch
489

    
490
            for(TaxonBase taxonBase : taxa) {
491
                if(logger.isDebugEnabled()){
492
                    logger.debug("accumulateByArea() - taxon :" + taxonToString(taxonBase));
493
                }
494

    
495
                batch.incementCounter();
496

    
497
                Taxon taxon = (Taxon)taxonBase;
498
                TaxonDescription description = findComputedDescription(taxon, doClearDescriptions);
499
                List<Distribution> distributions = distributionsFor(taxon);
500

    
501
                // Step through superAreas for accumulation of subAreas
502
                for (NamedArea superArea : superAreaList){
503

    
504
                    // accumulate all sub area status
505
                    StatusAndSources accumulatedStatusAndSources = null;
506
                    // TODO consider using the TermHierarchyLookup (only in local branch a.kohlbecker)
507
                    Set<NamedArea> subAreas = getSubAreasFor(superArea);
508
                    for(NamedArea subArea : subAreas){
509
                        if(logger.isTraceEnabled()){
510
                            logger.trace("accumulateByArea() - \t\t" + termToString(subArea));
511
                        }
512
                        // step through all distributions for the given subArea
513
                        for(Distribution distribution : distributions){
514
                            if(distribution.getArea() != null && distribution.getArea().equals(subArea) && distribution.getStatus() != null) {
515
                                PresenceAbsenceTerm status = distribution.getStatus();
516
                                if(logger.isTraceEnabled()){
517
                                    logger.trace("accumulateByArea() - \t\t" + termToString(subArea) + ": " + termToString(status));
518
                                }
519
                                // skip all having a status value different of those in byAreaIgnoreStatusList
520
                                if (getByAreaIgnoreStatusList().contains(status)){
521
                                    continue;
522
                                }
523
                                StatusAndSources subStatusAndSources = new StatusAndSources(status, distribution.getSources());
524
                                accumulatedStatusAndSources = choosePreferred(accumulatedStatusAndSources, subStatusAndSources, null);
525
                            }
526
                        }
527
                    } // next sub area
528
                    if (accumulatedStatusAndSources != null) {
529
                        if(logger.isDebugEnabled()){
530
                            logger.debug("accumulateByArea() - \t >> " + termToString(superArea) + ": " + termToString(accumulatedStatusAndSources.status));
531
                        }
532
                        // store new distribution element for superArea in taxon description
533
                        Distribution newDistribitionElement = Distribution.NewInstance(superArea, accumulatedStatusAndSources.status);
534
                        newDistribitionElement.getSources().addAll(accumulatedStatusAndSources.sources);
535
                        newDistribitionElement.addMarker(Marker.NewInstance(MarkerType.COMPUTED(), true));
536
                        description.addElement(newDistribitionElement);
537
                    }
538

    
539
                } // next super area ....
540

    
541
                descriptionService.saveOrUpdate(description);
542
                taxonService.saveOrUpdate(taxon);
543
                subMonitor.worked(1);
544
                if(!batch.isWithinJvmLimits()) {
545
                    break; // flushAndClear and start with new batch
546
                }
547

    
548
            } // next taxon
549

    
550
            flushAndClear();
551

    
552
            // commit for every batch, otherwise the persistent context
553
            // may grow too much and eats up all the heap
554
            commitTransaction(txStatus);
555
            txStatus = null;
556

    
557
            if(ONLY_FISRT_BATCH) {
558
                break;
559
            }
560

    
561
        } // next batch of taxa
562

    
563
        subMonitor.done();
564
    }
565

    
566
   /**
567
    * Step 2: Accumulate by ranks starting from lower rank to upper rank, the status of all children
568
    * are accumulated on each rank starting from lower rank to upper rank.
569
    * <ul>
570
    * <li>aggregate distribution of included taxa of the next lower rank for any rank level starting from the lower rank (e.g. sub species)
571
    *    up to upper rank (e.g. Genus)</li>
572
    *  <li>the accumulation id done for each distribution area found in the included taxa</li>
573
    *  <li>areas of subtaxa with status endemic are ignored</li>
574
    *  <li>the status with the highest priority determines the value for the accumulated distribution</li>
575
    *  <li>the source reference of the accumulated distributions are also accumulated into the new distribution,
576
    *    this has been especially implemented for the EuroMed Checklist Vol2 and might not be a general requirement</li>
577
    *</ul>
578
 * @throws JvmLimitsException
579
    */
580
    protected void accumulateByRank(List<Rank> rankInterval, ClassificationLookupDTO classificationLookupDao,  IProgressMonitor subMonitor, boolean doClearDescriptions) throws JvmLimitsException {
581

    
582
        DynamicBatch batch = new DynamicBatch(BATCH_SIZE_BY_RANK, batchMinFreeHeap);
583
        batch.setRequiredFreeHeap(BATCH_FREE_HEAP_RATIO);
584
        batch.setMaxAllowedGcIncreases(10);
585

    
586
        int ticksPerRank = 100;
587

    
588
        TransactionStatus txStatus = startTransaction(false);
589

    
590
        // the loadRankSpecificRootNodes() method not only finds
591
        // taxa of the specified rank but also taxa of lower ranks
592
        // if no taxon of the specified rank exists, so we need to
593
        // remember which taxa have been processed already
594
        Set<Integer> taxaProcessedIds = new HashSet<Integer>();
595
        List<TaxonBase> taxa = null;
596
        List<TaxonBase> childTaxa = null;
597

    
598
        List<Rank> ranks = rankInterval;
599

    
600
        subMonitor.beginTask("Accumulating by rank", ranks.size() * ticksPerRank);
601

    
602
        for (Rank rank : ranks) {
603

    
604
            if(logger.isDebugEnabled()){
605
                logger.debug("accumulateByRank() - at Rank '" + termToString(rank) + "'");
606
            }
607

    
608
            Set<Integer> taxonIdsPerRank = classificationLookupDao.getTaxonIdByRank().get(rank);
609

    
610
            int taxonCountperRank = taxonIdsPerRank != null ? taxonIdsPerRank.size() : 0;
611

    
612
            SubProgressMonitor taxonSubMonitor = new SubProgressMonitor(subMonitor, ticksPerRank);
613
            taxonSubMonitor.beginTask("Accumulating by rank " + termToString(rank), taxonCountperRank);
614

    
615
            if(taxonCountperRank == 0) {
616
                taxonSubMonitor.done();
617
                continue;
618
            }
619

    
620

    
621
            Iterator<Integer> taxonIdIterator = taxonIdsPerRank.iterator();
622
            while (taxonIdIterator.hasNext() || batch.hasUnprocessedItems()) {
623

    
624
                if(txStatus == null) {
625
                    // transaction has been committed at the end of this batch, start a new one
626
                    txStatus = startTransaction(false);
627
                }
628

    
629
                // load taxa for this batch
630
                List<Integer> taxonIds = batch.nextItems(taxonIdIterator);
631

    
632
                taxa = taxonService.loadByIds(taxonIds, null);
633

    
634
//                if(logger.isDebugEnabled()){
635
//                           logger.debug("accumulateByRank() - taxon " + taxonPager.getFirstRecord() + " to " + taxonPager.getLastRecord() + " of " + taxonPager.getCount() + "]");
636
//                }
637

    
638
                for(TaxonBase taxonBase : taxa) {
639

    
640
                    batch.incementCounter();
641

    
642
                    Taxon taxon = (Taxon)taxonBase;
643
                    if (taxaProcessedIds.contains(taxon.getId())) {
644
                        if(logger.isDebugEnabled()){
645
                            logger.debug("accumulateByRank() - skipping already processed taxon :" + taxonToString(taxon));
646
                        }
647
                        continue;
648
                    }
649
                    taxaProcessedIds.add(taxon.getId());
650
                    if(logger.isDebugEnabled()){
651
                        logger.debug("accumulateByRank() [" + rank.getLabel() + "] - taxon :" + taxonToString(taxon));
652
                    }
653

    
654
                    // Step through direct taxonomic children for accumulation
655
                    Map<NamedArea, StatusAndSources> accumulatedStatusMap = new HashMap<NamedArea, StatusAndSources>();
656

    
657
                    List<Integer> childTaxonIds = new ArrayList<>();
658
                    Set<Integer> childSet = classificationLookupDao.getChildTaxonMap().get(taxon.getId());
659
                    if(childSet != null) {
660
                        childTaxonIds.addAll(childSet);
661
                    }
662
                    if(!childTaxonIds.isEmpty()) {
663
                        childTaxa = taxonService.loadByIds(childTaxonIds, TAXONDESCRIPTION_INIT_STRATEGY);
664
                        LinkedList<TaxonBase> childStack = new LinkedList<TaxonBase>(childTaxa);
665
                        childTaxa = null; // allow to be garbage collected
666

    
667
                        while(childStack.size() > 0){
668

    
669
                            TaxonBase childTaxonBase = childStack.pop();
670
                            getSession().setReadOnly(childTaxonBase, true);
671

    
672
                            Taxon childTaxon = (Taxon) childTaxonBase;
673
                            getSession().setReadOnly(childTaxon, true);
674
                            if(logger.isTraceEnabled()){
675
                                logger.trace("                   subtaxon :" + taxonToString(childTaxon));
676
                            }
677

    
678
                            for(Distribution distribution : distributionsFor(childTaxon) ) {
679
                                PresenceAbsenceTerm status = distribution.getStatus();
680
                                NamedArea area = distribution.getArea();
681
                                if (status == null || getByRankIgnoreStatusList().contains(status)){
682
                                  continue;
683
                                }
684

    
685
                                StatusAndSources subStatusAndSources = new StatusAndSources(status, distribution.getSources());
686
                                accumulatedStatusMap.put(area, choosePreferred(accumulatedStatusMap.get(area), subStatusAndSources, null));
687
                             }
688

    
689
                            // evict all initialized entities of the childTaxon
690
                            // TODO consider using cascade="evict" in the model classes
691
//                            for( TaxonDescription description : ((Taxon)childTaxonBase).getDescriptions()) {
692
//                                for (DescriptionElementBase deb : description.getElements()) {
693
//                                    getSession().evict(deb);
694
//                                }
695
//                                getSession().evict(description); // this causes in some cases the taxon object to be detached from the session
696
//                            }
697
                            getSession().evict(childTaxonBase); // no longer needed, save heap
698
                        }
699

    
700
                        if(accumulatedStatusMap.size() > 0) {
701
                            TaxonDescription description = findComputedDescription(taxon, doClearDescriptions);
702
                            for (NamedArea area : accumulatedStatusMap.keySet()) {
703
                                Distribution distribition = findDistribution(description, area, accumulatedStatusMap.get(area).status);
704
                                if(distribition == null) {
705
                                    // create a new distribution element
706
                                    distribition = Distribution.NewInstance(area, accumulatedStatusMap.get(area).status);
707
                                    distribition.addMarker(Marker.NewInstance(MarkerType.COMPUTED(), true));
708
                                }
709
                                addSourcesDeduplicated(distribition.getSources(), accumulatedStatusMap.get(area).sources);
710

    
711
                                description.addElement(distribition);
712
                            }
713
                            taxonService.saveOrUpdate(taxon);
714
                            descriptionService.saveOrUpdate(description);
715
                        }
716

    
717
                    }
718
                    taxonSubMonitor.worked(1); // one taxon worked
719
                    if(!batch.isWithinJvmLimits()) {
720
                        break; // flushAndClear and start with new batch
721
                    }
722

    
723
                } // next taxon ....
724

    
725
                flushAndClear();
726

    
727
                // commit for every batch, otherwise the persistent context
728
                // may grow too much and eats up all the heap
729
                commitTransaction(txStatus);
730
                txStatus = null;
731

    
732
                // flushing the session and to the index (flushAndClear() ) can impose a
733
                // massive heap consumption. therefore we explicitly do a check after the
734
                // flush to detect these situations and to reduce the batch size.
735
                if(batch.getJvmMonitor().getGCRateSiceLastCheck() > 0.05) {
736
                    batch.reduceSize(0.5);
737
                }
738

    
739
                if(ONLY_FISRT_BATCH) {
740
                    break;
741
                }
742
            } // next batch
743

    
744
            taxonSubMonitor.done();
745
            subMonitor.worked(1);
746

    
747
            if(ONLY_FISRT_BATCH) {
748
                break;
749
            }
750
        } // next Rank
751

    
752
        logger.info("accumulateByRank() - done");
753
        subMonitor.done();
754
    }
755

    
756
/**
757
 * @param description
758
 * @param area
759
 * @param status
760
 * @return
761
 */
762
private Distribution findDistribution(TaxonDescription description, NamedArea area, PresenceAbsenceTerm status) {
763
    for(DescriptionElementBase item : description.getElements()) {
764
        if(!(item instanceof Distribution)) {
765
            continue;
766
        }
767
        Distribution distribution = ((Distribution)item);
768
        if(distribution.getArea().equals(area) && distribution.getStatus().equals(status)) {
769
            return distribution;
770
        }
771
    }
772
    return null;
773
}
774

    
775
/**
776
 * @param lowerRank
777
 * @param upperRank
778
 * @return
779
 */
780
private List<Rank> rankInterval(Rank lowerRank, Rank upperRank) {
781

    
782
    TransactionStatus txStatus = startTransaction(false);
783
    Rank currentRank = lowerRank;
784
    List<Rank> ranks = new ArrayList<Rank>();
785
    ranks.add(currentRank);
786
    while (!currentRank.isHigher(upperRank)) {
787
        currentRank = findNextHigherRank(currentRank);
788
        ranks.add(currentRank);
789
    }
790
    commitTransaction(txStatus);
791
    txStatus = null;
792
    return ranks;
793
}
794

    
795
    /**
796
     * @return
797
     */
798
    private Session getSession() {
799
        return descriptionService.getSession();
800
    }
801

    
802
    /**
803
     *
804
     */
805
    private void flush() {
806
        logger.debug("flushing session ...");
807
        getSession().flush();
808
        try {
809
            logger.debug("flushing to indexes ...");
810
            Search.getFullTextSession(getSession()).flushToIndexes();
811
        } catch (HibernateException e) {
812
            /* IGNORE - Hibernate Search Event listeners not configured ... */
813
            if(!e.getMessage().startsWith("Hibernate Search Event listeners not configured")){
814
                throw e;
815
            }
816
        }
817
    }
818

    
819
    /**
820
    *
821
    */
822
   private void flushAndClear() {
823
       flush();
824
       logger.debug("clearing session ...");
825
       getSession().clear();
826
   }
827

    
828

    
829
    // TODO merge with CdmApplicationDefaultConfiguration#startTransaction() into common base class
830
    public TransactionStatus startTransaction(Boolean readOnly) {
831

    
832
        DefaultTransactionDefinition defaultTxDef = new DefaultTransactionDefinition();
833
        defaultTxDef.setReadOnly(readOnly);
834
        TransactionDefinition txDef = defaultTxDef;
835

    
836
        // Log some transaction-related debug information.
837
        if (logger.isTraceEnabled()) {
838
            logger.trace("Transaction name = " + txDef.getName());
839
            logger.trace("Transaction facets:");
840
            logger.trace("Propagation behavior = " + txDef.getPropagationBehavior());
841
            logger.trace("Isolation level = " + txDef.getIsolationLevel());
842
            logger.trace("Timeout = " + txDef.getTimeout());
843
            logger.trace("Read Only = " + txDef.isReadOnly());
844
            // org.springframework.orm.hibernate5.HibernateTransactionManager
845
            // provides more transaction/session-related debug information.
846
        }
847

    
848
        TransactionStatus txStatus = transactionManager.getTransaction(txDef);
849

    
850
        getSession().setFlushMode(FlushMode.COMMIT);
851

    
852
        return txStatus;
853
    }
854

    
855
    // TODO merge with CdmApplicationDefaultConfiguration#startTransaction() into common base class
856
    public void commitTransaction(TransactionStatus txStatus){
857
        logger.debug("commiting transaction ...");
858
        transactionManager.commit(txStatus);
859
        return;
860
    }
861

    
862
    /**
863
     * returns the next higher rank
864
     *
865
     * TODO better implement OrderedTermBase.getNextHigherTerm() and OrderedTermBase.getNextLowerTerm()?
866
     *
867
     * @param rank
868
     * @return
869
     */
870
    private Rank findNextHigherRank(Rank rank) {
871
        rank = (Rank) termService.load(rank.getUuid());
872
        return rank.getNextHigherTerm();
873
//        OrderedTermVocabulary<Rank> rankVocabulary = mameService.getRankVocabulary();;
874
//        return rankVocabulary.getNextHigherTerm(rank);
875
    }
876

    
877
    /**
878
     * Either finds an existing taxon description of the given taxon or creates a new one.
879
     * If the doClear is set all existing description elements will be cleared.
880
     *
881
     * @param taxon
882
     * @param doClear will remove all existing Distributions if the taxon already
883
     * has a MarkerType.COMPUTED() TaxonDescription
884
     * @return
885
     */
886
    private TaxonDescription findComputedDescription(Taxon taxon, boolean doClear) {
887

    
888
        String descriptionTitle = this.getClass().getSimpleName();
889

    
890
        // find existing one
891
        for (TaxonDescription description : taxon.getDescriptions()) {
892
            if (description.hasMarker(MarkerType.COMPUTED(), true)) {
893
                logger.debug("reusing computed description for " + taxon.getTitleCache());
894
                if (doClear) {
895
                    int deleteCount = 0;
896
                    Set<DescriptionElementBase> deleteCandidates = new HashSet<DescriptionElementBase>();
897
                    for (DescriptionElementBase descriptionElement : description.getElements()) {
898
                        if(descriptionElement instanceof Distribution) {
899
                            deleteCandidates.add(descriptionElement);
900
                        }
901
                    }
902
                    if(deleteCandidates.size() > 0){
903
                        for(DescriptionElementBase descriptionElement : deleteCandidates) {
904
                            description.removeElement(descriptionElement);
905
                            descriptionService.deleteDescriptionElement(descriptionElement);
906
                            descriptionElement = null;
907
                            deleteCount++;
908
                        }
909
                        descriptionService.saveOrUpdate(description);
910
                        logger.debug("\t" + deleteCount +" distributions cleared");
911
                    }
912

    
913
                }
914
                return description;
915
            }
916
        }
917

    
918
        // create a new one
919
        logger.debug("creating new description for " + taxon.getTitleCache());
920
        TaxonDescription description = TaxonDescription.NewInstance(taxon);
921
        description.setTitleCache(descriptionTitle, true);
922
        description.addMarker(Marker.NewInstance(MarkerType.COMPUTED(), true));
923
        return description;
924
    }
925

    
926
    /**
927
     * @param superArea
928
     * @return
929
     */
930
    private Set<NamedArea> getSubAreasFor(NamedArea superArea) {
931

    
932
        if(!subAreaMap.containsKey(superArea)) {
933
            if(logger.isDebugEnabled()){
934
                logger.debug("loading included areas for " + superArea.getLabel());
935
            }
936
            subAreaMap.put(superArea, superArea.getIncludes());
937
        }
938
        return subAreaMap.get(superArea);
939
    }
940

    
941
    /**
942
     * @param taxon
943
     * @return
944
     */
945
    private List<Distribution> distributionsFor(Taxon taxon) {
946
        List<Distribution> distributions = new ArrayList<Distribution>();
947
        for(TaxonDescription description: taxon.getDescriptions()) {
948
            readOnlyIfInSession(description);
949
            for(DescriptionElementBase deb : description.getElements()) {
950
                if(deb instanceof Distribution) {
951
                    readOnlyIfInSession(deb);
952
                    distributions.add((Distribution)deb);
953
                }
954
            }
955
        }
956
        return distributions;
957
    }
958

    
959
    /**
960
     * This method avoids problems when running the TransmissionEngineDistribution test.
961
     * For some unknown reason entities are not in the PersitenceContext even if they are
962
     * loaded by a service method. Setting these entities to readonly would raise a
963
     * TransientObjectException("Instance was not associated with this persistence context")
964
     *
965
     * @param entity
966
     */
967
    private void readOnlyIfInSession(CdmBase entity) {
968
        if(getSession().contains(entity)) {
969
            getSession().setReadOnly(entity, true);
970
        }
971
    }
972

    
973
    /**
974
     * @param taxon
975
     * @param logger2
976
     * @return
977
     */
978
    private String taxonToString(TaxonBase taxon) {
979
        if(logger.isTraceEnabled()) {
980
            return taxon.getTitleCache();
981
        } else {
982
            return taxon.toString();
983
        }
984
    }
985

    
986
    /**
987
     * @param taxon
988
     * @param logger2
989
     * @return
990
     */
991
    private String termToString(OrderedTermBase<?> term) {
992
        if(logger.isTraceEnabled()) {
993
            return term.getLabel() + " [" + term.getIdInVocabulary() + "]";
994
        } else {
995
            return term.getIdInVocabulary();
996
        }
997
    }
998

    
999
    /**
1000
     * Sets the priorities for presence and absence terms, the priorities are stored in extensions.
1001
     * This method will start a new transaction and commits it after the work is done.
1002
     */
1003
    public void updatePriorities() {
1004

    
1005
        TransactionStatus txStatus = startTransaction(false);
1006

    
1007
        Map<PresenceAbsenceTerm, Integer> priorityMap = new HashMap<PresenceAbsenceTerm, Integer>();
1008

    
1009
        priorityMap.put(PresenceAbsenceTerm.CULTIVATED_REPORTED_IN_ERROR(), 1);
1010
        priorityMap.put(PresenceAbsenceTerm.INTRODUCED_UNCERTAIN_DEGREE_OF_NATURALISATION(), 2);
1011
        priorityMap.put(PresenceAbsenceTerm.INTRODUCED_FORMERLY_INTRODUCED(), 3);
1012
        priorityMap.put(PresenceAbsenceTerm.INTRODUCED_REPORTED_IN_ERROR(), 20);
1013
        priorityMap.put(PresenceAbsenceTerm.NATIVE_REPORTED_IN_ERROR(), 30);
1014
        priorityMap.put(PresenceAbsenceTerm.CULTIVATED(), 45);
1015
        priorityMap.put(PresenceAbsenceTerm.NATIVE_FORMERLY_NATIVE(), 40);
1016
        priorityMap.put(PresenceAbsenceTerm.NATIVE_PRESENCE_QUESTIONABLE(), 60);
1017
        priorityMap.put(PresenceAbsenceTerm.INTRODUCED_PRESENCE_QUESTIONABLE(), 50);
1018
        priorityMap.put(PresenceAbsenceTerm.INTRODUCED_DOUBTFULLY_INTRODUCED(), 80);
1019
        priorityMap.put(PresenceAbsenceTerm.INTRODUCED(), 90);
1020
        priorityMap.put(PresenceAbsenceTerm.INTRODUCED_ADVENTITIOUS(), 100);
1021
        priorityMap.put(PresenceAbsenceTerm.INTRODUCED_NATURALIZED(), 110);
1022
        priorityMap.put(PresenceAbsenceTerm.NATIVE_DOUBTFULLY_NATIVE(), 120); // null
1023
        priorityMap.put(PresenceAbsenceTerm.NATIVE(), 130); // null
1024
        priorityMap.put(PresenceAbsenceTerm.ENDEMIC_FOR_THE_RELEVANT_AREA(), 999);
1025

    
1026
        for(PresenceAbsenceTerm term : priorityMap.keySet()) {
1027
            // load the term
1028
            term = (PresenceAbsenceTerm) termService.load(term.getUuid());
1029
            // find the extension
1030
            Extension priorityExtension = null;
1031
            Set<Extension> extensions = term.getExtensions();
1032
            for(Extension extension : extensions){
1033
                if (!extension.getType().equals(ExtensionType.ORDER())) {
1034
                    continue;
1035
                }
1036
                int pos = extension.getValue().indexOf(EXTENSION_VALUE_PREFIX);
1037
                if(pos == 0){ // if starts with EXTENSION_VALUE_PREFIX
1038
                    priorityExtension = extension;
1039
                    break;
1040
                }
1041
            }
1042
            if(priorityExtension == null) {
1043
                priorityExtension = Extension.NewInstance(term, null, ExtensionType.ORDER());
1044
            }
1045
            priorityExtension.setValue(EXTENSION_VALUE_PREFIX + priorityMap.get(term));
1046

    
1047
            // save the term
1048
            termService.saveOrUpdate(term);
1049
            if (logger.isDebugEnabled()) {
1050
                logger.debug("Priority updated for " + term.getLabel());
1051
            }
1052
        }
1053

    
1054
        commitTransaction(txStatus);
1055
    }
1056

    
1057
    public static void addSourcesDeduplicated(Set<DescriptionElementSource> target, Set<DescriptionElementSource> sources) {
1058
        for(DescriptionElementSource source : sources) {
1059
            boolean contained = false;
1060
            for(DescriptionElementSource existingSource: target) {
1061
                if(existingSource.equalsByShallowCompare(source)) {
1062
                    contained = true;
1063
                    break;
1064
                }
1065
            }
1066
            if(!contained) {
1067
                try {
1068
                    target.add((DescriptionElementSource)source.clone());
1069
                } catch (CloneNotSupportedException e) {
1070
                    // should never happen
1071
                    throw new RuntimeException(e);
1072
                }
1073
            }
1074
        }
1075
    }
1076

    
1077
    /**
1078
     * @return the batchMinFreeHeap
1079
     */
1080
    public long getBatchMinFreeHeap() {
1081
        return batchMinFreeHeap;
1082
    }
1083

    
1084
    /**
1085
     * @param batchMinFreeHeap the batchMinFreeHeap to set
1086
     */
1087
    public void setBatchMinFreeHeap(long batchMinFreeHeap) {
1088
        this.batchMinFreeHeap = batchMinFreeHeap;
1089
    }
1090

    
1091
    public enum AggregationMode {
1092
        byAreas,
1093
        byRanks,
1094
        byAreasAndRanks
1095

    
1096
    }
1097

    
1098
    private class StatusAndSources {
1099

    
1100
        private final PresenceAbsenceTerm status;
1101

    
1102
        private final Set<DescriptionElementSource> sources = new HashSet<>();
1103

    
1104
        public StatusAndSources(PresenceAbsenceTerm status, Set<DescriptionElementSource> sources) {
1105
            this.status = status;
1106
            addSourcesDeduplicated(this.sources, sources);
1107
        }
1108

    
1109
        /**
1110
         * @param sources
1111
         */
1112
        public void addSources(Set<DescriptionElementSource> sources) {
1113
            addSourcesDeduplicated(this.sources, sources);
1114
        }
1115

    
1116
    }
1117
}
    (1-1/1)