14 |
14 |
import java.util.HashMap;
|
15 |
15 |
import java.util.HashSet;
|
16 |
16 |
import java.util.Iterator;
|
|
17 |
import java.util.LinkedList;
|
17 |
18 |
import java.util.List;
|
18 |
19 |
import java.util.Map;
|
19 |
20 |
import java.util.Set;
|
... | ... | |
37 |
38 |
import eu.etaxonomy.cdm.api.service.INameService;
|
38 |
39 |
import eu.etaxonomy.cdm.api.service.ITaxonService;
|
39 |
40 |
import eu.etaxonomy.cdm.api.service.ITermService;
|
|
41 |
import eu.etaxonomy.cdm.common.DynamicBatch;
|
|
42 |
import eu.etaxonomy.cdm.common.JvmLimitsException;
|
40 |
43 |
import eu.etaxonomy.cdm.common.monitor.IProgressMonitor;
|
41 |
44 |
import eu.etaxonomy.cdm.common.monitor.NullProgressMonitor;
|
42 |
45 |
import eu.etaxonomy.cdm.common.monitor.SubProgressMonitor;
|
... | ... | |
57 |
60 |
import eu.etaxonomy.cdm.model.taxon.Taxon;
|
58 |
61 |
import eu.etaxonomy.cdm.model.taxon.TaxonBase;
|
59 |
62 |
import eu.etaxonomy.cdm.persistence.dto.ClassificationLookupDTO;
|
60 |
|
import eu.etaxonomy.cdm.persistence.query.OrderHint;
|
61 |
63 |
|
62 |
64 |
/**
|
63 |
65 |
*
|
... | ... | |
91 |
93 |
@Service
|
92 |
94 |
public class TransmissionEngineDistribution { //TODO extends IoBase?
|
93 |
95 |
|
|
96 |
|
94 |
97 |
public static final String EXTENSION_VALUE_PREFIX = "transmissionEngineDistribution.priority:";
|
95 |
98 |
|
96 |
99 |
public static final Logger logger = Logger.getLogger(TransmissionEngineDistribution.class);
|
... | ... | |
143 |
146 |
|
144 |
147 |
private final Map<NamedArea, Set<NamedArea>> subAreaMap = new HashMap<NamedArea, Set<NamedArea>>();
|
145 |
148 |
|
146 |
|
private final List<OrderHint> emptyOrderHints = new ArrayList<OrderHint>(0);
|
147 |
|
|
148 |
149 |
int byRankTicks = 300;
|
149 |
150 |
int byAreasTicks = 100;
|
150 |
151 |
|
151 |
152 |
|
|
153 |
private static final long BATCH_MIN_FREE_HEAP = 800 * 1024 * 1024;
|
|
154 |
/**
|
|
155 |
* ratio of the initially free heap which should not be used
|
|
156 |
* during the batch processing. This amount of the heap is reserved
|
|
157 |
* for the flushing of the session and to the index
|
|
158 |
*/
|
|
159 |
private static final double BATCH_FREE_HEAP_RATIO = 0.9;
|
|
160 |
private static final int BATCH_SIZE_BY_AREA = 1000;
|
|
161 |
private static final int BATCH_SIZE_BY_RANK = 500;
|
|
162 |
|
|
163 |
|
|
164 |
|
152 |
165 |
/**
|
153 |
166 |
* byAreaIgnoreStatusList contains by default:
|
154 |
167 |
* <ul>
|
... | ... | |
334 |
347 |
* should be reported and that the operation cannot be cancelled.
|
335 |
348 |
*/
|
336 |
349 |
public void accumulate(AggregationMode mode, List<NamedArea> superAreas, Rank lowerRank, Rank upperRank,
|
337 |
|
Classification classification, IProgressMonitor monitor) {
|
|
350 |
Classification classification, IProgressMonitor monitor) throws JvmLimitsException {
|
338 |
351 |
|
339 |
352 |
if (monitor == null) {
|
340 |
353 |
monitor = new NullProgressMonitor();
|
341 |
354 |
}
|
342 |
355 |
|
343 |
|
|
344 |
356 |
// only for debugging:
|
345 |
357 |
//logger.setLevel(Level.TRACE); // TRACE will slow down a lot since it forces loading all term representations
|
346 |
358 |
//Logger.getLogger("org.hibernate.SQL").setLevel(Level.DEBUG);
|
... | ... | |
434 |
446 |
* @param superAreas
|
435 |
447 |
* the areas to which the subordinate areas should be projected
|
436 |
448 |
* @param classificationLookupDao
|
|
449 |
* @throws JvmLimitsException
|
437 |
450 |
*
|
438 |
451 |
*/
|
439 |
|
protected void accumulateByArea(List<NamedArea> superAreas, ClassificationLookupDTO classificationLookupDao, IProgressMonitor subMonitor, boolean doClearDescriptions) {
|
|
452 |
protected void accumulateByArea(List<NamedArea> superAreas, ClassificationLookupDTO classificationLookupDao, IProgressMonitor subMonitor, boolean doClearDescriptions) throws JvmLimitsException {
|
440 |
453 |
|
441 |
|
int batchSize = 1000;
|
|
454 |
DynamicBatch batch = new DynamicBatch(BATCH_SIZE_BY_AREA, BATCH_MIN_FREE_HEAP);
|
|
455 |
batch.setRequiredFreeHeap(BATCH_FREE_HEAP_RATIO);
|
442 |
456 |
|
443 |
457 |
TransactionStatus txStatus = startTransaction(false);
|
444 |
458 |
|
... | ... | |
452 |
466 |
subMonitor.beginTask("Accumulating by area ", classificationLookupDao.getTaxonIds().size());
|
453 |
467 |
Iterator<Integer> taxonIdIterator = classificationLookupDao.getTaxonIds().iterator();
|
454 |
468 |
|
455 |
|
while (taxonIdIterator.hasNext()) {
|
|
469 |
while (taxonIdIterator.hasNext() || batch.hasUnprocessedItems()) {
|
456 |
470 |
|
457 |
471 |
if(txStatus == null) {
|
458 |
472 |
// transaction has been comitted at the end of this batch, start a new one
|
... | ... | |
463 |
477 |
List<NamedArea> superAreaList = (List)termService.find(superAreaUuids);
|
464 |
478 |
|
465 |
479 |
// load taxa for this batch
|
466 |
|
List<TaxonBase> taxa = null;
|
467 |
|
List<Integer> taxonIds = new ArrayList<Integer>(batchSize);
|
468 |
|
while(taxonIdIterator.hasNext() && taxonIds.size() < batchSize ) {
|
469 |
|
taxonIds.add(taxonIdIterator.next());
|
470 |
|
}
|
471 |
|
|
|
480 |
List<Integer> taxonIds = batch.nextItems(taxonIdIterator);
|
472 |
481 |
// logger.debug("accumulateByArea() - taxon " + taxonPager.getFirstRecord() + " to " + taxonPager.getLastRecord() + " of " + taxonPager.getCount() + "]");
|
473 |
|
|
474 |
|
taxa = taxonService.loadByIds(taxonIds, TAXONDESCRIPTION_INIT_STRATEGY);
|
|
482 |
List<TaxonBase> taxa = taxonService.loadByIds(taxonIds, TAXONDESCRIPTION_INIT_STRATEGY);
|
475 |
483 |
|
476 |
484 |
// iterate over the taxa and accumulate areas
|
|
485 |
// start processing the new batch
|
|
486 |
|
477 |
487 |
for(TaxonBase taxonBase : taxa) {
|
478 |
488 |
if(logger.isDebugEnabled()){
|
479 |
489 |
logger.debug("accumulateByArea() - taxon :" + taxonToString(taxonBase));
|
480 |
490 |
}
|
481 |
491 |
|
|
492 |
batch.incementCounter();
|
|
493 |
|
482 |
494 |
Taxon taxon = (Taxon)taxonBase;
|
483 |
495 |
TaxonDescription description = findComputedDescription(taxon, doClearDescriptions);
|
484 |
496 |
List<Distribution> distributions = distributionsFor(taxon);
|
... | ... | |
526 |
538 |
descriptionService.saveOrUpdate(description);
|
527 |
539 |
taxonService.saveOrUpdate(taxon);
|
528 |
540 |
subMonitor.worked(1);
|
|
541 |
if(!batch.isWithinJvmLimits()) {
|
|
542 |
break; // flushAndClear and start with new batch
|
|
543 |
}
|
529 |
544 |
|
530 |
545 |
} // next taxon
|
531 |
546 |
|
... | ... | |
557 |
572 |
* <li>the source reference of the accumulated distributions are also accumulated into the new distribution,
|
558 |
573 |
* this has been especially implemented for the EuroMed Checklist Vol2 and might not be a general requirement</li>
|
559 |
574 |
*</ul>
|
|
575 |
* @throws JvmLimitsException
|
560 |
576 |
*/
|
561 |
|
protected void accumulateByRank(List<Rank> rankInterval, ClassificationLookupDTO classificationLookupDao, IProgressMonitor subMonitor, boolean doClearDescriptions) {
|
|
577 |
protected void accumulateByRank(List<Rank> rankInterval, ClassificationLookupDTO classificationLookupDao, IProgressMonitor subMonitor, boolean doClearDescriptions) throws JvmLimitsException {
|
|
578 |
|
|
579 |
DynamicBatch batch = new DynamicBatch(BATCH_SIZE_BY_RANK, BATCH_MIN_FREE_HEAP);
|
|
580 |
batch.setRequiredFreeHeap(BATCH_FREE_HEAP_RATIO);
|
|
581 |
batch.setMaxAllowedGcIncreases(10);
|
562 |
582 |
|
563 |
|
int batchSize = 500;
|
564 |
583 |
int ticksPerRank = 100;
|
565 |
584 |
|
566 |
585 |
TransactionStatus txStatus = startTransaction(false);
|
... | ... | |
597 |
616 |
|
598 |
617 |
|
599 |
618 |
Iterator<Integer> taxonIdIterator = taxonIdsPerRank.iterator();
|
600 |
|
while (taxonIdIterator.hasNext()) {
|
|
619 |
while (taxonIdIterator.hasNext() || batch.hasUnprocessedItems()) {
|
601 |
620 |
|
602 |
621 |
if(txStatus == null) {
|
603 |
622 |
// transaction has been committed at the end of this batch, start a new one
|
... | ... | |
605 |
624 |
}
|
606 |
625 |
|
607 |
626 |
// load taxa for this batch
|
608 |
|
List<Integer> taxonIds = new ArrayList<Integer>(batchSize);
|
609 |
|
while(taxonIdIterator.hasNext() && taxonIds.size() < batchSize ) {
|
610 |
|
taxonIds.add(taxonIdIterator.next());
|
611 |
|
}
|
|
627 |
List<Integer> taxonIds = batch.nextItems(taxonIdIterator);
|
612 |
628 |
|
613 |
629 |
taxa = taxonService.loadByIds(taxonIds, null);
|
614 |
630 |
|
... | ... | |
618 |
634 |
|
619 |
635 |
for(TaxonBase taxonBase : taxa) {
|
620 |
636 |
|
|
637 |
batch.incementCounter();
|
|
638 |
|
621 |
639 |
Taxon taxon = (Taxon)taxonBase;
|
622 |
640 |
if (taxaProcessedIds.contains(taxon.getId())) {
|
623 |
641 |
if(logger.isDebugEnabled()){
|
... | ... | |
640 |
658 |
}
|
641 |
659 |
if(!childTaxonIds.isEmpty()) {
|
642 |
660 |
childTaxa = taxonService.loadByIds(childTaxonIds, TAXONDESCRIPTION_INIT_STRATEGY);
|
|
661 |
LinkedList<TaxonBase> childStack = new LinkedList<TaxonBase>(childTaxa);
|
|
662 |
childTaxa = null; // allow to be garbage collected
|
|
663 |
|
|
664 |
while(childStack.size() > 0){
|
643 |
665 |
|
644 |
|
for (TaxonBase childTaxonBase : childTaxa){
|
|
666 |
TaxonBase childTaxonBase = childStack.pop();
|
|
667 |
getSession().setReadOnly(childTaxonBase, true);
|
645 |
668 |
|
646 |
669 |
Taxon childTaxon = (Taxon) childTaxonBase;
|
647 |
670 |
getSession().setReadOnly(childTaxon, true);
|
... | ... | |
659 |
682 |
StatusAndSources subStatusAndSources = new StatusAndSources(status, distribution.getSources());
|
660 |
683 |
accumulatedStatusMap.put(area, choosePreferred(accumulatedStatusMap.get(area), subStatusAndSources, null));
|
661 |
684 |
}
|
|
685 |
|
|
686 |
// evict all initialized entities of the childTaxon
|
|
687 |
// TODO consider using cascade="evict" in the model classes
|
|
688 |
// for( TaxonDescription description : ((Taxon)childTaxonBase).getDescriptions()) {
|
|
689 |
// for (DescriptionElementBase deb : description.getElements()) {
|
|
690 |
// getSession().evict(deb);
|
|
691 |
// }
|
|
692 |
// getSession().evict(description); // this causes in some cases the taxon object to be detached from the session
|
|
693 |
// }
|
|
694 |
getSession().evict(childTaxonBase); // no longer needed, save heap
|
662 |
695 |
}
|
663 |
696 |
|
664 |
697 |
if(accumulatedStatusMap.size() > 0) {
|
... | ... | |
680 |
713 |
|
681 |
714 |
}
|
682 |
715 |
taxonSubMonitor.worked(1); // one taxon worked
|
|
716 |
if(!batch.isWithinJvmLimits()) {
|
|
717 |
break; // flushAndClear and start with new batch
|
|
718 |
}
|
683 |
719 |
|
684 |
720 |
} // next taxon ....
|
685 |
721 |
|
... | ... | |
690 |
726 |
commitTransaction(txStatus);
|
691 |
727 |
txStatus = null;
|
692 |
728 |
|
|
729 |
// flushing the session and to the index (flushAndClear() ) can impose a
|
|
730 |
// massive heap consumption. therefore we explicitly do a check after the
|
|
731 |
// flush to detect these situations and to reduce the batch size.
|
|
732 |
if(batch.getJvmMonitor().getGCRateSiceLastCheck() > 0.05) {
|
|
733 |
batch.reduceSize(0.5);
|
|
734 |
}
|
|
735 |
|
693 |
736 |
if(ONLY_FISRT_BATCH) {
|
694 |
737 |
break;
|
695 |
738 |
}
|
... | ... | |
756 |
799 |
/**
|
757 |
800 |
*
|
758 |
801 |
*/
|
759 |
|
private void flushAndClear() {
|
760 |
|
logger.debug("flushing and clearing session ...");
|
|
802 |
private void flush() {
|
|
803 |
logger.debug("flushing session ...");
|
761 |
804 |
getSession().flush();
|
762 |
805 |
try {
|
|
806 |
logger.debug("flushing to indexes ...");
|
763 |
807 |
Search.getFullTextSession(getSession()).flushToIndexes();
|
764 |
808 |
} catch (HibernateException e) {
|
765 |
809 |
/* IGNORE - Hibernate Search Event listeners not configured ... */
|
... | ... | |
767 |
811 |
throw e;
|
768 |
812 |
}
|
769 |
813 |
}
|
770 |
|
getSession().clear();
|
771 |
814 |
}
|
772 |
815 |
|
|
816 |
/**
|
|
817 |
*
|
|
818 |
*/
|
|
819 |
private void flushAndClear() {
|
|
820 |
flush();
|
|
821 |
logger.debug("clearing session ...");
|
|
822 |
getSession().clear();
|
|
823 |
}
|
|
824 |
|
773 |
825 |
|
774 |
826 |
// TODO merge with CdmApplicationDefaultConfiguration#startTransaction() into common base class
|
775 |
827 |
public TransactionStatus startTransaction(Boolean readOnly) {
|
... | ... | |
835 |
887 |
// find existing one
|
836 |
888 |
for (TaxonDescription description : taxon.getDescriptions()) {
|
837 |
889 |
if (description.hasMarker(MarkerType.COMPUTED(), true)) {
|
838 |
|
logger.debug("reusing description for " + taxon.getTitleCache());
|
|
890 |
logger.debug("reusing computed description for " + taxon.getTitleCache());
|
839 |
891 |
if (doClear) {
|
840 |
892 |
int deleteCount = 0;
|
841 |
893 |
Set<DescriptionElementBase> deleteCandidates = new HashSet<DescriptionElementBase>();
|
... | ... | |
890 |
942 |
private List<Distribution> distributionsFor(Taxon taxon) {
|
891 |
943 |
List<Distribution> distributions = new ArrayList<Distribution>();
|
892 |
944 |
for(TaxonDescription description: taxon.getDescriptions()) {
|
|
945 |
getSession().setReadOnly(description, true);
|
893 |
946 |
for(DescriptionElementBase deb : description.getElements()) {
|
894 |
947 |
if(deb instanceof Distribution) {
|
|
948 |
getSession().setReadOnly(deb, true);
|
895 |
949 |
distributions.add((Distribution)deb);
|
896 |
950 |
}
|
897 |
951 |
}
|
TransmissionEngineDistribution: solving problems with excessive heap consumption