adding Vaadin compatibility to trunk
[cdmlib.git] / cdmlib-io / src / main / java / eu / etaxonomy / cdm / io / common / ResultSetPartitioner.java
1 // $Id$
2 /**
3 * Copyright (C) 2007 EDIT
4 * European Distributed Institute of Taxonomy
5 * http://www.e-taxonomy.eu
6 *
7 * The contents of this file are subject to the Mozilla Public License Version 1.1
8 * See LICENSE.TXT at the top of this package for the full license terms.
9 */
10
11 package eu.etaxonomy.cdm.io.common;
12
13 import java.sql.ResultSet;
14 import java.sql.ResultSetMetaData;
15 import java.sql.SQLException;
16 import java.sql.Types;
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.Map;
20
21 import org.apache.log4j.Logger;
22 import org.springframework.transaction.TransactionStatus;
23
24 import eu.etaxonomy.cdm.common.CdmUtils;
25 import eu.etaxonomy.cdm.model.common.CdmBase;
26
27 /**
28 * @author a.mueller
29 * @created 16.02.2010
30 * @version 1.0
31 */
32 public class ResultSetPartitioner<STATE extends IPartitionedState> {
33 private static final Logger logger = Logger.getLogger(ResultSetPartitioner.class);
34 private PartitionerProfiler profiler = new PartitionerProfiler();
35
36 //************************* STATIC ***************************************************/
37
38 public static ResultSetPartitioner NewInstance(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
39 ResultSetPartitioner resultSetPartitioner = new ResultSetPartitioner(source, strIdQuery, strRecordQuery, partitionSize);
40 return resultSetPartitioner;
41 }
42
43 //*********************** VARIABLES *************************************************/
44
45 /**
46 * The database
47 */
48 private Source source;
49
50 /**
51 * The result set containing all records and at least the ids as a field. This result set
52 * will be used for partitioning
53 */
54 private ResultSet idResultSet;
55
56 /**
57 * A template for a SQL Query returning all records and all values needed for a partition
58 * to be handled. The 'where' condition is filled by replacing the templates '@IdList' token
59 */
60 private String strRecordQueryTemplate;
61
62 /**
63 * The resultset returned for the strRecordQueryTemplate
64 */
65 private ResultSet partitionResultSet;
66
67 /**
68 * A 2-key map holding all related objects needed during the handling of a partition (e.g. when
69 * creating a taxon partition the map holds all taxon names.
70 * The key is a combination of a namespace and the id in the original source
71 */
72 private Map<Object, Map<String, CdmBase>> relatedObjects;
73
74 /**
75 * Number of records handled in the partition
76 */
77 private int partitionSize;
78
79 /**
80 * Lists of ids handled in this partition (must be an array of lists because sometimes
81 * we have non-single keys
82 */
83 private List<String>[] currentIdLists;
84
85 /**
86 * The sql type of the id columns.
87 * @see Types
88 */
89 private int[] currentIdListType;
90
91 /**
92 * counter for the partitions
93 */
94 private int currentPartition;
95
96 /**
97 * number of records in the current partition
98 */
99 private int rowsInCurrentPartition;
100
101 private TransactionStatus txStatus;
102
103 //*********************** CONSTRUCTOR *************************************************/
104
105 private ResultSetPartitioner(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
106 ResultSet idResultSet = source.getResultSet(strIdQuery);
107 // if (! idResultSet.isBeforeFirst()){
108 // idResultSet.beforeFirst();
109 // }
110 this.source = source;
111 this.idResultSet = idResultSet;
112 this.strRecordQueryTemplate = strRecordQuery;
113 this.partitionSize = partitionSize;
114 }
115
116 //************************ METHODS ****************************************************/
117
118
119 /**
120 * Import the whole partition of an input stream by starting a transaction, getting the related objects
121 * stored in the destination, invoke the IOs partition handling and commit the transaction
122 * @param partitionedIO
123 */
124 public void doPartition(IPartitionedIO partitionedIO, STATE state) {
125 try{
126 profiler.startTx();
127 TransactionStatus txStatus = getTransaction(partitionSize, partitionedIO);
128
129 state.makeTransactionalSourceReference(partitionedIO.getReferenceService());
130
131 profiler.startRs();
132 ResultSet rs = makePartitionResultSet();
133
134 profiler.startRelObjects();
135 this.relatedObjects = partitionedIO.getRelatedObjectsForPartition(rs);
136 state.setRelatedObjects(relatedObjects);
137
138 profiler.startRs2();
139 partitionResultSet = makePartitionResultSet();
140
141 profiler.startDoPartition();
142 partitionedIO.doPartition(this, state);
143
144 profiler.startDoCommit();
145 partitionedIO.commitTransaction(txStatus);
146 state.resetTransactionalSourceReference();
147
148 profiler.end();
149 state.setRelatedObjects(null);
150
151 logger.info("Saved " + getCurrentNumberOfRows() + " " + partitionedIO.getPluralString() );
152 profiler.print();
153 }catch(Exception e){
154 String message = "Exception (%s) occurred while handling import partition for %s.";
155 message = String.format(message, e.getLocalizedMessage(), partitionedIO.getPluralString());
156 throw new RuntimeException(message, e);
157 }
158 }
159
160
161 public void startDoSave(){
162 profiler.startDoSave();
163 }
164
165 /**
166 * Increases the partition counter and generates the <code>currentIdLists</code> for this partition
167 * @return
168 * @throws SQLException
169 */
170 public boolean nextPartition() throws SQLException{
171 boolean result = false;
172 ResultSetMetaData metaData = idResultSet.getMetaData();
173 int nOfIdColumns = metaData.getColumnCount();
174 currentPartition++;
175 currentIdLists = new ArrayList[nOfIdColumns];
176 currentIdListType = new int[nOfIdColumns];
177
178 for (int col = 0; col< currentIdLists.length; col++){
179 currentIdLists[col] = new ArrayList<String>();
180 currentIdListType[col] = metaData.getColumnType(col + 1);
181 }
182 List<String> currentIdList;
183
184 int i = 0;
185 //for each record
186 for (i = 0; i < partitionSize; i++){
187 if (idResultSet.next() == false){
188 break;
189 }
190 //for each column
191 for (int colIndex = 0; colIndex < nOfIdColumns; colIndex++){
192 Object oNextId = idResultSet.getObject(colIndex + 1);
193 String strNextId = String.valueOf(oNextId);
194 currentIdList = currentIdLists[colIndex];
195 currentIdList.add(strNextId);
196 }
197 result = true; //true if at least one record was read
198 }
199 rowsInCurrentPartition = i;
200
201 return result;
202 }
203
204
205
206 /**
207 * Returns the underlying resultSet holding all records needed to handle the partition.<BR>
208 * @return
209 */
210 public ResultSet getResultSet(){
211 return partitionResultSet;
212 }
213
214
215
216 /**
217 * Computes the value result set needed to handle a partition by using the <code>currentIdList</code>
218 * created during {@link #nextPartition}
219 * @return ResultSet
220 */
221 private ResultSet makePartitionResultSet(){
222 int nColumns = currentIdLists.length;
223 String[] strIdLists = new String[nColumns];
224
225 String strRecordQuery = strRecordQueryTemplate;
226 for (int col = 0; col < nColumns; col++){
227 for (String id: currentIdLists[col]){
228 id = addApostropheIfNeeded(id, currentIdListType[col]);
229 strIdLists[col] = CdmUtils.concat(",", strIdLists[col], id);
230 }
231 strRecordQuery = strRecordQuery.replaceFirst(IPartitionedIO.ID_LIST_TOKEN, strIdLists[col]);
232 }
233
234 ResultSet resultSet = ResultSetProxy.NewInstance(source.getResultSet(strRecordQuery));
235
236 return resultSet;
237 }
238
239 /**
240 * @param id
241 * @param i
242 * @return
243 */
244 private String addApostropheIfNeeded(String id, int sqlType) {
245 String result = id;
246 if (isStringType(sqlType)){
247 result = "'" + id + "'";
248 }
249 return result;
250 }
251
252 /**
253 * @param sqlType
254 * @return
255 */
256 private boolean isStringType(int sqlType) {
257 if(sqlType == Types.INTEGER){
258 return false; //standard case
259 }else if (sqlType == Types.CHAR || sqlType == Types.CLOB || sqlType == Types.NVARCHAR ||
260 sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR || sqlType == Types.NCHAR ||
261 sqlType == Types.LONGNVARCHAR || sqlType == Types.NCLOB){
262 return true;
263 }else{
264 return false;
265 }
266 }
267
268 public Map<String, ? extends CdmBase> getObjectMap(Object key){
269 Map<String, ? extends CdmBase> objectMap = this.relatedObjects.get(key);
270 return objectMap;
271 }
272
273 /**
274 *
275 */
276 private int getCurrentNumberOfRows() {
277 return ((currentPartition - 1) * partitionSize + rowsInCurrentPartition);
278 }
279
280
281 /**
282 * @param recordsPerTransaction
283 * @param partitionedIO
284 * @param i
285 */
286 protected TransactionStatus getTransaction(int recordsPerTransaction, IPartitionedIO partitionedIO) {
287 //if (loopNeedsHandling (i, recordsPerTransaction) || txStatus == null) {
288 txStatus = partitionedIO.startTransaction();
289 if(logger.isInfoEnabled()) {
290 logger.debug("currentPartitionNumber = " + currentPartition + " - Transaction started");
291 }
292 //}
293 return txStatus;
294 }
295
296 // ************************** Not Needed ?? **************************************************
297
298 // protected void doLogPerLoop(int recordsPerLog, String pluralString){
299 // int count = getAbsoluteRow() - 1;
300 // if ((count % recordsPerLog ) == 0 && count!= 0 ){
301 // logger.info(pluralString + " handled: " + (count));
302 // }
303 // }
304 //
305 //
306
307
308
309 // public boolean nextRow() throws SQLException{
310 // if (currentRowInPartition >= partitionSize ){
311 // return false;
312 // }else{
313 // currentRowInPartition++;
314 // return resultSet.next();
315 // }
316 // }
317 //
318
319
320
321 }