1
|
// $Id$
|
2
|
/**
|
3
|
* Copyright (C) 2007 EDIT
|
4
|
* European Distributed Institute of Taxonomy
|
5
|
* http://www.e-taxonomy.eu
|
6
|
*
|
7
|
* The contents of this file are subject to the Mozilla Public License Version 1.1
|
8
|
* See LICENSE.TXT at the top of this package for the full license terms.
|
9
|
*/
|
10
|
|
11
|
package eu.etaxonomy.cdm.io.common;
|
12
|
|
13
|
import java.sql.ResultSet;
|
14
|
import java.sql.ResultSetMetaData;
|
15
|
import java.sql.SQLException;
|
16
|
import java.sql.Types;
|
17
|
import java.util.ArrayList;
|
18
|
import java.util.List;
|
19
|
import java.util.Map;
|
20
|
|
21
|
import org.apache.log4j.Logger;
|
22
|
import org.springframework.transaction.TransactionStatus;
|
23
|
|
24
|
import eu.etaxonomy.cdm.common.CdmUtils;
|
25
|
import eu.etaxonomy.cdm.model.common.CdmBase;
|
26
|
|
27
|
/**
|
28
|
* @author a.mueller
|
29
|
* @created 16.02.2010
|
30
|
* @version 1.0
|
31
|
*/
|
32
|
public class ResultSetPartitioner<STATE extends IPartitionedState> {
|
33
|
private static final Logger logger = Logger.getLogger(ResultSetPartitioner.class);
|
34
|
private PartitionerProfiler profiler = new PartitionerProfiler();
|
35
|
|
36
|
//************************* STATIC ***************************************************/
|
37
|
|
38
|
public static ResultSetPartitioner NewInstance(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
|
39
|
ResultSetPartitioner resultSetPartitioner = new ResultSetPartitioner(source, strIdQuery, strRecordQuery, partitionSize);
|
40
|
return resultSetPartitioner;
|
41
|
}
|
42
|
|
43
|
//*********************** VARIABLES *************************************************/
|
44
|
|
45
|
/**
|
46
|
* The database
|
47
|
*/
|
48
|
private Source source;
|
49
|
|
50
|
/**
|
51
|
* The result set containing all records and at least the ids as a field. This result set
|
52
|
* will be used for partitioning
|
53
|
*/
|
54
|
private ResultSet idResultSet;
|
55
|
|
56
|
/**
|
57
|
* A template for a SQL Query returning all records and all values needed for a partition
|
58
|
* to be handled. The 'where' condition is filled by replacing the templates '@IdList' token
|
59
|
*/
|
60
|
private String strRecordQueryTemplate;
|
61
|
|
62
|
/**
|
63
|
* The resultset returned for the strRecordQueryTemplate
|
64
|
*/
|
65
|
private ResultSet partitionResultSet;
|
66
|
|
67
|
/**
|
68
|
* A 2-key map holding all related objects needed during the handling of a partition (e.g. when
|
69
|
* creating a taxon partition the map holds all taxon names.
|
70
|
* The key is a combination of a namespace and the id in the original source
|
71
|
*/
|
72
|
private Map<Object, Map<String, CdmBase>> relatedObjects;
|
73
|
|
74
|
/**
|
75
|
* Number of records handled in the partition
|
76
|
*/
|
77
|
private int partitionSize;
|
78
|
|
79
|
/**
|
80
|
* Lists of ids handled in this partition (must be an array of lists because sometimes
|
81
|
* we have non-single keys
|
82
|
*/
|
83
|
private List<String>[] currentIdLists;
|
84
|
|
85
|
/**
|
86
|
* The sql type of the id columns.
|
87
|
* @see Types
|
88
|
*/
|
89
|
private int[] currentIdListType;
|
90
|
|
91
|
/**
|
92
|
* counter for the partitions
|
93
|
*/
|
94
|
private int currentPartition;
|
95
|
|
96
|
/**
|
97
|
* number of records in the current partition
|
98
|
*/
|
99
|
private int rowsInCurrentPartition;
|
100
|
|
101
|
private TransactionStatus txStatus;
|
102
|
|
103
|
//*********************** CONSTRUCTOR *************************************************/
|
104
|
|
105
|
private ResultSetPartitioner(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
|
106
|
ResultSet idResultSet = source.getResultSet(strIdQuery);
|
107
|
// if (! idResultSet.isBeforeFirst()){
|
108
|
// idResultSet.beforeFirst();
|
109
|
// }
|
110
|
this.source = source;
|
111
|
this.idResultSet = idResultSet;
|
112
|
this.strRecordQueryTemplate = strRecordQuery;
|
113
|
this.partitionSize = partitionSize;
|
114
|
}
|
115
|
|
116
|
//************************ METHODS ****************************************************/
|
117
|
|
118
|
|
119
|
/**
|
120
|
* Import the whole partition of an input stream by starting a transaction, getting the related objects
|
121
|
* stored in the destination, invoke the IOs partition handling and commit the transaction
|
122
|
* @param partitionedIO
|
123
|
*/
|
124
|
public void doPartition(IPartitionedIO partitionedIO, STATE state) {
|
125
|
try{
|
126
|
profiler.startTx();
|
127
|
TransactionStatus txStatus = getTransaction(partitionSize, partitionedIO);
|
128
|
|
129
|
profiler.startRs();
|
130
|
ResultSet rs = makePartitionResultSet();
|
131
|
|
132
|
profiler.startRelObjects();
|
133
|
this.relatedObjects = partitionedIO.getRelatedObjectsForPartition(rs);
|
134
|
state.setRelatedObjects(relatedObjects);
|
135
|
|
136
|
profiler.startRs2();
|
137
|
partitionResultSet = makePartitionResultSet();
|
138
|
|
139
|
profiler.startDoPartition();
|
140
|
partitionedIO.doPartition(this, state);
|
141
|
|
142
|
profiler.startDoCommit();
|
143
|
partitionedIO.commitTransaction(txStatus);
|
144
|
|
145
|
profiler.end();
|
146
|
state.setRelatedObjects(null);
|
147
|
|
148
|
logger.info("Saved " + getCurrentNumberOfRows() + " " + partitionedIO.getPluralString() );
|
149
|
profiler.print();
|
150
|
}catch(Exception e){
|
151
|
throw new RuntimeException(e);
|
152
|
}
|
153
|
}
|
154
|
|
155
|
|
156
|
public void startDoSave(){
|
157
|
profiler.startDoSave();
|
158
|
}
|
159
|
|
160
|
/**
|
161
|
* Increases the partition counter and generates the <code>currentIdLists</code> for this partition
|
162
|
* @return
|
163
|
* @throws SQLException
|
164
|
*/
|
165
|
public boolean nextPartition() throws SQLException{
|
166
|
boolean result = false;
|
167
|
ResultSetMetaData metaData = idResultSet.getMetaData();
|
168
|
int nOfIdColumns = metaData.getColumnCount();
|
169
|
currentPartition++;
|
170
|
currentIdLists = new ArrayList[nOfIdColumns];
|
171
|
currentIdListType = new int[nOfIdColumns];
|
172
|
|
173
|
for (int col = 0; col< currentIdLists.length; col++){
|
174
|
currentIdLists[col] = new ArrayList<String>();
|
175
|
currentIdListType[col] = metaData.getColumnType(col + 1);
|
176
|
}
|
177
|
List<String> currentIdList;
|
178
|
|
179
|
int i = 0;
|
180
|
//for each record
|
181
|
for (i = 0; i < partitionSize; i++){
|
182
|
if (idResultSet.next() == false){
|
183
|
break;
|
184
|
}
|
185
|
//for each column
|
186
|
for (int colIndex = 0; colIndex < nOfIdColumns; colIndex++){
|
187
|
Object oNextId = idResultSet.getObject(colIndex + 1);
|
188
|
String strNextId = String.valueOf(oNextId);
|
189
|
currentIdList = currentIdLists[colIndex];
|
190
|
currentIdList.add(strNextId);
|
191
|
}
|
192
|
result = true; //true if at least one record was read
|
193
|
}
|
194
|
rowsInCurrentPartition = i;
|
195
|
|
196
|
return result;
|
197
|
}
|
198
|
|
199
|
|
200
|
|
201
|
/**
|
202
|
* Returns the underlying resultSet holding all records needed to handle the partition.<BR>
|
203
|
* @return
|
204
|
*/
|
205
|
public ResultSet getResultSet(){
|
206
|
return partitionResultSet;
|
207
|
}
|
208
|
|
209
|
|
210
|
|
211
|
/**
|
212
|
* Computes the value result set needed to handle a partition by using the <code>currentIdList</code>
|
213
|
* created during {@link #nextPartition}
|
214
|
* @return ResultSet
|
215
|
*/
|
216
|
private ResultSet makePartitionResultSet(){
|
217
|
int nColumns = currentIdLists.length;
|
218
|
String[] strIdLists = new String[nColumns];
|
219
|
|
220
|
String strRecordQuery = strRecordQueryTemplate;
|
221
|
for (int col = 0; col < nColumns; col++){
|
222
|
for (String id: currentIdLists[col]){
|
223
|
id = addApostropheIfNeeded(id, currentIdListType[col]);
|
224
|
strIdLists[col] = CdmUtils.concat(",", strIdLists[col], id);
|
225
|
}
|
226
|
strRecordQuery = strRecordQuery.replaceFirst(IPartitionedIO.ID_LIST_TOKEN, strIdLists[col]);
|
227
|
}
|
228
|
|
229
|
ResultSet resultSet = ResultSetProxy.NewInstance(source.getResultSet(strRecordQuery));
|
230
|
|
231
|
return resultSet;
|
232
|
}
|
233
|
|
234
|
/**
|
235
|
* @param id
|
236
|
* @param i
|
237
|
* @return
|
238
|
*/
|
239
|
private String addApostropheIfNeeded(String id, int sqlType) {
|
240
|
String result = id;
|
241
|
if (isStringType(sqlType)){
|
242
|
result = "'" + id + "'";
|
243
|
}
|
244
|
return result;
|
245
|
}
|
246
|
|
247
|
/**
|
248
|
* @param sqlType
|
249
|
* @return
|
250
|
*/
|
251
|
private boolean isStringType(int sqlType) {
|
252
|
if(sqlType == Types.INTEGER){
|
253
|
return false; //standard case
|
254
|
}else if (sqlType == Types.CHAR || sqlType == Types.CLOB || sqlType == Types.NVARCHAR ||
|
255
|
sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR || sqlType == Types.NCHAR ||
|
256
|
sqlType == Types.LONGNVARCHAR || sqlType == Types.NCLOB){
|
257
|
return true;
|
258
|
}else{
|
259
|
return false;
|
260
|
}
|
261
|
}
|
262
|
|
263
|
public Map<String, ? extends CdmBase> getObjectMap(Object key){
|
264
|
Map<String, ? extends CdmBase> objectMap = this.relatedObjects.get(key);
|
265
|
return objectMap;
|
266
|
}
|
267
|
|
268
|
/**
|
269
|
*
|
270
|
*/
|
271
|
private int getCurrentNumberOfRows() {
|
272
|
return ((currentPartition - 1) * partitionSize + rowsInCurrentPartition);
|
273
|
}
|
274
|
|
275
|
|
276
|
/**
|
277
|
* @param recordsPerTransaction
|
278
|
* @param partitionedIO
|
279
|
* @param i
|
280
|
*/
|
281
|
protected TransactionStatus getTransaction(int recordsPerTransaction, IPartitionedIO partitionedIO) {
|
282
|
//if (loopNeedsHandling (i, recordsPerTransaction) || txStatus == null) {
|
283
|
txStatus = partitionedIO.startTransaction();
|
284
|
if(logger.isInfoEnabled()) {
|
285
|
logger.debug("currentPartitionNumber = " + currentPartition + " - Transaction started");
|
286
|
}
|
287
|
//}
|
288
|
return txStatus;
|
289
|
}
|
290
|
|
291
|
// ************************** Not Needed ?? **************************************************
|
292
|
|
293
|
// protected void doLogPerLoop(int recordsPerLog, String pluralString){
|
294
|
// int count = getAbsoluteRow() - 1;
|
295
|
// if ((count % recordsPerLog ) == 0 && count!= 0 ){
|
296
|
// logger.info(pluralString + " handled: " + (count));
|
297
|
// }
|
298
|
// }
|
299
|
//
|
300
|
//
|
301
|
|
302
|
|
303
|
|
304
|
// public boolean nextRow() throws SQLException{
|
305
|
// if (currentRowInPartition >= partitionSize ){
|
306
|
// return false;
|
307
|
// }else{
|
308
|
// currentRowInPartition++;
|
309
|
// return resultSet.next();
|
310
|
// }
|
311
|
// }
|
312
|
//
|
313
|
|
314
|
|
315
|
|
316
|
}
|