Project

General

Profile

« Previous | Next » 

Revision 9b7db8db

Added by Andreas Kohlbecker almost 8 years ago

TransmissionEngineDistribution: solving problems with excessive heap consumption

View differences:

cdmlib-services/src/main/java/eu/etaxonomy/cdm/api/service/description/TransmissionEngineDistribution.java
14 14
import java.util.HashMap;
15 15
import java.util.HashSet;
16 16
import java.util.Iterator;
17
import java.util.LinkedList;
17 18
import java.util.List;
18 19
import java.util.Map;
19 20
import java.util.Set;
......
37 38
import eu.etaxonomy.cdm.api.service.INameService;
38 39
import eu.etaxonomy.cdm.api.service.ITaxonService;
39 40
import eu.etaxonomy.cdm.api.service.ITermService;
41
import eu.etaxonomy.cdm.common.DynamicBatch;
42
import eu.etaxonomy.cdm.common.JvmLimitsException;
40 43
import eu.etaxonomy.cdm.common.monitor.IProgressMonitor;
41 44
import eu.etaxonomy.cdm.common.monitor.NullProgressMonitor;
42 45
import eu.etaxonomy.cdm.common.monitor.SubProgressMonitor;
......
57 60
import eu.etaxonomy.cdm.model.taxon.Taxon;
58 61
import eu.etaxonomy.cdm.model.taxon.TaxonBase;
59 62
import eu.etaxonomy.cdm.persistence.dto.ClassificationLookupDTO;
60
import eu.etaxonomy.cdm.persistence.query.OrderHint;
61 63

  
62 64
/**
63 65
 *
......
91 93
@Service
92 94
public class TransmissionEngineDistribution { //TODO extends IoBase?
93 95

  
96

  
94 97
    public static final String EXTENSION_VALUE_PREFIX = "transmissionEngineDistribution.priority:";
95 98

  
96 99
    public static final Logger logger = Logger.getLogger(TransmissionEngineDistribution.class);
......
143 146

  
144 147
    private final Map<NamedArea, Set<NamedArea>> subAreaMap = new HashMap<NamedArea, Set<NamedArea>>();
145 148

  
146
    private final List<OrderHint> emptyOrderHints = new ArrayList<OrderHint>(0);
147

  
148 149
    int byRankTicks = 300;
149 150
    int byAreasTicks = 100;
150 151

  
151 152

  
153
    private static final long BATCH_MIN_FREE_HEAP = 800  * 1024 * 1024;
154
    /**
155
     * ratio of the initially free heap which should not be used
156
     * during the batch processing. This amount of the heap is reserved
157
     * for the flushing of the session and to the index
158
     */
159
    private static final double BATCH_FREE_HEAP_RATIO = 0.9;
160
    private static final int BATCH_SIZE_BY_AREA = 1000;
161
    private static final int BATCH_SIZE_BY_RANK = 500;
162

  
163

  
164

  
152 165
    /**
153 166
     * byAreaIgnoreStatusList contains by default:
154 167
     *  <ul>
......
334 347
     *            should be reported and that the operation cannot be cancelled.
335 348
     */
336 349
    public void accumulate(AggregationMode mode, List<NamedArea> superAreas, Rank lowerRank, Rank upperRank,
337
            Classification classification, IProgressMonitor monitor) {
350
            Classification classification, IProgressMonitor monitor) throws JvmLimitsException {
338 351

  
339 352
        if (monitor == null) {
340 353
            monitor = new NullProgressMonitor();
341 354
        }
342 355

  
343

  
344 356
        // only for debugging:
345 357
        //logger.setLevel(Level.TRACE); // TRACE will slow down a lot since it forces loading all term representations
346 358
        //Logger.getLogger("org.hibernate.SQL").setLevel(Level.DEBUG);
......
434 446
     * @param superAreas
435 447
     *      the areas to which the subordinate areas should be projected
436 448
     * @param classificationLookupDao
449
     * @throws JvmLimitsException
437 450
     *
438 451
     */
439
    protected void accumulateByArea(List<NamedArea> superAreas, ClassificationLookupDTO classificationLookupDao,  IProgressMonitor subMonitor, boolean doClearDescriptions) {
452
    protected void accumulateByArea(List<NamedArea> superAreas, ClassificationLookupDTO classificationLookupDao,  IProgressMonitor subMonitor, boolean doClearDescriptions) throws JvmLimitsException {
440 453

  
441
        int batchSize = 1000;
454
        DynamicBatch batch = new DynamicBatch(BATCH_SIZE_BY_AREA, BATCH_MIN_FREE_HEAP);
455
        batch.setRequiredFreeHeap(BATCH_FREE_HEAP_RATIO);
442 456

  
443 457
        TransactionStatus txStatus = startTransaction(false);
444 458

  
......
452 466
        subMonitor.beginTask("Accumulating by area ",  classificationLookupDao.getTaxonIds().size());
453 467
        Iterator<Integer> taxonIdIterator = classificationLookupDao.getTaxonIds().iterator();
454 468

  
455
        while (taxonIdIterator.hasNext()) {
469
        while (taxonIdIterator.hasNext() || batch.hasUnprocessedItems()) {
456 470

  
457 471
            if(txStatus == null) {
458 472
                // transaction has been comitted at the end of this batch, start a new one
......
463 477
            List<NamedArea> superAreaList = (List)termService.find(superAreaUuids);
464 478

  
465 479
            // load taxa for this batch
466
            List<TaxonBase> taxa = null;
467
            List<Integer> taxonIds = new ArrayList<Integer>(batchSize);
468
            while(taxonIdIterator.hasNext() && taxonIds.size() < batchSize ) {
469
                taxonIds.add(taxonIdIterator.next());
470
            }
471

  
480
            List<Integer> taxonIds = batch.nextItems(taxonIdIterator);
472 481
//            logger.debug("accumulateByArea() - taxon " + taxonPager.getFirstRecord() + " to " + taxonPager.getLastRecord() + " of " + taxonPager.getCount() + "]");
473

  
474
            taxa = taxonService.loadByIds(taxonIds, TAXONDESCRIPTION_INIT_STRATEGY);
482
            List<TaxonBase> taxa = taxonService.loadByIds(taxonIds, TAXONDESCRIPTION_INIT_STRATEGY);
475 483

  
476 484
            // iterate over the taxa and accumulate areas
485
            // start processing the new batch
486

  
477 487
            for(TaxonBase taxonBase : taxa) {
478 488
                if(logger.isDebugEnabled()){
479 489
                    logger.debug("accumulateByArea() - taxon :" + taxonToString(taxonBase));
480 490
                }
481 491

  
492
                batch.incementCounter();
493

  
482 494
                Taxon taxon = (Taxon)taxonBase;
483 495
                TaxonDescription description = findComputedDescription(taxon, doClearDescriptions);
484 496
                List<Distribution> distributions = distributionsFor(taxon);
......
526 538
                descriptionService.saveOrUpdate(description);
527 539
                taxonService.saveOrUpdate(taxon);
528 540
                subMonitor.worked(1);
541
                if(!batch.isWithinJvmLimits()) {
542
                    break; // flushAndClear and start with new batch
543
                }
529 544

  
530 545
            } // next taxon
531 546

  
......
557 572
    *  <li>the source reference of the accumulated distributions are also accumulated into the new distribution,
558 573
    *    this has been especially implemented for the EuroMed Checklist Vol2 and might not be a general requirement</li>
559 574
    *</ul>
575
 * @throws JvmLimitsException
560 576
    */
561
    protected void accumulateByRank(List<Rank> rankInterval, ClassificationLookupDTO classificationLookupDao,  IProgressMonitor subMonitor, boolean doClearDescriptions) {
577
    protected void accumulateByRank(List<Rank> rankInterval, ClassificationLookupDTO classificationLookupDao,  IProgressMonitor subMonitor, boolean doClearDescriptions) throws JvmLimitsException {
578

  
579
        DynamicBatch batch = new DynamicBatch(BATCH_SIZE_BY_RANK, BATCH_MIN_FREE_HEAP);
580
        batch.setRequiredFreeHeap(BATCH_FREE_HEAP_RATIO);
581
        batch.setMaxAllowedGcIncreases(10);
562 582

  
563
        int batchSize = 500;
564 583
        int ticksPerRank = 100;
565 584

  
566 585
        TransactionStatus txStatus = startTransaction(false);
......
597 616

  
598 617

  
599 618
            Iterator<Integer> taxonIdIterator = taxonIdsPerRank.iterator();
600
            while (taxonIdIterator.hasNext()) {
619
            while (taxonIdIterator.hasNext() || batch.hasUnprocessedItems()) {
601 620

  
602 621
                if(txStatus == null) {
603 622
                    // transaction has been committed at the end of this batch, start a new one
......
605 624
                }
606 625

  
607 626
                // load taxa for this batch
608
                List<Integer> taxonIds = new ArrayList<Integer>(batchSize);
609
                while(taxonIdIterator.hasNext() && taxonIds.size() < batchSize ) {
610
                    taxonIds.add(taxonIdIterator.next());
611
                }
627
                List<Integer> taxonIds = batch.nextItems(taxonIdIterator);
612 628

  
613 629
                taxa = taxonService.loadByIds(taxonIds, null);
614 630

  
......
618 634

  
619 635
                for(TaxonBase taxonBase : taxa) {
620 636

  
637
                    batch.incementCounter();
638

  
621 639
                    Taxon taxon = (Taxon)taxonBase;
622 640
                    if (taxaProcessedIds.contains(taxon.getId())) {
623 641
                        if(logger.isDebugEnabled()){
......
640 658
                    }
641 659
                    if(!childTaxonIds.isEmpty()) {
642 660
                        childTaxa = taxonService.loadByIds(childTaxonIds, TAXONDESCRIPTION_INIT_STRATEGY);
661
                        LinkedList<TaxonBase> childStack = new LinkedList<TaxonBase>(childTaxa);
662
                        childTaxa = null; // allow to be garbage collected
663

  
664
                        while(childStack.size() > 0){
643 665

  
644
                        for (TaxonBase childTaxonBase : childTaxa){
666
                            TaxonBase childTaxonBase = childStack.pop();
667
                            getSession().setReadOnly(childTaxonBase, true);
645 668

  
646 669
                            Taxon childTaxon = (Taxon) childTaxonBase;
647 670
                            getSession().setReadOnly(childTaxon, true);
......
659 682
                                StatusAndSources subStatusAndSources = new StatusAndSources(status, distribution.getSources());
660 683
                                accumulatedStatusMap.put(area, choosePreferred(accumulatedStatusMap.get(area), subStatusAndSources, null));
661 684
                             }
685

  
686
                            // evict all initialized entities of the childTaxon
687
                            // TODO consider using cascade="evict" in the model classes
688
//                            for( TaxonDescription description : ((Taxon)childTaxonBase).getDescriptions()) {
689
//                                for (DescriptionElementBase deb : description.getElements()) {
690
//                                    getSession().evict(deb);
691
//                                }
692
//                                getSession().evict(description); // this causes in some cases the taxon object to be detached from the session
693
//                            }
694
                            getSession().evict(childTaxonBase); // no longer needed, save heap
662 695
                        }
663 696

  
664 697
                        if(accumulatedStatusMap.size() > 0) {
......
680 713

  
681 714
                    }
682 715
                    taxonSubMonitor.worked(1); // one taxon worked
716
                    if(!batch.isWithinJvmLimits()) {
717
                        break; // flushAndClear and start with new batch
718
                    }
683 719

  
684 720
                } // next taxon ....
685 721

  
......
690 726
                commitTransaction(txStatus);
691 727
                txStatus = null;
692 728

  
729
                // flushing the session and to the index (flushAndClear() ) can impose a
730
                // massive heap consumption. therefore we explicitly do a check after the
731
                // flush to detect these situations and to reduce the batch size.
732
                if(batch.getJvmMonitor().getGCRateSiceLastCheck() > 0.05) {
733
                    batch.reduceSize(0.5);
734
                }
735

  
693 736
                if(ONLY_FISRT_BATCH) {
694 737
                    break;
695 738
                }
......
756 799
    /**
757 800
     *
758 801
     */
759
    private void flushAndClear() {
760
        logger.debug("flushing and clearing session ...");
802
    private void flush() {
803
        logger.debug("flushing session ...");
761 804
        getSession().flush();
762 805
        try {
806
            logger.debug("flushing to indexes ...");
763 807
            Search.getFullTextSession(getSession()).flushToIndexes();
764 808
        } catch (HibernateException e) {
765 809
            /* IGNORE - Hibernate Search Event listeners not configured ... */
......
767 811
                throw e;
768 812
            }
769 813
        }
770
        getSession().clear();
771 814
    }
772 815

  
816
    /**
817
    *
818
    */
819
   private void flushAndClear() {
820
       flush();
821
       logger.debug("clearing session ...");
822
       getSession().clear();
823
   }
824

  
773 825

  
774 826
    // TODO merge with CdmApplicationDefaultConfiguration#startTransaction() into common base class
775 827
    public TransactionStatus startTransaction(Boolean readOnly) {
......
835 887
        // find existing one
836 888
        for (TaxonDescription description : taxon.getDescriptions()) {
837 889
            if (description.hasMarker(MarkerType.COMPUTED(), true)) {
838
                logger.debug("reusing description for " + taxon.getTitleCache());
890
                logger.debug("reusing computed description for " + taxon.getTitleCache());
839 891
                if (doClear) {
840 892
                    int deleteCount = 0;
841 893
                    Set<DescriptionElementBase> deleteCandidates = new HashSet<DescriptionElementBase>();
......
890 942
    private List<Distribution> distributionsFor(Taxon taxon) {
891 943
        List<Distribution> distributions = new ArrayList<Distribution>();
892 944
        for(TaxonDescription description: taxon.getDescriptions()) {
945
            getSession().setReadOnly(description, true);
893 946
            for(DescriptionElementBase deb : description.getElements()) {
894 947
                if(deb instanceof Distribution) {
948
                    getSession().setReadOnly(deb, true);
895 949
                    distributions.add((Distribution)deb);
896 950
                }
897 951
            }

Also available in: Unified diff