Project

General

Profile

Download (18.9 KB) Statistics
| Branch: | Tag: | Revision:
1
/**
2
* Copyright (C) 2019 EDIT
3
* European Distributed Institute of Taxonomy
4
* http://www.e-taxonomy.eu
5
*
6
* The contents of this file are subject to the Mozilla Public License Version 1.1
7
* See LICENSE.TXT at the top of this package for the full license terms.
8
*/
9
package eu.etaxonomy.cdm.api.service.description;
10

    
11
import java.util.ArrayList;
12
import java.util.HashSet;
13
import java.util.Iterator;
14
import java.util.List;
15
import java.util.Set;
16

    
17
import org.apache.log4j.Logger;
18
import org.hibernate.FlushMode;
19
import org.hibernate.Session;
20
import org.springframework.transaction.PlatformTransactionManager;
21
import org.springframework.transaction.TransactionDefinition;
22
import org.springframework.transaction.TransactionStatus;
23
import org.springframework.transaction.support.DefaultTransactionDefinition;
24

    
25
import eu.etaxonomy.cdm.api.application.ICdmRepository;
26
import eu.etaxonomy.cdm.api.service.DeleteResult;
27
import eu.etaxonomy.cdm.api.service.IClassificationService;
28
import eu.etaxonomy.cdm.api.service.IDescriptionService;
29
import eu.etaxonomy.cdm.api.service.IDescriptiveDataSetService;
30
import eu.etaxonomy.cdm.api.service.ITaxonNodeService;
31
import eu.etaxonomy.cdm.api.service.ITaxonService;
32
import eu.etaxonomy.cdm.api.service.ITermService;
33
import eu.etaxonomy.cdm.api.service.UpdateResult;
34
import eu.etaxonomy.cdm.common.DynamicBatch;
35
import eu.etaxonomy.cdm.common.JvmLimitsException;
36
import eu.etaxonomy.cdm.common.monitor.IProgressMonitor;
37
import eu.etaxonomy.cdm.common.monitor.NullProgressMonitor;
38
import eu.etaxonomy.cdm.common.monitor.SubProgressMonitor;
39
import eu.etaxonomy.cdm.filter.TaxonNodeFilter;
40
import eu.etaxonomy.cdm.filter.TaxonNodeFilter.ORDER;
41
import eu.etaxonomy.cdm.model.common.CdmBase;
42
import eu.etaxonomy.cdm.model.description.CategoricalData;
43
import eu.etaxonomy.cdm.model.description.DescriptionBase;
44
import eu.etaxonomy.cdm.model.description.DescriptionElementSource;
45
import eu.etaxonomy.cdm.model.description.Distribution;
46
import eu.etaxonomy.cdm.model.description.TaxonDescription;
47
import eu.etaxonomy.cdm.model.media.Media;
48
import eu.etaxonomy.cdm.model.reference.OriginalSourceType;
49
import eu.etaxonomy.cdm.model.taxon.Taxon;
50
import eu.etaxonomy.cdm.model.taxon.TaxonBase;
51
import eu.etaxonomy.cdm.model.taxon.TaxonNode;
52
import eu.etaxonomy.cdm.persistence.query.OrderHint;
53

    
54
/**
55
 * A common base class to run aggregation tasks on descriptive data.
56
 *
57
 * Usable for all types of descriptive data like structured descriptive data,
58
 * ( {@link CategoricalData and QuantitativeData}, {@link Distribution},
59
 * {@link Media}, etc.
60
 *
61
 * @author a.mueller
62
 * @since 03.11.2019
63
 */
64
public abstract class DescriptionAggregationBase<T extends DescriptionAggregationBase<T, CONFIG>, CONFIG extends DescriptionAggregationConfigurationBase<T>> {
65

    
66
    public static final Logger logger = Logger.getLogger(DescriptionAggregationBase.class);
67

    
68
    private static final long BATCH_MIN_FREE_HEAP = 150  * 1024 * 1024;  //800 MB
69
    /**
70
     * ratio of the initially free heap which should not be used
71
     * during the batch processing. This amount of the heap is reserved
72
     * for the flushing of the session and to the index
73
     */
74
    private static final double BATCH_FREE_HEAP_RATIO = 0.9;
75
//    private static final int BATCH_SIZE_BY_AREA = 1000;
76
//    private static final int BATCH_SIZE_BY_RANK = 500;
77
    private static final int BATCH_SIZE_BY_TAXON = 200;
78

    
79
    private ICdmRepository repository;
80
    private CONFIG config;
81
    private UpdateResult result;
82

    
83
    private long batchMinFreeHeap = BATCH_MIN_FREE_HEAP;
84

    
85

    
86
    public final UpdateResult invoke(CONFIG config, ICdmRepository repository){
87
        init(config, repository);
88
        return doInvoke();
89
    }
90

    
91
    protected UpdateResult doInvoke() {
92

    
93
        try {
94
            //TODO FIXME use UpdateResult
95

    
96
            double start = System.currentTimeMillis();
97
            IProgressMonitor monitor = getConfig().getMonitor();
98

    
99
            // only for debugging:
100
            //logger.setLevel(Level.TRACE); // TRACE will slow down a lot since it forces loading all term representations
101
            //Logger.getLogger("org.hibernate.SQL").setLevel(Level.DEBUG);
102
            logger.info("Hibernate JDBC Batch size: " +  getSession().getSessionFactory().getSessionFactoryOptions().getJdbcBatchSize());
103

    
104
            TaxonNodeFilter filter = getConfig().getTaxonNodeFilter();
105
            filter.setOrder(ORDER.TREEINDEX_DESC); //DESC guarantees that child taxa are aggregated before parent
106
            filter.setIncludeRootNodes(false);  //root nodes do not make sense for aggregation
107

    
108
            monitor.beginTask("Accumulating " + pluralDataType(), 100);
109
            Long countTaxonNodes = getTaxonNodeService().count(filter);
110
            int aggregationWorkTicks = countTaxonNodes.intValue();
111
            logger.info(aggregationWorkTicks + " taxa to aggregate");
112
            int getIdListTicks = 1;
113
            int preAccumulateTicks = 1;
114
            monitor.worked(5);
115
            SubProgressMonitor subMonitor = SubProgressMonitor.NewStarted(monitor,
116
                    95, "Accumulating " + pluralDataType(), aggregationWorkTicks + getIdListTicks + preAccumulateTicks);
117

    
118
            subMonitor.subTask("Get taxon node ID list");
119
            List<Integer> taxonNodeIdList = getTaxonNodeService().idList(filter);
120

    
121
            subMonitor.worked(getIdListTicks);
122

    
123
            try {
124
                preAggregate(subMonitor);
125
            } catch (Exception e) {
126
                return handleException(e, "Unhandled error during pre-aggregation");
127
            }
128

    
129
            try {
130
                verifyConfiguration(subMonitor);
131
            } catch (Exception e) {
132
                return handleException(e, "Unhandled error during configuration check");
133
            }
134

    
135
            subMonitor.worked(preAccumulateTicks);
136
            subMonitor.subTask("Accumulating "+pluralDataType()+" per taxon for taxon filter " + filter.toString());
137

    
138
            double startAccumulate = System.currentTimeMillis();
139

    
140
            //TODO AM move to invokeOnSingleTaxon()
141
            IProgressMonitor aggregateMonitor = new SubProgressMonitor(subMonitor, aggregationWorkTicks);
142
            try {
143
                aggregate(taxonNodeIdList, aggregateMonitor);
144
            } catch (Exception e) {
145
                return handleException(e, "Unhandled error during aggregation");
146
            }
147

    
148
            double end = System.currentTimeMillis();
149
            logger.info("Time elapsed for accumulate only(): " + (end - startAccumulate) / (1000) + "s");
150
            logger.info("Time elapsed for invoking task(): " + (end - start) / (1000) + "s");
151

    
152
            done();
153
            return result;
154
        } catch (Exception e) {
155
            result.addException(new RuntimeException("Unhandled error during doInvoke", e));
156
            return result;
157
        }
158
    }
159

    
160
    private UpdateResult handleException(Exception e, String unhandledMessage) {
161
        Exception ex;
162
        if (e instanceof AggregationException){
163
            ex = e;
164
        }else{
165
            ex = new RuntimeException(unhandledMessage + ": " + e.getMessage() , e);
166
            e.printStackTrace();
167
        }
168
        result.addException(ex);
169
        result.setError();
170
        done();
171
        return result;
172
    }
173

    
174
    protected void aggregate(List<Integer> taxonNodeIdList, IProgressMonitor subMonitor)  throws JvmLimitsException {
175

    
176
        DynamicBatch batch = new DynamicBatch(BATCH_SIZE_BY_TAXON, batchMinFreeHeap);
177
        batch.setRequiredFreeHeap(BATCH_FREE_HEAP_RATIO);
178
        //TODO AM from aggByRank          batch.setMaxAllowedGcIncreases(10);
179

    
180
        TransactionStatus txStatus = startTransaction(false);
181
        initTransaction();
182

    
183
        // visit all accepted taxa
184
//        subMonitor.beginTask("Work on taxa.", taxonNodeIdList.size());
185
        subMonitor.subTask("Accumulating bottom up " + taxonNodeIdList.size() + " taxa.");
186

    
187
        //TODO FIXME this was a Taxon not a TaxonNode id list
188
        Iterator<Integer> taxonIdIterator = taxonNodeIdList.iterator();
189

    
190
        while (taxonIdIterator.hasNext() || batch.hasUnprocessedItems()) {
191
            if(getConfig().getMonitor().isCanceled()){
192
                break;
193
            }
194

    
195
            if(txStatus == null) {
196
                // transaction has been committed at the end of this batch, start a new one
197
                txStatus = startTransaction(false);
198
                initTransaction();
199
            }
200

    
201
            // load taxa for this batch
202
            List<Integer> taxonIds = batch.nextItems(taxonIdIterator);
203
//            logger.debug("accumulateByArea() - taxon " + taxonPager.getFirstRecord() + " to " + taxonPager.getLastRecord() + " of " + taxonPager.getCount() + "]");
204

    
205
            //TODO AM adapt init-strat to taxonnode if it stays a taxon node list
206
            List<OrderHint> orderHints = new ArrayList<>();
207
            orderHints.add(OrderHint.BY_TREE_INDEX_DESC);
208
            List<TaxonNode> taxonNodes = getTaxonNodeService().loadByIds(taxonIds, orderHints, descriptionInitStrategy());
209

    
210
            // iterate over the taxa and accumulate areas
211
            // start processing the new batch
212

    
213
            for(TaxonNode taxonNode : taxonNodes) {
214
                if(getConfig().getMonitor().isCanceled()){
215
                    break;
216
                }
217
                subMonitor.subTask("Accumulating " + taxonNode.getTaxon().getTitleCache());
218

    
219
                accumulateSingleTaxon(taxonNode);
220
                batch.incrementCounter();
221

    
222
                subMonitor.worked(1);
223

    
224
                //TODO handle canceled better if needed
225
                if(subMonitor.isCanceled()){
226
                    return;
227
                }
228

    
229
                if(!batch.isWithinJvmLimits()) {
230
                    break; // flushAndClear and start with new batch
231
                }
232
            } // next taxon
233

    
234
//            flushAndClear();
235

    
236
            // commit for every batch, otherwise the persistent context
237
            // may grow too much and eats up all the heap
238
            commitTransaction(txStatus);
239
            txStatus = null;
240

    
241

    
242
            // flushing the session and to the index (flushAndClear() ) can impose a
243
            // massive heap consumption. therefore we explicitly do a check after the
244
            // flush to detect these situations and to reduce the batch size.
245
            if(getConfig().isAdaptBatchSize() && batch.getJvmMonitor().getGCRateSiceLastCheck() > 0.05) {
246
                batch.reduceSize(0.5);
247
            }
248

    
249
        } // next batch of taxa
250

    
251
    }
252

    
253
    protected class ResultHolder{
254
        //descriptions are identifiable and therefore are not deleted automatically by removing them from taxon or specimen
255
        //here we store all descriptions that need to be deleted after aggregation as they are not needed anymore
256
        Set<DescriptionBase<?>> descriptionsToDelete = new HashSet<>();;
257
    }
258

    
259
    protected void accumulateSingleTaxon(TaxonNode taxonNode){
260

    
261
        Taxon taxon = CdmBase.deproxy(taxonNode.getTaxon());
262
        if(logger.isDebugEnabled()){
263
            logger.debug("accumulate - taxon :" + taxonToString(taxon));
264
        }
265

    
266
        TaxonDescription targetDescription = getAggregatedDescription(taxon);
267
        ResultHolder resultHolder = createResultHolder();
268
        for (AggregationMode mode : getConfig().getAggregationModes()){
269
            if (mode == AggregationMode.ToParent){
270
                aggregateToParentTaxon(taxonNode, resultHolder, new HashSet<>()); ////excludedDescriptions because aggregating from children
271
            } else if (mode == AggregationMode.WithinTaxon){
272
                Set<TaxonDescription> excludedDescriptions = new HashSet<>();
273
                excludedDescriptions.add(targetDescription);
274
                aggregateWithinSingleTaxon(taxon, resultHolder, excludedDescriptions);
275
            }else{
276
                throw new IllegalArgumentException("Mode " + mode + " not yet supported");
277
            }
278
        }
279
        addAggregationResultToDescription(targetDescription, resultHolder);
280
        removeDescriptionIfEmpty(targetDescription, resultHolder);
281
        deleteDescriptionsToDelete(resultHolder);
282
    }
283

    
284
    private void deleteDescriptionsToDelete(DescriptionAggregationBase<T, CONFIG>.ResultHolder resultHolder) {
285
        for (DescriptionBase<?> descriptionToDelete : resultHolder.descriptionsToDelete){
286
            if (descriptionToDelete.isPersited()){
287
                getSession().flush(); // move to service method #9801
288
                DeleteResult result = repository.getDescriptionService().deleteDescription(descriptionToDelete);
289
                //TODO handle result somehow if not OK, but careful, descriptions may be linked >1x and therefore maybe deleted only after last link was removed
290
            }
291
        }
292
    }
293

    
294
    protected void removeDescriptionIfEmpty(TaxonDescription description, ResultHolder resultHolder) {
295
        if (description.getElements().isEmpty()){
296
            description.getTaxon().removeDescription(description);
297
            resultHolder.descriptionsToDelete.add(description);
298
        }
299
    }
300

    
301
    protected abstract void addAggregationResultToDescription(TaxonDescription targetDescription,
302
            ResultHolder resultHolder);
303

    
304
    protected abstract void aggregateToParentTaxon(TaxonNode taxonNode, ResultHolder resultHolder,
305
            Set<TaxonDescription> excludedDescriptions);
306

    
307
    protected abstract void aggregateWithinSingleTaxon(Taxon taxon, ResultHolder resultHolder,
308
            Set<TaxonDescription> excludedDescriptions);
309

    
310
    protected abstract ResultHolder createResultHolder();
311

    
312
    /**
313
     * Either finds an existing taxon description of the given taxon or creates a new one.
314
     */
315
    private TaxonDescription getAggregatedDescription(Taxon taxon) {
316

    
317
        // find existing one
318
        for (TaxonDescription description : taxon.getDescriptions()) {
319
            if (hasDescriptionType(description)){
320
                if (logger.isDebugEnabled()){logger.debug("reusing existing aggregated description for " + taxonToString(taxon));}
321
                setDescriptionTitle(description, taxon);  //maybe we want to redefine the title
322
                return description;
323
            }
324
        }
325

    
326
        // create a new one
327
        return createNewDescription(taxon);
328
    }
329

    
330
    protected abstract TaxonDescription createNewDescription(Taxon taxon);
331

    
332
    protected abstract boolean hasDescriptionType(TaxonDescription description);
333

    
334
    protected abstract void setDescriptionTitle(TaxonDescription description, Taxon taxon) ;
335

    
336
    protected String taxonToString(TaxonBase<?> taxon) {
337
        if(logger.isTraceEnabled()) {
338
            return taxon.getTitleCache();
339
        } else {
340
            return taxon.toString();
341
        }
342
    }
343

    
344
    protected abstract List<String> descriptionInitStrategy();
345

    
346
    protected abstract void preAggregate(IProgressMonitor monitor);
347

    
348
    protected abstract void verifyConfiguration(IProgressMonitor monitor);
349

    
350
    /**
351
     * hook for initializing object when a new transaction starts
352
     */
353
    protected abstract void initTransaction();
354

    
355
    protected abstract String pluralDataType();
356

    
357
    private void init(CONFIG config, ICdmRepository repository) {
358
        this.repository = repository;
359
        this.config = config;
360
        if(config.getMonitor() == null){
361
            config.setMonitor(new NullProgressMonitor());
362
        }
363
        result = new UpdateResult();
364
    }
365

    
366
    protected void addSourcesDeduplicated(Set<DescriptionElementSource> target, Set<DescriptionElementSource> sourcesToAdd) {
367
        for(DescriptionElementSource source : sourcesToAdd) {
368
            boolean contained = false;
369
            if (!hasValidSourceType(source)&& !isAggregationSource(source)){  //only aggregate sources of defined source types
370
                continue;
371
            }
372
            for(DescriptionElementSource existingSource: target) {
373
                if(existingSource.equalsByShallowCompare(source)) {
374
                    contained = true;
375
                    break;
376
                }
377
            }
378
            if(!contained) {
379
                try {
380
                    target.add(source.clone());
381
                } catch (CloneNotSupportedException e) {
382
                    // should never happen
383
                    throw new RuntimeException(e);
384
                }
385
            }
386
        }
387
    }
388

    
389
    private boolean hasValidSourceType(DescriptionElementSource source) {
390
        return getConfig().getAggregatingSourceTypes().contains(source.getType());
391
    }
392

    
393
    private boolean isAggregationSource(DescriptionElementSource source) {
394
        return source.getType().equals(OriginalSourceType.Aggregation) && source.getCdmSource() != null;
395
    }
396

    
397
// ******************** GETTER / SETTER *************************/
398

    
399
    protected IDescriptionService getDescriptionService(){
400
        return repository.getDescriptionService();
401
    }
402

    
403
    protected IDescriptiveDataSetService getDescriptiveDatasetService() {
404
        return repository.getDescriptiveDataSetService();
405
    }
406

    
407
    protected ITaxonService getTaxonService() {
408
        return repository.getTaxonService();
409
    }
410

    
411
    protected ITaxonNodeService getTaxonNodeService() {
412
        return repository.getTaxonNodeService();
413
    }
414

    
415
    protected ITermService getTermService() {
416
        return repository.getTermService();
417
    }
418

    
419
    protected IClassificationService getClassificationService() {
420
        return repository.getClassificationService();
421
    }
422

    
423
    protected PlatformTransactionManager getTransactionManager(){
424
        return repository.getTransactionManager();
425
    }
426

    
427
    // TODO merge with CdmRepository#startTransaction() into common base class
428
    protected void commitTransaction(TransactionStatus txStatus){
429
        logger.debug("commiting transaction ...");
430
        repository.commitTransaction(txStatus);
431
        return;
432
    }
433

    
434
    protected TransactionStatus startTransaction(Boolean readOnly) {
435

    
436
        DefaultTransactionDefinition defaultTxDef = new DefaultTransactionDefinition();
437
        defaultTxDef.setReadOnly(readOnly);
438
        TransactionDefinition txDef = defaultTxDef;
439

    
440
        // Log some transaction-related debug information.
441
        if (logger.isTraceEnabled()) {
442
            logger.trace("Transaction name = " + txDef.getName());
443
            logger.trace("Transaction facets:");
444
            logger.trace("Propagation behavior = " + txDef.getPropagationBehavior());
445
            logger.trace("Isolation level = " + txDef.getIsolationLevel());
446
            logger.trace("Timeout = " + txDef.getTimeout());
447
            logger.trace("Read Only = " + txDef.isReadOnly());
448
            // org.springframework.orm.hibernate5.HibernateTransactionManager
449
            // provides more transaction/session-related debug information.
450
        }
451

    
452
        TransactionStatus txStatus = getTransactionManager().getTransaction(txDef);
453
        getSession().setFlushMode(FlushMode.COMMIT);
454

    
455
        return txStatus;
456
    }
457

    
458
    protected Session getSession() {
459
        return getDescriptionService().getSession();
460
    }
461

    
462
    protected ICdmRepository getRepository() {
463
        return repository;
464
    }
465

    
466
    protected CONFIG getConfig() {
467
        return config;
468
    }
469

    
470
    protected UpdateResult getResult() {
471
        return result;
472
    }
473

    
474
    protected void done(){
475
        getConfig().getMonitor().done();
476
    }
477

    
478
    public void setBatchMinFreeHeap(long batchMinFreeHeap) {
479
        this.batchMinFreeHeap = batchMinFreeHeap;
480
    }
481

    
482
}
(5-5/12)