Revision 9b7db8db
Added by Andreas Kohlbecker almost 8 years ago
cdmlib-services/src/main/java/eu/etaxonomy/cdm/api/service/description/TransmissionEngineDistribution.java | ||
---|---|---|
14 | 14 |
import java.util.HashMap; |
15 | 15 |
import java.util.HashSet; |
16 | 16 |
import java.util.Iterator; |
17 |
import java.util.LinkedList; |
|
17 | 18 |
import java.util.List; |
18 | 19 |
import java.util.Map; |
19 | 20 |
import java.util.Set; |
... | ... | |
37 | 38 |
import eu.etaxonomy.cdm.api.service.INameService; |
38 | 39 |
import eu.etaxonomy.cdm.api.service.ITaxonService; |
39 | 40 |
import eu.etaxonomy.cdm.api.service.ITermService; |
41 |
import eu.etaxonomy.cdm.common.DynamicBatch; |
|
42 |
import eu.etaxonomy.cdm.common.JvmLimitsException; |
|
40 | 43 |
import eu.etaxonomy.cdm.common.monitor.IProgressMonitor; |
41 | 44 |
import eu.etaxonomy.cdm.common.monitor.NullProgressMonitor; |
42 | 45 |
import eu.etaxonomy.cdm.common.monitor.SubProgressMonitor; |
... | ... | |
57 | 60 |
import eu.etaxonomy.cdm.model.taxon.Taxon; |
58 | 61 |
import eu.etaxonomy.cdm.model.taxon.TaxonBase; |
59 | 62 |
import eu.etaxonomy.cdm.persistence.dto.ClassificationLookupDTO; |
60 |
import eu.etaxonomy.cdm.persistence.query.OrderHint; |
|
61 | 63 |
|
62 | 64 |
/** |
63 | 65 |
* |
... | ... | |
91 | 93 |
@Service |
92 | 94 |
public class TransmissionEngineDistribution { //TODO extends IoBase? |
93 | 95 |
|
96 |
|
|
94 | 97 |
public static final String EXTENSION_VALUE_PREFIX = "transmissionEngineDistribution.priority:"; |
95 | 98 |
|
96 | 99 |
public static final Logger logger = Logger.getLogger(TransmissionEngineDistribution.class); |
... | ... | |
143 | 146 |
|
144 | 147 |
private final Map<NamedArea, Set<NamedArea>> subAreaMap = new HashMap<NamedArea, Set<NamedArea>>(); |
145 | 148 |
|
146 |
private final List<OrderHint> emptyOrderHints = new ArrayList<OrderHint>(0); |
|
147 |
|
|
148 | 149 |
int byRankTicks = 300; |
149 | 150 |
int byAreasTicks = 100; |
150 | 151 |
|
151 | 152 |
|
153 |
private static final long BATCH_MIN_FREE_HEAP = 800 * 1024 * 1024; |
|
154 |
/** |
|
155 |
* ratio of the initially free heap which should not be used |
|
156 |
* during the batch processing. This amount of the heap is reserved |
|
157 |
* for the flushing of the session and to the index |
|
158 |
*/ |
|
159 |
private static final double BATCH_FREE_HEAP_RATIO = 0.9; |
|
160 |
private static final int BATCH_SIZE_BY_AREA = 1000; |
|
161 |
private static final int BATCH_SIZE_BY_RANK = 500; |
|
162 |
|
|
163 |
|
|
164 |
|
|
152 | 165 |
/** |
153 | 166 |
* byAreaIgnoreStatusList contains by default: |
154 | 167 |
* <ul> |
... | ... | |
334 | 347 |
* should be reported and that the operation cannot be cancelled. |
335 | 348 |
*/ |
336 | 349 |
public void accumulate(AggregationMode mode, List<NamedArea> superAreas, Rank lowerRank, Rank upperRank, |
337 |
Classification classification, IProgressMonitor monitor) { |
|
350 |
Classification classification, IProgressMonitor monitor) throws JvmLimitsException {
|
|
338 | 351 |
|
339 | 352 |
if (monitor == null) { |
340 | 353 |
monitor = new NullProgressMonitor(); |
341 | 354 |
} |
342 | 355 |
|
343 |
|
|
344 | 356 |
// only for debugging: |
345 | 357 |
//logger.setLevel(Level.TRACE); // TRACE will slow down a lot since it forces loading all term representations |
346 | 358 |
//Logger.getLogger("org.hibernate.SQL").setLevel(Level.DEBUG); |
... | ... | |
434 | 446 |
* @param superAreas |
435 | 447 |
* the areas to which the subordinate areas should be projected |
436 | 448 |
* @param classificationLookupDao |
449 |
* @throws JvmLimitsException |
|
437 | 450 |
* |
438 | 451 |
*/ |
439 |
protected void accumulateByArea(List<NamedArea> superAreas, ClassificationLookupDTO classificationLookupDao, IProgressMonitor subMonitor, boolean doClearDescriptions) { |
|
452 |
protected void accumulateByArea(List<NamedArea> superAreas, ClassificationLookupDTO classificationLookupDao, IProgressMonitor subMonitor, boolean doClearDescriptions) throws JvmLimitsException {
|
|
440 | 453 |
|
441 |
int batchSize = 1000; |
|
454 |
DynamicBatch batch = new DynamicBatch(BATCH_SIZE_BY_AREA, BATCH_MIN_FREE_HEAP); |
|
455 |
batch.setRequiredFreeHeap(BATCH_FREE_HEAP_RATIO); |
|
442 | 456 |
|
443 | 457 |
TransactionStatus txStatus = startTransaction(false); |
444 | 458 |
|
... | ... | |
452 | 466 |
subMonitor.beginTask("Accumulating by area ", classificationLookupDao.getTaxonIds().size()); |
453 | 467 |
Iterator<Integer> taxonIdIterator = classificationLookupDao.getTaxonIds().iterator(); |
454 | 468 |
|
455 |
while (taxonIdIterator.hasNext()) { |
|
469 |
while (taxonIdIterator.hasNext() || batch.hasUnprocessedItems()) {
|
|
456 | 470 |
|
457 | 471 |
if(txStatus == null) { |
458 | 472 |
// transaction has been comitted at the end of this batch, start a new one |
... | ... | |
463 | 477 |
List<NamedArea> superAreaList = (List)termService.find(superAreaUuids); |
464 | 478 |
|
465 | 479 |
// load taxa for this batch |
466 |
List<TaxonBase> taxa = null; |
|
467 |
List<Integer> taxonIds = new ArrayList<Integer>(batchSize); |
|
468 |
while(taxonIdIterator.hasNext() && taxonIds.size() < batchSize ) { |
|
469 |
taxonIds.add(taxonIdIterator.next()); |
|
470 |
} |
|
471 |
|
|
480 |
List<Integer> taxonIds = batch.nextItems(taxonIdIterator); |
|
472 | 481 |
// logger.debug("accumulateByArea() - taxon " + taxonPager.getFirstRecord() + " to " + taxonPager.getLastRecord() + " of " + taxonPager.getCount() + "]"); |
473 |
|
|
474 |
taxa = taxonService.loadByIds(taxonIds, TAXONDESCRIPTION_INIT_STRATEGY); |
|
482 |
List<TaxonBase> taxa = taxonService.loadByIds(taxonIds, TAXONDESCRIPTION_INIT_STRATEGY); |
|
475 | 483 |
|
476 | 484 |
// iterate over the taxa and accumulate areas |
485 |
// start processing the new batch |
|
486 |
|
|
477 | 487 |
for(TaxonBase taxonBase : taxa) { |
478 | 488 |
if(logger.isDebugEnabled()){ |
479 | 489 |
logger.debug("accumulateByArea() - taxon :" + taxonToString(taxonBase)); |
480 | 490 |
} |
481 | 491 |
|
492 |
batch.incementCounter(); |
|
493 |
|
|
482 | 494 |
Taxon taxon = (Taxon)taxonBase; |
483 | 495 |
TaxonDescription description = findComputedDescription(taxon, doClearDescriptions); |
484 | 496 |
List<Distribution> distributions = distributionsFor(taxon); |
... | ... | |
526 | 538 |
descriptionService.saveOrUpdate(description); |
527 | 539 |
taxonService.saveOrUpdate(taxon); |
528 | 540 |
subMonitor.worked(1); |
541 |
if(!batch.isWithinJvmLimits()) { |
|
542 |
break; // flushAndClear and start with new batch |
|
543 |
} |
|
529 | 544 |
|
530 | 545 |
} // next taxon |
531 | 546 |
|
... | ... | |
557 | 572 |
* <li>the source reference of the accumulated distributions are also accumulated into the new distribution, |
558 | 573 |
* this has been especially implemented for the EuroMed Checklist Vol2 and might not be a general requirement</li> |
559 | 574 |
*</ul> |
575 |
* @throws JvmLimitsException |
|
560 | 576 |
*/ |
561 |
protected void accumulateByRank(List<Rank> rankInterval, ClassificationLookupDTO classificationLookupDao, IProgressMonitor subMonitor, boolean doClearDescriptions) { |
|
577 |
protected void accumulateByRank(List<Rank> rankInterval, ClassificationLookupDTO classificationLookupDao, IProgressMonitor subMonitor, boolean doClearDescriptions) throws JvmLimitsException { |
|
578 |
|
|
579 |
DynamicBatch batch = new DynamicBatch(BATCH_SIZE_BY_RANK, BATCH_MIN_FREE_HEAP); |
|
580 |
batch.setRequiredFreeHeap(BATCH_FREE_HEAP_RATIO); |
|
581 |
batch.setMaxAllowedGcIncreases(10); |
|
562 | 582 |
|
563 |
int batchSize = 500; |
|
564 | 583 |
int ticksPerRank = 100; |
565 | 584 |
|
566 | 585 |
TransactionStatus txStatus = startTransaction(false); |
... | ... | |
597 | 616 |
|
598 | 617 |
|
599 | 618 |
Iterator<Integer> taxonIdIterator = taxonIdsPerRank.iterator(); |
600 |
while (taxonIdIterator.hasNext()) { |
|
619 |
while (taxonIdIterator.hasNext() || batch.hasUnprocessedItems()) {
|
|
601 | 620 |
|
602 | 621 |
if(txStatus == null) { |
603 | 622 |
// transaction has been committed at the end of this batch, start a new one |
... | ... | |
605 | 624 |
} |
606 | 625 |
|
607 | 626 |
// load taxa for this batch |
608 |
List<Integer> taxonIds = new ArrayList<Integer>(batchSize); |
|
609 |
while(taxonIdIterator.hasNext() && taxonIds.size() < batchSize ) { |
|
610 |
taxonIds.add(taxonIdIterator.next()); |
|
611 |
} |
|
627 |
List<Integer> taxonIds = batch.nextItems(taxonIdIterator); |
|
612 | 628 |
|
613 | 629 |
taxa = taxonService.loadByIds(taxonIds, null); |
614 | 630 |
|
... | ... | |
618 | 634 |
|
619 | 635 |
for(TaxonBase taxonBase : taxa) { |
620 | 636 |
|
637 |
batch.incementCounter(); |
|
638 |
|
|
621 | 639 |
Taxon taxon = (Taxon)taxonBase; |
622 | 640 |
if (taxaProcessedIds.contains(taxon.getId())) { |
623 | 641 |
if(logger.isDebugEnabled()){ |
... | ... | |
640 | 658 |
} |
641 | 659 |
if(!childTaxonIds.isEmpty()) { |
642 | 660 |
childTaxa = taxonService.loadByIds(childTaxonIds, TAXONDESCRIPTION_INIT_STRATEGY); |
661 |
LinkedList<TaxonBase> childStack = new LinkedList<TaxonBase>(childTaxa); |
|
662 |
childTaxa = null; // allow to be garbage collected |
|
663 |
|
|
664 |
while(childStack.size() > 0){ |
|
643 | 665 |
|
644 |
for (TaxonBase childTaxonBase : childTaxa){ |
|
666 |
TaxonBase childTaxonBase = childStack.pop(); |
|
667 |
getSession().setReadOnly(childTaxonBase, true); |
|
645 | 668 |
|
646 | 669 |
Taxon childTaxon = (Taxon) childTaxonBase; |
647 | 670 |
getSession().setReadOnly(childTaxon, true); |
... | ... | |
659 | 682 |
StatusAndSources subStatusAndSources = new StatusAndSources(status, distribution.getSources()); |
660 | 683 |
accumulatedStatusMap.put(area, choosePreferred(accumulatedStatusMap.get(area), subStatusAndSources, null)); |
661 | 684 |
} |
685 |
|
|
686 |
// evict all initialized entities of the childTaxon |
|
687 |
// TODO consider using cascade="evict" in the model classes |
|
688 |
// for( TaxonDescription description : ((Taxon)childTaxonBase).getDescriptions()) { |
|
689 |
// for (DescriptionElementBase deb : description.getElements()) { |
|
690 |
// getSession().evict(deb); |
|
691 |
// } |
|
692 |
// getSession().evict(description); // this causes in some cases the taxon object to be detached from the session |
|
693 |
// } |
|
694 |
getSession().evict(childTaxonBase); // no longer needed, save heap |
|
662 | 695 |
} |
663 | 696 |
|
664 | 697 |
if(accumulatedStatusMap.size() > 0) { |
... | ... | |
680 | 713 |
|
681 | 714 |
} |
682 | 715 |
taxonSubMonitor.worked(1); // one taxon worked |
716 |
if(!batch.isWithinJvmLimits()) { |
|
717 |
break; // flushAndClear and start with new batch |
|
718 |
} |
|
683 | 719 |
|
684 | 720 |
} // next taxon .... |
685 | 721 |
|
... | ... | |
690 | 726 |
commitTransaction(txStatus); |
691 | 727 |
txStatus = null; |
692 | 728 |
|
729 |
// flushing the session and to the index (flushAndClear() ) can impose a |
|
730 |
// massive heap consumption. therefore we explicitly do a check after the |
|
731 |
// flush to detect these situations and to reduce the batch size. |
|
732 |
if(batch.getJvmMonitor().getGCRateSiceLastCheck() > 0.05) { |
|
733 |
batch.reduceSize(0.5); |
|
734 |
} |
|
735 |
|
|
693 | 736 |
if(ONLY_FISRT_BATCH) { |
694 | 737 |
break; |
695 | 738 |
} |
... | ... | |
756 | 799 |
/** |
757 | 800 |
* |
758 | 801 |
*/ |
759 |
private void flushAndClear() {
|
|
760 |
logger.debug("flushing and clearing session ...");
|
|
802 |
private void flush() { |
|
803 |
logger.debug("flushing session ..."); |
|
761 | 804 |
getSession().flush(); |
762 | 805 |
try { |
806 |
logger.debug("flushing to indexes ..."); |
|
763 | 807 |
Search.getFullTextSession(getSession()).flushToIndexes(); |
764 | 808 |
} catch (HibernateException e) { |
765 | 809 |
/* IGNORE - Hibernate Search Event listeners not configured ... */ |
... | ... | |
767 | 811 |
throw e; |
768 | 812 |
} |
769 | 813 |
} |
770 |
getSession().clear(); |
|
771 | 814 |
} |
772 | 815 |
|
816 |
/** |
|
817 |
* |
|
818 |
*/ |
|
819 |
private void flushAndClear() { |
|
820 |
flush(); |
|
821 |
logger.debug("clearing session ..."); |
|
822 |
getSession().clear(); |
|
823 |
} |
|
824 |
|
|
773 | 825 |
|
774 | 826 |
// TODO merge with CdmApplicationDefaultConfiguration#startTransaction() into common base class |
775 | 827 |
public TransactionStatus startTransaction(Boolean readOnly) { |
... | ... | |
835 | 887 |
// find existing one |
836 | 888 |
for (TaxonDescription description : taxon.getDescriptions()) { |
837 | 889 |
if (description.hasMarker(MarkerType.COMPUTED(), true)) { |
838 |
logger.debug("reusing description for " + taxon.getTitleCache()); |
|
890 |
logger.debug("reusing computed description for " + taxon.getTitleCache());
|
|
839 | 891 |
if (doClear) { |
840 | 892 |
int deleteCount = 0; |
841 | 893 |
Set<DescriptionElementBase> deleteCandidates = new HashSet<DescriptionElementBase>(); |
... | ... | |
890 | 942 |
private List<Distribution> distributionsFor(Taxon taxon) { |
891 | 943 |
List<Distribution> distributions = new ArrayList<Distribution>(); |
892 | 944 |
for(TaxonDescription description: taxon.getDescriptions()) { |
945 |
getSession().setReadOnly(description, true); |
|
893 | 946 |
for(DescriptionElementBase deb : description.getElements()) { |
894 | 947 |
if(deb instanceof Distribution) { |
948 |
getSession().setReadOnly(deb, true); |
|
895 | 949 |
distributions.add((Distribution)deb); |
896 | 950 |
} |
897 | 951 |
} |
Also available in: Unified diff
TransmissionEngineDistribution: solving problems with excessive heap consumption