Project

General

Profile

Download (11.6 KB) Statistics
| Branch: | Tag: | Revision:
1
/**
2
* Copyright (C) 2007 EDIT
3
* European Distributed Institute of Taxonomy
4
* http://www.e-taxonomy.eu
5
*
6
* The contents of this file are subject to the Mozilla Public License Version 1.1
7
* See LICENSE.TXT at the top of this package for the full license terms.
8
*/
9

    
10
package eu.etaxonomy.cdm.io.common;
11

    
12
import java.util.ArrayList;
13
import java.util.Arrays;
14
import java.util.Iterator;
15
import java.util.LinkedList;
16
import java.util.List;
17

    
18
import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;
19
import org.springframework.transaction.TransactionStatus;
20

    
21
import eu.etaxonomy.cdm.api.application.ICdmRepository;
22
import eu.etaxonomy.cdm.common.monitor.IProgressMonitor;
23
import eu.etaxonomy.cdm.common.monitor.SubProgressMonitor;
24
import eu.etaxonomy.cdm.filter.TaxonNodeFilter;
25
import eu.etaxonomy.cdm.model.taxon.TaxonNode;
26
import eu.etaxonomy.cdm.persistence.query.OrderHint;
27
import eu.etaxonomy.cdm.persistence.query.OrderHint.SortOrder;
28

    
29
/**
30
 * @author a.mueller
31
 * @since 01.07.2017
32
 */
33
public class TaxonNodeOutStreamPartitioner<STATE extends IoStateBase>
34
        implements ITaxonNodeOutStreamPartitioner {
35

    
36
    @SuppressWarnings("unused")
37
    private static final Logger logger = LogManager.getLogger(TaxonNodeOutStreamPartitioner.class);
38

    
39

    
40
    private static final List<String> defaultPropertyPaths = Arrays.asList(new String[]{"taxon","taxon.name"});
41

    
42
    public static final List<String> fullPropertyPaths = Arrays.asList(new String[]{
43
            "*",
44
            "excludedNote.*",
45
            "classification.*",
46
            "classification.name.*",
47
            "classification.description.*",
48
            "classification.rootNode.*",
49
            "classification.rootNode.excludedNote.*",
50
            "parent.*",
51
            "agentRelations.*",
52
            "agentRelations.agent.*",
53
            "agentRelations.agent.sources.*",
54
            "agentRelations.agent.sources.citation.*",
55
            "agentRelations.agent.contact.*",
56
            "agentRelations.agent.institutionalMemberships.*",
57
            "agentRelations.agent.institutionalMemberships.institute.*",
58
            "agentRelations.agent.institutionalMemberships.institute.contact.*",
59
            "agentRelations.type.*",
60
            "agentRelations.type.representations.*",
61
            "taxon.*",
62
            "taxon.extensions.type.*",
63
            "taxon.extensions.type.representations.*",
64
            "taxon.extensions.type.vocabulary.*",
65
            "taxon.extensions.type.vocabulary.representations.*",
66
            "taxon.extensions.type.vocabulary.termRelations.*",
67
            "taxon.sources.*",
68

    
69
            "taxon.name.*",
70
            "taxon.name.relationsFromThisName.*",
71
            "taxon.name.relationsToThisName.*",
72
            "taxon.name.sources.*",
73
            "taxon.name.extensions.type.*",
74
            "taxon.name.extensions.type.representations.*",
75
            "taxon.name.extensions.type.vocabulary.*",
76
            "taxon.name.extensions.type.vocabulary.terms.*",
77
            "taxon.name.extensions.type.vocabulary.terms.type.*",
78
            "taxon.name.extensions.type.vocabulary.terms.representations.*",
79
            "taxon.name.homotypicalGroup.*",
80

    
81
            "taxon.synonyms.*",
82
            "taxon.synonyms.name.*",
83
            "taxon.synonyms.name.relationsFromThisName.*",
84
            "taxon.synonyms.name.relationsToThisName.*",
85
            "taxon.synonyms.name.sources.*",
86
            "taxon.synonyms.markers.type.*",
87
            "taxon.synonyms.markers.type.representations.*",
88
            "taxon.synonyms.markers.type.vocabulary.*",
89
            "taxon.synonyms.markers.type.vocabulary.terms.*",
90
            "taxon.synonyms.markers.type.vocabulary.terms.type.*",
91
            "taxon.synonyms.markers.type.vocabulary.terms.representations.*",
92

    
93
            "taxon.name.combinationAuthorship.*",
94
            "taxon.name.combinationAuthorship.sources.*",
95
            "taxon.name.combinationAuthorship.contact.*",
96
            "taxon.name.combinationAuthorship.teamMembers.*",
97
            "taxon.name.combinationAuthorship.teamMembers.contact.*",
98
            "taxon.name.combinationAuthorship.teamMembers.sources.*",
99

    
100
            "taxon.name.exCombinationAuthorship.*",
101
            "taxon.name.basionymAuthorship.*",
102
            "taxon.name.basionymAuthorship.sources.*",
103
            "taxon.name.basionymAuthorship.contact.*",
104
            "taxon.name.basionymAuthorship.teamMembers.*",
105
            "taxon.name.basionymAuthorship.teamMembers.contact.*",
106
            "taxon.name.basionymAuthorship.teamMembers.sources.*",
107
            "taxon.name.exBasionymAuthorship.*",
108

    
109
            "taxon.descriptions.*",
110
            "taxon.descriptions.elements",
111
            "taxon.descriptions.elements.*",
112
            "taxon.descriptions.elements.modifyingText.*",
113
            "taxon.descriptions.elements.sources.*",
114
            "taxon.descriptions.elements.sources.citation.*",
115
            "taxon.descriptions.elements.area.*",
116
            "taxon.descriptions.elements.area.representations.*",
117
            "taxon.descriptions.elements.area.annotations.*",
118
            "taxon.descriptions.elements.area.vocabulary.*",
119
            "taxon.descriptions.elements.area.vocabulary.terms.*",
120
            "taxon.descriptions.elements.area.vocabulary.terms.type.*",
121
            "taxon.descriptions.elements.area.vocabulary.terms.annotations.*",
122
            "taxon.descriptions.elements.area.vocabulary.terms.representations.*",
123
//            "taxon.descriptions.elements.area.vocabulary.terms.representations.annotations.*",
124
            "taxon.descriptions.elements.area.vocabulary.representations.*",
125

    
126
    });
127

    
128
//************************* STATIC ***************************************************/
129

    
130
	public static <ST  extends IoStateBase>  TaxonNodeOutStreamPartitioner NewInstance(
131
	        ICdmRepository repository, ST state,
132
            TaxonNodeFilter filter, Integer partitionSize,
133
            IProgressMonitor parentMonitor, Integer parentTicks){
134

    
135
	    TaxonNodeOutStreamPartitioner<ST> taxonNodePartitioner
136
		        = new TaxonNodeOutStreamPartitioner(repository, state, filter, partitionSize,
137
		                parentMonitor, parentTicks, null);
138
		return taxonNodePartitioner;
139
	}
140

    
141
    public static <ST  extends IoStateBase> TaxonNodeOutStreamPartitioner NewInstance(
142
            ICdmRepository repository, ST state,
143
            TaxonNodeFilter filter, Integer partitionSize,
144
            IProgressMonitor parentMonitor, Integer parentTicks, List<String> propertyPath){
145

    
146
        TaxonNodeOutStreamPartitioner<ST> taxonNodePartitioner
147
                = new TaxonNodeOutStreamPartitioner(repository, state, filter, partitionSize,
148
                        parentMonitor, parentTicks, propertyPath);
149
        return taxonNodePartitioner;
150
    }
151

    
152
//*********************** VARIABLES *************************************************/
153

    
154

    
155
	/**
156
	 * counter for the partitions
157
	 */
158
	private int currentPartition;
159

    
160
	private TransactionStatus txStatus;
161

    
162
    private boolean readOnly = true;
163

    
164
    /**
165
     * If <code>true</code> the final commit/rollback is executed only by calling
166
     * {@link #close()}
167
     */
168
    private boolean lastCommitManually = false;
169

    
170

    
171

    
172
    private List<String> propertyPaths = defaultPropertyPaths;
173

    
174
//******************
175

    
176

    
177
    private final ICdmRepository repository;
178

    
179
	private final TaxonNodeFilter filter;
180

    
181
	private STATE state;
182

    
183
    /**
184
     * Number of records handled in the partition
185
     */
186
	private final int partitionSize;
187

    
188
	private int totalCount = -1;
189
	private List<Integer> idList;
190

    
191
	private IProgressMonitor parentMonitor;
192
	private Integer parentTicks;
193

    
194
	private SubProgressMonitor monitor;
195

    
196
	private LinkedList<TaxonNode> fifo = new LinkedList<>();
197

    
198
	private Iterator<Integer> idIterator;
199

    
200
	private int currentIndex;
201

    
202
	private static final int retrieveFactor = 1;
203
	private static final int iterateFactor = 2;
204

    
205

    
206
	//*********************** CONSTRUCTOR *************************************************/
207

    
208
	private TaxonNodeOutStreamPartitioner(ICdmRepository repository, STATE state,
209
	        TaxonNodeFilter filter, Integer partitionSize,
210
	        IProgressMonitor parentMonitor, Integer parentTicks, List<String> propertyPaths){
211
		this.repository = repository;
212
		this.filter = filter;
213
		this.partitionSize = partitionSize;
214
		this.state = state;
215
		this.parentMonitor = parentMonitor;
216
		this.parentTicks = parentTicks;
217
		if (propertyPaths != null){
218
		    this.propertyPaths = propertyPaths;
219
		}
220
	}
221

    
222
//************************ METHODS ****************************************************/
223

    
224
    public void initialize(){
225
	    if (totalCount < 0){
226

    
227
	        parentMonitor.subTask("Compute total number of records");
228
	        totalCount = ((Long)repository.getTaxonNodeService().count(filter)).intValue();
229
	        idList = repository.getTaxonNodeService().idList(filter);
230
	        int parTicks = this.parentTicks == null? totalCount : this.parentTicks;
231

    
232
	        monitor = SubProgressMonitor.NewStarted(parentMonitor, parTicks,
233
	                "Taxon node streamer", totalCount * (retrieveFactor +  iterateFactor));
234
	        idIterator = idList.iterator();
235
	        monitor.subTask("id iterator created");
236
	    }
237
	}
238

    
239

    
240
	@Override
241
    public TaxonNode next(){
242
	    int currentIndexAtStart = currentIndex;
243
	    initialize();
244
	    if(fifo.isEmpty()){
245
	        List<TaxonNode> list = getNextPartition();
246
	        fifo.addAll(list);
247
	    }
248
	    if (!fifo.isEmpty()){
249
	        TaxonNode result = fifo.removeFirst();
250
	        // worked should be called after each step is ready,
251
	        //this is usually after each next() call but not for the first
252
	        if (currentIndexAtStart > 0){
253
	            monitor.worked(iterateFactor);
254
	        }
255
	        return result;
256
	    }else{
257
	        if(!lastCommitManually){
258
	            commitTransaction();
259
	        }
260
	        return null;
261
	    }
262
	}
263

    
264
	@Override
265
    public void close(){
266
	    monitor.done();
267
	    commitTransaction();
268
	}
269

    
270
    private List<TaxonNode> getNextPartition() {
271
        List<Integer> partList = new ArrayList<>();
272

    
273
        if (txStatus != null){
274
            commitTransaction();
275
        }
276

    
277
        txStatus = startTransaction();
278
//        if (readOnly){
279
//            txStatus.setRollbackOnly();  //unclear if this is correct way to handle rollback, see comment on method
280
//        }
281
        while (partList.size() < partitionSize && idIterator.hasNext()){
282
            partList.add(idIterator.next());
283
            currentIndex++;
284
        }
285
        List<TaxonNode> partition = new ArrayList<>();
286
        if (!partList.isEmpty()){
287
            monitor.subTask(String.format("Reading partition %d/%d", currentPartition + 1, (totalCount / partitionSize) +1 ));
288
            OrderHint orderHint = new OrderHint("treeIndex", SortOrder.ASCENDING);
289
            List<OrderHint> orderHints = Arrays.asList(new OrderHint[]{orderHint});
290
            partition = repository.getTaxonNodeService().loadByIds(partList, orderHints, propertyPaths);
291
            monitor.worked(partition.size());
292
            currentPartition++;
293
            monitor.subTask(String.format("Writing partition %d/%d", currentPartition, (totalCount / partitionSize) +1 ));
294
        }
295
        return partition;
296
    }
297

    
298
    private void commitTransaction() {
299
        if (!txStatus.isCompleted()){
300
            if (this.readOnly){
301
                repository.rollbackTransaction(txStatus);
302
            }else{
303
                repository.commitTransaction(txStatus);
304
            }
305
        }
306
    }
307

    
308
    private TransactionStatus startTransaction() {
309
        return repository.startTransaction(readOnly);
310
    }
311

    
312
    @Override
313
    public void setReadOnly(boolean readOnly) {
314
        this.readOnly = readOnly;
315
    }
316

    
317
    public boolean isLastCommitManually() {
318
        return lastCommitManually;
319
    }
320

    
321
    public void setLastCommitManually(boolean lastCommitManually) {
322
        this.lastCommitManually = lastCommitManually;
323
    }
324
}
(57-57/65)