cleanup
[cdmlib.git] / cdmlib-io / src / main / java / eu / etaxonomy / cdm / io / common / TaxonNodeOutStreamPartitioner.java
1 /**
2 * Copyright (C) 2007 EDIT
3 * European Distributed Institute of Taxonomy
4 * http://www.e-taxonomy.eu
5 *
6 * The contents of this file are subject to the Mozilla Public License Version 1.1
7 * See LICENSE.TXT at the top of this package for the full license terms.
8 */
9
10 package eu.etaxonomy.cdm.io.common;
11
12 import java.util.ArrayList;
13 import java.util.Iterator;
14 import java.util.LinkedList;
15 import java.util.List;
16
17 import org.apache.log4j.Logger;
18 import org.springframework.transaction.TransactionStatus;
19
20 import eu.etaxonomy.cdm.api.application.ICdmRepository;
21 import eu.etaxonomy.cdm.common.monitor.IProgressMonitor;
22 import eu.etaxonomy.cdm.common.monitor.SubProgressMonitor;
23 import eu.etaxonomy.cdm.filter.TaxonNodeFilter;
24 import eu.etaxonomy.cdm.model.taxon.TaxonNode;
25
26 /**
27 * @author a.mueller
28 * @since 01.07.2017
29 */
30 public class TaxonNodeOutStreamPartitioner<STATE extends IoStateBase> {
31
32 private static final Logger logger = Logger.getLogger(TaxonNodeOutStreamPartitioner.class);
33
34 //************************* STATIC ***************************************************/
35
36 public static <ST extends XmlExportState> TaxonNodeOutStreamPartitioner NewInstance(
37 ICdmRepository repository, IoStateBase state,
38 TaxonNodeFilter filter, Integer partitionSize,
39 IProgressMonitor parentMonitor, Integer parentTicks){
40 TaxonNodeOutStreamPartitioner<ST> taxonNodePartitioner
41 = new TaxonNodeOutStreamPartitioner(repository, state, filter, partitionSize,
42 parentMonitor, parentTicks);
43 return taxonNodePartitioner;
44 }
45
46 //*********************** VARIABLES *************************************************/
47
48
49 /**
50 * counter for the partitions
51 */
52 private int currentPartition;
53
54 private TransactionStatus txStatus;
55
56 private boolean readOnly = true;
57
58 //******************
59
60 private final ICdmRepository repository;
61
62 private final TaxonNodeFilter filter;
63
64 private STATE state;
65
66 /**
67 * Number of records handled in the partition
68 */
69 private final int partitionSize;
70
71 private int totalCount = -1;
72 private List<Integer> idList;
73
74 private IProgressMonitor parentMonitor;
75 private Integer parentTicks;
76
77 private SubProgressMonitor monitor;
78
79 private LinkedList<TaxonNode> fifo = new LinkedList<>();
80
81 private Iterator<Integer> idIterator;
82
83 private int currentIndex;
84
85 private static final int retrieveFactor = 1;
86 private static final int iterateFactor = 2;
87
88
89 //*********************** CONSTRUCTOR *************************************************/
90
91 private TaxonNodeOutStreamPartitioner(ICdmRepository repository, STATE state,
92 TaxonNodeFilter filter, Integer partitionSize,
93 IProgressMonitor parentMonitor, Integer parentTicks){
94 this.repository = repository;
95 this.filter = filter;
96 this.partitionSize = partitionSize;
97 this.state = state;
98 this.parentMonitor = parentMonitor;
99 this.parentTicks = parentTicks;
100 }
101
102 //************************ METHODS ****************************************************/
103
104 public void initialize(){
105 if (totalCount < 0){
106
107 parentMonitor.subTask("Compute total number of records");
108 totalCount = ((Long)repository.getTaxonNodeService().count(filter)).intValue();
109 idList = repository.getTaxonNodeService().idList(filter);
110 int parTicks = this.parentTicks == null? totalCount : this.parentTicks;
111
112 monitor = SubProgressMonitor.NewStarted(parentMonitor, parTicks,
113 "Taxon node streamer", totalCount * (retrieveFactor + iterateFactor));
114 idIterator = idList.iterator();
115 monitor.subTask("id iterator created");
116 }
117 }
118
119 public TaxonNode next(){
120 int currentIndexAtStart = currentIndex;
121 initialize();
122 if(fifo.isEmpty()){
123 List<TaxonNode> list = getNextPartition();
124 fifo.addAll(list);
125 }
126 if (!fifo.isEmpty()){
127 TaxonNode result = fifo.removeFirst();
128 // worked should be called after each step is ready,
129 //this is usually after each next() call but not for the first
130 if (currentIndexAtStart > 0){
131 monitor.worked(iterateFactor);
132 }
133 return result;
134 }else{
135 commitTransaction();
136 return null;
137 }
138 }
139
140 public void close(){
141 monitor.done();
142 commitTransaction();
143 }
144
145 private List<TaxonNode> getNextPartition() {
146 List<Integer> partList = new ArrayList<>();
147
148 if (txStatus != null){
149 commitTransaction();
150 }
151 txStatus = startTransaction();
152 if (readOnly){
153 txStatus.setRollbackOnly();
154 }
155 while (partList.size() < partitionSize && idIterator.hasNext()){
156 partList.add(idIterator.next());
157 currentIndex++;
158 }
159 List<TaxonNode> partition = new ArrayList<>();
160 if (!partList.isEmpty()){
161 monitor.subTask(String.format("Reading partition %d/%d", currentPartition + 1, (totalCount / partitionSize) +1 ));
162 List<String> propertyPaths = new ArrayList<String>();
163 propertyPaths.add("taxon");
164 propertyPaths.add("taxon.name");
165 partition = repository.getTaxonNodeService().loadByIds(partList, propertyPaths);
166 monitor.worked(partition.size());
167 currentPartition++;
168 monitor.subTask(String.format("Writing partition %d/%d", currentPartition, (totalCount / partitionSize) +1 ));
169 }
170 return partition;
171 }
172
173 private void commitTransaction() {
174 if (!txStatus.isCompleted()){
175 if (this.readOnly){
176 repository.rollback(txStatus);
177 }else{
178 repository.commitTransaction(txStatus);
179 }
180 }
181 }
182
183 private TransactionStatus startTransaction() {
184 return repository.startTransaction(readOnly);
185 }
186
187 }