Project

General

Profile

« Previous | Next » 

Revision b440115a

Added by Andreas Müller over 4 years ago

ref #8479 concurrency in FauEu2CdmImport and some other improvements(?) like parent handling

View differences:

cdm-pesi/src/main/java/eu/etaxonomy/cdm/app/pesi/FauEu2CdmActivator.java
13 13
import org.apache.log4j.Logger;
14 14

  
15 15
import eu.etaxonomy.cdm.app.common.CdmDestinations;
16
import eu.etaxonomy.cdm.common.monitor.IProgressMonitor;
16 17
import eu.etaxonomy.cdm.database.DbSchemaValidation;
17 18
import eu.etaxonomy.cdm.database.ICdmDataSource;
19
import eu.etaxonomy.cdm.filter.TaxonNodeFilter.ORDER;
18 20
import eu.etaxonomy.cdm.io.common.CdmDefaultImport;
19 21
import eu.etaxonomy.cdm.io.common.IImportConfigurator.CHECK;
22
import eu.etaxonomy.cdm.io.common.ITaxonNodeOutStreamPartitioner;
23
import eu.etaxonomy.cdm.io.common.TaxonNodeOutStreamPartitioner;
24
import eu.etaxonomy.cdm.io.common.TaxonNodeOutStreamPartitionerConcurrent;
20 25
import eu.etaxonomy.cdm.io.pesi.fauEu2Cdm.FauEu2CdmImportConfigurator;
21 26

  
22 27
/**
......
36 41

  
37 42
    static final int partitionSize = 5000;
38 43

  
44
    static final boolean doConcurrent = false;
45

  
39 46
// ***************** ALL ************************************************//
40 47

  
41 48
//    >50 records
......
51 58

  
52 59
        FauEu2CdmImportConfigurator config = FauEu2CdmImportConfigurator.NewInstance(source,  destination);
53 60

  
61
        IProgressMonitor monitor = config.getProgressMonitor();
62

  
54 63
//        config.setDoTaxa(doTaxa);
55 64
        config.setDbSchemaValidation(hbm2dll);
56 65
        config.getTaxonNodeFilter().orSubtree(uuidTaxonNodeFilter);
66
        config.getTaxonNodeFilter().setOrder(ORDER.TREEINDEX);
67
        if (doConcurrent){
68
            ITaxonNodeOutStreamPartitioner partitioner = TaxonNodeOutStreamPartitionerConcurrent
69
                    .NewInstance(config.getSource(), config.getTaxonNodeFilter(),
70
                            8, monitor, 1, TaxonNodeOutStreamPartitioner.fullPropertyPaths);
71
            config.setPartitioner(partitioner);
72
        }
57 73

  
58 74
        config.setCheck(check);
59 75
//        config.setRecordsPerTransaction(partitionSize);
cdm-pesi/src/main/java/eu/etaxonomy/cdm/io/pesi/fauEu2Cdm/FauEu2CdmImport.java
25 25
import org.springframework.transaction.TransactionStatus;
26 26

  
27 27
import eu.etaxonomy.cdm.common.monitor.IProgressMonitor;
28
import eu.etaxonomy.cdm.io.common.ITaxonNodeOutStreamPartitioner;
28 29
import eu.etaxonomy.cdm.io.common.TaxonNodeOutStreamPartitioner;
30
import eu.etaxonomy.cdm.io.common.TaxonNodeOutStreamPartitionerConcurrent;
29 31
import eu.etaxonomy.cdm.model.agent.AgentBase;
30 32
import eu.etaxonomy.cdm.model.agent.Contact;
31 33
import eu.etaxonomy.cdm.model.agent.Institution;
......
124 126
    @Override
125 127
    protected void doInvoke(FauEu2CdmImportState state) {
126 128
        IProgressMonitor monitor = state.getConfig().getProgressMonitor();
127
        System.out.println("start source repo");
128
        source(state);
129
        System.out.println("end source repo");
129

  
130 130
        FauEu2CdmImportConfigurator config = state.getConfig();
131
        setRootId(state, config); //necessary?
132

  
133
        ITaxonNodeOutStreamPartitioner partitioner = config.getPartitioner();
134
        if (partitioner == null){
135
//        @SuppressWarnings("unchecked")
136
//        TaxonNodeOutStreamPartitioner<FauEu2CdmImportState> partitioner = TaxonNodeOutStreamPartitioner
137
//                .NewInstance(source(state), state, state.getConfig().getTaxonNodeFilter(), 100, monitor, null);
138
//        partitioner.setLastCommitManually(true);
139

  
140
            partitioner = TaxonNodeOutStreamPartitionerConcurrent
141
                    .NewInstance(state.getConfig().getSource(), state.getConfig().getTaxonNodeFilter(),
142
                            1000, monitor, 1, TaxonNodeOutStreamPartitioner.fullPropertyPaths);
143

  
144
        }
145
        monitor.subTask("Start partitioning");
146
        doData(state, partitioner);
147
    }
148

  
149
    /**
150
     * @param state
151
     * @param config
152
     */
153
    private void setRootId(FauEu2CdmImportState state, FauEu2CdmImportConfigurator config) {
131 154
        if (config.getTaxonNodeFilter().hasClassificationFilter()) {
132 155
            Classification classification = getClassificationService()
133 156
                    .load(config.getTaxonNodeFilter().getClassificationFilter().get(0).getUuid());
......
135 158
        } else if (config.getTaxonNodeFilter().hasSubtreeFilter()) {
136 159
            state.setRootId(config.getTaxonNodeFilter().getSubtreeFilter().get(0).getUuid());
137 160
        }
138
        @SuppressWarnings("unchecked")
139
        TaxonNodeOutStreamPartitioner<FauEu2CdmImportState> partitioner = TaxonNodeOutStreamPartitioner
140
                .NewInstance(source(state), state, state.getConfig().getTaxonNodeFilter(), 100, monitor, null);
141
        monitor.subTask("Start partitioning");
142
        partitioner.setLastCommitManually(true);
143
        doData(state, partitioner);
144 161
    }
145 162

  
146
    /**
147
     * @param state
148
     * @param partitioner
149
     * @throws IllegalAccessException
150
     * @throws InvocationTargetException
151
     */
152
    private void doData(FauEu2CdmImportState state, TaxonNodeOutStreamPartitioner<FauEu2CdmImportState> partitioner){
163
    private void doData(FauEu2CdmImportState state, ITaxonNodeOutStreamPartitioner partitioner){
153 164
        TaxonNode node = partitioner.next();
154 165
        int partitionSize = 100;
155 166
        int count = 0;
......
174 185
        partitioner.close();
175 186
    }
176 187

  
177
    /**
178
     * @param state
179
     * @param node
180
     * @return
181
     */
182 188
    private TaxonNode doSingleNode(FauEu2CdmImportState state, TaxonNode node) {
183 189
        TaxonNode result = null;
184 190
        logger.info(node.treeIndex());
......
224 230
    }
225 231

  
226 232
    private void handleParentTaxonNode(TaxonNode childNode) throws IllegalAccessException, InvocationTargetException, NoSuchFieldException, SecurityException, IllegalArgumentException, NoSuchMethodException {
227
        TaxonNode parent = detache(childNode.getParent());
228
        if (parent == null){
229
            return;
230
        }
233
        TaxonNode parent = detache(childNode.getParent(), true);
231 234
        //TODO
232 235
        String microReference = null;
233 236
        Reference reference = null;
234
        parent.addChildNode(childNode, reference, microReference);
237
        if (parent == null && childNode.getClassification().getRootNode().equals(childNode)){
238
            //do nothing
239
        }else if (parent == null ){
240
            childNode.getClassification().addChildNode(childNode, reference, microReference) ;
241
        }else{
242
            parent.addChildNode(childNode, reference, microReference);
243
        }
235 244
    }
236 245

  
237 246
    private void setInvisible(Object holder, String fieldName, Object value) throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException {
......
983 992
        return (IIntextReferenceTarget)detache((CdmBase)cdmBase);
984 993
    }
985 994

  
986

  
987 995
    private <T extends CdmBase> T detache(T cdmBase) throws IllegalAccessException, InvocationTargetException, NoSuchFieldException, SecurityException, IllegalArgumentException, NoSuchMethodException {
996
        return detache(cdmBase, false);
997
    }
998

  
999
    private <T extends CdmBase> T detache(T cdmBase, boolean notFromSource) throws IllegalAccessException, InvocationTargetException, NoSuchFieldException, SecurityException, IllegalArgumentException, NoSuchMethodException {
988 1000
        cdmBase = CdmBase.deproxy(cdmBase);
989 1001
        if (cdmBase == null ){
990 1002
            return cdmBase;
......
1006 1018
            logger.warn("Non persisted object not in cache and not in target DB. This should not happen: " + cdmBase.getUuid());
1007 1019
            return cdmBase; //should not happen anymore; either in cache or in target or persisted in source
1008 1020
        }else{
1009
            return (T)handlePersisted(cdmBase);
1021
            return notFromSource? null : (T)handlePersisted(cdmBase);
1010 1022
        }
1011 1023
    }
1012 1024

  
cdm-pesi/src/main/java/eu/etaxonomy/cdm/io/pesi/fauEu2Cdm/FauEu2CdmImportBase.java
23 23
    private static final long serialVersionUID = 8917991155285743172L;
24 24

  
25 25
    protected ICdmRepository source(FauEu2CdmImportState state){
26

  
27 26
        ICdmRepository repo = state.getSourceRepository();
28 27
        if (repo == null){
28
            System.out.println("start source repo");
29 29
            boolean omitTermLoading = true;
30 30
            repo = CdmApplicationController.NewInstance(state.getConfig().getSource(),
31 31
                    DbSchemaValidation.VALIDATE, omitTermLoading);
32 32
            state.setSourceRepository(repo);
33
            System.out.println("end source repo");
33 34
        }
34 35
        return repo;
35 36
    }
cdm-pesi/src/main/java/eu/etaxonomy/cdm/io/pesi/fauEu2Cdm/FauEu2CdmImportConfigurator.java
10 10

  
11 11
import eu.etaxonomy.cdm.database.ICdmDataSource;
12 12
import eu.etaxonomy.cdm.filter.TaxonNodeFilter;
13
import eu.etaxonomy.cdm.io.common.ITaxonNodeOutStreamPartitioner;
13 14
import eu.etaxonomy.cdm.io.common.ImportConfiguratorBase;
14 15
import eu.etaxonomy.cdm.io.common.mapping.IInputTransformer;
15 16
import eu.etaxonomy.cdm.model.reference.Reference;
......
26 27
    private static IInputTransformer myTransformer = null;
27 28

  
28 29
    private TaxonNodeFilter taxonNodeFilter = new TaxonNodeFilter();
30
    private ITaxonNodeOutStreamPartitioner partitioner;
29 31

  
30 32
    public static FauEu2CdmImportConfigurator NewInstance(ICdmDataSource source, ICdmDataSource destination) {
31 33
        return new FauEu2CdmImportConfigurator(source, destination);
......
64 66
        this.taxonNodeFilter = taxonNodeFilter;
65 67
    }
66 68

  
69
    public ITaxonNodeOutStreamPartitioner getPartitioner() {
70
        return partitioner;
71
    }
72
    public void setPartitioner(ITaxonNodeOutStreamPartitioner partitioner) {
73
        this.partitioner = partitioner;
74
    }
75

  
67 76

  
68 77
}

Also available in: Unified diff