Project

General

Profile

Download (17.3 KB) Statistics
| Branch: | Tag: | Revision:
1
/**
2
* Copyright (C) 2019 EDIT
3
* European Distributed Institute of Taxonomy
4
* http://www.e-taxonomy.eu
5
*
6
* The contents of this file are subject to the Mozilla Public License Version 1.1
7
* See LICENSE.TXT at the top of this package for the full license terms.
8
*/
9
package eu.etaxonomy.cdm.api.service.description;
10

    
11
import java.util.ArrayList;
12
import java.util.HashSet;
13
import java.util.Iterator;
14
import java.util.List;
15
import java.util.Set;
16

    
17
import org.apache.log4j.Logger;
18
import org.hibernate.FlushMode;
19
import org.hibernate.Session;
20
import org.springframework.transaction.PlatformTransactionManager;
21
import org.springframework.transaction.TransactionDefinition;
22
import org.springframework.transaction.TransactionStatus;
23
import org.springframework.transaction.support.DefaultTransactionDefinition;
24

    
25
import eu.etaxonomy.cdm.api.application.ICdmRepository;
26
import eu.etaxonomy.cdm.api.service.IClassificationService;
27
import eu.etaxonomy.cdm.api.service.IDescriptionService;
28
import eu.etaxonomy.cdm.api.service.IDescriptiveDataSetService;
29
import eu.etaxonomy.cdm.api.service.ITaxonNodeService;
30
import eu.etaxonomy.cdm.api.service.ITaxonService;
31
import eu.etaxonomy.cdm.api.service.ITermService;
32
import eu.etaxonomy.cdm.api.service.UpdateResult;
33
import eu.etaxonomy.cdm.common.DynamicBatch;
34
import eu.etaxonomy.cdm.common.JvmLimitsException;
35
import eu.etaxonomy.cdm.common.monitor.IProgressMonitor;
36
import eu.etaxonomy.cdm.common.monitor.NullProgressMonitor;
37
import eu.etaxonomy.cdm.common.monitor.SubProgressMonitor;
38
import eu.etaxonomy.cdm.filter.TaxonNodeFilter;
39
import eu.etaxonomy.cdm.filter.TaxonNodeFilter.ORDER;
40
import eu.etaxonomy.cdm.model.common.CdmBase;
41
import eu.etaxonomy.cdm.model.description.CategoricalData;
42
import eu.etaxonomy.cdm.model.description.DescriptionElementSource;
43
import eu.etaxonomy.cdm.model.description.Distribution;
44
import eu.etaxonomy.cdm.model.description.TaxonDescription;
45
import eu.etaxonomy.cdm.model.media.Media;
46
import eu.etaxonomy.cdm.model.reference.OriginalSourceType;
47
import eu.etaxonomy.cdm.model.taxon.Taxon;
48
import eu.etaxonomy.cdm.model.taxon.TaxonBase;
49
import eu.etaxonomy.cdm.model.taxon.TaxonNode;
50
import eu.etaxonomy.cdm.persistence.query.OrderHint;
51

    
52
/**
53
 * A common base class to run aggregation tasks on descriptive data.
54
 *
55
 * Usable for all types of descriptive data like structured descriptive data,
56
 * ( {@link CategoricalData and QuantitativeData}, {@link Distribution},
57
 * {@link Media}, etc.
58
 *
59
 * @author a.mueller
60
 * @since 03.11.2019
61
 */
62
public abstract class DescriptionAggregationBase<T extends DescriptionAggregationBase<T, CONFIG>, CONFIG extends DescriptionAggregationConfigurationBase<T>> {
63

    
64
    public static final Logger logger = Logger.getLogger(DescriptionAggregationBase.class);
65

    
66
    private static final long BATCH_MIN_FREE_HEAP = 800  * 1024 * 1024;  //800 MB
67
    /**
68
     * ratio of the initially free heap which should not be used
69
     * during the batch processing. This amount of the heap is reserved
70
     * for the flushing of the session and to the index
71
     */
72
    private static final double BATCH_FREE_HEAP_RATIO = 0.9;
73
//    private static final int BATCH_SIZE_BY_AREA = 1000;
74
//    private static final int BATCH_SIZE_BY_RANK = 500;
75
    private static final int BATCH_SIZE_BY_TAXON = 200;
76

    
77
    private ICdmRepository repository;
78
    private CONFIG config;
79
    private UpdateResult result;
80

    
81
    private long batchMinFreeHeap = BATCH_MIN_FREE_HEAP;
82

    
83

    
84
    public final UpdateResult invoke(CONFIG config, ICdmRepository repository) throws JvmLimitsException{
85
        init(config, repository);
86
        return doInvoke();
87
    }
88

    
89
    protected UpdateResult doInvoke() {
90

    
91
        try {
92
            //TODO FIXME use UpdateResult
93

    
94
            double start = System.currentTimeMillis();
95
            IProgressMonitor monitor = getConfig().getMonitor();
96

    
97
            // only for debugging:
98
            //logger.setLevel(Level.TRACE); // TRACE will slow down a lot since it forces loading all term representations
99
            //Logger.getLogger("org.hibernate.SQL").setLevel(Level.DEBUG);
100
            logger.info("Hibernate JDBC Batch size: " +  getSession().getSessionFactory().getSessionFactoryOptions().getJdbcBatchSize());
101

    
102
            TaxonNodeFilter filter = getConfig().getTaxonNodeFilter();
103
            filter.setOrder(ORDER.TREEINDEX_DESC); //DESC guarantees that child taxa are aggregated before parent
104
            filter.setIncludeRootNodes(false);  //root nodes do not make sense for aggregation
105

    
106
            monitor.beginTask("Accumulating " + pluralDataType(), 100);
107
            Long countTaxonNodes = getTaxonNodeService().count(filter);
108
            int aggregationWorkTicks = countTaxonNodes.intValue();
109
            logger.info(aggregationWorkTicks + " taxa to aggregate");
110
            int getIdListTicks = 1;
111
            int preAccumulateTicks = 1;
112
            monitor.worked(5);
113
            SubProgressMonitor subMonitor = SubProgressMonitor.NewStarted(monitor,
114
                    95, "Accumulating " + pluralDataType(), aggregationWorkTicks + getIdListTicks + preAccumulateTicks);
115

    
116
            subMonitor.subTask("Get taxon node ID list");
117
            List<Integer> taxonNodeIdList = getTaxonNodeService().idList(filter);
118

    
119
            subMonitor.worked(getIdListTicks);
120

    
121
            try {
122
                preAggregate(subMonitor);
123
            } catch (Exception e) {
124
                result.addException(new RuntimeException("Unhandled error during pre-aggregation", e));
125
                result.setError();
126
                done();
127
                return result;
128
            }
129

    
130
            subMonitor.worked(preAccumulateTicks);
131
            subMonitor.subTask("Accumulating "+pluralDataType()+" per taxon for taxon filter " + filter.toString());
132

    
133
            double startAccumulate = System.currentTimeMillis();
134

    
135
            //TODO AM move to invokeOnSingleTaxon()
136
            IProgressMonitor aggregateMonitor = new SubProgressMonitor(subMonitor, aggregationWorkTicks);
137
            try {
138
                aggregate(taxonNodeIdList, aggregateMonitor);
139
            } catch (Exception e) {
140
                result.addException(new RuntimeException("Unhandled error during aggregation", e));
141
                result.setError();
142
                done();
143
                return result;
144
            }
145

    
146
            double end = System.currentTimeMillis();
147
            logger.info("Time elapsed for accumulate only(): " + (end - startAccumulate) / (1000) + "s");
148
            logger.info("Time elapsed for invoking task(): " + (end - start) / (1000) + "s");
149

    
150
            done();
151
            return result;
152
        } catch (Exception e) {
153
            result.addException(new RuntimeException("Unhandled error during doInvoke", e));
154
            return result;
155
        }
156

    
157
    }
158

    
159
    protected void aggregate(List<Integer> taxonNodeIdList, IProgressMonitor subMonitor)  throws JvmLimitsException {
160

    
161
        DynamicBatch batch = new DynamicBatch(BATCH_SIZE_BY_TAXON, batchMinFreeHeap);
162
        batch.setRequiredFreeHeap(BATCH_FREE_HEAP_RATIO);
163
        //TODO AM from aggByRank          batch.setMaxAllowedGcIncreases(10);
164

    
165
        TransactionStatus txStatus = startTransaction(false);
166
        initTransaction();
167

    
168
        // visit all accepted taxa
169
//        subMonitor.beginTask("Work on taxa.", taxonNodeIdList.size());
170
        subMonitor.subTask("Accumulating bottom up " + taxonNodeIdList.size() + " taxa.");
171

    
172
        //TODO FIXME this was a Taxon not a TaxonNode id list
173
        Iterator<Integer> taxonIdIterator = taxonNodeIdList.iterator();
174

    
175
        while (taxonIdIterator.hasNext() || batch.hasUnprocessedItems()) {
176
            if(getConfig().getMonitor().isCanceled()){
177
                break;
178
            }
179

    
180
            if(txStatus == null) {
181
                // transaction has been committed at the end of this batch, start a new one
182
                txStatus = startTransaction(false);
183
                initTransaction();
184
            }
185

    
186
            // load taxa for this batch
187
            List<Integer> taxonIds = batch.nextItems(taxonIdIterator);
188
//            logger.debug("accumulateByArea() - taxon " + taxonPager.getFirstRecord() + " to " + taxonPager.getLastRecord() + " of " + taxonPager.getCount() + "]");
189

    
190
            //TODO AM adapt init-strat to taxonnode if it stays a taxon node list
191
            List<OrderHint> orderHints = new ArrayList<>();
192
            orderHints.add(OrderHint.BY_TREE_INDEX_DESC);
193
            List<TaxonNode> taxonNodes = getTaxonNodeService().loadByIds(taxonIds, orderHints, descriptionInitStrategy());
194

    
195
            // iterate over the taxa and accumulate areas
196
            // start processing the new batch
197

    
198
            for(TaxonNode taxonNode : taxonNodes) {
199
                if(getConfig().getMonitor().isCanceled()){
200
                    break;
201
                }
202
                subMonitor.subTask("Accumulating " + taxonNode.getTaxon().getTitleCache());
203

    
204
                accumulateSingleTaxon(taxonNode);
205
                batch.incrementCounter();
206

    
207
                subMonitor.worked(1);
208

    
209
                //TODO handle canceled better if needed
210
                if(subMonitor.isCanceled()){
211
                    return;
212
                }
213

    
214
                if(!batch.isWithinJvmLimits()) {
215
                    break; // flushAndClear and start with new batch
216
                }
217
            } // next taxon
218

    
219
//            flushAndClear();
220

    
221
            // commit for every batch, otherwise the persistent context
222
            // may grow too much and eats up all the heap
223
            commitTransaction(txStatus);
224
            txStatus = null;
225

    
226

    
227
            // flushing the session and to the index (flushAndClear() ) can impose a
228
            // massive heap consumption. therefore we explicitly do a check after the
229
            // flush to detect these situations and to reduce the batch size.
230
            if(getConfig().isAdaptBatchSize() && batch.getJvmMonitor().getGCRateSiceLastCheck() > 0.05) {
231
                batch.reduceSize(0.5);
232
            }
233

    
234
        } // next batch of taxa
235

    
236
    }
237

    
238
    protected interface ResultHolder{
239

    
240
    }
241

    
242
    protected void accumulateSingleTaxon(TaxonNode taxonNode){
243

    
244
        Taxon taxon = CdmBase.deproxy(taxonNode.getTaxon());
245
        if(logger.isDebugEnabled()){
246
            logger.debug("accumulate - taxon :" + taxonToString(taxon));
247
        }
248

    
249
        TaxonDescription targetDescription = getAggregatedDescription(taxon);
250
        ResultHolder resultHolder = createResultHolder();
251
        for (AggregationMode mode : getConfig().getAggregationModes()){
252
            if (mode == AggregationMode.ToParent){
253
                Set<TaxonDescription> excludedDescriptions = new HashSet<>();
254
//            excludedDescriptions.add(targetDescription); //not possible because aggregating from children
255
                aggregateToParentTaxon(taxonNode, resultHolder, excludedDescriptions);
256
            }
257
            if (mode == AggregationMode.WithinTaxon){
258
                Set<TaxonDescription> excludedDescriptions = new HashSet<>();
259
                excludedDescriptions.add(targetDescription);
260
                aggregateWithinSingleTaxon(taxon, resultHolder, excludedDescriptions);
261
            }
262
        }
263
        addAggregationResultToDescription(targetDescription, resultHolder);
264
        removeDescriptionIfEmpty(targetDescription);
265
    }
266

    
267
    protected void removeDescriptionIfEmpty(TaxonDescription description) {
268
        if (description.getElements().isEmpty()){
269
            description.getTaxon().removeDescription(description);
270
        }
271
    }
272

    
273
    protected abstract void addAggregationResultToDescription(TaxonDescription targetDescription,
274
            ResultHolder resultHolder);
275

    
276
    protected abstract void aggregateToParentTaxon(TaxonNode taxonNode, ResultHolder resultHolder,
277
            Set<TaxonDescription> excludedDescriptions);
278

    
279
    protected abstract void aggregateWithinSingleTaxon(Taxon taxon, ResultHolder resultHolder,
280
            Set<TaxonDescription> excludedDescriptions);
281

    
282
    protected abstract ResultHolder createResultHolder();
283

    
284
    /**
285
     * Either finds an existing taxon description of the given taxon or creates a new one.
286
     */
287
    private TaxonDescription getAggregatedDescription(Taxon taxon) {
288

    
289
        // find existing one
290
        for (TaxonDescription description : taxon.getDescriptions()) {
291
            if (hasDescriptionType(description)){
292
                logger.debug("reusing existing aggregated description for " + taxonToString(taxon));
293
                setDescriptionTitle(description, taxon);  //maybe we want to redefine the title
294
                return description;
295
            }
296
        }
297

    
298
        // create a new one
299
        return createNewDescription(taxon);
300
    }
301

    
302
    protected abstract TaxonDescription createNewDescription(Taxon taxon);
303

    
304
    protected abstract boolean hasDescriptionType(TaxonDescription description);
305

    
306
    protected abstract void setDescriptionTitle(TaxonDescription description, Taxon taxon) ;
307

    
308
    protected String taxonToString(TaxonBase<?> taxon) {
309
        if(logger.isTraceEnabled()) {
310
            return taxon.getTitleCache();
311
        } else {
312
            return taxon.toString();
313
        }
314
    }
315

    
316
    protected abstract List<String> descriptionInitStrategy();
317

    
318
    protected abstract void preAggregate(IProgressMonitor monitor);
319

    
320
    /**
321
     * hook for initializing object when a new transaction starts
322
     */
323
    protected abstract void initTransaction();
324

    
325
    protected abstract String pluralDataType();
326

    
327
    private void init(CONFIG config, ICdmRepository repository) {
328
        this.repository = repository;
329
        this.config = config;
330
        if(config.getMonitor() == null){
331
            config.setMonitor(new NullProgressMonitor());
332
        }
333
        result = new UpdateResult();
334
    }
335

    
336
    protected void addSourcesDeduplicated(Set<DescriptionElementSource> target, Set<DescriptionElementSource> sourcesToAdd) {
337
        for(DescriptionElementSource source : sourcesToAdd) {
338
            boolean contained = false;
339
            if (!hasValidSourceType(source)&& !isAggregationSource(source)){  //only aggregate sources of defined source types
340
                continue;
341
            }
342
            for(DescriptionElementSource existingSource: target) {
343
                if(existingSource.equalsByShallowCompare(source)) {
344
                    contained = true;
345
                    break;
346
                }
347
            }
348
            if(!contained) {
349
                try {
350
                    target.add(source.clone());
351
                } catch (CloneNotSupportedException e) {
352
                    // should never happen
353
                    throw new RuntimeException(e);
354
                }
355
            }
356
        }
357
    }
358

    
359
    private boolean hasValidSourceType(DescriptionElementSource source) {
360
        return getConfig().getAggregatingSourceTypes().contains(source.getType());
361
    }
362

    
363
    private boolean isAggregationSource(DescriptionElementSource source) {
364
        return source.getType().equals(OriginalSourceType.Aggregation) && source.getCdmSource() != null;
365
    }
366

    
367
// ******************** GETTER / SETTER *************************/
368

    
369
    protected IDescriptionService getDescriptionService(){
370
        return repository.getDescriptionService();
371
    }
372

    
373
    protected IDescriptiveDataSetService getDescriptiveDatasetService() {
374
        return repository.getDescriptiveDataSetService();
375
    }
376

    
377
    protected ITaxonService getTaxonService() {
378
        return repository.getTaxonService();
379
    }
380

    
381
    protected ITaxonNodeService getTaxonNodeService() {
382
        return repository.getTaxonNodeService();
383
    }
384

    
385
    protected ITermService getTermService() {
386
        return repository.getTermService();
387
    }
388

    
389
    protected IClassificationService getClassificationService() {
390
        return repository.getClassificationService();
391
    }
392

    
393
    protected PlatformTransactionManager getTransactionManager(){
394
        return repository.getTransactionManager();
395
    }
396

    
397
    // TODO merge with CdmRepository#startTransaction() into common base class
398
    protected void commitTransaction(TransactionStatus txStatus){
399
        logger.debug("commiting transaction ...");
400
        repository.commitTransaction(txStatus);
401
        return;
402
    }
403

    
404
    protected TransactionStatus startTransaction(Boolean readOnly) {
405

    
406
        DefaultTransactionDefinition defaultTxDef = new DefaultTransactionDefinition();
407
        defaultTxDef.setReadOnly(readOnly);
408
        TransactionDefinition txDef = defaultTxDef;
409

    
410
        // Log some transaction-related debug information.
411
        if (logger.isTraceEnabled()) {
412
            logger.trace("Transaction name = " + txDef.getName());
413
            logger.trace("Transaction facets:");
414
            logger.trace("Propagation behavior = " + txDef.getPropagationBehavior());
415
            logger.trace("Isolation level = " + txDef.getIsolationLevel());
416
            logger.trace("Timeout = " + txDef.getTimeout());
417
            logger.trace("Read Only = " + txDef.isReadOnly());
418
            // org.springframework.orm.hibernate5.HibernateTransactionManager
419
            // provides more transaction/session-related debug information.
420
        }
421

    
422
        TransactionStatus txStatus = getTransactionManager().getTransaction(txDef);
423
        getSession().setFlushMode(FlushMode.COMMIT);
424

    
425
        return txStatus;
426
    }
427

    
428
    protected Session getSession() {
429
        return getDescriptionService().getSession();
430
    }
431

    
432
    protected ICdmRepository getRepository() {
433
        return repository;
434
    }
435

    
436
    protected CONFIG getConfig() {
437
        return config;
438
    }
439

    
440
    protected UpdateResult getResult() {
441
        return result;
442
    }
443

    
444
    protected void done(){
445
        getConfig().getMonitor().done();
446
    }
447

    
448
    public void setBatchMinFreeHeap(long batchMinFreeHeap) {
449
        this.batchMinFreeHeap = batchMinFreeHeap;
450
    }
451

    
452
}
(4-4/11)