Project

General

Profile

Download (8.87 KB) Statistics
| Branch: | Tag: | Revision:
1
// $Id$
2
/**
3
* Copyright (C) 2007 EDIT
4
* European Distributed Institute of Taxonomy 
5
* http://www.e-taxonomy.eu
6
* 
7
* The contents of this file are subject to the Mozilla Public License Version 1.1
8
* See LICENSE.TXT at the top of this package for the full license terms.
9
*/
10

    
11
package eu.etaxonomy.cdm.io.common;
12

    
13
import java.sql.ResultSet;
14
import java.sql.ResultSetMetaData;
15
import java.sql.SQLException;
16
import java.sql.Types;
17
import java.util.ArrayList;
18
import java.util.List;
19
import java.util.Map;
20

    
21
import org.apache.log4j.Logger;
22
import org.springframework.transaction.TransactionStatus;
23

    
24
import eu.etaxonomy.cdm.common.CdmUtils;
25
import eu.etaxonomy.cdm.model.common.CdmBase;
26

    
27
/**
28
 * @author a.mueller
29
 * @created 16.02.2010
30
 * @version 1.0
31
 */
32
public class ResultSetPartitioner<STATE extends IPartitionedState> {
33
	private static final Logger logger = Logger.getLogger(ResultSetPartitioner.class);
34
	private PartitionerProfiler profiler = new PartitionerProfiler();
35

    
36
//************************* STATIC ***************************************************/
37
	
38
	public static ResultSetPartitioner NewInstance(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
39
		ResultSetPartitioner resultSetPartitioner = new ResultSetPartitioner(source, strIdQuery, strRecordQuery, partitionSize);
40
		return resultSetPartitioner;
41
	}
42

    
43
//*********************** VARIABLES *************************************************/
44
	
45
	/**
46
	 * The database
47
	 */
48
	private Source source;
49
	
50
	/**
51
	 * The result set containing all records and at least the ids as a field. This result set
52
	 * will be used for partitioning
53
	 */
54
	private ResultSet idResultSet;
55
	
56
	/**
57
	 * A template for a SQL Query returning all records and all values needed for a partition
58
	 * to be handled. The 'where' condition is filled by replacing the templates '@IdList' token
59
	 */
60
	private String strRecordQueryTemplate;
61
	
62
	/**
63
	 * The resultset returned for the strRecordQueryTemplate 
64
	 */
65
	private ResultSet partitionResultSet;
66
	
67
	/**
68
	 * A 2-key map holding all related objects needed during the handling of a partition (e.g. when 
69
	 * creating a taxon partition the map holds all taxon names.
70
	 * The key is a combination of a namespace and the id in the original source
71
	 */
72
	private Map<Object, Map<String, CdmBase>> relatedObjects;
73
	
74
	/**
75
	 * Number of records handled in the partition
76
	 */
77
	private int partitionSize;
78
	
79
	/**
80
	 * Lists of ids handled in this partition (must be an array of lists because sometimes 
81
	 * we have non-single keys 
82
	 */
83
	private List<String>[] currentIdLists;
84
	
85
	/**
86
	 * The sql type of the id columns.
87
	 * @see Types
88
	 */
89
	private int[] currentIdListType;
90
	
91
	/**
92
	 * counter for the partitions
93
	 */
94
	private int currentPartition;
95
	
96
	/**
97
	 * number of records in the current partition
98
	 */
99
	private int rowsInCurrentPartition;
100
	
101
	private TransactionStatus txStatus;
102
	
103
//*********************** CONSTRUCTOR *************************************************/
104

    
105
	private ResultSetPartitioner(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
106
		ResultSet idResultSet = source.getResultSet(strIdQuery);
107
//		if (! idResultSet.isBeforeFirst()){
108
//			idResultSet.beforeFirst();
109
//		}
110
		this.source = source;
111
		this.idResultSet = idResultSet;
112
		this.strRecordQueryTemplate = strRecordQuery;
113
		this.partitionSize = partitionSize;
114
	}
115
	
116
//************************ METHODS ****************************************************/
117
	
118

    
119
	/**
120
	 * Import the whole partition of an input stream by starting a transaction, getting the related objects
121
	 * stored in the destination, invoke the IOs partition handling and commit the transaction
122
	 * @param partitionedIO
123
	 */
124
	public void doPartition(IPartitionedIO partitionedIO, STATE state) {
125
		try{
126
			profiler.startTx();
127
			TransactionStatus txStatus = getTransaction(partitionSize, partitionedIO);
128
			
129
			profiler.startRs();
130
			ResultSet rs = makePartitionResultSet();
131

    
132
			profiler.startRelObjects();
133
			this.relatedObjects = partitionedIO.getRelatedObjectsForPartition(rs);
134
			state.setRelatedObjects(relatedObjects);
135
			
136
			profiler.startRs2();
137
			partitionResultSet = makePartitionResultSet();
138
			
139
			profiler.startDoPartition(); 
140
			partitionedIO.doPartition(this, state);
141
			
142
			profiler.startDoCommit();
143
			partitionedIO.commitTransaction(txStatus);
144
			
145
			profiler.end();
146
			state.setRelatedObjects(null);
147
			
148
			logger.info("Saved " + getCurrentNumberOfRows() + " " + partitionedIO.getPluralString() );
149
			profiler.print();
150
		}catch(Exception e){
151
			throw new RuntimeException(e);
152
		}
153
	}
154
	
155
	
156
	public void startDoSave(){
157
		profiler.startDoSave();
158
	}
159
	
160
	/**
161
	 * Increases the partition counter and generates the <code>currentIdLists</code> for this partition
162
	 * @return
163
	 * @throws SQLException
164
	 */
165
	public boolean nextPartition() throws SQLException{
166
		boolean result = false;
167
		ResultSetMetaData metaData = idResultSet.getMetaData();
168
		int nOfIdColumns = metaData.getColumnCount();
169
		currentPartition++;
170
		currentIdLists = new ArrayList[nOfIdColumns];
171
		currentIdListType = new int[nOfIdColumns];
172
		
173
		for (int col = 0; col< currentIdLists.length; col++){
174
			currentIdLists[col] = new ArrayList<String>();
175
			currentIdListType[col] = metaData.getColumnType(col + 1);
176
		}
177
		List<String> currentIdList;
178
		
179
		int i = 0;
180
		//for each record
181
		for (i = 0; i < partitionSize; i++){
182
			if (idResultSet.next() == false){
183
				break; 
184
			}
185
			//for each column
186
			for (int colIndex = 0; colIndex < nOfIdColumns; colIndex++){
187
				Object oNextId = idResultSet.getObject(colIndex + 1);
188
				String strNextId = String.valueOf(oNextId); 
189
				currentIdList = currentIdLists[colIndex];
190
				currentIdList.add(strNextId);
191
			}
192
			result = true; //true if at least one record was read
193
		}
194
		rowsInCurrentPartition = i;
195
		
196
		return result;
197
	}
198

    
199

    
200

    
201
	/**
202
	 * Returns the underlying resultSet holding all records needed to handle the partition.<BR>
203
	 * @return
204
	 */
205
	public ResultSet getResultSet(){
206
		return partitionResultSet;
207
	}
208

    
209
	
210
	
211
	/**
212
	 * Computes the value result set needed to handle a partition by using the <code>currentIdList</code>
213
	 * created during {@link #nextPartition}
214
	 * @return ResultSet
215
	 */
216
	private ResultSet makePartitionResultSet(){
217
		int nColumns = currentIdLists.length;
218
		String[] strIdLists = new String[nColumns];
219
		
220
		String strRecordQuery = strRecordQueryTemplate;
221
		for (int col = 0; col < nColumns; col++){
222
			for (String id: currentIdLists[col]){
223
				id = addApostropheIfNeeded(id, currentIdListType[col]);
224
				strIdLists[col] = CdmUtils.concat(",", strIdLists[col], id);
225
			}
226
			strRecordQuery = strRecordQuery.replaceFirst(IPartitionedIO.ID_LIST_TOKEN, strIdLists[col]);
227
		}
228
		
229
		ResultSet resultSet = source.getResultSet(strRecordQuery);
230
		return resultSet;
231
	}
232
	
233
	/**
234
	 * @param id
235
	 * @param i
236
	 * @return
237
	 */
238
	private String addApostropheIfNeeded(String id, int sqlType) {
239
		String result = id;
240
		if (isStringType(sqlType)){
241
			result = "'" + id + "'";
242
		}
243
		return result;
244
	}
245

    
246
	/**
247
	 * @param sqlType
248
	 * @return
249
	 */
250
	private boolean isStringType(int sqlType) {
251
		if(sqlType == Types.INTEGER){
252
			return false;  //standard case
253
		}else if (sqlType == Types.CHAR || sqlType == Types.CLOB || sqlType == Types.NVARCHAR || 
254
				sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR || sqlType == Types.NCHAR ||
255
				sqlType == Types.LONGNVARCHAR || sqlType == Types.NCLOB){
256
			return true;
257
		}else{
258
			return false;
259
		}
260
	}
261

    
262
	public Map<String, ? extends CdmBase> getObjectMap(Object key){
263
		Map<String, ? extends CdmBase> objectMap = this.relatedObjects.get(key);
264
		return objectMap;
265
	}
266
	
267
	/**
268
	 * 
269
	 */
270
	private int getCurrentNumberOfRows() {
271
		return ((currentPartition - 1) * partitionSize + rowsInCurrentPartition);
272
	}
273
	
274

    
275
	/**
276
	 * @param recordsPerTransaction
277
	 * @param partitionedIO 
278
	 * @param i
279
	 */
280
	protected TransactionStatus getTransaction(int recordsPerTransaction, IPartitionedIO partitionedIO) {
281
		//if (loopNeedsHandling (i, recordsPerTransaction) || txStatus == null) {
282
			txStatus = partitionedIO.startTransaction();
283
			if(logger.isInfoEnabled()) {
284
				logger.debug("currentPartitionNumber = " + currentPartition + " - Transaction started"); 
285
			}
286
		//}
287
		return txStatus;
288
	}
289
	
290
// ************************** Not Needed ?? **************************************************
291
	
292
//	protected void doLogPerLoop(int recordsPerLog, String pluralString){
293
//		int count = getAbsoluteRow() - 1;
294
//		if ((count % recordsPerLog ) == 0 && count!= 0 ){ 
295
//			logger.info(pluralString + " handled: " + (count));
296
//		}
297
//	}
298
//
299
//	
300

    
301
	
302

    
303
//	public boolean nextRow() throws SQLException{
304
//		if (currentRowInPartition >= partitionSize ){
305
//			return false;
306
//		}else{
307
//			currentRowInPartition++;
308
//			return resultSet.next();
309
//		}
310
//	}
311
//	
312
	
313
	
314
	
315
}
(36-36/41)