cleanup
[cdmlib.git] / cdmlib-services / src / main / java / eu / etaxonomy / cdm / api / service / description / TransmissionEngineDistribution.java
1 /**
2 * Copyright (C) 2013 EDIT
3 * European Distributed Institute of Taxonomy
4 * http://www.e-taxonomy.eu
5 *
6 * The contents of this file are subject to the Mozilla Public License Version 1.1
7 * See LICENSE.TXT at the top of this package for the full license terms.
8 */
9 package eu.etaxonomy.cdm.api.service.description;
10
11 import java.util.ArrayList;
12 import java.util.Arrays;
13 import java.util.HashMap;
14 import java.util.HashSet;
15 import java.util.Iterator;
16 import java.util.LinkedList;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.Set;
20 import java.util.UUID;
21
22 import org.apache.log4j.Logger;
23 import org.hibernate.FlushMode;
24 import org.hibernate.HibernateException;
25 import org.hibernate.Session;
26 import org.hibernate.engine.spi.SessionFactoryImplementor;
27 import org.hibernate.search.Search;
28 import org.springframework.beans.factory.annotation.Autowired;
29 import org.springframework.orm.hibernate5.HibernateTransactionManager;
30 import org.springframework.stereotype.Service;
31 import org.springframework.transaction.TransactionDefinition;
32 import org.springframework.transaction.TransactionStatus;
33 import org.springframework.transaction.support.DefaultTransactionDefinition;
34
35 import eu.etaxonomy.cdm.api.service.IClassificationService;
36 import eu.etaxonomy.cdm.api.service.IDescriptionService;
37 import eu.etaxonomy.cdm.api.service.INameService;
38 import eu.etaxonomy.cdm.api.service.ITaxonService;
39 import eu.etaxonomy.cdm.api.service.ITermService;
40 import eu.etaxonomy.cdm.common.DynamicBatch;
41 import eu.etaxonomy.cdm.common.JvmLimitsException;
42 import eu.etaxonomy.cdm.common.monitor.IProgressMonitor;
43 import eu.etaxonomy.cdm.common.monitor.NullProgressMonitor;
44 import eu.etaxonomy.cdm.common.monitor.SubProgressMonitor;
45 import eu.etaxonomy.cdm.model.common.CdmBase;
46 import eu.etaxonomy.cdm.model.common.DefinedTermBase;
47 import eu.etaxonomy.cdm.model.common.Extension;
48 import eu.etaxonomy.cdm.model.common.ExtensionType;
49 import eu.etaxonomy.cdm.model.common.Marker;
50 import eu.etaxonomy.cdm.model.common.MarkerType;
51 import eu.etaxonomy.cdm.model.common.OrderedTermBase;
52 import eu.etaxonomy.cdm.model.description.DescriptionElementBase;
53 import eu.etaxonomy.cdm.model.description.DescriptionElementSource;
54 import eu.etaxonomy.cdm.model.description.Distribution;
55 import eu.etaxonomy.cdm.model.description.PresenceAbsenceTerm;
56 import eu.etaxonomy.cdm.model.description.TaxonDescription;
57 import eu.etaxonomy.cdm.model.location.NamedArea;
58 import eu.etaxonomy.cdm.model.name.Rank;
59 import eu.etaxonomy.cdm.model.taxon.Classification;
60 import eu.etaxonomy.cdm.model.taxon.Taxon;
61 import eu.etaxonomy.cdm.model.taxon.TaxonBase;
62 import eu.etaxonomy.cdm.persistence.dto.ClassificationLookupDTO;
63
64 /**
65 *
66 * <h2>GENERAL NOTES </h2>
67 * <em>TODO: These notes are directly taken from original Transmission Engine Occurrence
68 * version 14 written in Visual Basic and still need to be
69 * adapted to the java version of the transmission engine!</em>
70 *
71 * <h3>summaryStatus</h3>
72 *
73 * Each distribution information has a summaryStatus, this is an summary of the status codes
74 * as stored in the fields of emOccurrence native, introduced, cultivated, ...
75 * The summaryStatus seems to be equivalent to the CDM DistributionStatus
76 *
77 * <h3>map generation</h3>
78 *
79 * When generating maps from the accumulated distribution information some special cases have to be handled:
80 * <ol>
81 * <li>if a entered or imported status information exist for the same area for which calculated (accumulated)
82 * data is available, the calculated data has to be given preference over other data.
83 * </li>
84 * <li>If there is an area with a sub area and both areas have the same calculated status only the subarea
85 * status should be shown in the map, whereas the super area should be ignored.
86 * </li>
87 * </ol>
88 *
89 * @author Anton Güntsch (author of original Transmission Engine Occurrence version 14 written in Visual Basic)
90 * @author Andreas Kohlbecker (2013, porting Transmission Engine Occurrence to Java)
91 * @date Feb 22, 2013
92 */
93 @Service
94 public class TransmissionEngineDistribution { //TODO extends IoBase?
95
96
97 public static final String EXTENSION_VALUE_PREFIX = "transmissionEngineDistribution.priority:";
98
99 public static final Logger logger = Logger.getLogger(TransmissionEngineDistribution.class);
100
101 /**
102 * only used for performance testing
103 */
104 final boolean ONLY_FISRT_BATCH = false;
105
106
107 protected static final List<String> TAXONDESCRIPTION_INIT_STRATEGY = Arrays.asList(new String [] {
108 "description.markers.markerType",
109 "description.elements.markers.markerType",
110 "description.elements.area",
111 "description.elements.status",
112 "description.elements.sources.citation.authorship",
113 // "description.elements.sources.nameUsedInSource",
114 // "description.elements.multilanguageText",
115 // "name.status.type",
116 });
117
118
119 /**
120 * A map which contains the status terms as key and the priority as value
121 * The map will contain both, the PresenceTerms and the AbsenceTerms
122 */
123 private Map<PresenceAbsenceTerm, Integer> statusPriorityMap = null;
124
125 @Autowired
126 private IDescriptionService descriptionService;
127
128 @Autowired
129 private ITermService termService;
130
131 @Autowired
132 private ITaxonService taxonService;
133
134 @Autowired
135 private IClassificationService classificationService;
136
137 @Autowired
138 private INameService mameService;
139
140 @Autowired
141 private HibernateTransactionManager transactionManager;
142
143 private List<PresenceAbsenceTerm> byAreaIgnoreStatusList = null;
144
145 private List<PresenceAbsenceTerm> byRankIgnoreStatusList = null;
146
147 private final Map<NamedArea, Set<NamedArea>> subAreaMap = new HashMap<NamedArea, Set<NamedArea>>();
148
149 int byRankTicks = 300;
150 int byAreasTicks = 100;
151
152
153 private static final long BATCH_MIN_FREE_HEAP = 800 * 1024 * 1024;
154 /**
155 * ratio of the initially free heap which should not be used
156 * during the batch processing. This amount of the heap is reserved
157 * for the flushing of the session and to the index
158 */
159 private static final double BATCH_FREE_HEAP_RATIO = 0.9;
160 private static final int BATCH_SIZE_BY_AREA = 1000;
161 private static final int BATCH_SIZE_BY_RANK = 500;
162
163 private long batchMinFreeHeap = BATCH_MIN_FREE_HEAP;
164
165
166
167 /**
168 * byAreaIgnoreStatusList contains by default:
169 * <ul>
170 * <li>AbsenceTerm.CULTIVATED_REPORTED_IN_ERROR()</li>
171 * <li>AbsenceTerm.INTRODUCED_REPORTED_IN_ERROR()</li>
172 * <li>AbsenceTerm.INTRODUCED_FORMERLY_INTRODUCED()</li>
173 * <li>AbsenceTerm.NATIVE_REPORTED_IN_ERROR()</li>
174 * <li>AbsenceTerm.NATIVE_FORMERLY_NATIVE()</li>
175 * </ul>
176 *
177 * @return the byAreaIgnoreStatusList
178 */
179 public List<PresenceAbsenceTerm> getByAreaIgnoreStatusList() {
180 if(byAreaIgnoreStatusList == null ){
181 byAreaIgnoreStatusList = Arrays.asList(
182 new PresenceAbsenceTerm[] {
183 PresenceAbsenceTerm.CULTIVATED_REPORTED_IN_ERROR(),
184 PresenceAbsenceTerm.INTRODUCED_REPORTED_IN_ERROR(),
185 PresenceAbsenceTerm.NATIVE_REPORTED_IN_ERROR(),
186 PresenceAbsenceTerm.INTRODUCED_FORMERLY_INTRODUCED(),
187 PresenceAbsenceTerm.NATIVE_FORMERLY_NATIVE()
188 // TODO what about PresenceAbsenceTerm.ABSENT() also ignore?
189 });
190 }
191 return byAreaIgnoreStatusList;
192 }
193
194 /**
195 * @param byAreaIgnoreStatusList the byAreaIgnoreStatusList to set
196 */
197 public void setByAreaIgnoreStatusList(List<PresenceAbsenceTerm> byAreaIgnoreStatusList) {
198 this.byAreaIgnoreStatusList = byAreaIgnoreStatusList;
199 }
200
201 /**
202 * byRankIgnoreStatusList contains by default
203 * <ul>
204 * <li>PresenceTerm.ENDEMIC_FOR_THE_RELEVANT_AREA()</li>
205 * </ul>
206 *
207 * @return the byRankIgnoreStatusList
208 */
209 public List<PresenceAbsenceTerm> getByRankIgnoreStatusList() {
210
211 if (byRankIgnoreStatusList == null) {
212 byRankIgnoreStatusList = Arrays.asList(
213 new PresenceAbsenceTerm[] {
214 PresenceAbsenceTerm.ENDEMIC_FOR_THE_RELEVANT_AREA()
215 });
216 }
217 return byRankIgnoreStatusList;
218 }
219
220 /**
221 * @param byRankIgnoreStatusList the byRankIgnoreStatusList to set
222 */
223 public void setByRankIgnoreStatusList(List<PresenceAbsenceTerm> byRankIgnoreStatusList) {
224 this.byRankIgnoreStatusList = byRankIgnoreStatusList;
225 }
226
227 /**
228 *
229 * @param superAreas
230 */
231 public TransmissionEngineDistribution() {
232 }
233
234 /**
235 * initializes the map which contains the status terms as key and the priority as value
236 * The map will contain both, the PresenceTerms and the AbsenceTerms
237 */
238 private void initializeStatusPriorityMap() {
239
240 statusPriorityMap = new HashMap<PresenceAbsenceTerm, Integer>();
241 Integer priority;
242
243 // PresenceTerms
244 for(PresenceAbsenceTerm term : termService.list(PresenceAbsenceTerm.class, null, null, null, null)){
245 priority = getPriorityFor(term);
246 if(priority != null){
247 statusPriorityMap.put(term, priority);
248 }
249 }
250 }
251
252 /**
253 * Compares the PresenceAbsenceTermBase terms contained in <code>a.status</code> and <code>b.status</code> after
254 * the priority as stored in the statusPriorityMap. The StatusAndSources object with
255 * the higher priority is returned. In the case of <code>a == b</code> the sources of b will be added to the sources
256 * of a.
257 *
258 * If either a or b or the status are null b or a is returned.
259 *
260 * @see initializeStatusPriorityMap()
261 *
262 * @param a
263 * @param b
264 * @param sourcesForWinnerB
265 * In the case when <code>b</code> is preferred over <code>a</code> these Set of sources will be added to the sources of <code>b</code>
266 * @return
267 */
268 private StatusAndSources choosePreferred(StatusAndSources a, StatusAndSources b, Set<DescriptionElementSource> sourcesForWinnerB){
269
270 if (statusPriorityMap == null) {
271 initializeStatusPriorityMap();
272 }
273
274 if (b == null || b.status == null) {
275 return a;
276 }
277 if (a == null || a.status == null) {
278 return b;
279 }
280
281 if (statusPriorityMap.get(a.status) == null) {
282 logger.warn("No priority found in map for " + a.status.getLabel());
283 return b;
284 }
285 if (statusPriorityMap.get(b.status) == null) {
286 logger.warn("No priority found in map for " + b.status.getLabel());
287 return a;
288 }
289 if(statusPriorityMap.get(a.status) < statusPriorityMap.get(b.status)){
290 if(sourcesForWinnerB != null) {
291 b.addSources(sourcesForWinnerB);
292 }
293 return b;
294 } else if (statusPriorityMap.get(a.status) == statusPriorityMap.get(b.status)){
295 a.addSources(b.sources);
296 return a;
297 } else {
298 return a;
299 }
300 }
301
302 /**
303 * reads the priority for the given status term from the extensions.
304 *
305 * @param term
306 * @return the priority value
307 */
308 private Integer getPriorityFor(DefinedTermBase<?> term) {
309 Set<Extension> extensions = term.getExtensions();
310 for(Extension extension : extensions){
311 if(!extension.getType().equals(ExtensionType.ORDER())) {
312 continue;
313 }
314 int pos = extension.getValue().indexOf(EXTENSION_VALUE_PREFIX);
315 if(pos == 0){ // if starts with EXTENSION_VALUE_PREFIX
316 try {
317 Integer priority = Integer.valueOf(extension.getValue().substring(EXTENSION_VALUE_PREFIX.length()));
318 return priority;
319 } catch (NumberFormatException e) {
320 logger.warn("Invalid number format in Extension:" + extension.getValue());
321 }
322 }
323 }
324 logger.warn("no priority defined for '" + term.getLabel() + "'");
325 return null;
326 }
327
328 /**
329 * runs both steps
330 * <ul>
331 * <li>Step 1: Accumulate occurrence records by area</li>
332 * <li>Step 2: Accumulate by ranks starting from lower rank to upper rank,
333 * the status of all children are accumulated on each rank starting from
334 * lower rank to upper rank.</li>
335 * </ul>
336 *
337 * @param superAreas
338 * the areas to which the subordinate areas should be projected.
339 * @param lowerRank
340 * @param upperRank
341 * @param classification
342 * @param classification
343 * limit the accumulation process to a specific classification
344 * (not yet implemented)
345 * @param monitor
346 * the progress monitor to use for reporting progress to the
347 * user. It is the caller's responsibility to call done() on the
348 * given monitor. Accepts null, indicating that no progress
349 * should be reported and that the operation cannot be cancelled.
350 */
351 public void accumulate(AggregationMode mode, List<NamedArea> superAreas, Rank lowerRank, Rank upperRank,
352 Classification classification, IProgressMonitor monitor) throws JvmLimitsException {
353
354 if (monitor == null) {
355 monitor = new NullProgressMonitor();
356 }
357
358 // only for debugging:
359 //logger.setLevel(Level.TRACE); // TRACE will slow down a lot since it forces loading all term representations
360 //Logger.getLogger("org.hibernate.SQL").setLevel(Level.DEBUG);
361
362 logger.info("Hibernate JDBC Batch size: "
363 + ((SessionFactoryImplementor) getSession().getSessionFactory()).getSettings().getJdbcBatchSize());
364
365 Set<Classification> classifications = new HashSet<Classification>();
366 if(classification == null) {
367 classifications.addAll(classificationService.listClassifications(null, null, null, null));
368 } else {
369 classifications.add(classification);
370 }
371
372 int aggregationWorkTicks;
373 switch(mode){
374 case byAreasAndRanks:
375 aggregationWorkTicks = byAreasTicks + byRankTicks;
376 break;
377 case byAreas:
378 aggregationWorkTicks = byAreasTicks;
379 break;
380 case byRanks:
381 aggregationWorkTicks = byRankTicks;
382 break;
383 default:
384 aggregationWorkTicks = 0;
385 break;
386 }
387
388 // take start time for performance testing
389 // NOTE: use ONLY_FISRT_BATCH = true to measure only one batch
390 double start = System.currentTimeMillis();
391
392 monitor.beginTask("Accumulating distributions", (classifications.size() * aggregationWorkTicks) + 1 );
393
394 updatePriorities();
395
396 List<Rank> ranks = rankInterval(lowerRank, upperRank);
397
398 monitor.worked(1);
399
400
401 for(Classification _classification : classifications) {
402
403 ClassificationLookupDTO classificationLookupDao = classificationService.classificationLookup(_classification);
404 classificationLookupDao.filterInclude(ranks);
405
406 double end1 = System.currentTimeMillis();
407 logger.info("Time elapsed for classificationLookup() : " + (end1 - start) / (1000) + "s");
408 double start2 = System.currentTimeMillis();
409
410 monitor.subTask("Accumulating distributions to super areas for " + _classification.getTitleCache());
411 if (mode.equals(AggregationMode.byAreas) || mode.equals(AggregationMode.byAreasAndRanks)) {
412 accumulateByArea(superAreas, classificationLookupDao, new SubProgressMonitor(monitor, byAreasTicks), true);
413 }
414 monitor.subTask("Accumulating distributions to higher ranks for " + _classification.getTitleCache());
415
416 double end2 = System.currentTimeMillis();
417 logger.info("Time elapsed for accumulateByArea() : " + (end2 - start2) / (1000) + "s");
418
419 double start3 = System.currentTimeMillis();
420 if (mode.equals(AggregationMode.byRanks) || mode.equals(AggregationMode.byAreasAndRanks)) {
421 accumulateByRank(ranks, classificationLookupDao, new SubProgressMonitor(monitor, byRankTicks), mode.equals(AggregationMode.byRanks));
422 }
423
424 double end3 = System.currentTimeMillis();
425 logger.info("Time elapsed for accumulateByRank() : " + (end3 - start3) / (1000) + "s");
426 logger.info("Time elapsed for accumulate(): " + (end3 - start) / (1000) + "s");
427
428 if(ONLY_FISRT_BATCH) {
429 monitor.done();
430 break;
431 }
432 }
433 monitor.done();
434 }
435
436
437 /**
438 * Step 1: Accumulate occurrence records by area
439 * <ul>
440 * <li>areas are projected to super areas e.g.: HS <-- HS(A), HS(G), HS(S)</li>
441 * <li>super areas do initially not have a status set ==> Prerequisite to check in CDM</li>
442 * <li>areas having a summary status of summary value different from {@link #getByAreaIgnoreStatusList()} are ignored</li>
443 * <li>areas have a priority value, the status of the area with highest priority determines the status of the super area</li>
444 * <li>the source references of the accumulated distributions are also accumulated into the new distribution,,</li>
445 * <li>this has been especially implemented for the EuroMed Checklist Vol2 and might not be a general requirement</li>
446 * </ul>
447 *
448 * @param superAreas
449 * the areas to which the subordinate areas should be projected
450 * @param classificationLookupDao
451 * @throws JvmLimitsException
452 *
453 */
454 protected void accumulateByArea(List<NamedArea> superAreas, ClassificationLookupDTO classificationLookupDao, IProgressMonitor subMonitor, boolean doClearDescriptions) throws JvmLimitsException {
455
456 DynamicBatch batch = new DynamicBatch(BATCH_SIZE_BY_AREA, batchMinFreeHeap);
457 batch.setRequiredFreeHeap(BATCH_FREE_HEAP_RATIO);
458
459 TransactionStatus txStatus = startTransaction(false);
460
461 // reload superAreas TODO is it faster to getSession().merge(object) ??
462 Set<UUID> superAreaUuids = new HashSet<>(superAreas.size());
463 for (NamedArea superArea : superAreas){
464 superAreaUuids.add(superArea.getUuid());
465 }
466
467 // visit all accepted taxa
468 subMonitor.beginTask("Accumulating by area ", classificationLookupDao.getTaxonIds().size());
469 Iterator<Integer> taxonIdIterator = classificationLookupDao.getTaxonIds().iterator();
470
471 while (taxonIdIterator.hasNext() || batch.hasUnprocessedItems()) {
472
473 if(txStatus == null) {
474 // transaction has been comitted at the end of this batch, start a new one
475 txStatus = startTransaction(false);
476 }
477
478 // the session is cleared after each batch, so load the superAreaList for each batch
479 List<NamedArea> superAreaList = (List)termService.find(superAreaUuids);
480
481 // load taxa for this batch
482 List<Integer> taxonIds = batch.nextItems(taxonIdIterator);
483 // logger.debug("accumulateByArea() - taxon " + taxonPager.getFirstRecord() + " to " + taxonPager.getLastRecord() + " of " + taxonPager.getCount() + "]");
484 List<TaxonBase> taxa = taxonService.loadByIds(taxonIds, TAXONDESCRIPTION_INIT_STRATEGY);
485
486 // iterate over the taxa and accumulate areas
487 // start processing the new batch
488
489 for(TaxonBase<?> taxonBase : taxa) {
490 if(logger.isDebugEnabled()){
491 logger.debug("accumulateByArea() - taxon :" + taxonToString(taxonBase));
492 }
493
494 batch.incementCounter();
495
496 Taxon taxon = (Taxon)taxonBase;
497 TaxonDescription description = findComputedDescription(taxon, doClearDescriptions);
498 List<Distribution> distributions = distributionsFor(taxon);
499
500 // Step through superAreas for accumulation of subAreas
501 for (NamedArea superArea : superAreaList){
502
503 // accumulate all sub area status
504 StatusAndSources accumulatedStatusAndSources = null;
505 // TODO consider using the TermHierarchyLookup (only in local branch a.kohlbecker)
506 Set<NamedArea> subAreas = getSubAreasFor(superArea);
507 for(NamedArea subArea : subAreas){
508 if(logger.isTraceEnabled()){
509 logger.trace("accumulateByArea() - \t\t" + termToString(subArea));
510 }
511 // step through all distributions for the given subArea
512 for(Distribution distribution : distributions){
513 if(distribution.getArea() != null && distribution.getArea().equals(subArea) && distribution.getStatus() != null) {
514 PresenceAbsenceTerm status = distribution.getStatus();
515 if(logger.isTraceEnabled()){
516 logger.trace("accumulateByArea() - \t\t" + termToString(subArea) + ": " + termToString(status));
517 }
518 // skip all having a status value different of those in byAreaIgnoreStatusList
519 if (getByAreaIgnoreStatusList().contains(status)){
520 continue;
521 }
522 StatusAndSources subStatusAndSources = new StatusAndSources(status, distribution.getSources());
523 accumulatedStatusAndSources = choosePreferred(accumulatedStatusAndSources, subStatusAndSources, null);
524 }
525 }
526 } // next sub area
527 if (accumulatedStatusAndSources != null) {
528 if(logger.isDebugEnabled()){
529 logger.debug("accumulateByArea() - \t >> " + termToString(superArea) + ": " + termToString(accumulatedStatusAndSources.status));
530 }
531 // store new distribution element for superArea in taxon description
532 Distribution newDistribitionElement = Distribution.NewInstance(superArea, accumulatedStatusAndSources.status);
533 newDistribitionElement.getSources().addAll(accumulatedStatusAndSources.sources);
534 newDistribitionElement.addMarker(Marker.NewInstance(MarkerType.COMPUTED(), true));
535 description.addElement(newDistribitionElement);
536 }
537
538 } // next super area ....
539
540 descriptionService.saveOrUpdate(description);
541 taxonService.saveOrUpdate(taxon);
542 subMonitor.worked(1);
543 if(!batch.isWithinJvmLimits()) {
544 break; // flushAndClear and start with new batch
545 }
546
547 } // next taxon
548
549 flushAndClear();
550
551 // commit for every batch, otherwise the persistent context
552 // may grow too much and eats up all the heap
553 commitTransaction(txStatus);
554 txStatus = null;
555
556 if(ONLY_FISRT_BATCH) {
557 break;
558 }
559
560 } // next batch of taxa
561
562 subMonitor.done();
563 }
564
565 /**
566 * Step 2: Accumulate by ranks starting from lower rank to upper rank, the status of all children
567 * are accumulated on each rank starting from lower rank to upper rank.
568 * <ul>
569 * <li>aggregate distribution of included taxa of the next lower rank for any rank level starting from the lower rank (e.g. sub species)
570 * up to upper rank (e.g. Genus)</li>
571 * <li>the accumulation id done for each distribution area found in the included taxa</li>
572 * <li>areas of subtaxa with status endemic are ignored</li>
573 * <li>the status with the highest priority determines the value for the accumulated distribution</li>
574 * <li>the source reference of the accumulated distributions are also accumulated into the new distribution,
575 * this has been especially implemented for the EuroMed Checklist Vol2 and might not be a general requirement</li>
576 *</ul>
577 * @throws JvmLimitsException
578 */
579 protected void accumulateByRank(List<Rank> rankInterval, ClassificationLookupDTO classificationLookupDao, IProgressMonitor subMonitor, boolean doClearDescriptions) throws JvmLimitsException {
580
581 DynamicBatch batch = new DynamicBatch(BATCH_SIZE_BY_RANK, batchMinFreeHeap);
582 batch.setRequiredFreeHeap(BATCH_FREE_HEAP_RATIO);
583 batch.setMaxAllowedGcIncreases(10);
584
585 int ticksPerRank = 100;
586
587 TransactionStatus txStatus = startTransaction(false);
588
589 // the loadRankSpecificRootNodes() method not only finds
590 // taxa of the specified rank but also taxa of lower ranks
591 // if no taxon of the specified rank exists, so we need to
592 // remember which taxa have been processed already
593 Set<Integer> taxaProcessedIds = new HashSet<>();
594 List<TaxonBase> taxa = null;
595 List<TaxonBase> childTaxa = null;
596
597 List<Rank> ranks = rankInterval;
598
599 subMonitor.beginTask("Accumulating by rank", ranks.size() * ticksPerRank);
600
601 for (Rank rank : ranks) {
602
603 if(logger.isDebugEnabled()){
604 logger.debug("accumulateByRank() - at Rank '" + termToString(rank) + "'");
605 }
606
607 Set<Integer> taxonIdsPerRank = classificationLookupDao.getTaxonIdByRank().get(rank);
608
609 int taxonCountperRank = taxonIdsPerRank != null ? taxonIdsPerRank.size() : 0;
610
611 SubProgressMonitor taxonSubMonitor = new SubProgressMonitor(subMonitor, ticksPerRank);
612 taxonSubMonitor.beginTask("Accumulating by rank " + termToString(rank), taxonCountperRank);
613
614 if(taxonCountperRank == 0) {
615 taxonSubMonitor.done();
616 continue;
617 }
618
619
620 Iterator<Integer> taxonIdIterator = taxonIdsPerRank.iterator();
621 while (taxonIdIterator.hasNext() || batch.hasUnprocessedItems()) {
622
623 if(txStatus == null) {
624 // transaction has been committed at the end of this batch, start a new one
625 txStatus = startTransaction(false);
626 }
627
628 // load taxa for this batch
629 List<Integer> taxonIds = batch.nextItems(taxonIdIterator);
630
631 taxa = taxonService.loadByIds(taxonIds, null);
632
633 // if(logger.isDebugEnabled()){
634 // logger.debug("accumulateByRank() - taxon " + taxonPager.getFirstRecord() + " to " + taxonPager.getLastRecord() + " of " + taxonPager.getCount() + "]");
635 // }
636
637 for(TaxonBase<?> taxonBase : taxa) {
638
639 batch.incementCounter();
640
641 Taxon taxon = (Taxon)taxonBase;
642 if (taxaProcessedIds.contains(taxon.getId())) {
643 if(logger.isDebugEnabled()){
644 logger.debug("accumulateByRank() - skipping already processed taxon :" + taxonToString(taxon));
645 }
646 continue;
647 }
648 taxaProcessedIds.add(taxon.getId());
649 if(logger.isDebugEnabled()){
650 logger.debug("accumulateByRank() [" + rank.getLabel() + "] - taxon :" + taxonToString(taxon));
651 }
652
653 // Step through direct taxonomic children for accumulation
654 Map<NamedArea, StatusAndSources> accumulatedStatusMap = new HashMap<NamedArea, StatusAndSources>();
655
656 List<Integer> childTaxonIds = new ArrayList<>();
657 Set<Integer> childSet = classificationLookupDao.getChildTaxonMap().get(taxon.getId());
658 if(childSet != null) {
659 childTaxonIds.addAll(childSet);
660 }
661 if(!childTaxonIds.isEmpty()) {
662 childTaxa = taxonService.loadByIds(childTaxonIds, TAXONDESCRIPTION_INIT_STRATEGY);
663 LinkedList<TaxonBase> childStack = new LinkedList<>(childTaxa);
664 childTaxa = null; // allow to be garbage collected
665
666 while(childStack.size() > 0){
667
668 TaxonBase<?> childTaxonBase = childStack.pop();
669 getSession().setReadOnly(childTaxonBase, true);
670
671 Taxon childTaxon = (Taxon) childTaxonBase;
672 getSession().setReadOnly(childTaxon, true);
673 if(logger.isTraceEnabled()){
674 logger.trace(" subtaxon :" + taxonToString(childTaxon));
675 }
676
677 for(Distribution distribution : distributionsFor(childTaxon) ) {
678 PresenceAbsenceTerm status = distribution.getStatus();
679 NamedArea area = distribution.getArea();
680 if (status == null || getByRankIgnoreStatusList().contains(status)){
681 continue;
682 }
683
684 StatusAndSources subStatusAndSources = new StatusAndSources(status, distribution.getSources());
685 accumulatedStatusMap.put(area, choosePreferred(accumulatedStatusMap.get(area), subStatusAndSources, null));
686 }
687
688 // evict all initialized entities of the childTaxon
689 // TODO consider using cascade="evict" in the model classes
690 // for( TaxonDescription description : ((Taxon)childTaxonBase).getDescriptions()) {
691 // for (DescriptionElementBase deb : description.getElements()) {
692 // getSession().evict(deb);
693 // }
694 // getSession().evict(description); // this causes in some cases the taxon object to be detached from the session
695 // }
696 getSession().evict(childTaxonBase); // no longer needed, save heap
697 }
698
699 if(accumulatedStatusMap.size() > 0) {
700 TaxonDescription description = findComputedDescription(taxon, doClearDescriptions);
701 for (NamedArea area : accumulatedStatusMap.keySet()) {
702 Distribution distribition = findDistribution(description, area, accumulatedStatusMap.get(area).status);
703 if(distribition == null) {
704 // create a new distribution element
705 distribition = Distribution.NewInstance(area, accumulatedStatusMap.get(area).status);
706 distribition.addMarker(Marker.NewInstance(MarkerType.COMPUTED(), true));
707 }
708 addSourcesDeduplicated(distribition.getSources(), accumulatedStatusMap.get(area).sources);
709
710 description.addElement(distribition);
711 }
712 taxonService.saveOrUpdate(taxon);
713 descriptionService.saveOrUpdate(description);
714 }
715
716 }
717 taxonSubMonitor.worked(1); // one taxon worked
718 if(!batch.isWithinJvmLimits()) {
719 break; // flushAndClear and start with new batch
720 }
721
722 } // next taxon ....
723
724 flushAndClear();
725
726 // commit for every batch, otherwise the persistent context
727 // may grow too much and eats up all the heap
728 commitTransaction(txStatus);
729 txStatus = null;
730
731 // flushing the session and to the index (flushAndClear() ) can impose a
732 // massive heap consumption. therefore we explicitly do a check after the
733 // flush to detect these situations and to reduce the batch size.
734 if(batch.getJvmMonitor().getGCRateSiceLastCheck() > 0.05) {
735 batch.reduceSize(0.5);
736 }
737
738 if(ONLY_FISRT_BATCH) {
739 break;
740 }
741 } // next batch
742
743 taxonSubMonitor.done();
744 subMonitor.worked(1);
745
746 if(ONLY_FISRT_BATCH) {
747 break;
748 }
749 } // next Rank
750
751 logger.info("accumulateByRank() - done");
752 subMonitor.done();
753 }
754
755 /**
756 * @param description
757 * @param area
758 * @param status
759 * @return
760 */
761 private Distribution findDistribution(TaxonDescription description, NamedArea area, PresenceAbsenceTerm status) {
762 for(DescriptionElementBase item : description.getElements()) {
763 if(!(item instanceof Distribution)) {
764 continue;
765 }
766 Distribution distribution = ((Distribution)item);
767 if(distribution.getArea().equals(area) && distribution.getStatus().equals(status)) {
768 return distribution;
769 }
770 }
771 return null;
772 }
773
774 /**
775 * @param lowerRank
776 * @param upperRank
777 * @return
778 */
779 private List<Rank> rankInterval(Rank lowerRank, Rank upperRank) {
780
781 TransactionStatus txStatus = startTransaction(false);
782 Rank currentRank = lowerRank;
783 List<Rank> ranks = new ArrayList<>();
784 ranks.add(currentRank);
785 while (!currentRank.isHigher(upperRank)) {
786 currentRank = findNextHigherRank(currentRank);
787 ranks.add(currentRank);
788 }
789 commitTransaction(txStatus);
790 txStatus = null;
791 return ranks;
792 }
793
794 /**
795 * @return
796 */
797 private Session getSession() {
798 return descriptionService.getSession();
799 }
800
801 /**
802 *
803 */
804 private void flush() {
805 logger.debug("flushing session ...");
806 getSession().flush();
807 try {
808 logger.debug("flushing to indexes ...");
809 Search.getFullTextSession(getSession()).flushToIndexes();
810 } catch (HibernateException e) {
811 /* IGNORE - Hibernate Search Event listeners not configured ... */
812 if(!e.getMessage().startsWith("Hibernate Search Event listeners not configured")){
813 throw e;
814 }
815 }
816 }
817
818 /**
819 *
820 */
821 private void flushAndClear() {
822 flush();
823 logger.debug("clearing session ...");
824 getSession().clear();
825 }
826
827
828 // TODO merge with CdmRepository#startTransaction() into common base class
829 public TransactionStatus startTransaction(Boolean readOnly) {
830
831 DefaultTransactionDefinition defaultTxDef = new DefaultTransactionDefinition();
832 defaultTxDef.setReadOnly(readOnly);
833 TransactionDefinition txDef = defaultTxDef;
834
835 // Log some transaction-related debug information.
836 if (logger.isTraceEnabled()) {
837 logger.trace("Transaction name = " + txDef.getName());
838 logger.trace("Transaction facets:");
839 logger.trace("Propagation behavior = " + txDef.getPropagationBehavior());
840 logger.trace("Isolation level = " + txDef.getIsolationLevel());
841 logger.trace("Timeout = " + txDef.getTimeout());
842 logger.trace("Read Only = " + txDef.isReadOnly());
843 // org.springframework.orm.hibernate5.HibernateTransactionManager
844 // provides more transaction/session-related debug information.
845 }
846
847 TransactionStatus txStatus = transactionManager.getTransaction(txDef);
848
849 getSession().setFlushMode(FlushMode.COMMIT);
850
851 return txStatus;
852 }
853
854 // TODO merge with CdmRepository#startTransaction() into common base class
855 public void commitTransaction(TransactionStatus txStatus){
856 logger.debug("commiting transaction ...");
857 transactionManager.commit(txStatus);
858 return;
859 }
860
861 /**
862 * returns the next higher rank
863 *
864 * TODO better implement OrderedTermBase.getNextHigherTerm() and OrderedTermBase.getNextLowerTerm()?
865 *
866 * @param rank
867 * @return
868 */
869 private Rank findNextHigherRank(Rank rank) {
870 rank = (Rank) termService.load(rank.getUuid());
871 return rank.getNextHigherTerm();
872 // OrderedTermVocabulary<Rank> rankVocabulary = mameService.getRankVocabulary();;
873 // return rankVocabulary.getNextHigherTerm(rank);
874 }
875
876 /**
877 * Either finds an existing taxon description of the given taxon or creates a new one.
878 * If the doClear is set all existing description elements will be cleared.
879 *
880 * @param taxon
881 * @param doClear will remove all existing Distributions if the taxon already
882 * has a MarkerType.COMPUTED() TaxonDescription
883 * @return
884 */
885 private TaxonDescription findComputedDescription(Taxon taxon, boolean doClear) {
886
887 String descriptionTitle = this.getClass().getSimpleName();
888
889 // find existing one
890 for (TaxonDescription description : taxon.getDescriptions()) {
891 if (description.hasMarker(MarkerType.COMPUTED(), true)) {
892 logger.debug("reusing computed description for " + taxon.getTitleCache());
893 if (doClear) {
894 int deleteCount = 0;
895 Set<DescriptionElementBase> deleteCandidates = new HashSet<>();
896 for (DescriptionElementBase descriptionElement : description.getElements()) {
897 if(descriptionElement instanceof Distribution) {
898 deleteCandidates.add(descriptionElement);
899 }
900 }
901 if(deleteCandidates.size() > 0){
902 for(DescriptionElementBase descriptionElement : deleteCandidates) {
903 description.removeElement(descriptionElement);
904 descriptionService.deleteDescriptionElement(descriptionElement);
905 descriptionElement = null;
906 deleteCount++;
907 }
908 descriptionService.saveOrUpdate(description);
909 logger.debug("\t" + deleteCount +" distributions cleared");
910 }
911
912 }
913 return description;
914 }
915 }
916
917 // create a new one
918 logger.debug("creating new description for " + taxon.getTitleCache());
919 TaxonDescription description = TaxonDescription.NewInstance(taxon);
920 description.setTitleCache(descriptionTitle, true);
921 description.addMarker(Marker.NewInstance(MarkerType.COMPUTED(), true));
922 return description;
923 }
924
925 /**
926 * @param superArea
927 * @return
928 */
929 private Set<NamedArea> getSubAreasFor(NamedArea superArea) {
930
931 if(!subAreaMap.containsKey(superArea)) {
932 if(logger.isDebugEnabled()){
933 logger.debug("loading included areas for " + superArea.getLabel());
934 }
935 subAreaMap.put(superArea, superArea.getIncludes());
936 }
937 return subAreaMap.get(superArea);
938 }
939
940 /**
941 * @param taxon
942 * @return
943 */
944 private List<Distribution> distributionsFor(Taxon taxon) {
945 List<Distribution> distributions = new ArrayList<>();
946 for(TaxonDescription description: taxon.getDescriptions()) {
947 readOnlyIfInSession(description);
948 for(DescriptionElementBase deb : description.getElements()) {
949 if(deb instanceof Distribution) {
950 readOnlyIfInSession(deb);
951 distributions.add((Distribution)deb);
952 }
953 }
954 }
955 return distributions;
956 }
957
958 /**
959 * This method avoids problems when running the TransmissionEngineDistribution test.
960 * For some unknown reason entities are not in the PersitenceContext even if they are
961 * loaded by a service method. Setting these entities to readonly would raise a
962 * TransientObjectException("Instance was not associated with this persistence context")
963 *
964 * @param entity
965 */
966 private void readOnlyIfInSession(CdmBase entity) {
967 if(getSession().contains(entity)) {
968 getSession().setReadOnly(entity, true);
969 }
970 }
971
972 /**
973 * @param taxon
974 * @param logger2
975 * @return
976 */
977 private String taxonToString(TaxonBase<?> taxon) {
978 if(logger.isTraceEnabled()) {
979 return taxon.getTitleCache();
980 } else {
981 return taxon.toString();
982 }
983 }
984
985 /**
986 * @param taxon
987 * @param logger2
988 * @return
989 */
990 private String termToString(OrderedTermBase<?> term) {
991 if(logger.isTraceEnabled()) {
992 return term.getLabel() + " [" + term.getIdInVocabulary() + "]";
993 } else {
994 return term.getIdInVocabulary();
995 }
996 }
997
998 /**
999 * Sets the priorities for presence and absence terms, the priorities are stored in extensions.
1000 * This method will start a new transaction and commits it after the work is done.
1001 */
1002 public void updatePriorities() {
1003
1004 TransactionStatus txStatus = startTransaction(false);
1005
1006 Map<PresenceAbsenceTerm, Integer> priorityMap = new HashMap<>();
1007
1008 priorityMap.put(PresenceAbsenceTerm.CULTIVATED_REPORTED_IN_ERROR(), 1);
1009 priorityMap.put(PresenceAbsenceTerm.INTRODUCED_UNCERTAIN_DEGREE_OF_NATURALISATION(), 2);
1010 priorityMap.put(PresenceAbsenceTerm.INTRODUCED_FORMERLY_INTRODUCED(), 3);
1011 priorityMap.put(PresenceAbsenceTerm.INTRODUCED_REPORTED_IN_ERROR(), 20);
1012 priorityMap.put(PresenceAbsenceTerm.NATIVE_REPORTED_IN_ERROR(), 30);
1013 priorityMap.put(PresenceAbsenceTerm.CULTIVATED(), 45);
1014 priorityMap.put(PresenceAbsenceTerm.NATIVE_FORMERLY_NATIVE(), 40);
1015 priorityMap.put(PresenceAbsenceTerm.NATIVE_PRESENCE_QUESTIONABLE(), 60);
1016 priorityMap.put(PresenceAbsenceTerm.INTRODUCED_PRESENCE_QUESTIONABLE(), 50);
1017 priorityMap.put(PresenceAbsenceTerm.INTRODUCED_DOUBTFULLY_INTRODUCED(), 80);
1018 priorityMap.put(PresenceAbsenceTerm.INTRODUCED(), 90);
1019 priorityMap.put(PresenceAbsenceTerm.CASUAL(), 100);
1020 priorityMap.put(PresenceAbsenceTerm.NATURALISED(), 110);
1021 priorityMap.put(PresenceAbsenceTerm.NATIVE_DOUBTFULLY_NATIVE(), 120); // null
1022 priorityMap.put(PresenceAbsenceTerm.NATIVE(), 130); // null
1023 priorityMap.put(PresenceAbsenceTerm.ENDEMIC_FOR_THE_RELEVANT_AREA(), 999);
1024
1025 for(PresenceAbsenceTerm term : priorityMap.keySet()) {
1026 // load the term
1027 term = (PresenceAbsenceTerm) termService.load(term.getUuid());
1028 // find the extension
1029 Extension priorityExtension = null;
1030 Set<Extension> extensions = term.getExtensions();
1031 for(Extension extension : extensions){
1032 if (!extension.getType().equals(ExtensionType.ORDER())) {
1033 continue;
1034 }
1035 int pos = extension.getValue().indexOf(EXTENSION_VALUE_PREFIX);
1036 if(pos == 0){ // if starts with EXTENSION_VALUE_PREFIX
1037 priorityExtension = extension;
1038 break;
1039 }
1040 }
1041 if(priorityExtension == null) {
1042 priorityExtension = Extension.NewInstance(term, null, ExtensionType.ORDER());
1043 }
1044 priorityExtension.setValue(EXTENSION_VALUE_PREFIX + priorityMap.get(term));
1045
1046 // save the term
1047 termService.saveOrUpdate(term);
1048 if (logger.isDebugEnabled()) {
1049 logger.debug("Priority updated for " + term.getLabel());
1050 }
1051 }
1052
1053 commitTransaction(txStatus);
1054 }
1055
1056 public static void addSourcesDeduplicated(Set<DescriptionElementSource> target, Set<DescriptionElementSource> sources) {
1057 for(DescriptionElementSource source : sources) {
1058 boolean contained = false;
1059 for(DescriptionElementSource existingSource: target) {
1060 if(existingSource.equalsByShallowCompare(source)) {
1061 contained = true;
1062 break;
1063 }
1064 }
1065 if(!contained) {
1066 try {
1067 target.add((DescriptionElementSource)source.clone());
1068 } catch (CloneNotSupportedException e) {
1069 // should never happen
1070 throw new RuntimeException(e);
1071 }
1072 }
1073 }
1074 }
1075
1076 /**
1077 * @return the batchMinFreeHeap
1078 */
1079 public long getBatchMinFreeHeap() {
1080 return batchMinFreeHeap;
1081 }
1082
1083 /**
1084 * @param batchMinFreeHeap the batchMinFreeHeap to set
1085 */
1086 public void setBatchMinFreeHeap(long batchMinFreeHeap) {
1087 this.batchMinFreeHeap = batchMinFreeHeap;
1088 }
1089
1090 public enum AggregationMode {
1091 byAreas,
1092 byRanks,
1093 byAreasAndRanks
1094
1095 }
1096
1097 private class StatusAndSources {
1098
1099 private final PresenceAbsenceTerm status;
1100
1101 private final Set<DescriptionElementSource> sources = new HashSet<>();
1102
1103 public StatusAndSources(PresenceAbsenceTerm status, Set<DescriptionElementSource> sources) {
1104 this.status = status;
1105 addSourcesDeduplicated(this.sources, sources);
1106 }
1107
1108 /**
1109 * @param sources
1110 */
1111 public void addSources(Set<DescriptionElementSource> sources) {
1112 addSourcesDeduplicated(this.sources, sources);
1113 }
1114
1115 }
1116 }