Project

General

Profile

Download (44.1 KB) Statistics
| Branch: | Tag: | Revision:
1
// $Id$
2
/**
3
* Copyright (C) 2013 EDIT
4
* European Distributed Institute of Taxonomy
5
* http://www.e-taxonomy.eu
6
*
7
* The contents of this file are subject to the Mozilla Public License Version 1.1
8
* See LICENSE.TXT at the top of this package for the full license terms.
9
*/
10
package eu.etaxonomy.cdm.api.service.description;
11

    
12
import java.util.ArrayList;
13
import java.util.Arrays;
14
import java.util.HashMap;
15
import java.util.HashSet;
16
import java.util.Iterator;
17
import java.util.LinkedList;
18
import java.util.List;
19
import java.util.Map;
20
import java.util.Set;
21
import java.util.UUID;
22

    
23
import org.apache.log4j.Logger;
24
import org.hibernate.FlushMode;
25
import org.hibernate.HibernateException;
26
import org.hibernate.Session;
27
import org.hibernate.engine.spi.SessionFactoryImplementor;
28
import org.hibernate.search.Search;
29
import org.springframework.beans.factory.annotation.Autowired;
30
import org.springframework.orm.hibernate5.HibernateTransactionManager;
31
import org.springframework.stereotype.Service;
32
import org.springframework.transaction.TransactionDefinition;
33
import org.springframework.transaction.TransactionStatus;
34
import org.springframework.transaction.support.DefaultTransactionDefinition;
35

    
36
import eu.etaxonomy.cdm.api.service.IClassificationService;
37
import eu.etaxonomy.cdm.api.service.IDescriptionService;
38
import eu.etaxonomy.cdm.api.service.INameService;
39
import eu.etaxonomy.cdm.api.service.ITaxonService;
40
import eu.etaxonomy.cdm.api.service.ITermService;
41
import eu.etaxonomy.cdm.common.DynamicBatch;
42
import eu.etaxonomy.cdm.common.JvmLimitsException;
43
import eu.etaxonomy.cdm.common.monitor.IProgressMonitor;
44
import eu.etaxonomy.cdm.common.monitor.NullProgressMonitor;
45
import eu.etaxonomy.cdm.common.monitor.SubProgressMonitor;
46
import eu.etaxonomy.cdm.model.common.DefinedTermBase;
47
import eu.etaxonomy.cdm.model.common.Extension;
48
import eu.etaxonomy.cdm.model.common.ExtensionType;
49
import eu.etaxonomy.cdm.model.common.Marker;
50
import eu.etaxonomy.cdm.model.common.MarkerType;
51
import eu.etaxonomy.cdm.model.common.OrderedTermBase;
52
import eu.etaxonomy.cdm.model.description.DescriptionElementBase;
53
import eu.etaxonomy.cdm.model.description.DescriptionElementSource;
54
import eu.etaxonomy.cdm.model.description.Distribution;
55
import eu.etaxonomy.cdm.model.description.PresenceAbsenceTerm;
56
import eu.etaxonomy.cdm.model.description.TaxonDescription;
57
import eu.etaxonomy.cdm.model.location.NamedArea;
58
import eu.etaxonomy.cdm.model.name.Rank;
59
import eu.etaxonomy.cdm.model.taxon.Classification;
60
import eu.etaxonomy.cdm.model.taxon.Taxon;
61
import eu.etaxonomy.cdm.model.taxon.TaxonBase;
62
import eu.etaxonomy.cdm.persistence.dto.ClassificationLookupDTO;
63

    
64
/**
65
 *
66
 * <h2>GENERAL NOTES </h2>
67
 * <em>TODO: These notes are directly taken from original Transmission Engine Occurrence
68
 * version 14 written in Visual Basic and still need to be
69
 * adapted to the java version of the transmission engine!</em>
70
 *
71
 * <h3>summaryStatus</h3>
72
 *
73
 *   Each distribution information has a summaryStatus, this is an summary of the status codes
74
 *   as stored in the fields of emOccurrence native, introduced, cultivated, ...
75
 *   The summaryStatus seems to be equivalent to  the CDM DistributionStatus
76
 *
77
 * <h3>map generation</h3>
78
 *
79
 *   When generating maps from the accumulated distribution information some special cases have to be handled:
80
 * <ol>
81
 *   <li>if a entered or imported status information exist for the same area for which calculated (accumulated)
82
 *       data is available, the calculated data has to be given preference over other data.
83
 *   </li>
84
 *   <li>If there is an area with a sub area and both areas have the same calculated status only the subarea
85
 *       status should be shown in the map, whereas the super area should be ignored.
86
 *   </li>
87
 * </ol>
88
 *
89
 * @author Anton Güntsch (author of original Transmission Engine Occurrence version 14 written in Visual Basic)
90
 * @author Andreas Kohlbecker (2013, porting Transmission Engine Occurrence to Java)
91
 * @date Feb 22, 2013
92
 */
93
@Service
94
public class TransmissionEngineDistribution { //TODO extends IoBase?
95

    
96

    
97
    public static final String EXTENSION_VALUE_PREFIX = "transmissionEngineDistribution.priority:";
98

    
99
    public static final Logger logger = Logger.getLogger(TransmissionEngineDistribution.class);
100

    
101
    /**
102
     * only used for performance testing
103
     */
104
    final boolean ONLY_FISRT_BATCH = false;
105

    
106

    
107
    protected static final List<String> TAXONDESCRIPTION_INIT_STRATEGY = Arrays.asList(new String [] {
108
            "description.markers.markerType",
109
            "description.elements.markers.markerType",
110
            "description.elements.area",
111
            "description.elements.status",
112
            "description.elements.sources.citation.authorship",
113
//            "description.elements.sources.nameUsedInSource",
114
//            "description.elements.multilanguageText",
115
//            "name.status.type",
116
    });
117

    
118

    
119
    /**
120
     * A map which contains the status terms as key and the priority as value
121
     * The map will contain both, the PresenceTerms and the AbsenceTerms
122
     */
123
    private Map<PresenceAbsenceTerm, Integer> statusPriorityMap = null;
124

    
125
    @Autowired
126
    private IDescriptionService descriptionService;
127

    
128
    @Autowired
129
    private ITermService termService;
130

    
131
    @Autowired
132
    private ITaxonService taxonService;
133

    
134
    @Autowired
135
    private IClassificationService classificationService;
136

    
137
    @Autowired
138
    private INameService mameService;
139

    
140
    @Autowired
141
    private HibernateTransactionManager transactionManager;
142

    
143
    private List<PresenceAbsenceTerm> byAreaIgnoreStatusList = null;
144

    
145
    private List<PresenceAbsenceTerm> byRankIgnoreStatusList = null;
146

    
147
    private final Map<NamedArea, Set<NamedArea>> subAreaMap = new HashMap<NamedArea, Set<NamedArea>>();
148

    
149
    int byRankTicks = 300;
150
    int byAreasTicks = 100;
151

    
152

    
153
    private static final long BATCH_MIN_FREE_HEAP = 800  * 1024 * 1024;
154
    /**
155
     * ratio of the initially free heap which should not be used
156
     * during the batch processing. This amount of the heap is reserved
157
     * for the flushing of the session and to the index
158
     */
159
    private static final double BATCH_FREE_HEAP_RATIO = 0.9;
160
    private static final int BATCH_SIZE_BY_AREA = 1000;
161
    private static final int BATCH_SIZE_BY_RANK = 500;
162

    
163

    
164

    
165
    /**
166
     * byAreaIgnoreStatusList contains by default:
167
     *  <ul>
168
     *    <li>AbsenceTerm.CULTIVATED_REPORTED_IN_ERROR()</li>
169
     *    <li>AbsenceTerm.INTRODUCED_REPORTED_IN_ERROR()</li>
170
     *    <li>AbsenceTerm.INTRODUCED_FORMERLY_INTRODUCED()</li>
171
     *    <li>AbsenceTerm.NATIVE_REPORTED_IN_ERROR()</li>
172
     *    <li>AbsenceTerm.NATIVE_FORMERLY_NATIVE()</li>
173
     *  </ul>
174
     *
175
     * @return the byAreaIgnoreStatusList
176
     */
177
    public List<PresenceAbsenceTerm> getByAreaIgnoreStatusList() {
178
        if(byAreaIgnoreStatusList == null ){
179
            byAreaIgnoreStatusList = Arrays.asList(
180
                    new PresenceAbsenceTerm[] {
181
                    		PresenceAbsenceTerm.CULTIVATED_REPORTED_IN_ERROR(),
182
                    		PresenceAbsenceTerm.INTRODUCED_REPORTED_IN_ERROR(),
183
                    		PresenceAbsenceTerm.NATIVE_REPORTED_IN_ERROR(),
184
                    		PresenceAbsenceTerm.INTRODUCED_FORMERLY_INTRODUCED(),
185
                    		PresenceAbsenceTerm.NATIVE_FORMERLY_NATIVE()
186
                            // TODO what about PresenceAbsenceTerm.ABSENT() also ignore?
187
                    });
188
        }
189
        return byAreaIgnoreStatusList;
190
    }
191

    
192
    /**
193
     * @param byAreaIgnoreStatusList the byAreaIgnoreStatusList to set
194
     */
195
    public void setByAreaIgnoreStatusList(List<PresenceAbsenceTerm> byAreaIgnoreStatusList) {
196
        this.byAreaIgnoreStatusList = byAreaIgnoreStatusList;
197
    }
198

    
199
    /**
200
     * byRankIgnoreStatusList contains by default
201
     *  <ul>
202
     *    <li>PresenceTerm.ENDEMIC_FOR_THE_RELEVANT_AREA()</li>
203
     *  </ul>
204
     *
205
     * @return the byRankIgnoreStatusList
206
     */
207
    public List<PresenceAbsenceTerm> getByRankIgnoreStatusList() {
208

    
209
        if (byRankIgnoreStatusList == null) {
210
            byRankIgnoreStatusList = Arrays.asList(
211
                    new PresenceAbsenceTerm[] {
212
                    		PresenceAbsenceTerm.ENDEMIC_FOR_THE_RELEVANT_AREA()
213
                    });
214
        }
215
        return byRankIgnoreStatusList;
216
    }
217

    
218
    /**
219
     * @param byRankIgnoreStatusList the byRankIgnoreStatusList to set
220
     */
221
    public void setByRankIgnoreStatusList(List<PresenceAbsenceTerm> byRankIgnoreStatusList) {
222
        this.byRankIgnoreStatusList = byRankIgnoreStatusList;
223
    }
224

    
225
    /**
226
     *
227
     * @param superAreas
228
     */
229
    public TransmissionEngineDistribution() {
230
    }
231

    
232
    /**
233
     * initializes the map which contains the status terms as key and the priority as value
234
     * The map will contain both, the PresenceTerms and the AbsenceTerms
235
     */
236
    private void initializeStatusPriorityMap() {
237

    
238
        statusPriorityMap = new HashMap<PresenceAbsenceTerm, Integer>();
239
        Integer priority;
240

    
241
        // PresenceTerms
242
        for(PresenceAbsenceTerm term : termService.list(PresenceAbsenceTerm.class, null, null, null, null)){
243
            priority = getPriorityFor(term);
244
            if(priority != null){
245
                statusPriorityMap.put(term, priority);
246
            }
247
        }
248
    }
249

    
250
    /**
251
     * Compares the PresenceAbsenceTermBase terms contained in <code>a.status</code> and <code>b.status</code> after
252
     * the priority as stored in the statusPriorityMap. The StatusAndSources object with
253
     * the higher priority is returned. In the case of <code>a == b</code> the sources of b will be added to the sources
254
     * of a.
255
     *
256
     * If either a or b or the status are null b or a is returned.
257
     *
258
     * @see initializeStatusPriorityMap()
259
     *
260
     * @param a
261
     * @param b
262
     * @param sourcesForWinnerB
263
     *  In the case when <code>b</code> is preferred over <code>a</code> these Set of sources will be added to the sources of <code>b</code>
264
     * @return
265
     */
266
    private StatusAndSources choosePreferred(StatusAndSources a, StatusAndSources b, Set<DescriptionElementSource> sourcesForWinnerB){
267

    
268
        if (statusPriorityMap == null) {
269
            initializeStatusPriorityMap();
270
        }
271

    
272
        if (b == null || b.status == null) {
273
            return a;
274
        }
275
        if (a == null || a.status == null) {
276
            return b;
277
        }
278

    
279
        if (statusPriorityMap.get(a.status) == null) {
280
            logger.warn("No priority found in map for " + a.status.getLabel());
281
            return b;
282
        }
283
        if (statusPriorityMap.get(b.status) == null) {
284
            logger.warn("No priority found in map for " + b.status.getLabel());
285
            return a;
286
        }
287
        if(statusPriorityMap.get(a.status) < statusPriorityMap.get(b.status)){
288
            if(sourcesForWinnerB != null) {
289
                b.addSources(sourcesForWinnerB);
290
            }
291
            return b;
292
        } else if (statusPriorityMap.get(a.status) == statusPriorityMap.get(b.status)){
293
            a.addSources(b.sources);
294
            return a;
295
        } else {
296
            return a;
297
        }
298
    }
299

    
300
    /**
301
     * reads the priority for the given status term from the extensions.
302
     *
303
     * @param term
304
     * @return the priority value
305
     */
306
    private Integer getPriorityFor(DefinedTermBase<?> term) {
307
        Set<Extension> extensions = term.getExtensions();
308
        for(Extension extension : extensions){
309
            if(!extension.getType().equals(ExtensionType.ORDER())) {
310
                continue;
311
            }
312
            int pos = extension.getValue().indexOf(EXTENSION_VALUE_PREFIX);
313
            if(pos == 0){ // if starts with EXTENSION_VALUE_PREFIX
314
                try {
315
                    Integer priority = Integer.valueOf(extension.getValue().substring(EXTENSION_VALUE_PREFIX.length()));
316
                    return priority;
317
                } catch (NumberFormatException e) {
318
                    logger.warn("Invalid number format in Extension:" + extension.getValue());
319
                }
320
            }
321
        }
322
        logger.warn("no priority defined for '" + term.getLabel() + "'");
323
        return null;
324
    }
325

    
326
    /**
327
     * runs both steps
328
     * <ul>
329
     * <li>Step 1: Accumulate occurrence records by area</li>
330
     * <li>Step 2: Accumulate by ranks starting from lower rank to upper rank,
331
     * the status of all children are accumulated on each rank starting from
332
     * lower rank to upper rank.</li>
333
     * </ul>
334
     *
335
     * @param superAreas
336
     *            the areas to which the subordinate areas should be projected.
337
     * @param lowerRank
338
     * @param upperRank
339
     * @param classification
340
     * @param classification
341
     *            limit the accumulation process to a specific classification
342
     *            (not yet implemented)
343
     * @param monitor
344
     *            the progress monitor to use for reporting progress to the
345
     *            user. It is the caller's responsibility to call done() on the
346
     *            given monitor. Accepts null, indicating that no progress
347
     *            should be reported and that the operation cannot be cancelled.
348
     */
349
    public void accumulate(AggregationMode mode, List<NamedArea> superAreas, Rank lowerRank, Rank upperRank,
350
            Classification classification, IProgressMonitor monitor) throws JvmLimitsException {
351

    
352
        if (monitor == null) {
353
            monitor = new NullProgressMonitor();
354
        }
355

    
356
        // only for debugging:
357
        //logger.setLevel(Level.TRACE); // TRACE will slow down a lot since it forces loading all term representations
358
        //Logger.getLogger("org.hibernate.SQL").setLevel(Level.DEBUG);
359

    
360
        logger.info("Hibernate JDBC Batch size: "
361
                + ((SessionFactoryImplementor) getSession().getSessionFactory()).getSettings().getJdbcBatchSize());
362

    
363
        Set<Classification> classifications = new HashSet<Classification>();
364
        if(classification == null) {
365
            classifications.addAll(classificationService.listClassifications(null, null, null, null));
366
        } else {
367
            classifications.add(classification);
368
        }
369

    
370
        int aggregationWorkTicks;
371
        switch(mode){
372
        case byAreasAndRanks:
373
            aggregationWorkTicks = byAreasTicks + byRankTicks;
374
            break;
375
        case byAreas:
376
            aggregationWorkTicks = byAreasTicks;
377
            break;
378
        case byRanks:
379
            aggregationWorkTicks = byRankTicks;
380
            break;
381
        default:
382
            aggregationWorkTicks = 0;
383
            break;
384
        }
385

    
386
        // take start time for performance testing
387
        // NOTE: use ONLY_FISRT_BATCH = true to measure only one batch
388
        double start = System.currentTimeMillis();
389

    
390
        monitor.beginTask("Accumulating distributions", (classifications.size() * aggregationWorkTicks) + 1 );
391

    
392
        updatePriorities();
393

    
394
        List<Rank> ranks = rankInterval(lowerRank, upperRank);
395

    
396
        monitor.worked(1);
397

    
398

    
399
        for(Classification _classification : classifications) {
400

    
401
            ClassificationLookupDTO classificationLookupDao = classificationService.classificationLookup(_classification);
402
            classificationLookupDao.filterInclude(ranks);
403

    
404
            double end1 = System.currentTimeMillis();
405
            logger.info("Time elapsed for classificationLookup() : " + (end1 - start) / (1000) + "s");
406
            double start2 = System.currentTimeMillis();
407

    
408
            monitor.subTask("Accumulating distributions to super areas for " + _classification.getTitleCache());
409
            if (mode.equals(AggregationMode.byAreas) || mode.equals(AggregationMode.byAreasAndRanks)) {
410
                accumulateByArea(superAreas, classificationLookupDao, new SubProgressMonitor(monitor, byAreasTicks), true);
411
            }
412
            monitor.subTask("Accumulating distributions to higher ranks for " + _classification.getTitleCache());
413

    
414
            double end2 = System.currentTimeMillis();
415
            logger.info("Time elapsed for accumulateByArea() : " + (end2 - start2) / (1000) + "s");
416

    
417
            double start3 = System.currentTimeMillis();
418
            if (mode.equals(AggregationMode.byRanks) || mode.equals(AggregationMode.byAreasAndRanks)) {
419
                accumulateByRank(ranks, classificationLookupDao, new SubProgressMonitor(monitor, byRankTicks), mode.equals(AggregationMode.byRanks));
420
            }
421

    
422
            double end3 = System.currentTimeMillis();
423
            logger.info("Time elapsed for accumulateByRank() : " + (end3 - start3) / (1000) + "s");
424
            logger.info("Time elapsed for accumulate(): " + (end3 - start) / (1000) + "s");
425

    
426
            if(ONLY_FISRT_BATCH) {
427
                monitor.done();
428
                break;
429
            }
430
        }
431
        monitor.done();
432
    }
433

    
434

    
435
    /**
436
     * Step 1: Accumulate occurrence records by area
437
     * <ul>
438
     * <li>areas are projected to super areas e.g.:  HS <-- HS(A), HS(G), HS(S)</li>
439
     * <li>super areas do initially not have a status set ==> Prerequisite to check in CDM</li>
440
     * <li>areas having a summary status of summary value different from {@link #getByAreaIgnoreStatusList()} are ignored</li>
441
     * <li>areas have a priority value, the status of the area with highest priority determines the status of the super area</li>
442
     * <li>the source references of the accumulated distributions are also accumulated into the new distribution,,</li>
443
     * <li>this has been especially implemented for the EuroMed Checklist Vol2 and might not be a general requirement</li>
444
     * </ul>
445
     *
446
     * @param superAreas
447
     *      the areas to which the subordinate areas should be projected
448
     * @param classificationLookupDao
449
     * @throws JvmLimitsException
450
     *
451
     */
452
    protected void accumulateByArea(List<NamedArea> superAreas, ClassificationLookupDTO classificationLookupDao,  IProgressMonitor subMonitor, boolean doClearDescriptions) throws JvmLimitsException {
453

    
454
        DynamicBatch batch = new DynamicBatch(BATCH_SIZE_BY_AREA, BATCH_MIN_FREE_HEAP);
455
        batch.setRequiredFreeHeap(BATCH_FREE_HEAP_RATIO);
456

    
457
        TransactionStatus txStatus = startTransaction(false);
458

    
459
        // reload superAreas TODO is it faster to getSession().merge(object) ??
460
        Set<UUID> superAreaUuids = new HashSet<UUID>(superAreas.size());
461
        for (NamedArea superArea : superAreas){
462
            superAreaUuids.add(superArea.getUuid());
463
        }
464

    
465
        // visit all accepted taxa
466
        subMonitor.beginTask("Accumulating by area ",  classificationLookupDao.getTaxonIds().size());
467
        Iterator<Integer> taxonIdIterator = classificationLookupDao.getTaxonIds().iterator();
468

    
469
        while (taxonIdIterator.hasNext() || batch.hasUnprocessedItems()) {
470

    
471
            if(txStatus == null) {
472
                // transaction has been comitted at the end of this batch, start a new one
473
                txStatus = startTransaction(false);
474
            }
475

    
476
            // the session is cleared after each batch, so load the superAreaList for each batch
477
            List<NamedArea> superAreaList = (List)termService.find(superAreaUuids);
478

    
479
            // load taxa for this batch
480
            List<Integer> taxonIds = batch.nextItems(taxonIdIterator);
481
//            logger.debug("accumulateByArea() - taxon " + taxonPager.getFirstRecord() + " to " + taxonPager.getLastRecord() + " of " + taxonPager.getCount() + "]");
482
            List<TaxonBase> taxa = taxonService.loadByIds(taxonIds, TAXONDESCRIPTION_INIT_STRATEGY);
483

    
484
            // iterate over the taxa and accumulate areas
485
            // start processing the new batch
486

    
487
            for(TaxonBase taxonBase : taxa) {
488
                if(logger.isDebugEnabled()){
489
                    logger.debug("accumulateByArea() - taxon :" + taxonToString(taxonBase));
490
                }
491

    
492
                batch.incementCounter();
493

    
494
                Taxon taxon = (Taxon)taxonBase;
495
                TaxonDescription description = findComputedDescription(taxon, doClearDescriptions);
496
                List<Distribution> distributions = distributionsFor(taxon);
497

    
498
                // Step through superAreas for accumulation of subAreas
499
                for (NamedArea superArea : superAreaList){
500

    
501
                    // accumulate all sub area status
502
                    StatusAndSources accumulatedStatusAndSources = null;
503
                    // TODO consider using the TermHierarchyLookup (only in local branch a.kohlbecker)
504
                    Set<NamedArea> subAreas = getSubAreasFor(superArea);
505
                    for(NamedArea subArea : subAreas){
506
                        if(logger.isTraceEnabled()){
507
                            logger.trace("accumulateByArea() - \t\t" + termToString(subArea));
508
                        }
509
                        // step through all distributions for the given subArea
510
                        for(Distribution distribution : distributions){
511
                            if(distribution.getArea() != null && distribution.getArea().equals(subArea) && distribution.getStatus() != null) {
512
                                PresenceAbsenceTerm status = distribution.getStatus();
513
                                if(logger.isTraceEnabled()){
514
                                    logger.trace("accumulateByArea() - \t\t" + termToString(subArea) + ": " + termToString(status));
515
                                }
516
                                // skip all having a status value different of those in byAreaIgnoreStatusList
517
                                if (getByAreaIgnoreStatusList().contains(status)){
518
                                    continue;
519
                                }
520
                                StatusAndSources subStatusAndSources = new StatusAndSources(status, distribution.getSources());
521
                                accumulatedStatusAndSources = choosePreferred(accumulatedStatusAndSources, subStatusAndSources, null);
522
                            }
523
                        }
524
                    } // next sub area
525
                    if (accumulatedStatusAndSources != null) {
526
                        if(logger.isDebugEnabled()){
527
                            logger.debug("accumulateByArea() - \t >> " + termToString(superArea) + ": " + termToString(accumulatedStatusAndSources.status));
528
                        }
529
                        // store new distribution element for superArea in taxon description
530
                        Distribution newDistribitionElement = Distribution.NewInstance(superArea, accumulatedStatusAndSources.status);
531
                        newDistribitionElement.getSources().addAll(accumulatedStatusAndSources.sources);
532
                        newDistribitionElement.addMarker(Marker.NewInstance(MarkerType.COMPUTED(), true));
533
                        description.addElement(newDistribitionElement);
534
                    }
535

    
536
                } // next super area ....
537

    
538
                descriptionService.saveOrUpdate(description);
539
                taxonService.saveOrUpdate(taxon);
540
                subMonitor.worked(1);
541
                if(!batch.isWithinJvmLimits()) {
542
                    break; // flushAndClear and start with new batch
543
                }
544

    
545
            } // next taxon
546

    
547
            flushAndClear();
548

    
549
            // commit for every batch, otherwise the persistent context
550
            // may grow too much and eats up all the heap
551
            commitTransaction(txStatus);
552
            txStatus = null;
553

    
554
            if(ONLY_FISRT_BATCH) {
555
                break;
556
            }
557

    
558
        } // next batch of taxa
559

    
560
        subMonitor.done();
561
    }
562

    
563
   /**
564
    * Step 2: Accumulate by ranks starting from lower rank to upper rank, the status of all children
565
    * are accumulated on each rank starting from lower rank to upper rank.
566
    * <ul>
567
    * <li>aggregate distribution of included taxa of the next lower rank for any rank level starting from the lower rank (e.g. sub species)
568
    *    up to upper rank (e.g. Genus)</li>
569
    *  <li>the accumulation id done for each distribution area found in the included taxa</li>
570
    *  <li>areas of subtaxa with status endemic are ignored</li>
571
    *  <li>the status with the highest priority determines the value for the accumulated distribution</li>
572
    *  <li>the source reference of the accumulated distributions are also accumulated into the new distribution,
573
    *    this has been especially implemented for the EuroMed Checklist Vol2 and might not be a general requirement</li>
574
    *</ul>
575
 * @throws JvmLimitsException
576
    */
577
    protected void accumulateByRank(List<Rank> rankInterval, ClassificationLookupDTO classificationLookupDao,  IProgressMonitor subMonitor, boolean doClearDescriptions) throws JvmLimitsException {
578

    
579
        DynamicBatch batch = new DynamicBatch(BATCH_SIZE_BY_RANK, BATCH_MIN_FREE_HEAP);
580
        batch.setRequiredFreeHeap(BATCH_FREE_HEAP_RATIO);
581
        batch.setMaxAllowedGcIncreases(10);
582

    
583
        int ticksPerRank = 100;
584

    
585
        TransactionStatus txStatus = startTransaction(false);
586

    
587
        // the loadRankSpecificRootNodes() method not only finds
588
        // taxa of the specified rank but also taxa of lower ranks
589
        // if no taxon of the specified rank exists, so we need to
590
        // remember which taxa have been processed already
591
        Set<Integer> taxaProcessedIds = new HashSet<Integer>();
592
        List<TaxonBase> taxa = null;
593
        List<TaxonBase> childTaxa = null;
594

    
595
        List<Rank> ranks = rankInterval;
596

    
597
        subMonitor.beginTask("Accumulating by rank", ranks.size() * ticksPerRank);
598

    
599
        for (Rank rank : ranks) {
600

    
601
            if(logger.isDebugEnabled()){
602
                logger.debug("accumulateByRank() - at Rank '" + termToString(rank) + "'");
603
            }
604

    
605
            Set<Integer> taxonIdsPerRank = classificationLookupDao.getTaxonIdByRank().get(rank);
606

    
607
            int taxonCountperRank = taxonIdsPerRank != null ? taxonIdsPerRank.size() : 0;
608

    
609
            SubProgressMonitor taxonSubMonitor = new SubProgressMonitor(subMonitor, ticksPerRank);
610
            taxonSubMonitor.beginTask("Accumulating by rank " + termToString(rank), taxonCountperRank);
611

    
612
            if(taxonCountperRank == 0) {
613
                taxonSubMonitor.done();
614
                continue;
615
            }
616

    
617

    
618
            Iterator<Integer> taxonIdIterator = taxonIdsPerRank.iterator();
619
            while (taxonIdIterator.hasNext() || batch.hasUnprocessedItems()) {
620

    
621
                if(txStatus == null) {
622
                    // transaction has been committed at the end of this batch, start a new one
623
                    txStatus = startTransaction(false);
624
                }
625

    
626
                // load taxa for this batch
627
                List<Integer> taxonIds = batch.nextItems(taxonIdIterator);
628

    
629
                taxa = taxonService.loadByIds(taxonIds, null);
630

    
631
//                if(logger.isDebugEnabled()){
632
//                           logger.debug("accumulateByRank() - taxon " + taxonPager.getFirstRecord() + " to " + taxonPager.getLastRecord() + " of " + taxonPager.getCount() + "]");
633
//                }
634

    
635
                for(TaxonBase taxonBase : taxa) {
636

    
637
                    batch.incementCounter();
638

    
639
                    Taxon taxon = (Taxon)taxonBase;
640
                    if (taxaProcessedIds.contains(taxon.getId())) {
641
                        if(logger.isDebugEnabled()){
642
                            logger.debug("accumulateByRank() - skipping already processed taxon :" + taxonToString(taxon));
643
                        }
644
                        continue;
645
                    }
646
                    taxaProcessedIds.add(taxon.getId());
647
                    if(logger.isDebugEnabled()){
648
                        logger.debug("accumulateByRank() [" + rank.getLabel() + "] - taxon :" + taxonToString(taxon));
649
                    }
650

    
651
                    // Step through direct taxonomic children for accumulation
652
                    Map<NamedArea, StatusAndSources> accumulatedStatusMap = new HashMap<NamedArea, StatusAndSources>();
653

    
654
                    List<Integer> childTaxonIds = new ArrayList<>();
655
                    Set<Integer> childSet = classificationLookupDao.getChildTaxonMap().get(taxon.getId());
656
                    if(childSet != null) {
657
                        childTaxonIds.addAll(childSet);
658
                    }
659
                    if(!childTaxonIds.isEmpty()) {
660
                        childTaxa = taxonService.loadByIds(childTaxonIds, TAXONDESCRIPTION_INIT_STRATEGY);
661
                        LinkedList<TaxonBase> childStack = new LinkedList<TaxonBase>(childTaxa);
662
                        childTaxa = null; // allow to be garbage collected
663

    
664
                        while(childStack.size() > 0){
665

    
666
                            TaxonBase childTaxonBase = childStack.pop();
667
                            getSession().setReadOnly(childTaxonBase, true);
668

    
669
                            Taxon childTaxon = (Taxon) childTaxonBase;
670
                            getSession().setReadOnly(childTaxon, true);
671
                            if(logger.isTraceEnabled()){
672
                                logger.trace("                   subtaxon :" + taxonToString(childTaxon));
673
                            }
674

    
675
                            for(Distribution distribution : distributionsFor(childTaxon) ) {
676
                                PresenceAbsenceTerm status = distribution.getStatus();
677
                                NamedArea area = distribution.getArea();
678
                                if (status == null || getByRankIgnoreStatusList().contains(status)){
679
                                  continue;
680
                                }
681

    
682
                                StatusAndSources subStatusAndSources = new StatusAndSources(status, distribution.getSources());
683
                                accumulatedStatusMap.put(area, choosePreferred(accumulatedStatusMap.get(area), subStatusAndSources, null));
684
                             }
685

    
686
                            // evict all initialized entities of the childTaxon
687
                            // TODO consider using cascade="evict" in the model classes
688
//                            for( TaxonDescription description : ((Taxon)childTaxonBase).getDescriptions()) {
689
//                                for (DescriptionElementBase deb : description.getElements()) {
690
//                                    getSession().evict(deb);
691
//                                }
692
//                                getSession().evict(description); // this causes in some cases the taxon object to be detached from the session
693
//                            }
694
                            getSession().evict(childTaxonBase); // no longer needed, save heap
695
                        }
696

    
697
                        if(accumulatedStatusMap.size() > 0) {
698
                            TaxonDescription description = findComputedDescription(taxon, doClearDescriptions);
699
                            for (NamedArea area : accumulatedStatusMap.keySet()) {
700
                                Distribution distribition = findDistribution(description, area, accumulatedStatusMap.get(area).status);
701
                                if(distribition == null) {
702
                                    // create a new distribution element
703
                                    distribition = Distribution.NewInstance(area, accumulatedStatusMap.get(area).status);
704
                                    distribition.addMarker(Marker.NewInstance(MarkerType.COMPUTED(), true));
705
                                }
706
                                addSourcesDeduplicated(distribition.getSources(), accumulatedStatusMap.get(area).sources);
707

    
708
                                description.addElement(distribition);
709
                            }
710
                            taxonService.saveOrUpdate(taxon);
711
                            descriptionService.saveOrUpdate(description);
712
                        }
713

    
714
                    }
715
                    taxonSubMonitor.worked(1); // one taxon worked
716
                    if(!batch.isWithinJvmLimits()) {
717
                        break; // flushAndClear and start with new batch
718
                    }
719

    
720
                } // next taxon ....
721

    
722
                flushAndClear();
723

    
724
                // commit for every batch, otherwise the persistent context
725
                // may grow too much and eats up all the heap
726
                commitTransaction(txStatus);
727
                txStatus = null;
728

    
729
                // flushing the session and to the index (flushAndClear() ) can impose a
730
                // massive heap consumption. therefore we explicitly do a check after the
731
                // flush to detect these situations and to reduce the batch size.
732
                if(batch.getJvmMonitor().getGCRateSiceLastCheck() > 0.05) {
733
                    batch.reduceSize(0.5);
734
                }
735

    
736
                if(ONLY_FISRT_BATCH) {
737
                    break;
738
                }
739
            } // next batch
740

    
741
            taxonSubMonitor.done();
742
            subMonitor.worked(1);
743

    
744
            if(ONLY_FISRT_BATCH) {
745
                break;
746
            }
747
        } // next Rank
748

    
749
        logger.info("accumulateByRank() - done");
750
        subMonitor.done();
751
    }
752

    
753
/**
754
 * @param description
755
 * @param area
756
 * @param status
757
 * @return
758
 */
759
private Distribution findDistribution(TaxonDescription description, NamedArea area, PresenceAbsenceTerm status) {
760
    for(DescriptionElementBase item : description.getElements()) {
761
        if(!(item instanceof Distribution)) {
762
            continue;
763
        }
764
        Distribution distribution = ((Distribution)item);
765
        if(distribution.getArea().equals(area) && distribution.getStatus().equals(status)) {
766
            return distribution;
767
        }
768
    }
769
    return null;
770
}
771

    
772
/**
773
 * @param lowerRank
774
 * @param upperRank
775
 * @return
776
 */
777
private List<Rank> rankInterval(Rank lowerRank, Rank upperRank) {
778

    
779
    TransactionStatus txStatus = startTransaction(false);
780
    Rank currentRank = lowerRank;
781
    List<Rank> ranks = new ArrayList<Rank>();
782
    ranks.add(currentRank);
783
    while (!currentRank.isHigher(upperRank)) {
784
        currentRank = findNextHigherRank(currentRank);
785
        ranks.add(currentRank);
786
    }
787
    commitTransaction(txStatus);
788
    txStatus = null;
789
    return ranks;
790
}
791

    
792
    /**
793
     * @return
794
     */
795
    private Session getSession() {
796
        return descriptionService.getSession();
797
    }
798

    
799
    /**
800
     *
801
     */
802
    private void flush() {
803
        logger.debug("flushing session ...");
804
        getSession().flush();
805
        try {
806
            logger.debug("flushing to indexes ...");
807
            Search.getFullTextSession(getSession()).flushToIndexes();
808
        } catch (HibernateException e) {
809
            /* IGNORE - Hibernate Search Event listeners not configured ... */
810
            if(!e.getMessage().startsWith("Hibernate Search Event listeners not configured")){
811
                throw e;
812
            }
813
        }
814
    }
815

    
816
    /**
817
    *
818
    */
819
   private void flushAndClear() {
820
       flush();
821
       logger.debug("clearing session ...");
822
       getSession().clear();
823
   }
824

    
825

    
826
    // TODO merge with CdmApplicationDefaultConfiguration#startTransaction() into common base class
827
    public TransactionStatus startTransaction(Boolean readOnly) {
828

    
829
        DefaultTransactionDefinition defaultTxDef = new DefaultTransactionDefinition();
830
        defaultTxDef.setReadOnly(readOnly);
831
        TransactionDefinition txDef = defaultTxDef;
832

    
833
        // Log some transaction-related debug information.
834
        if (logger.isTraceEnabled()) {
835
            logger.trace("Transaction name = " + txDef.getName());
836
            logger.trace("Transaction facets:");
837
            logger.trace("Propagation behavior = " + txDef.getPropagationBehavior());
838
            logger.trace("Isolation level = " + txDef.getIsolationLevel());
839
            logger.trace("Timeout = " + txDef.getTimeout());
840
            logger.trace("Read Only = " + txDef.isReadOnly());
841
            // org.springframework.orm.hibernate5.HibernateTransactionManager
842
            // provides more transaction/session-related debug information.
843
        }
844

    
845
        TransactionStatus txStatus = transactionManager.getTransaction(txDef);
846

    
847
        getSession().setFlushMode(FlushMode.COMMIT);
848

    
849
        return txStatus;
850
    }
851

    
852
    // TODO merge with CdmApplicationDefaultConfiguration#startTransaction() into common base class
853
    public void commitTransaction(TransactionStatus txStatus){
854
        logger.debug("commiting transaction ...");
855
        transactionManager.commit(txStatus);
856
        return;
857
    }
858

    
859
    /**
860
     * returns the next higher rank
861
     *
862
     * TODO better implement OrderedTermBase.getNextHigherTerm() and OrderedTermBase.getNextLowerTerm()?
863
     *
864
     * @param rank
865
     * @return
866
     */
867
    private Rank findNextHigherRank(Rank rank) {
868
        rank = (Rank) termService.load(rank.getUuid());
869
        return rank.getNextHigherTerm();
870
//        OrderedTermVocabulary<Rank> rankVocabulary = mameService.getRankVocabulary();;
871
//        return rankVocabulary.getNextHigherTerm(rank);
872
    }
873

    
874
    /**
875
     * Either finds an existing taxon description of the given taxon or creates a new one.
876
     * If the doClear is set all existing description elements will be cleared.
877
     *
878
     * @param taxon
879
     * @param doClear will remove all existing Distributions if the taxon already
880
     * has a MarkerType.COMPUTED() TaxonDescription
881
     * @return
882
     */
883
    private TaxonDescription findComputedDescription(Taxon taxon, boolean doClear) {
884

    
885
        String descriptionTitle = this.getClass().getSimpleName();
886

    
887
        // find existing one
888
        for (TaxonDescription description : taxon.getDescriptions()) {
889
            if (description.hasMarker(MarkerType.COMPUTED(), true)) {
890
                logger.debug("reusing computed description for " + taxon.getTitleCache());
891
                if (doClear) {
892
                    int deleteCount = 0;
893
                    Set<DescriptionElementBase> deleteCandidates = new HashSet<DescriptionElementBase>();
894
                    for (DescriptionElementBase descriptionElement : description.getElements()) {
895
                        if(descriptionElement instanceof Distribution) {
896
                            deleteCandidates.add(descriptionElement);
897
                        }
898
                    }
899
                    if(deleteCandidates.size() > 0){
900
                        for(DescriptionElementBase descriptionElement : deleteCandidates) {
901
                            description.removeElement(descriptionElement);
902
                            descriptionService.deleteDescriptionElement(descriptionElement);
903
                            descriptionElement = null;
904
                            deleteCount++;
905
                        }
906
                        descriptionService.saveOrUpdate(description);
907
                        logger.debug("\t" + deleteCount +" distributions cleared");
908
                    }
909

    
910
                }
911
                return description;
912
            }
913
        }
914

    
915
        // create a new one
916
        logger.debug("creating new description for " + taxon.getTitleCache());
917
        TaxonDescription description = TaxonDescription.NewInstance(taxon);
918
        description.setTitleCache(descriptionTitle, true);
919
        description.addMarker(Marker.NewInstance(MarkerType.COMPUTED(), true));
920
        return description;
921
    }
922

    
923
    /**
924
     * @param superArea
925
     * @return
926
     */
927
    private Set<NamedArea> getSubAreasFor(NamedArea superArea) {
928

    
929
        if(!subAreaMap.containsKey(superArea)) {
930
            if(logger.isDebugEnabled()){
931
                logger.debug("loading included areas for " + superArea.getLabel());
932
            }
933
            subAreaMap.put(superArea, superArea.getIncludes());
934
        }
935
        return subAreaMap.get(superArea);
936
    }
937

    
938
    /**
939
     * @param taxon
940
     * @return
941
     */
942
    private List<Distribution> distributionsFor(Taxon taxon) {
943
        List<Distribution> distributions = new ArrayList<Distribution>();
944
        for(TaxonDescription description: taxon.getDescriptions()) {
945
            getSession().setReadOnly(description, true);
946
            for(DescriptionElementBase deb : description.getElements()) {
947
                if(deb instanceof Distribution) {
948
                    getSession().setReadOnly(deb, true);
949
                    distributions.add((Distribution)deb);
950
                }
951
            }
952
        }
953
        return distributions;
954
    }
955

    
956
    /**
957
     * @param taxon
958
     * @param logger2
959
     * @return
960
     */
961
    private String taxonToString(TaxonBase taxon) {
962
        if(logger.isTraceEnabled()) {
963
            return taxon.getTitleCache();
964
        } else {
965
            return taxon.toString();
966
        }
967
    }
968

    
969
    /**
970
     * @param taxon
971
     * @param logger2
972
     * @return
973
     */
974
    private String termToString(OrderedTermBase<?> term) {
975
        if(logger.isTraceEnabled()) {
976
            return term.getLabel() + " [" + term.getIdInVocabulary() + "]";
977
        } else {
978
            return term.getIdInVocabulary();
979
        }
980
    }
981

    
982
    /**
983
     * Sets the priorities for presence and absence terms, the priorities are stored in extensions.
984
     * This method will start a new transaction and commits it after the work is done.
985
     */
986
    public void updatePriorities() {
987

    
988
        TransactionStatus txStatus = startTransaction(false);
989

    
990
        Map<PresenceAbsenceTerm, Integer> priorityMap = new HashMap<PresenceAbsenceTerm, Integer>();
991

    
992
        priorityMap.put(PresenceAbsenceTerm.CULTIVATED_REPORTED_IN_ERROR(), 1);
993
        priorityMap.put(PresenceAbsenceTerm.INTRODUCED_UNCERTAIN_DEGREE_OF_NATURALISATION(), 2);
994
        priorityMap.put(PresenceAbsenceTerm.INTRODUCED_FORMERLY_INTRODUCED(), 3);
995
        priorityMap.put(PresenceAbsenceTerm.INTRODUCED_REPORTED_IN_ERROR(), 20);
996
        priorityMap.put(PresenceAbsenceTerm.NATIVE_REPORTED_IN_ERROR(), 30);
997
        priorityMap.put(PresenceAbsenceTerm.CULTIVATED(), 45);
998
        priorityMap.put(PresenceAbsenceTerm.NATIVE_FORMERLY_NATIVE(), 40);
999
        priorityMap.put(PresenceAbsenceTerm.NATIVE_PRESENCE_QUESTIONABLE(), 60);
1000
        priorityMap.put(PresenceAbsenceTerm.INTRODUCED_PRESENCE_QUESTIONABLE(), 50);
1001
        priorityMap.put(PresenceAbsenceTerm.INTRODUCED_DOUBTFULLY_INTRODUCED(), 80);
1002
        priorityMap.put(PresenceAbsenceTerm.INTRODUCED(), 90);
1003
        priorityMap.put(PresenceAbsenceTerm.INTRODUCED_ADVENTITIOUS(), 100);
1004
        priorityMap.put(PresenceAbsenceTerm.INTRODUCED_NATURALIZED(), 110);
1005
        priorityMap.put(PresenceAbsenceTerm.NATIVE_DOUBTFULLY_NATIVE(), 120); // null
1006
        priorityMap.put(PresenceAbsenceTerm.NATIVE(), 130); // null
1007
        priorityMap.put(PresenceAbsenceTerm.ENDEMIC_FOR_THE_RELEVANT_AREA(), 999);
1008

    
1009
        for(PresenceAbsenceTerm term : priorityMap.keySet()) {
1010
            // load the term
1011
            term = (PresenceAbsenceTerm) termService.load(term.getUuid());
1012
            // find the extension
1013
            Extension priorityExtension = null;
1014
            Set<Extension> extensions = term.getExtensions();
1015
            for(Extension extension : extensions){
1016
                if (!extension.getType().equals(ExtensionType.ORDER())) {
1017
                    continue;
1018
                }
1019
                int pos = extension.getValue().indexOf(EXTENSION_VALUE_PREFIX);
1020
                if(pos == 0){ // if starts with EXTENSION_VALUE_PREFIX
1021
                    priorityExtension = extension;
1022
                    break;
1023
                }
1024
            }
1025
            if(priorityExtension == null) {
1026
                priorityExtension = Extension.NewInstance(term, null, ExtensionType.ORDER());
1027
            }
1028
            priorityExtension.setValue(EXTENSION_VALUE_PREFIX + priorityMap.get(term));
1029

    
1030
            // save the term
1031
            termService.saveOrUpdate(term);
1032
            if (logger.isDebugEnabled()) {
1033
                logger.debug("Priority updated for " + term.getLabel());
1034
            }
1035
        }
1036

    
1037
        commitTransaction(txStatus);
1038
    }
1039

    
1040
    public static void addSourcesDeduplicated(Set<DescriptionElementSource> target, Set<DescriptionElementSource> sources) {
1041
        for(DescriptionElementSource source : sources) {
1042
            boolean contained = false;
1043
            for(DescriptionElementSource existingSource: target) {
1044
                if(existingSource.equalsByShallowCompare(source)) {
1045
                    contained = true;
1046
                    break;
1047
                }
1048
            }
1049
            if(!contained) {
1050
                try {
1051
                    target.add((DescriptionElementSource)source.clone());
1052
                } catch (CloneNotSupportedException e) {
1053
                    // should never happen
1054
                    throw new RuntimeException(e);
1055
                }
1056
            }
1057
        }
1058
    }
1059

    
1060
    public enum AggregationMode {
1061
        byAreas,
1062
        byRanks,
1063
        byAreasAndRanks
1064

    
1065
    }
1066

    
1067
    private class StatusAndSources {
1068

    
1069
        private final PresenceAbsenceTerm status;
1070

    
1071
        private final Set<DescriptionElementSource> sources = new HashSet<>();
1072

    
1073
        public StatusAndSources(PresenceAbsenceTerm status, Set<DescriptionElementSource> sources) {
1074
            this.status = status;
1075
            addSourcesDeduplicated(this.sources, sources);
1076
        }
1077

    
1078
        /**
1079
         * @param sources
1080
         */
1081
        public void addSources(Set<DescriptionElementSource> sources) {
1082
            addSourcesDeduplicated(this.sources, sources);
1083
        }
1084

    
1085
    }
1086
}
    (1-1/1)