Project

General

Profile

Download (9.09 KB) Statistics
| Branch: | Tag: | Revision:
1
/**
2
* Copyright (C) 2007 EDIT
3
* European Distributed Institute of Taxonomy
4
* http://www.e-taxonomy.eu
5
*
6
* The contents of this file are subject to the Mozilla Public License Version 1.1
7
* See LICENSE.TXT at the top of this package for the full license terms.
8
*/
9

    
10
package eu.etaxonomy.cdm.io.common;
11

    
12
import java.sql.ResultSet;
13
import java.sql.ResultSetMetaData;
14
import java.sql.SQLException;
15
import java.sql.Types;
16
import java.util.ArrayList;
17
import java.util.List;
18
import java.util.Map;
19

    
20
import org.apache.log4j.Logger;
21
import org.springframework.transaction.TransactionStatus;
22

    
23
import eu.etaxonomy.cdm.common.CdmUtils;
24
import eu.etaxonomy.cdm.model.common.CdmBase;
25

    
26
/**
27
 * @author a.mueller
28
 * @since 16.02.2010
29
 */
30
public class ResultSetPartitioner<STATE extends IPartitionedState> {
31
	private static final Logger logger = Logger.getLogger(ResultSetPartitioner.class);
32
	private final PartitionerProfiler profiler = new PartitionerProfiler();
33

    
34
//************************* STATIC ***************************************************/
35

    
36
	public static <T extends IPartitionedState> ResultSetPartitioner<T> NewInstance(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
37
		ResultSetPartitioner<T> resultSetPartitioner
38
		        = new ResultSetPartitioner<>(source, strIdQuery, strRecordQuery, partitionSize);
39
		return resultSetPartitioner;
40
	}
41

    
42
//*********************** VARIABLES *************************************************/
43

    
44
	/**
45
	 * The database
46
	 */
47
	private final Source source;
48

    
49
	/**
50
	 * The result set containing all records and at least the ids as a field. This result set
51
	 * will be used for partitioning
52
	 */
53
	private final ResultSet idResultSet;
54

    
55
	/**
56
	 * A template for a SQL Query returning all records and all values needed for a partition
57
	 * to be handled. The 'where' condition is filled by replacing the templates '@IdList' token
58
	 */
59
	private final String strRecordQueryTemplate;
60

    
61
	/**
62
	 * The resultset returned for the strRecordQueryTemplate
63
	 */
64
	private ResultSet partitionResultSet;
65

    
66
	/**
67
	 * A 2-key map holding all related objects needed during the handling of a partition (e.g. when
68
	 * creating a taxon partition the map holds all taxon names.
69
	 * The key is a combination of a namespace and the id in the original source
70
	 */
71
	private Map<Object, Map<String, ? extends CdmBase>> relatedObjects;
72

    
73
	/**
74
	 * Number of records handled in the partition
75
	 */
76
	private final int partitionSize;
77

    
78
	/**
79
	 * Lists of ids handled in this partition (must be an array of lists because sometimes
80
	 * we have non-single keys
81
	 */
82
	private List<String>[] currentIdLists;
83

    
84
	/**
85
	 * The sql type of the id columns.
86
	 * @see Types
87
	 */
88
	private int[] currentIdListType;
89

    
90
	/**
91
	 * counter for the partitions
92
	 */
93
	private int currentPartition;
94

    
95
	/**
96
	 * number of records in the current partition
97
	 */
98
	private int rowsInCurrentPartition;
99

    
100
	private TransactionStatus txStatus;
101

    
102
//*********************** CONSTRUCTOR *************************************************/
103

    
104
	private ResultSetPartitioner(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
105
		ResultSet idResultSet = source.getResultSet(strIdQuery);
106
//		if (! idResultSet.isBeforeFirst()){
107
//			idResultSet.beforeFirst();
108
//		}
109
		this.source = source;
110
		this.idResultSet = idResultSet;
111
		this.strRecordQueryTemplate = strRecordQuery;
112
		this.partitionSize = partitionSize;
113
	}
114

    
115
//************************ METHODS ****************************************************/
116

    
117

    
118
	/**
119
	 * Import the whole partition of an input stream by starting a transaction, getting the related objects
120
	 * stored in the destination, invoke the IOs partition handling and commit the transaction
121
	 * @param partitionedIO
122
	 */
123
	public void doPartition(IPartitionedIO<STATE> partitionedIO, STATE state) {
124
		int i = 0;
125
	    try{
126
			profiler.startTx();
127
			TransactionStatus txStatus = getTransaction(partitionSize, partitionedIO);
128

    
129
			i = 1;
130
			state.makeTransactionalSourceReference(partitionedIO.getReferenceService());
131

    
132
			profiler.startRs();
133
			ResultSet rs = makePartitionResultSet();
134
            i = 2;
135
			profiler.startRelObjects();
136
			this.relatedObjects = partitionedIO.getRelatedObjectsForPartition(rs, state);
137
            i = 3;
138
            state.setRelatedObjects(relatedObjects);
139
            i = 4;
140
			profiler.startRs2();
141
			partitionResultSet = makePartitionResultSet();
142
            i = 5;
143
			profiler.startDoPartition();
144
			partitionedIO.doPartition(this, state);
145
            i = 6;
146
			profiler.startDoCommit();
147
			partitionedIO.commitTransaction(txStatus);
148
			state.resetTransactionalSourceReference();
149
            i = 7;
150
			profiler.end();
151
			state.setRelatedObjects(null);
152
            i = 8;
153
			logger.info("Saved " + getCurrentNumberOfRows() + " " + partitionedIO.getPluralString() );
154
			profiler.print();
155
		}catch(Exception e){
156
			String message = "Exception (%s) occurred at position " + i + " while handling import partition for %s.";
157
			message = String.format(message, e.getMessage(), partitionedIO.getPluralString());
158
			e.printStackTrace();
159
			throw new RuntimeException(message, e);
160
		}
161
	}
162

    
163

    
164
	public void startDoSave(){
165
		profiler.startDoSave();
166
	}
167

    
168
	/**
169
	 * Increases the partition counter and generates the <code>currentIdLists</code> for this partition
170
	 * @return
171
	 * @throws SQLException
172
	 */
173
	public boolean nextPartition() throws SQLException{
174
		boolean result = false;
175
		ResultSetMetaData metaData = idResultSet.getMetaData();
176
		int nOfIdColumns = metaData.getColumnCount();
177
		currentPartition++;
178
		currentIdLists = new List[nOfIdColumns];
179
		currentIdListType = new int[nOfIdColumns];
180

    
181
		for (int col = 0; col< currentIdLists.length; col++){
182
			currentIdLists[col] = new ArrayList<>();
183
			currentIdListType[col] = metaData.getColumnType(col + 1);
184
		}
185
		List<String> currentIdList;
186

    
187
		int i = 0;
188
		//for each record
189
		for (i = 0; i < partitionSize; i++){
190
			if (idResultSet.next() == false){
191
				break;
192
			}
193
			//for each column
194
			for (int colIndex = 0; colIndex < nOfIdColumns; colIndex++){
195
				Object oNextId = idResultSet.getObject(colIndex + 1);
196
				String strNextId = String.valueOf(oNextId);
197
				currentIdList = currentIdLists[colIndex];
198
				currentIdList.add(strNextId);
199
			}
200
			result = true; //true if at least one record was read
201
		}
202
		rowsInCurrentPartition = i;
203

    
204
		return result;
205
	}
206

    
207

    
208

    
209
	/**
210
	 * Returns the underlying resultSet holding all records needed to handle the partition.<BR>
211
	 * @return
212
	 */
213
	public ResultSet getResultSet(){
214
		return partitionResultSet;
215
	}
216

    
217

    
218

    
219
	/**
220
	 * Computes the value result set needed to handle a partition by using the <code>currentIdList</code>
221
	 * created during {@link #nextPartition}
222
	 * @return ResultSet
223
	 */
224
	private ResultSet makePartitionResultSet(){
225
		int nColumns = currentIdLists.length;
226
		String[] strIdLists = new String[nColumns];
227

    
228
		String strRecordQuery = strRecordQueryTemplate;
229
		for (int col = 0; col < nColumns; col++){
230
			for (String id: currentIdLists[col]){
231
				id = addApostropheIfNeeded(id, currentIdListType[col]);
232
				strIdLists[col] = CdmUtils.concat(",", strIdLists[col], id);
233
			}
234
			strRecordQuery = strRecordQuery.replaceFirst(IPartitionedIO.ID_LIST_TOKEN, strIdLists[col]);
235
		}
236

    
237
		ResultSet resultSet = ResultSetProxy.NewInstance(source.getResultSet(strRecordQuery));
238

    
239
		return resultSet;
240
	}
241

    
242
	/**
243
	 * @param id
244
	 * @param i
245
	 * @return
246
	 */
247
	private String addApostropheIfNeeded(String id, int sqlType) {
248
		String result = id;
249
		if (isStringType(sqlType)){
250
			result = "'" + id + "'";
251
		}
252
		return result;
253
	}
254

    
255
	/**
256
	 * @param sqlType
257
	 * @return
258
	 */
259
	private boolean isStringType(int sqlType) {
260
		if(sqlType == Types.INTEGER){
261
			return false;  //standard case
262
		}else if (sqlType == Types.CHAR || sqlType == Types.CLOB || sqlType == Types.NVARCHAR ||
263
				sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR || sqlType == Types.NCHAR ||
264
				sqlType == Types.LONGNVARCHAR || sqlType == Types.NCLOB){
265
			return true;
266
		}else{
267
			return false;
268
		}
269
	}
270

    
271
	public Map<String, ? extends CdmBase> getObjectMap(Object key){
272
		Map<String, ? extends CdmBase> objectMap = this.relatedObjects.get(key);
273
		return objectMap;
274
	}
275

    
276
	/**
277
	 *
278
	 */
279
	private int getCurrentNumberOfRows() {
280
		return ((currentPartition - 1) * partitionSize + rowsInCurrentPartition);
281
	}
282

    
283

    
284
	/**
285
	 * @param recordsPerTransaction
286
	 * @param partitionedIO
287
	 * @param i
288
	 */
289
	protected TransactionStatus getTransaction(int recordsPerTransaction, IPartitionedIO partitionedIO) {
290
		//if (loopNeedsHandling (i, recordsPerTransaction) || txStatus == null) {
291
			txStatus = partitionedIO.startTransaction();
292
			if(logger.isInfoEnabled()) {
293
				logger.debug("currentPartitionNumber = " + currentPartition + " - Transaction started");
294
			}
295
		//}
296
		return txStatus;
297
	}
298

    
299
// ************************** Not Needed ?? **************************************************
300

    
301
//	protected void doLogPerLoop(int recordsPerLog, String pluralString){
302
//		int count = getAbsoluteRow() - 1;
303
//		if ((count % recordsPerLog ) == 0 && count!= 0 ){
304
//			logger.info(pluralString + " handled: " + (count));
305
//		}
306
//	}
307
//
308
//
309

    
310

    
311

    
312
//	public boolean nextRow() throws SQLException{
313
//		if (currentRowInPartition >= partitionSize ){
314
//			return false;
315
//		}else{
316
//			currentRowInPartition++;
317
//			return resultSet.next();
318
//		}
319
//	}
320
//
321

    
322

    
323

    
324
}
(50-50/63)