Project

General

Profile

« Previous | Next » 

Revision 9b7db8db

Added by Andreas Kohlbecker almost 8 years ago

TransmissionEngineDistribution: solving problems with excessive heap consumption

View differences:

cdmlib-remote/src/main/java/eu/etaxonomy/cdm/remote/controller/DescriptionListController.java
46 46
import eu.etaxonomy.cdm.api.service.dto.DistributionInfoDTO.InfoPart;
47 47
import eu.etaxonomy.cdm.api.service.pager.Pager;
48 48
import eu.etaxonomy.cdm.api.utility.DistributionOrder;
49
import eu.etaxonomy.cdm.common.JvmLimitsException;
50
import eu.etaxonomy.cdm.common.monitor.IRestServiceProgressMonitor;
49 51
import eu.etaxonomy.cdm.ext.geo.CondensedDistributionRecipe;
50 52
import eu.etaxonomy.cdm.ext.geo.EditGeoServiceUtilities;
51 53
import eu.etaxonomy.cdm.ext.geo.IEditGeoService;
......
187 189
                public void run() {
188 190
                    Pager<NamedArea> areaPager = termService.list(targetAreaLevel, (NamedAreaType) null,
189 191
                            null, null, (List<OrderHint>) null, term_init_strategy);
190
                    transmissionEngineDistribution.accumulate(mode, areaPager.getRecords(), _lowerRank, _upperRank,
191
                            null, progressMonitorController.getMonitor(transmissionEngineMonitorUuid));
192
                    try {
193
                        transmissionEngineDistribution.accumulate(mode, areaPager.getRecords(), _lowerRank, _upperRank,
194
                                null, progressMonitorController.getMonitor(transmissionEngineMonitorUuid));
195
                    } catch (JvmLimitsException e) {
196
                        IRestServiceProgressMonitor monitor = progressMonitorController.getMonitor(transmissionEngineMonitorUuid);
197
                        monitor.setIsFailed(true);
198
                        monitor.setFeedback(e);
199
                    }
192 200
                }
193 201
            };
194 202
            if(priority == null) {
cdmlib-services/src/main/java/eu/etaxonomy/cdm/api/service/description/TransmissionEngineDistribution.java
14 14
import java.util.HashMap;
15 15
import java.util.HashSet;
16 16
import java.util.Iterator;
17
import java.util.LinkedList;
17 18
import java.util.List;
18 19
import java.util.Map;
19 20
import java.util.Set;
......
37 38
import eu.etaxonomy.cdm.api.service.INameService;
38 39
import eu.etaxonomy.cdm.api.service.ITaxonService;
39 40
import eu.etaxonomy.cdm.api.service.ITermService;
41
import eu.etaxonomy.cdm.common.DynamicBatch;
42
import eu.etaxonomy.cdm.common.JvmLimitsException;
40 43
import eu.etaxonomy.cdm.common.monitor.IProgressMonitor;
41 44
import eu.etaxonomy.cdm.common.monitor.NullProgressMonitor;
42 45
import eu.etaxonomy.cdm.common.monitor.SubProgressMonitor;
......
57 60
import eu.etaxonomy.cdm.model.taxon.Taxon;
58 61
import eu.etaxonomy.cdm.model.taxon.TaxonBase;
59 62
import eu.etaxonomy.cdm.persistence.dto.ClassificationLookupDTO;
60
import eu.etaxonomy.cdm.persistence.query.OrderHint;
61 63

  
62 64
/**
63 65
 *
......
91 93
@Service
92 94
public class TransmissionEngineDistribution { //TODO extends IoBase?
93 95

  
96

  
94 97
    public static final String EXTENSION_VALUE_PREFIX = "transmissionEngineDistribution.priority:";
95 98

  
96 99
    public static final Logger logger = Logger.getLogger(TransmissionEngineDistribution.class);
......
143 146

  
144 147
    private final Map<NamedArea, Set<NamedArea>> subAreaMap = new HashMap<NamedArea, Set<NamedArea>>();
145 148

  
146
    private final List<OrderHint> emptyOrderHints = new ArrayList<OrderHint>(0);
147

  
148 149
    int byRankTicks = 300;
149 150
    int byAreasTicks = 100;
150 151

  
151 152

  
153
    private static final long BATCH_MIN_FREE_HEAP = 800  * 1024 * 1024;
154
    /**
155
     * ratio of the initially free heap which should not be used
156
     * during the batch processing. This amount of the heap is reserved
157
     * for the flushing of the session and to the index
158
     */
159
    private static final double BATCH_FREE_HEAP_RATIO = 0.9;
160
    private static final int BATCH_SIZE_BY_AREA = 1000;
161
    private static final int BATCH_SIZE_BY_RANK = 500;
162

  
163

  
164

  
152 165
    /**
153 166
     * byAreaIgnoreStatusList contains by default:
154 167
     *  <ul>
......
334 347
     *            should be reported and that the operation cannot be cancelled.
335 348
     */
336 349
    public void accumulate(AggregationMode mode, List<NamedArea> superAreas, Rank lowerRank, Rank upperRank,
337
            Classification classification, IProgressMonitor monitor) {
350
            Classification classification, IProgressMonitor monitor) throws JvmLimitsException {
338 351

  
339 352
        if (monitor == null) {
340 353
            monitor = new NullProgressMonitor();
341 354
        }
342 355

  
343

  
344 356
        // only for debugging:
345 357
        //logger.setLevel(Level.TRACE); // TRACE will slow down a lot since it forces loading all term representations
346 358
        //Logger.getLogger("org.hibernate.SQL").setLevel(Level.DEBUG);
......
434 446
     * @param superAreas
435 447
     *      the areas to which the subordinate areas should be projected
436 448
     * @param classificationLookupDao
449
     * @throws JvmLimitsException
437 450
     *
438 451
     */
439
    protected void accumulateByArea(List<NamedArea> superAreas, ClassificationLookupDTO classificationLookupDao,  IProgressMonitor subMonitor, boolean doClearDescriptions) {
452
    protected void accumulateByArea(List<NamedArea> superAreas, ClassificationLookupDTO classificationLookupDao,  IProgressMonitor subMonitor, boolean doClearDescriptions) throws JvmLimitsException {
440 453

  
441
        int batchSize = 1000;
454
        DynamicBatch batch = new DynamicBatch(BATCH_SIZE_BY_AREA, BATCH_MIN_FREE_HEAP);
455
        batch.setRequiredFreeHeap(BATCH_FREE_HEAP_RATIO);
442 456

  
443 457
        TransactionStatus txStatus = startTransaction(false);
444 458

  
......
452 466
        subMonitor.beginTask("Accumulating by area ",  classificationLookupDao.getTaxonIds().size());
453 467
        Iterator<Integer> taxonIdIterator = classificationLookupDao.getTaxonIds().iterator();
454 468

  
455
        while (taxonIdIterator.hasNext()) {
469
        while (taxonIdIterator.hasNext() || batch.hasUnprocessedItems()) {
456 470

  
457 471
            if(txStatus == null) {
458 472
                // transaction has been comitted at the end of this batch, start a new one
......
463 477
            List<NamedArea> superAreaList = (List)termService.find(superAreaUuids);
464 478

  
465 479
            // load taxa for this batch
466
            List<TaxonBase> taxa = null;
467
            List<Integer> taxonIds = new ArrayList<Integer>(batchSize);
468
            while(taxonIdIterator.hasNext() && taxonIds.size() < batchSize ) {
469
                taxonIds.add(taxonIdIterator.next());
470
            }
471

  
480
            List<Integer> taxonIds = batch.nextItems(taxonIdIterator);
472 481
//            logger.debug("accumulateByArea() - taxon " + taxonPager.getFirstRecord() + " to " + taxonPager.getLastRecord() + " of " + taxonPager.getCount() + "]");
473

  
474
            taxa = taxonService.loadByIds(taxonIds, TAXONDESCRIPTION_INIT_STRATEGY);
482
            List<TaxonBase> taxa = taxonService.loadByIds(taxonIds, TAXONDESCRIPTION_INIT_STRATEGY);
475 483

  
476 484
            // iterate over the taxa and accumulate areas
485
            // start processing the new batch
486

  
477 487
            for(TaxonBase taxonBase : taxa) {
478 488
                if(logger.isDebugEnabled()){
479 489
                    logger.debug("accumulateByArea() - taxon :" + taxonToString(taxonBase));
480 490
                }
481 491

  
492
                batch.incementCounter();
493

  
482 494
                Taxon taxon = (Taxon)taxonBase;
483 495
                TaxonDescription description = findComputedDescription(taxon, doClearDescriptions);
484 496
                List<Distribution> distributions = distributionsFor(taxon);
......
526 538
                descriptionService.saveOrUpdate(description);
527 539
                taxonService.saveOrUpdate(taxon);
528 540
                subMonitor.worked(1);
541
                if(!batch.isWithinJvmLimits()) {
542
                    break; // flushAndClear and start with new batch
543
                }
529 544

  
530 545
            } // next taxon
531 546

  
......
557 572
    *  <li>the source reference of the accumulated distributions are also accumulated into the new distribution,
558 573
    *    this has been especially implemented for the EuroMed Checklist Vol2 and might not be a general requirement</li>
559 574
    *</ul>
575
 * @throws JvmLimitsException
560 576
    */
561
    protected void accumulateByRank(List<Rank> rankInterval, ClassificationLookupDTO classificationLookupDao,  IProgressMonitor subMonitor, boolean doClearDescriptions) {
577
    protected void accumulateByRank(List<Rank> rankInterval, ClassificationLookupDTO classificationLookupDao,  IProgressMonitor subMonitor, boolean doClearDescriptions) throws JvmLimitsException {
578

  
579
        DynamicBatch batch = new DynamicBatch(BATCH_SIZE_BY_RANK, BATCH_MIN_FREE_HEAP);
580
        batch.setRequiredFreeHeap(BATCH_FREE_HEAP_RATIO);
581
        batch.setMaxAllowedGcIncreases(10);
562 582

  
563
        int batchSize = 500;
564 583
        int ticksPerRank = 100;
565 584

  
566 585
        TransactionStatus txStatus = startTransaction(false);
......
597 616

  
598 617

  
599 618
            Iterator<Integer> taxonIdIterator = taxonIdsPerRank.iterator();
600
            while (taxonIdIterator.hasNext()) {
619
            while (taxonIdIterator.hasNext() || batch.hasUnprocessedItems()) {
601 620

  
602 621
                if(txStatus == null) {
603 622
                    // transaction has been committed at the end of this batch, start a new one
......
605 624
                }
606 625

  
607 626
                // load taxa for this batch
608
                List<Integer> taxonIds = new ArrayList<Integer>(batchSize);
609
                while(taxonIdIterator.hasNext() && taxonIds.size() < batchSize ) {
610
                    taxonIds.add(taxonIdIterator.next());
611
                }
627
                List<Integer> taxonIds = batch.nextItems(taxonIdIterator);
612 628

  
613 629
                taxa = taxonService.loadByIds(taxonIds, null);
614 630

  
......
618 634

  
619 635
                for(TaxonBase taxonBase : taxa) {
620 636

  
637
                    batch.incementCounter();
638

  
621 639
                    Taxon taxon = (Taxon)taxonBase;
622 640
                    if (taxaProcessedIds.contains(taxon.getId())) {
623 641
                        if(logger.isDebugEnabled()){
......
640 658
                    }
641 659
                    if(!childTaxonIds.isEmpty()) {
642 660
                        childTaxa = taxonService.loadByIds(childTaxonIds, TAXONDESCRIPTION_INIT_STRATEGY);
661
                        LinkedList<TaxonBase> childStack = new LinkedList<TaxonBase>(childTaxa);
662
                        childTaxa = null; // allow to be garbage collected
663

  
664
                        while(childStack.size() > 0){
643 665

  
644
                        for (TaxonBase childTaxonBase : childTaxa){
666
                            TaxonBase childTaxonBase = childStack.pop();
667
                            getSession().setReadOnly(childTaxonBase, true);
645 668

  
646 669
                            Taxon childTaxon = (Taxon) childTaxonBase;
647 670
                            getSession().setReadOnly(childTaxon, true);
......
659 682
                                StatusAndSources subStatusAndSources = new StatusAndSources(status, distribution.getSources());
660 683
                                accumulatedStatusMap.put(area, choosePreferred(accumulatedStatusMap.get(area), subStatusAndSources, null));
661 684
                             }
685

  
686
                            // evict all initialized entities of the childTaxon
687
                            // TODO consider using cascade="evict" in the model classes
688
//                            for( TaxonDescription description : ((Taxon)childTaxonBase).getDescriptions()) {
689
//                                for (DescriptionElementBase deb : description.getElements()) {
690
//                                    getSession().evict(deb);
691
//                                }
692
//                                getSession().evict(description); // this causes in some cases the taxon object to be detached from the session
693
//                            }
694
                            getSession().evict(childTaxonBase); // no longer needed, save heap
662 695
                        }
663 696

  
664 697
                        if(accumulatedStatusMap.size() > 0) {
......
680 713

  
681 714
                    }
682 715
                    taxonSubMonitor.worked(1); // one taxon worked
716
                    if(!batch.isWithinJvmLimits()) {
717
                        break; // flushAndClear and start with new batch
718
                    }
683 719

  
684 720
                } // next taxon ....
685 721

  
......
690 726
                commitTransaction(txStatus);
691 727
                txStatus = null;
692 728

  
729
                // flushing the session and to the index (flushAndClear() ) can impose a
730
                // massive heap consumption. therefore we explicitly do a check after the
731
                // flush to detect these situations and to reduce the batch size.
732
                if(batch.getJvmMonitor().getGCRateSiceLastCheck() > 0.05) {
733
                    batch.reduceSize(0.5);
734
                }
735

  
693 736
                if(ONLY_FISRT_BATCH) {
694 737
                    break;
695 738
                }
......
756 799
    /**
757 800
     *
758 801
     */
759
    private void flushAndClear() {
760
        logger.debug("flushing and clearing session ...");
802
    private void flush() {
803
        logger.debug("flushing session ...");
761 804
        getSession().flush();
762 805
        try {
806
            logger.debug("flushing to indexes ...");
763 807
            Search.getFullTextSession(getSession()).flushToIndexes();
764 808
        } catch (HibernateException e) {
765 809
            /* IGNORE - Hibernate Search Event listeners not configured ... */
......
767 811
                throw e;
768 812
            }
769 813
        }
770
        getSession().clear();
771 814
    }
772 815

  
816
    /**
817
    *
818
    */
819
   private void flushAndClear() {
820
       flush();
821
       logger.debug("clearing session ...");
822
       getSession().clear();
823
   }
824

  
773 825

  
774 826
    // TODO merge with CdmApplicationDefaultConfiguration#startTransaction() into common base class
775 827
    public TransactionStatus startTransaction(Boolean readOnly) {
......
835 887
        // find existing one
836 888
        for (TaxonDescription description : taxon.getDescriptions()) {
837 889
            if (description.hasMarker(MarkerType.COMPUTED(), true)) {
838
                logger.debug("reusing description for " + taxon.getTitleCache());
890
                logger.debug("reusing computed description for " + taxon.getTitleCache());
839 891
                if (doClear) {
840 892
                    int deleteCount = 0;
841 893
                    Set<DescriptionElementBase> deleteCandidates = new HashSet<DescriptionElementBase>();
......
890 942
    private List<Distribution> distributionsFor(Taxon taxon) {
891 943
        List<Distribution> distributions = new ArrayList<Distribution>();
892 944
        for(TaxonDescription description: taxon.getDescriptions()) {
945
            getSession().setReadOnly(description, true);
893 946
            for(DescriptionElementBase deb : description.getElements()) {
894 947
                if(deb instanceof Distribution) {
948
                    getSession().setReadOnly(deb, true);
895 949
                    distributions.add((Distribution)deb);
896 950
                }
897 951
            }
cdmlib-services/src/test/java/eu/etaxonomy/cdm/api/service/TransmissionEngineDistributionTest.java
33 33

  
34 34
import eu.etaxonomy.cdm.api.service.description.TransmissionEngineDistribution;
35 35
import eu.etaxonomy.cdm.api.service.description.TransmissionEngineDistribution.AggregationMode;
36
import eu.etaxonomy.cdm.common.JvmLimitsException;
36 37
import eu.etaxonomy.cdm.model.common.Extension;
37 38
import eu.etaxonomy.cdm.model.common.MarkerType;
38 39
import eu.etaxonomy.cdm.model.description.DescriptionElementBase;
......
154 155
        @DataSet(value="TransmissionEngineDistributionTest.xml"),
155 156
    })
156 157
//  @DataSet(loadStrategy=CleanSweepInsertLoadStrategy.class) //, value="./BlankDataSet.xml")
157
    public void test_ignore() {
158
    public void test_ignore() throws JvmLimitsException {
158 159

  
159 160
        addDistributions(
160 161
                T_LAPSANA_COMMUNIS_ALPINA_UUID,
......
196 197
        @DataSet(value="/eu/etaxonomy/cdm/database/TermsDataSet-with_auditing_info.xml"),
197 198
        @DataSet(value="TransmissionEngineDistributionTest.xml"),
198 199
    })
199
    public void testArea_area() {
200
    public void testArea_area() throws JvmLimitsException {
200 201

  
201 202
        Set<Distribution> distributions_LCA = new HashSet<>();
202 203

  
......
242 243
        @DataSet(value="/eu/etaxonomy/cdm/database/TermsDataSet-with_auditing_info.xml"),
243 244
        @DataSet(value="TransmissionEngineDistributionTest.xml"),
244 245
    })
245
    public void testArea_rank_and_area_1() {
246
    public void testArea_rank_and_area_1() throws JvmLimitsException {
246 247

  
247 248
        Set<Distribution> distributions_LCA = new HashSet<>();
248 249
        distributions_LCA.add(newDistribution(book_a, yug_mn, PresenceAbsenceTerm.CULTIVATED(), "1"));
......
314 315
     *
315 316
     * This test relies on {@link #testArea_rank_and_area_1()}
316 317
     * an makes assertions only on the alternative source references
318
     * @throws JvmLimitsException
317 319
     */
318 320
    @Test
319 321
    @DataSets({
......
321 323
        @DataSet(value="/eu/etaxonomy/cdm/database/TermsDataSet-with_auditing_info.xml"),
322 324
        @DataSet(value="TransmissionEngineDistributionTest.xml"),
323 325
    })
324
    public void testArea_rank_and_area_2() {
326
    public void testArea_rank_and_area_2() throws JvmLimitsException {
325 327

  
326 328
        Set<Distribution> distributions_LCA = new HashSet<Distribution>();
327 329
        distributions_LCA.add(newDistribution(book_a, yug_ko, PresenceAbsenceTerm.NATIVE(), "1"));
......
361 363
     *
362 364
     * This test relies on {@link #testArea_rank_and_area_1()}
363 365
     * an makes assertions only on the alternative source references
366
     * @throws JvmLimitsException
364 367
     */
365 368
    @Test
366 369
    @DataSets({
......
368 371
        @DataSet(value="/eu/etaxonomy/cdm/database/TermsDataSet-with_auditing_info.xml"),
369 372
        @DataSet(value="TransmissionEngineDistributionTest.xml"),
370 373
    })
371
    public void testArea_rank_and_area_3() {
374
    public void testArea_rank_and_area_3() throws JvmLimitsException {
372 375

  
373 376
        Set<Distribution> distributions_LCA = new HashSet<Distribution>();
374 377
        distributions_LCA.add(newDistribution(book_a, yug_ko, PresenceAbsenceTerm.NATIVE(), "1"));
......
417 420
     *
418 421
     * This test relies on {@link #testArea_rank_and_area_1()}
419 422
     * an makes assertions only on the alternative source references
423
     * @throws JvmLimitsException
420 424
     */
421 425
    @Test
422 426
    @Ignore
......
425 429
        @DataSet(value="/eu/etaxonomy/cdm/database/TermsDataSet-with_auditing_info.xml"),
426 430
        @DataSet(value="TransmissionEngineDistributionTest.xml"),
427 431
    })
428
    public void testArea_rank_and_area_4() {
432
    public void testArea_rank_and_area_4() throws JvmLimitsException {
429 433

  
430 434
        Set<Distribution> distributions_LCA = new HashSet<>();
431 435
        distributions_LCA.add(newDistribution(book_a, yug_ko, PresenceAbsenceTerm.NATIVE(), "1"));

Also available in: Unified diff