Project

General

Profile

Download (8.9 KB) Statistics
| Branch: | Tag: | Revision:
1
// $Id$
2
/**
3
* Copyright (C) 2007 EDIT
4
* European Distributed Institute of Taxonomy 
5
* http://www.e-taxonomy.eu
6
* 
7
* The contents of this file are subject to the Mozilla Public License Version 1.1
8
* See LICENSE.TXT at the top of this package for the full license terms.
9
*/
10

    
11
package eu.etaxonomy.cdm.io.common;
12

    
13
import java.sql.ResultSet;
14
import java.sql.ResultSetMetaData;
15
import java.sql.SQLException;
16
import java.sql.Types;
17
import java.util.ArrayList;
18
import java.util.List;
19
import java.util.Map;
20

    
21
import org.apache.log4j.Logger;
22
import org.springframework.transaction.TransactionStatus;
23

    
24
import eu.etaxonomy.cdm.common.CdmUtils;
25
import eu.etaxonomy.cdm.model.common.CdmBase;
26

    
27
/**
28
 * @author a.mueller
29
 * @created 16.02.2010
30
 * @version 1.0
31
 */
32
public class ResultSetPartitioner<STATE extends IPartitionedState> {
33
	private static final Logger logger = Logger.getLogger(ResultSetPartitioner.class);
34
	private PartitionerProfiler profiler = new PartitionerProfiler();
35

    
36
//************************* STATIC ***************************************************/
37
	
38
	public static ResultSetPartitioner NewInstance(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
39
		ResultSetPartitioner resultSetPartitioner = new ResultSetPartitioner(source, strIdQuery, strRecordQuery, partitionSize);
40
		return resultSetPartitioner;
41
	}
42

    
43
//*********************** VARIABLES *************************************************/
44
	
45
	/**
46
	 * The database
47
	 */
48
	private Source source;
49
	
50
	/**
51
	 * The result set containing all records and at least the ids as a field. This result set
52
	 * will be used for partitioning
53
	 */
54
	private ResultSet idResultSet;
55
	
56
	/**
57
	 * A template for a SQL Query returning all records and all values needed for a partition
58
	 * to be handled. The 'where' condition is filled by replacing the templates '@IdList' token
59
	 */
60
	private String strRecordQueryTemplate;
61
	
62
	/**
63
	 * The resultset returned for the strRecordQueryTemplate 
64
	 */
65
	private ResultSet partitionResultSet;
66
	
67
	/**
68
	 * A 2-key map holding all related objects needed during the handling of a partition (e.g. when 
69
	 * creating a taxon partition the map holds all taxon names.
70
	 * The key is a combination of a namespace and the id in the original source
71
	 */
72
	private Map<Object, Map<String, CdmBase>> relatedObjects;
73
	
74
	/**
75
	 * Number of records handled in the partition
76
	 */
77
	private int partitionSize;
78
	
79
	/**
80
	 * Lists of ids handled in this partition (must be an array of lists because sometimes 
81
	 * we have non-single keys 
82
	 */
83
	private List<String>[] currentIdLists;
84
	
85
	/**
86
	 * The sql type of the id columns.
87
	 * @see Types
88
	 */
89
	private int[] currentIdListType;
90
	
91
	/**
92
	 * counter for the partitions
93
	 */
94
	private int currentPartition;
95
	
96
	/**
97
	 * number of records in the current partition
98
	 */
99
	private int rowsInCurrentPartition;
100
	
101
	private TransactionStatus txStatus;
102
	
103
//*********************** CONSTRUCTOR *************************************************/
104

    
105
	private ResultSetPartitioner(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
106
		ResultSet idResultSet = source.getResultSet(strIdQuery);
107
//		if (! idResultSet.isBeforeFirst()){
108
//			idResultSet.beforeFirst();
109
//		}
110
		this.source = source;
111
		this.idResultSet = idResultSet;
112
		this.strRecordQueryTemplate = strRecordQuery;
113
		this.partitionSize = partitionSize;
114
	}
115
	
116
//************************ METHODS ****************************************************/
117
	
118

    
119
	/**
120
	 * Import the whole partition of an input stream by starting a transaction, getting the related objects
121
	 * stored in the destination, invoke the IOs partition handling and commit the transaction
122
	 * @param partitionedIO
123
	 */
124
	public void doPartition(IPartitionedIO partitionedIO, STATE state) {
125
		try{
126
			profiler.startTx();
127
			TransactionStatus txStatus = getTransaction(partitionSize, partitionedIO);
128
			
129
			profiler.startRs();
130
			ResultSet rs = makePartitionResultSet();
131

    
132
			profiler.startRelObjects();
133
			this.relatedObjects = partitionedIO.getRelatedObjectsForPartition(rs);
134
			state.setRelatedObjects(relatedObjects);
135
			
136
			profiler.startRs2();
137
			partitionResultSet = makePartitionResultSet();
138
			
139
			profiler.startDoPartition(); 
140
			partitionedIO.doPartition(this, state);
141
			
142
			profiler.startDoCommit();
143
			partitionedIO.commitTransaction(txStatus);
144
			
145
			profiler.end();
146
			state.setRelatedObjects(null);
147
			
148
			logger.info("Saved " + getCurrentNumberOfRows() + " " + partitionedIO.getPluralString() );
149
			profiler.print();
150
		}catch(Exception e){
151
			throw new RuntimeException(e);
152
		}
153
	}
154
	
155
	
156
	public void startDoSave(){
157
		profiler.startDoSave();
158
	}
159
	
160
	/**
161
	 * Increases the partition counter and generates the <code>currentIdLists</code> for this partition
162
	 * @return
163
	 * @throws SQLException
164
	 */
165
	public boolean nextPartition() throws SQLException{
166
		boolean result = false;
167
		ResultSetMetaData metaData = idResultSet.getMetaData();
168
		int nOfIdColumns = metaData.getColumnCount();
169
		currentPartition++;
170
		currentIdLists = new ArrayList[nOfIdColumns];
171
		currentIdListType = new int[nOfIdColumns];
172
		
173
		for (int col = 0; col< currentIdLists.length; col++){
174
			currentIdLists[col] = new ArrayList<String>();
175
			currentIdListType[col] = metaData.getColumnType(col + 1);
176
		}
177
		List<String> currentIdList;
178
		
179
		int i = 0;
180
		//for each record
181
		for (i = 0; i < partitionSize; i++){
182
			if (idResultSet.next() == false){
183
				break; 
184
			}
185
			//for each column
186
			for (int colIndex = 0; colIndex < nOfIdColumns; colIndex++){
187
				Object oNextId = idResultSet.getObject(colIndex + 1);
188
				String strNextId = String.valueOf(oNextId); 
189
				currentIdList = currentIdLists[colIndex];
190
				currentIdList.add(strNextId);
191
			}
192
			result = true; //true if at least one record was read
193
		}
194
		rowsInCurrentPartition = i;
195
		
196
		return result;
197
	}
198

    
199

    
200

    
201
	/**
202
	 * Returns the underlying resultSet holding all records needed to handle the partition.<BR>
203
	 * @return
204
	 */
205
	public ResultSet getResultSet(){
206
		return partitionResultSet;
207
	}
208

    
209
	
210
	
211
	/**
212
	 * Computes the value result set needed to handle a partition by using the <code>currentIdList</code>
213
	 * created during {@link #nextPartition}
214
	 * @return ResultSet
215
	 */
216
	private ResultSet makePartitionResultSet(){
217
		int nColumns = currentIdLists.length;
218
		String[] strIdLists = new String[nColumns];
219
		
220
		String strRecordQuery = strRecordQueryTemplate;
221
		for (int col = 0; col < nColumns; col++){
222
			for (String id: currentIdLists[col]){
223
				id = addApostropheIfNeeded(id, currentIdListType[col]);
224
				strIdLists[col] = CdmUtils.concat(",", strIdLists[col], id);
225
			}
226
			strRecordQuery = strRecordQuery.replaceFirst(IPartitionedIO.ID_LIST_TOKEN, strIdLists[col]);
227
		}
228
		
229
		ResultSet resultSet = ResultSetProxy.NewInstance(source.getResultSet(strRecordQuery));
230
		
231
		return resultSet;
232
	}
233
	
234
	/**
235
	 * @param id
236
	 * @param i
237
	 * @return
238
	 */
239
	private String addApostropheIfNeeded(String id, int sqlType) {
240
		String result = id;
241
		if (isStringType(sqlType)){
242
			result = "'" + id + "'";
243
		}
244
		return result;
245
	}
246

    
247
	/**
248
	 * @param sqlType
249
	 * @return
250
	 */
251
	private boolean isStringType(int sqlType) {
252
		if(sqlType == Types.INTEGER){
253
			return false;  //standard case
254
		}else if (sqlType == Types.CHAR || sqlType == Types.CLOB || sqlType == Types.NVARCHAR || 
255
				sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR || sqlType == Types.NCHAR ||
256
				sqlType == Types.LONGNVARCHAR || sqlType == Types.NCLOB){
257
			return true;
258
		}else{
259
			return false;
260
		}
261
	}
262

    
263
	public Map<String, ? extends CdmBase> getObjectMap(Object key){
264
		Map<String, ? extends CdmBase> objectMap = this.relatedObjects.get(key);
265
		return objectMap;
266
	}
267
	
268
	/**
269
	 * 
270
	 */
271
	private int getCurrentNumberOfRows() {
272
		return ((currentPartition - 1) * partitionSize + rowsInCurrentPartition);
273
	}
274
	
275

    
276
	/**
277
	 * @param recordsPerTransaction
278
	 * @param partitionedIO 
279
	 * @param i
280
	 */
281
	protected TransactionStatus getTransaction(int recordsPerTransaction, IPartitionedIO partitionedIO) {
282
		//if (loopNeedsHandling (i, recordsPerTransaction) || txStatus == null) {
283
			txStatus = partitionedIO.startTransaction();
284
			if(logger.isInfoEnabled()) {
285
				logger.debug("currentPartitionNumber = " + currentPartition + " - Transaction started"); 
286
			}
287
		//}
288
		return txStatus;
289
	}
290
	
291
// ************************** Not Needed ?? **************************************************
292
	
293
//	protected void doLogPerLoop(int recordsPerLog, String pluralString){
294
//		int count = getAbsoluteRow() - 1;
295
//		if ((count % recordsPerLog ) == 0 && count!= 0 ){ 
296
//			logger.info(pluralString + " handled: " + (count));
297
//		}
298
//	}
299
//
300
//	
301

    
302
	
303

    
304
//	public boolean nextRow() throws SQLException{
305
//		if (currentRowInPartition >= partitionSize ){
306
//			return false;
307
//		}else{
308
//			currentRowInPartition++;
309
//			return resultSet.next();
310
//		}
311
//	}
312
//	
313
	
314
	
315
	
316
}
(40-40/48)