1
|
/**
|
2
|
* Copyright (C) 2007 EDIT
|
3
|
* European Distributed Institute of Taxonomy
|
4
|
* http://www.e-taxonomy.eu
|
5
|
*
|
6
|
* The contents of this file are subject to the Mozilla Public License Version 1.1
|
7
|
* See LICENSE.TXT at the top of this package for the full license terms.
|
8
|
*/
|
9
|
|
10
|
package eu.etaxonomy.cdm.io.common;
|
11
|
|
12
|
import java.sql.ResultSet;
|
13
|
import java.sql.ResultSetMetaData;
|
14
|
import java.sql.SQLException;
|
15
|
import java.sql.Types;
|
16
|
import java.util.ArrayList;
|
17
|
import java.util.List;
|
18
|
import java.util.Map;
|
19
|
|
20
|
import org.apache.log4j.Logger;
|
21
|
import org.springframework.transaction.TransactionStatus;
|
22
|
|
23
|
import eu.etaxonomy.cdm.common.CdmUtils;
|
24
|
import eu.etaxonomy.cdm.model.common.CdmBase;
|
25
|
|
26
|
/**
|
27
|
* @author a.mueller
|
28
|
* @since 16.02.2010
|
29
|
*/
|
30
|
public class ResultSetPartitioner<STATE extends IPartitionedState> {
|
31
|
private static final Logger logger = Logger.getLogger(ResultSetPartitioner.class);
|
32
|
private final PartitionerProfiler profiler = new PartitionerProfiler();
|
33
|
|
34
|
//************************* STATIC ***************************************************/
|
35
|
|
36
|
public static <T extends IPartitionedState> ResultSetPartitioner<T> NewInstance(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
|
37
|
ResultSetPartitioner<T> resultSetPartitioner
|
38
|
= new ResultSetPartitioner<>(source, strIdQuery, strRecordQuery, partitionSize);
|
39
|
return resultSetPartitioner;
|
40
|
}
|
41
|
|
42
|
//*********************** VARIABLES *************************************************/
|
43
|
|
44
|
/**
|
45
|
* The database
|
46
|
*/
|
47
|
private final Source source;
|
48
|
|
49
|
/**
|
50
|
* The result set containing all records and at least the ids as a field. This result set
|
51
|
* will be used for partitioning
|
52
|
*/
|
53
|
private final ResultSet idResultSet;
|
54
|
|
55
|
/**
|
56
|
* A template for a SQL Query returning all records and all values needed for a partition
|
57
|
* to be handled. The 'where' condition is filled by replacing the templates '@IdList' token
|
58
|
*/
|
59
|
private final String strRecordQueryTemplate;
|
60
|
|
61
|
/**
|
62
|
* The resultset returned for the strRecordQueryTemplate
|
63
|
*/
|
64
|
private ResultSet partitionResultSet;
|
65
|
|
66
|
/**
|
67
|
* A 2-key map holding all related objects needed during the handling of a partition (e.g. when
|
68
|
* creating a taxon partition the map holds all taxon names.
|
69
|
* The key is a combination of a namespace and the id in the original source
|
70
|
*/
|
71
|
private Map<Object, Map<String, ? extends CdmBase>> relatedObjects;
|
72
|
|
73
|
/**
|
74
|
* Number of records handled in the partition
|
75
|
*/
|
76
|
private final int partitionSize;
|
77
|
|
78
|
/**
|
79
|
* Lists of ids handled in this partition (must be an array of lists because sometimes
|
80
|
* we have non-single keys
|
81
|
*/
|
82
|
private List<String>[] currentIdLists;
|
83
|
|
84
|
/**
|
85
|
* The sql type of the id columns.
|
86
|
* @see Types
|
87
|
*/
|
88
|
private int[] currentIdListType;
|
89
|
|
90
|
private String lastPartitionHighestIDs;
|
91
|
|
92
|
boolean nextAlreadyCalled = false;
|
93
|
|
94
|
/**
|
95
|
* counter for the partitions
|
96
|
*/
|
97
|
private int currentPartition;
|
98
|
|
99
|
|
100
|
/**
|
101
|
* counter for all records
|
102
|
*/
|
103
|
private int allRecords;
|
104
|
|
105
|
/**
|
106
|
* number of records in the current partition
|
107
|
*/
|
108
|
private int rowsInCurrentPartition;
|
109
|
|
110
|
private TransactionStatus txStatus;
|
111
|
|
112
|
//*********************** CONSTRUCTOR *************************************************/
|
113
|
|
114
|
private ResultSetPartitioner(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
|
115
|
ResultSet idResultSet = source.getResultSet(strIdQuery);
|
116
|
// if (! idResultSet.isBeforeFirst()){
|
117
|
// idResultSet.beforeFirst();
|
118
|
// }
|
119
|
this.source = source;
|
120
|
this.idResultSet = idResultSet;
|
121
|
this.strRecordQueryTemplate = strRecordQuery;
|
122
|
this.partitionSize = partitionSize;
|
123
|
}
|
124
|
|
125
|
//************************ METHODS ****************************************************/
|
126
|
|
127
|
|
128
|
/**
|
129
|
* Import the whole partition of an input stream by starting a transaction, getting the related objects
|
130
|
* stored in the destination, invoke the IOs partition handling and commit the transaction
|
131
|
* @param partitionedIO
|
132
|
*/
|
133
|
public void doPartition(IPartitionedIO<STATE> partitionedIO, STATE state) {
|
134
|
int i = 0;
|
135
|
try{
|
136
|
profiler.startTx();
|
137
|
TransactionStatus txStatus = getTransaction(partitionSize, partitionedIO);
|
138
|
|
139
|
i = 1;
|
140
|
state.makeTransactionalSourceReference(partitionedIO.getReferenceService());
|
141
|
|
142
|
profiler.startRs();
|
143
|
ResultSet rs = makePartitionResultSet();
|
144
|
i = 2;
|
145
|
profiler.startRelObjects();
|
146
|
this.relatedObjects = partitionedIO.getRelatedObjectsForPartition(rs, state);
|
147
|
i = 3;
|
148
|
state.setRelatedObjects(relatedObjects);
|
149
|
i = 4;
|
150
|
profiler.startRs2();
|
151
|
partitionResultSet = makePartitionResultSet();
|
152
|
i = 5;
|
153
|
profiler.startDoPartition();
|
154
|
partitionedIO.doPartition(this, state);
|
155
|
i = 6;
|
156
|
profiler.startDoCommit();
|
157
|
partitionedIO.commitTransaction(txStatus);
|
158
|
state.resetTransactionalSourceReference();
|
159
|
i = 7;
|
160
|
profiler.end();
|
161
|
state.setRelatedObjects(null);
|
162
|
i = 8;
|
163
|
logger.info("Saved " + getCurrentNumberOfRows() + " " + partitionedIO.getPluralString() );
|
164
|
profiler.print();
|
165
|
}catch(Exception e){
|
166
|
String message = "Exception (%s) occurred at position " + i + " while handling import partition for %s.";
|
167
|
message = String.format(message, e.getMessage(), partitionedIO.getPluralString());
|
168
|
e.printStackTrace();
|
169
|
throw new RuntimeException(message, e);
|
170
|
}
|
171
|
}
|
172
|
|
173
|
|
174
|
public void startDoSave(){
|
175
|
profiler.startDoSave();
|
176
|
}
|
177
|
|
178
|
/**
|
179
|
* Increases the partition counter and generates the <code>currentIdLists</code> for this partition
|
180
|
* @return
|
181
|
* @throws SQLException
|
182
|
*/
|
183
|
public boolean nextPartition() throws SQLException{
|
184
|
boolean result = false;
|
185
|
ResultSetMetaData metaData = idResultSet.getMetaData();
|
186
|
int nOfIdColumns = metaData.getColumnCount();
|
187
|
currentPartition++;
|
188
|
|
189
|
currentIdLists = new List[nOfIdColumns];
|
190
|
currentIdListType = new int[nOfIdColumns];
|
191
|
|
192
|
for (int col = 0; col< currentIdLists.length; col++){
|
193
|
currentIdLists[col] = new ArrayList<>();
|
194
|
currentIdListType[col] = metaData.getColumnType(col + 1);
|
195
|
}
|
196
|
|
197
|
int i = 0;
|
198
|
//for each record
|
199
|
for (i = 0; i < partitionSize || !firstIdIsNew(); i++){
|
200
|
if( !nextAlreadyCalled){
|
201
|
if (!idResultSet.next()){
|
202
|
break;
|
203
|
}
|
204
|
}else if (idResultSet.isAfterLast()){
|
205
|
break;
|
206
|
}
|
207
|
nextAlreadyCalled = false;
|
208
|
allRecords++;
|
209
|
//for each column
|
210
|
for (int colIndex = 0; colIndex < nOfIdColumns; colIndex++){
|
211
|
|
212
|
String strNextId = String.valueOf(idResultSet.getObject(colIndex + 1));
|
213
|
List<String> currentIdList = currentIdLists[colIndex];
|
214
|
currentIdList.add(strNextId);
|
215
|
if(colIndex == 0){
|
216
|
lastPartitionHighestIDs = strNextId;
|
217
|
}
|
218
|
}
|
219
|
result = true; //true if at least one record was read
|
220
|
}
|
221
|
rowsInCurrentPartition = i;
|
222
|
|
223
|
return result;
|
224
|
}
|
225
|
|
226
|
/**
|
227
|
* Checks if the current partition may have duplicates that were handled already.
|
228
|
* This may happen if the result set has >1 columns and if the first column does
|
229
|
* not change it value with the first record but with a later record and the
|
230
|
* following columns jump back with there values.
|
231
|
* E.g. first result set contains (x1=1,x2=3) and with the second result set
|
232
|
* we ask for x1 in (1,2) x2 in (1-5, 10-15) where the 1-5 comes from x1=2.
|
233
|
* This should not happen and therefore we increase the partition a bit such that
|
234
|
* the first column always changes its value and therefore it is guranteed that
|
235
|
* such duplicates will never exist.
|
236
|
* The reason for this problem is, that we do not create tuples in the WHERE clause
|
237
|
* of getIdRecord but we handle the range for each column separately. This is not correct
|
238
|
* but handling of tuples is more difficult in SQL.
|
239
|
* As multiple columns do not appear so often this workaround seems acceptable.
|
240
|
* @return
|
241
|
* @throws SQLException
|
242
|
*/
|
243
|
private boolean firstIdIsNew() throws SQLException {
|
244
|
String last = lastPartitionHighestIDs;
|
245
|
if (!idResultSet.next()){
|
246
|
return true;
|
247
|
}
|
248
|
nextAlreadyCalled = true;
|
249
|
String current = String.valueOf(idResultSet.getObject(1));
|
250
|
return !current.equals(last);
|
251
|
}
|
252
|
|
253
|
/**
|
254
|
* Returns the underlying resultSet holding all records needed to handle the partition.<BR>
|
255
|
* @return
|
256
|
*/
|
257
|
public ResultSet getResultSet(){
|
258
|
return partitionResultSet;
|
259
|
}
|
260
|
|
261
|
/**
|
262
|
* Computes the value result set needed to handle a partition by using the <code>currentIdList</code>
|
263
|
* created during {@link #nextPartition}
|
264
|
* @return ResultSet
|
265
|
*/
|
266
|
private ResultSet makePartitionResultSet(){
|
267
|
int nColumns = currentIdLists.length;
|
268
|
String[] strIdLists = new String[nColumns];
|
269
|
|
270
|
String strRecordQuery = strRecordQueryTemplate;
|
271
|
for (int col = 0; col < nColumns; col++){
|
272
|
for (String id: currentIdLists[col]){
|
273
|
id = addApostropheIfNeeded(id, currentIdListType[col]);
|
274
|
strIdLists[col] = CdmUtils.concat(",", strIdLists[col], id);
|
275
|
}
|
276
|
strRecordQuery = strRecordQuery.replaceFirst(IPartitionedIO.ID_LIST_TOKEN, strIdLists[col]);
|
277
|
}
|
278
|
|
279
|
ResultSet resultSet = ResultSetProxy.NewInstance(source.getResultSet(strRecordQuery));
|
280
|
|
281
|
return resultSet;
|
282
|
}
|
283
|
|
284
|
private String addApostropheIfNeeded(String id, int sqlType) {
|
285
|
String result = id;
|
286
|
if (isStringType(sqlType)){
|
287
|
result = "'" + id + "'";
|
288
|
}
|
289
|
return result;
|
290
|
}
|
291
|
|
292
|
private boolean isStringType(int sqlType) {
|
293
|
if(sqlType == Types.INTEGER){
|
294
|
return false; //standard case
|
295
|
}else if (sqlType == Types.CHAR || sqlType == Types.CLOB || sqlType == Types.NVARCHAR ||
|
296
|
sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR || sqlType == Types.NCHAR ||
|
297
|
sqlType == Types.LONGNVARCHAR || sqlType == Types.NCLOB){
|
298
|
return true;
|
299
|
}else{
|
300
|
return false;
|
301
|
}
|
302
|
}
|
303
|
|
304
|
public Map<String, ? extends CdmBase> getObjectMap(Object key){
|
305
|
Map<String, ? extends CdmBase> objectMap = this.relatedObjects.get(key);
|
306
|
return objectMap;
|
307
|
}
|
308
|
|
309
|
/**
|
310
|
*
|
311
|
*/
|
312
|
private int getCurrentNumberOfRows() {
|
313
|
return allRecords;
|
314
|
// return ((currentPartition - 1) * partitionSize + rowsInCurrentPartition);
|
315
|
}
|
316
|
|
317
|
|
318
|
/**
|
319
|
* @param recordsPerTransaction
|
320
|
* @param partitionedIO
|
321
|
* @param i
|
322
|
*/
|
323
|
protected TransactionStatus getTransaction(int recordsPerTransaction, IPartitionedIO partitionedIO) {
|
324
|
//if (loopNeedsHandling (i, recordsPerTransaction) || txStatus == null) {
|
325
|
txStatus = partitionedIO.startTransaction();
|
326
|
if(logger.isInfoEnabled()) {
|
327
|
logger.debug("currentPartitionNumber = " + currentPartition + " - Transaction started");
|
328
|
}
|
329
|
//}
|
330
|
return txStatus;
|
331
|
}
|
332
|
|
333
|
// ************************** Not Needed ?? **************************************************
|
334
|
|
335
|
// protected void doLogPerLoop(int recordsPerLog, String pluralString){
|
336
|
// int count = getAbsoluteRow() - 1;
|
337
|
// if ((count % recordsPerLog ) == 0 && count!= 0 ){
|
338
|
// logger.info(pluralString + " handled: " + (count));
|
339
|
// }
|
340
|
// }
|
341
|
//
|
342
|
//
|
343
|
|
344
|
|
345
|
|
346
|
// public boolean nextRow() throws SQLException{
|
347
|
// if (currentRowInPartition >= partitionSize ){
|
348
|
// return false;
|
349
|
// }else{
|
350
|
// currentRowInPartition++;
|
351
|
// return resultSet.next();
|
352
|
// }
|
353
|
// }
|
354
|
//
|
355
|
|
356
|
|
357
|
|
358
|
}
|