1
|
/**
|
2
|
* Copyright (C) 2007 EDIT
|
3
|
* European Distributed Institute of Taxonomy
|
4
|
* http://www.e-taxonomy.eu
|
5
|
*
|
6
|
* The contents of this file are subject to the Mozilla Public License Version 1.1
|
7
|
* See LICENSE.TXT at the top of this package for the full license terms.
|
8
|
*/
|
9
|
|
10
|
package eu.etaxonomy.cdm.io.common;
|
11
|
|
12
|
import java.sql.ResultSet;
|
13
|
import java.sql.ResultSetMetaData;
|
14
|
import java.sql.SQLException;
|
15
|
import java.sql.Types;
|
16
|
import java.util.ArrayList;
|
17
|
import java.util.List;
|
18
|
import java.util.Map;
|
19
|
|
20
|
import org.apache.log4j.Logger;
|
21
|
import org.springframework.transaction.TransactionStatus;
|
22
|
|
23
|
import eu.etaxonomy.cdm.common.CdmUtils;
|
24
|
import eu.etaxonomy.cdm.model.common.CdmBase;
|
25
|
|
26
|
/**
|
27
|
* @author a.mueller
|
28
|
* @since 16.02.2010
|
29
|
*/
|
30
|
public class ResultSetPartitioner<STATE extends IPartitionedState> {
|
31
|
private static final Logger logger = Logger.getLogger(ResultSetPartitioner.class);
|
32
|
private final PartitionerProfiler profiler = new PartitionerProfiler();
|
33
|
|
34
|
//************************* STATIC ***************************************************/
|
35
|
|
36
|
public static <T extends IPartitionedState> ResultSetPartitioner<T> NewInstance(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
|
37
|
ResultSetPartitioner<T> resultSetPartitioner
|
38
|
= new ResultSetPartitioner<>(source, strIdQuery, strRecordQuery, partitionSize);
|
39
|
return resultSetPartitioner;
|
40
|
}
|
41
|
|
42
|
//*********************** VARIABLES *************************************************/
|
43
|
|
44
|
/**
|
45
|
* The database
|
46
|
*/
|
47
|
private final Source source;
|
48
|
|
49
|
/**
|
50
|
* The result set containing all records and at least the ids as a field. This result set
|
51
|
* will be used for partitioning
|
52
|
*/
|
53
|
private final ResultSet idResultSet;
|
54
|
|
55
|
/**
|
56
|
* A template for a SQL Query returning all records and all values needed for a partition
|
57
|
* to be handled. The 'where' condition is filled by replacing the templates '@IdList' token
|
58
|
*/
|
59
|
private final String strRecordQueryTemplate;
|
60
|
|
61
|
/**
|
62
|
* The resultset returned for the strRecordQueryTemplate
|
63
|
*/
|
64
|
private ResultSet partitionResultSet;
|
65
|
|
66
|
/**
|
67
|
* A 2-key map holding all related objects needed during the handling of a partition (e.g. when
|
68
|
* creating a taxon partition the map holds all taxon names.
|
69
|
* The key is a combination of a namespace and the id in the original source
|
70
|
*/
|
71
|
private Map<Object, Map<String, ? extends CdmBase>> relatedObjects;
|
72
|
|
73
|
/**
|
74
|
* Number of records handled in the partition
|
75
|
*/
|
76
|
private final int partitionSize;
|
77
|
|
78
|
/**
|
79
|
* Lists of ids handled in this partition (must be an array of lists because sometimes
|
80
|
* we have non-single keys
|
81
|
*/
|
82
|
private List<String>[] currentIdLists;
|
83
|
|
84
|
/**
|
85
|
* The sql type of the id columns.
|
86
|
* @see Types
|
87
|
*/
|
88
|
private int[] currentIdListType;
|
89
|
|
90
|
/**
|
91
|
* counter for the partitions
|
92
|
*/
|
93
|
private int currentPartition;
|
94
|
|
95
|
/**
|
96
|
* number of records in the current partition
|
97
|
*/
|
98
|
private int rowsInCurrentPartition;
|
99
|
|
100
|
private TransactionStatus txStatus;
|
101
|
|
102
|
//*********************** CONSTRUCTOR *************************************************/
|
103
|
|
104
|
private ResultSetPartitioner(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
|
105
|
ResultSet idResultSet = source.getResultSet(strIdQuery);
|
106
|
// if (! idResultSet.isBeforeFirst()){
|
107
|
// idResultSet.beforeFirst();
|
108
|
// }
|
109
|
this.source = source;
|
110
|
this.idResultSet = idResultSet;
|
111
|
this.strRecordQueryTemplate = strRecordQuery;
|
112
|
this.partitionSize = partitionSize;
|
113
|
}
|
114
|
|
115
|
//************************ METHODS ****************************************************/
|
116
|
|
117
|
|
118
|
/**
|
119
|
* Import the whole partition of an input stream by starting a transaction, getting the related objects
|
120
|
* stored in the destination, invoke the IOs partition handling and commit the transaction
|
121
|
* @param partitionedIO
|
122
|
*/
|
123
|
public void doPartition(IPartitionedIO<STATE> partitionedIO, STATE state) {
|
124
|
int i = 0;
|
125
|
try{
|
126
|
profiler.startTx();
|
127
|
TransactionStatus txStatus = getTransaction(partitionSize, partitionedIO);
|
128
|
|
129
|
i = 1;
|
130
|
state.makeTransactionalSourceReference(partitionedIO.getReferenceService());
|
131
|
|
132
|
profiler.startRs();
|
133
|
ResultSet rs = makePartitionResultSet();
|
134
|
i = 2;
|
135
|
profiler.startRelObjects();
|
136
|
this.relatedObjects = partitionedIO.getRelatedObjectsForPartition(rs, state);
|
137
|
i = 3;
|
138
|
state.setRelatedObjects(relatedObjects);
|
139
|
i = 4;
|
140
|
profiler.startRs2();
|
141
|
partitionResultSet = makePartitionResultSet();
|
142
|
i = 5;
|
143
|
profiler.startDoPartition();
|
144
|
partitionedIO.doPartition(this, state);
|
145
|
i = 6;
|
146
|
profiler.startDoCommit();
|
147
|
partitionedIO.commitTransaction(txStatus);
|
148
|
state.resetTransactionalSourceReference();
|
149
|
i = 7;
|
150
|
profiler.end();
|
151
|
state.setRelatedObjects(null);
|
152
|
i = 8;
|
153
|
logger.info("Saved " + getCurrentNumberOfRows() + " " + partitionedIO.getPluralString() );
|
154
|
profiler.print();
|
155
|
}catch(Exception e){
|
156
|
String message = "Exception (%s) occurred at position " + i + " while handling import partition for %s.";
|
157
|
message = String.format(message, e.getMessage(), partitionedIO.getPluralString());
|
158
|
e.printStackTrace();
|
159
|
throw new RuntimeException(message, e);
|
160
|
}
|
161
|
}
|
162
|
|
163
|
|
164
|
public void startDoSave(){
|
165
|
profiler.startDoSave();
|
166
|
}
|
167
|
|
168
|
/**
|
169
|
* Increases the partition counter and generates the <code>currentIdLists</code> for this partition
|
170
|
* @return
|
171
|
* @throws SQLException
|
172
|
*/
|
173
|
public boolean nextPartition() throws SQLException{
|
174
|
boolean result = false;
|
175
|
ResultSetMetaData metaData = idResultSet.getMetaData();
|
176
|
int nOfIdColumns = metaData.getColumnCount();
|
177
|
currentPartition++;
|
178
|
currentIdLists = new ArrayList[nOfIdColumns];
|
179
|
currentIdListType = new int[nOfIdColumns];
|
180
|
|
181
|
for (int col = 0; col< currentIdLists.length; col++){
|
182
|
currentIdLists[col] = new ArrayList<>();
|
183
|
currentIdListType[col] = metaData.getColumnType(col + 1);
|
184
|
}
|
185
|
List<String> currentIdList;
|
186
|
|
187
|
int i = 0;
|
188
|
//for each record
|
189
|
for (i = 0; i < partitionSize; i++){
|
190
|
if (idResultSet.next() == false){
|
191
|
break;
|
192
|
}
|
193
|
//for each column
|
194
|
for (int colIndex = 0; colIndex < nOfIdColumns; colIndex++){
|
195
|
Object oNextId = idResultSet.getObject(colIndex + 1);
|
196
|
String strNextId = String.valueOf(oNextId);
|
197
|
currentIdList = currentIdLists[colIndex];
|
198
|
currentIdList.add(strNextId);
|
199
|
}
|
200
|
result = true; //true if at least one record was read
|
201
|
}
|
202
|
rowsInCurrentPartition = i;
|
203
|
|
204
|
return result;
|
205
|
}
|
206
|
|
207
|
|
208
|
|
209
|
/**
|
210
|
* Returns the underlying resultSet holding all records needed to handle the partition.<BR>
|
211
|
* @return
|
212
|
*/
|
213
|
public ResultSet getResultSet(){
|
214
|
return partitionResultSet;
|
215
|
}
|
216
|
|
217
|
|
218
|
|
219
|
/**
|
220
|
* Computes the value result set needed to handle a partition by using the <code>currentIdList</code>
|
221
|
* created during {@link #nextPartition}
|
222
|
* @return ResultSet
|
223
|
*/
|
224
|
private ResultSet makePartitionResultSet(){
|
225
|
int nColumns = currentIdLists.length;
|
226
|
String[] strIdLists = new String[nColumns];
|
227
|
|
228
|
String strRecordQuery = strRecordQueryTemplate;
|
229
|
for (int col = 0; col < nColumns; col++){
|
230
|
for (String id: currentIdLists[col]){
|
231
|
id = addApostropheIfNeeded(id, currentIdListType[col]);
|
232
|
strIdLists[col] = CdmUtils.concat(",", strIdLists[col], id);
|
233
|
}
|
234
|
strRecordQuery = strRecordQuery.replaceFirst(IPartitionedIO.ID_LIST_TOKEN, strIdLists[col]);
|
235
|
}
|
236
|
|
237
|
ResultSet resultSet = ResultSetProxy.NewInstance(source.getResultSet(strRecordQuery));
|
238
|
|
239
|
return resultSet;
|
240
|
}
|
241
|
|
242
|
/**
|
243
|
* @param id
|
244
|
* @param i
|
245
|
* @return
|
246
|
*/
|
247
|
private String addApostropheIfNeeded(String id, int sqlType) {
|
248
|
String result = id;
|
249
|
if (isStringType(sqlType)){
|
250
|
result = "'" + id + "'";
|
251
|
}
|
252
|
return result;
|
253
|
}
|
254
|
|
255
|
/**
|
256
|
* @param sqlType
|
257
|
* @return
|
258
|
*/
|
259
|
private boolean isStringType(int sqlType) {
|
260
|
if(sqlType == Types.INTEGER){
|
261
|
return false; //standard case
|
262
|
}else if (sqlType == Types.CHAR || sqlType == Types.CLOB || sqlType == Types.NVARCHAR ||
|
263
|
sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR || sqlType == Types.NCHAR ||
|
264
|
sqlType == Types.LONGNVARCHAR || sqlType == Types.NCLOB){
|
265
|
return true;
|
266
|
}else{
|
267
|
return false;
|
268
|
}
|
269
|
}
|
270
|
|
271
|
public Map<String, ? extends CdmBase> getObjectMap(Object key){
|
272
|
Map<String, ? extends CdmBase> objectMap = this.relatedObjects.get(key);
|
273
|
return objectMap;
|
274
|
}
|
275
|
|
276
|
/**
|
277
|
*
|
278
|
*/
|
279
|
private int getCurrentNumberOfRows() {
|
280
|
return ((currentPartition - 1) * partitionSize + rowsInCurrentPartition);
|
281
|
}
|
282
|
|
283
|
|
284
|
/**
|
285
|
* @param recordsPerTransaction
|
286
|
* @param partitionedIO
|
287
|
* @param i
|
288
|
*/
|
289
|
protected TransactionStatus getTransaction(int recordsPerTransaction, IPartitionedIO partitionedIO) {
|
290
|
//if (loopNeedsHandling (i, recordsPerTransaction) || txStatus == null) {
|
291
|
txStatus = partitionedIO.startTransaction();
|
292
|
if(logger.isInfoEnabled()) {
|
293
|
logger.debug("currentPartitionNumber = " + currentPartition + " - Transaction started");
|
294
|
}
|
295
|
//}
|
296
|
return txStatus;
|
297
|
}
|
298
|
|
299
|
// ************************** Not Needed ?? **************************************************
|
300
|
|
301
|
// protected void doLogPerLoop(int recordsPerLog, String pluralString){
|
302
|
// int count = getAbsoluteRow() - 1;
|
303
|
// if ((count % recordsPerLog ) == 0 && count!= 0 ){
|
304
|
// logger.info(pluralString + " handled: " + (count));
|
305
|
// }
|
306
|
// }
|
307
|
//
|
308
|
//
|
309
|
|
310
|
|
311
|
|
312
|
// public boolean nextRow() throws SQLException{
|
313
|
// if (currentRowInPartition >= partitionSize ){
|
314
|
// return false;
|
315
|
// }else{
|
316
|
// currentRowInPartition++;
|
317
|
// return resultSet.next();
|
318
|
// }
|
319
|
// }
|
320
|
//
|
321
|
|
322
|
|
323
|
|
324
|
}
|