minor
[cdmlib.git] / cdmlib-io / src / main / java / eu / etaxonomy / cdm / io / common / ResultSetPartitioner.java
1 // $Id$
2 /**
3 * Copyright (C) 2007 EDIT
4 * European Distributed Institute of Taxonomy
5 * http://www.e-taxonomy.eu
6 *
7 * The contents of this file are subject to the Mozilla Public License Version 1.1
8 * See LICENSE.TXT at the top of this package for the full license terms.
9 */
10
11 package eu.etaxonomy.cdm.io.common;
12
13 import java.sql.ResultSet;
14 import java.sql.ResultSetMetaData;
15 import java.sql.SQLException;
16 import java.sql.Types;
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.Map;
20
21 import org.apache.log4j.Logger;
22 import org.springframework.transaction.TransactionStatus;
23
24 import eu.etaxonomy.cdm.common.CdmUtils;
25 import eu.etaxonomy.cdm.model.common.CdmBase;
26
27 /**
28 * @author a.mueller
29 * @created 16.02.2010
30 */
31 public class ResultSetPartitioner<STATE extends IPartitionedState> {
32 private static final Logger logger = Logger.getLogger(ResultSetPartitioner.class);
33 private PartitionerProfiler profiler = new PartitionerProfiler();
34
35 //************************* STATIC ***************************************************/
36
37 public static ResultSetPartitioner NewInstance(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
38 ResultSetPartitioner<IPartitionedState> resultSetPartitioner = new ResultSetPartitioner<IPartitionedState>(source, strIdQuery, strRecordQuery, partitionSize);
39 return resultSetPartitioner;
40 }
41
42 //*********************** VARIABLES *************************************************/
43
44 /**
45 * The database
46 */
47 private Source source;
48
49 /**
50 * The result set containing all records and at least the ids as a field. This result set
51 * will be used for partitioning
52 */
53 private ResultSet idResultSet;
54
55 /**
56 * A template for a SQL Query returning all records and all values needed for a partition
57 * to be handled. The 'where' condition is filled by replacing the templates '@IdList' token
58 */
59 private String strRecordQueryTemplate;
60
61 /**
62 * The resultset returned for the strRecordQueryTemplate
63 */
64 private ResultSet partitionResultSet;
65
66 /**
67 * A 2-key map holding all related objects needed during the handling of a partition (e.g. when
68 * creating a taxon partition the map holds all taxon names.
69 * The key is a combination of a namespace and the id in the original source
70 */
71 private Map<Object, Map<String, ? extends CdmBase>> relatedObjects;
72
73 /**
74 * Number of records handled in the partition
75 */
76 private int partitionSize;
77
78 /**
79 * Lists of ids handled in this partition (must be an array of lists because sometimes
80 * we have non-single keys
81 */
82 private List<String>[] currentIdLists;
83
84 /**
85 * The sql type of the id columns.
86 * @see Types
87 */
88 private int[] currentIdListType;
89
90 /**
91 * counter for the partitions
92 */
93 private int currentPartition;
94
95 /**
96 * number of records in the current partition
97 */
98 private int rowsInCurrentPartition;
99
100 private TransactionStatus txStatus;
101
102 //*********************** CONSTRUCTOR *************************************************/
103
104 private ResultSetPartitioner(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
105 ResultSet idResultSet = source.getResultSet(strIdQuery);
106 // if (! idResultSet.isBeforeFirst()){
107 // idResultSet.beforeFirst();
108 // }
109 this.source = source;
110 this.idResultSet = idResultSet;
111 this.strRecordQueryTemplate = strRecordQuery;
112 this.partitionSize = partitionSize;
113 }
114
115 //************************ METHODS ****************************************************/
116
117
118 /**
119 * Import the whole partition of an input stream by starting a transaction, getting the related objects
120 * stored in the destination, invoke the IOs partition handling and commit the transaction
121 * @param partitionedIO
122 */
123 public void doPartition(IPartitionedIO<STATE> partitionedIO, STATE state) {
124 try{
125 profiler.startTx();
126 TransactionStatus txStatus = getTransaction(partitionSize, partitionedIO);
127
128 state.makeTransactionalSourceReference(partitionedIO.getReferenceService());
129
130 profiler.startRs();
131 ResultSet rs = makePartitionResultSet();
132
133 profiler.startRelObjects();
134 this.relatedObjects = partitionedIO.getRelatedObjectsForPartition(rs, state);
135 state.setRelatedObjects(relatedObjects);
136
137 profiler.startRs2();
138 partitionResultSet = makePartitionResultSet();
139
140 profiler.startDoPartition();
141 partitionedIO.doPartition(this, state);
142
143 profiler.startDoCommit();
144 partitionedIO.commitTransaction(txStatus);
145 state.resetTransactionalSourceReference();
146
147 profiler.end();
148 state.setRelatedObjects(null);
149
150 logger.info("Saved " + getCurrentNumberOfRows() + " " + partitionedIO.getPluralString() );
151 profiler.print();
152 }catch(Exception e){
153 String message = "Exception (%s) occurred while handling import partition for %s.";
154 message = String.format(message, e.getLocalizedMessage(), partitionedIO.getPluralString());
155 throw new RuntimeException(message, e);
156 }
157 }
158
159
160 public void startDoSave(){
161 profiler.startDoSave();
162 }
163
164 /**
165 * Increases the partition counter and generates the <code>currentIdLists</code> for this partition
166 * @return
167 * @throws SQLException
168 */
169 public boolean nextPartition() throws SQLException{
170 boolean result = false;
171 ResultSetMetaData metaData = idResultSet.getMetaData();
172 int nOfIdColumns = metaData.getColumnCount();
173 currentPartition++;
174 currentIdLists = new ArrayList[nOfIdColumns];
175 currentIdListType = new int[nOfIdColumns];
176
177 for (int col = 0; col< currentIdLists.length; col++){
178 currentIdLists[col] = new ArrayList<String>();
179 currentIdListType[col] = metaData.getColumnType(col + 1);
180 }
181 List<String> currentIdList;
182
183 int i = 0;
184 //for each record
185 for (i = 0; i < partitionSize; i++){
186 if (idResultSet.next() == false){
187 break;
188 }
189 //for each column
190 for (int colIndex = 0; colIndex < nOfIdColumns; colIndex++){
191 Object oNextId = idResultSet.getObject(colIndex + 1);
192 String strNextId = String.valueOf(oNextId);
193 currentIdList = currentIdLists[colIndex];
194 currentIdList.add(strNextId);
195 }
196 result = true; //true if at least one record was read
197 }
198 rowsInCurrentPartition = i;
199
200 return result;
201 }
202
203
204
205 /**
206 * Returns the underlying resultSet holding all records needed to handle the partition.<BR>
207 * @return
208 */
209 public ResultSet getResultSet(){
210 return partitionResultSet;
211 }
212
213
214
215 /**
216 * Computes the value result set needed to handle a partition by using the <code>currentIdList</code>
217 * created during {@link #nextPartition}
218 * @return ResultSet
219 */
220 private ResultSet makePartitionResultSet(){
221 int nColumns = currentIdLists.length;
222 String[] strIdLists = new String[nColumns];
223
224 String strRecordQuery = strRecordQueryTemplate;
225 for (int col = 0; col < nColumns; col++){
226 for (String id: currentIdLists[col]){
227 id = addApostropheIfNeeded(id, currentIdListType[col]);
228 strIdLists[col] = CdmUtils.concat(",", strIdLists[col], id);
229 }
230 strRecordQuery = strRecordQuery.replaceFirst(IPartitionedIO.ID_LIST_TOKEN, strIdLists[col]);
231 }
232
233 ResultSet resultSet = ResultSetProxy.NewInstance(source.getResultSet(strRecordQuery));
234
235 return resultSet;
236 }
237
238 /**
239 * @param id
240 * @param i
241 * @return
242 */
243 private String addApostropheIfNeeded(String id, int sqlType) {
244 String result = id;
245 if (isStringType(sqlType)){
246 result = "'" + id + "'";
247 }
248 return result;
249 }
250
251 /**
252 * @param sqlType
253 * @return
254 */
255 private boolean isStringType(int sqlType) {
256 if(sqlType == Types.INTEGER){
257 return false; //standard case
258 }else if (sqlType == Types.CHAR || sqlType == Types.CLOB || sqlType == Types.NVARCHAR ||
259 sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR || sqlType == Types.NCHAR ||
260 sqlType == Types.LONGNVARCHAR || sqlType == Types.NCLOB){
261 return true;
262 }else{
263 return false;
264 }
265 }
266
267 public Map<String, ? extends CdmBase> getObjectMap(Object key){
268 Map<String, ? extends CdmBase> objectMap = this.relatedObjects.get(key);
269 return objectMap;
270 }
271
272 /**
273 *
274 */
275 private int getCurrentNumberOfRows() {
276 return ((currentPartition - 1) * partitionSize + rowsInCurrentPartition);
277 }
278
279
280 /**
281 * @param recordsPerTransaction
282 * @param partitionedIO
283 * @param i
284 */
285 protected TransactionStatus getTransaction(int recordsPerTransaction, IPartitionedIO partitionedIO) {
286 //if (loopNeedsHandling (i, recordsPerTransaction) || txStatus == null) {
287 txStatus = partitionedIO.startTransaction();
288 if(logger.isInfoEnabled()) {
289 logger.debug("currentPartitionNumber = " + currentPartition + " - Transaction started");
290 }
291 //}
292 return txStatus;
293 }
294
295 // ************************** Not Needed ?? **************************************************
296
297 // protected void doLogPerLoop(int recordsPerLog, String pluralString){
298 // int count = getAbsoluteRow() - 1;
299 // if ((count % recordsPerLog ) == 0 && count!= 0 ){
300 // logger.info(pluralString + " handled: " + (count));
301 // }
302 // }
303 //
304 //
305
306
307
308 // public boolean nextRow() throws SQLException{
309 // if (currentRowInPartition >= partitionSize ){
310 // return false;
311 // }else{
312 // currentRowInPartition++;
313 // return resultSet.next();
314 // }
315 // }
316 //
317
318
319
320 }