Project

General

Profile

Download (8.9 KB) Statistics
| Branch: | Tag: | Revision:
1
/**
2
* Copyright (C) 2007 EDIT
3
* European Distributed Institute of Taxonomy
4
* http://www.e-taxonomy.eu
5
*
6
* The contents of this file are subject to the Mozilla Public License Version 1.1
7
* See LICENSE.TXT at the top of this package for the full license terms.
8
*/
9

    
10
package eu.etaxonomy.cdm.io.common;
11

    
12
import java.sql.ResultSet;
13
import java.sql.ResultSetMetaData;
14
import java.sql.SQLException;
15
import java.sql.Types;
16
import java.util.ArrayList;
17
import java.util.List;
18
import java.util.Map;
19

    
20
import org.apache.log4j.Logger;
21
import org.springframework.transaction.TransactionStatus;
22

    
23
import eu.etaxonomy.cdm.common.CdmUtils;
24
import eu.etaxonomy.cdm.model.common.CdmBase;
25

    
26
/**
27
 * @author a.mueller
28
 * @since 16.02.2010
29
 */
30
public class ResultSetPartitioner<STATE extends IPartitionedState> {
31
	private static final Logger logger = Logger.getLogger(ResultSetPartitioner.class);
32
	private final PartitionerProfiler profiler = new PartitionerProfiler();
33

    
34
//************************* STATIC ***************************************************/
35

    
36
	public static <T extends IPartitionedState> ResultSetPartitioner<T> NewInstance(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
37
		ResultSetPartitioner<T> resultSetPartitioner
38
		        = new ResultSetPartitioner<>(source, strIdQuery, strRecordQuery, partitionSize);
39
		return resultSetPartitioner;
40
	}
41

    
42
//*********************** VARIABLES *************************************************/
43

    
44
	/**
45
	 * The database
46
	 */
47
	private final Source source;
48

    
49
	/**
50
	 * The result set containing all records and at least the ids as a field. This result set
51
	 * will be used for partitioning
52
	 */
53
	private final ResultSet idResultSet;
54

    
55
	/**
56
	 * A template for a SQL Query returning all records and all values needed for a partition
57
	 * to be handled. The 'where' condition is filled by replacing the templates '@IdList' token
58
	 */
59
	private final String strRecordQueryTemplate;
60

    
61
	/**
62
	 * The resultset returned for the strRecordQueryTemplate
63
	 */
64
	private ResultSet partitionResultSet;
65

    
66
	/**
67
	 * A 2-key map holding all related objects needed during the handling of a partition (e.g. when
68
	 * creating a taxon partition the map holds all taxon names.
69
	 * The key is a combination of a namespace and the id in the original source
70
	 */
71
	private Map<Object, Map<String, ? extends CdmBase>> relatedObjects;
72

    
73
	/**
74
	 * Number of records handled in the partition
75
	 */
76
	private final int partitionSize;
77

    
78
	/**
79
	 * Lists of ids handled in this partition (must be an array of lists because sometimes
80
	 * we have non-single keys
81
	 */
82
	private List<String>[] currentIdLists;
83

    
84
	/**
85
	 * The sql type of the id columns.
86
	 * @see Types
87
	 */
88
	private int[] currentIdListType;
89

    
90
	/**
91
	 * counter for the partitions
92
	 */
93
	private int currentPartition;
94

    
95
	/**
96
	 * number of records in the current partition
97
	 */
98
	private int rowsInCurrentPartition;
99

    
100
	private TransactionStatus txStatus;
101

    
102
//*********************** CONSTRUCTOR *************************************************/
103

    
104
	private ResultSetPartitioner(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
105
		ResultSet idResultSet = source.getResultSet(strIdQuery);
106
//		if (! idResultSet.isBeforeFirst()){
107
//			idResultSet.beforeFirst();
108
//		}
109
		this.source = source;
110
		this.idResultSet = idResultSet;
111
		this.strRecordQueryTemplate = strRecordQuery;
112
		this.partitionSize = partitionSize;
113
	}
114

    
115
//************************ METHODS ****************************************************/
116

    
117

    
118
	/**
119
	 * Import the whole partition of an input stream by starting a transaction, getting the related objects
120
	 * stored in the destination, invoke the IOs partition handling and commit the transaction
121
	 * @param partitionedIO
122
	 */
123
	public void doPartition(IPartitionedIO<STATE> partitionedIO, STATE state) {
124
		try{
125
			profiler.startTx();
126
			TransactionStatus txStatus = getTransaction(partitionSize, partitionedIO);
127

    
128
			state.makeTransactionalSourceReference(partitionedIO.getReferenceService());
129

    
130
			profiler.startRs();
131
			ResultSet rs = makePartitionResultSet();
132

    
133
			profiler.startRelObjects();
134
			this.relatedObjects = partitionedIO.getRelatedObjectsForPartition(rs, state);
135
			state.setRelatedObjects(relatedObjects);
136

    
137
			profiler.startRs2();
138
			partitionResultSet = makePartitionResultSet();
139

    
140
			profiler.startDoPartition();
141
			partitionedIO.doPartition(this, state);
142

    
143
			profiler.startDoCommit();
144
			partitionedIO.commitTransaction(txStatus);
145
			state.resetTransactionalSourceReference();
146

    
147
			profiler.end();
148
			state.setRelatedObjects(null);
149

    
150
			logger.info("Saved " + getCurrentNumberOfRows() + " " + partitionedIO.getPluralString() );
151
			profiler.print();
152
		}catch(Exception e){
153
			String message = "Exception (%s) occurred while handling import partition for %s.";
154
			message = String.format(message, e.getLocalizedMessage(), partitionedIO.getPluralString());
155
			throw new RuntimeException(message, e);
156
		}
157
	}
158

    
159

    
160
	public void startDoSave(){
161
		profiler.startDoSave();
162
	}
163

    
164
	/**
165
	 * Increases the partition counter and generates the <code>currentIdLists</code> for this partition
166
	 * @return
167
	 * @throws SQLException
168
	 */
169
	public boolean nextPartition() throws SQLException{
170
		boolean result = false;
171
		ResultSetMetaData metaData = idResultSet.getMetaData();
172
		int nOfIdColumns = metaData.getColumnCount();
173
		currentPartition++;
174
		currentIdLists = new List[nOfIdColumns];
175
		currentIdListType = new int[nOfIdColumns];
176

    
177
		for (int col = 0; col< currentIdLists.length; col++){
178
			currentIdLists[col] = new ArrayList<>();
179
			currentIdListType[col] = metaData.getColumnType(col + 1);
180
		}
181
		List<String> currentIdList;
182

    
183
		int i = 0;
184
		//for each record
185
		for (i = 0; i < partitionSize; i++){
186
			if (idResultSet.next() == false){
187
				break;
188
			}
189
			//for each column
190
			for (int colIndex = 0; colIndex < nOfIdColumns; colIndex++){
191
				Object oNextId = idResultSet.getObject(colIndex + 1);
192
				String strNextId = String.valueOf(oNextId);
193
				currentIdList = currentIdLists[colIndex];
194
				currentIdList.add(strNextId);
195
			}
196
			result = true; //true if at least one record was read
197
		}
198
		rowsInCurrentPartition = i;
199

    
200
		return result;
201
	}
202

    
203

    
204

    
205
	/**
206
	 * Returns the underlying resultSet holding all records needed to handle the partition.<BR>
207
	 * @return
208
	 */
209
	public ResultSet getResultSet(){
210
		return partitionResultSet;
211
	}
212

    
213

    
214

    
215
	/**
216
	 * Computes the value result set needed to handle a partition by using the <code>currentIdList</code>
217
	 * created during {@link #nextPartition}
218
	 * @return ResultSet
219
	 */
220
	private ResultSet makePartitionResultSet(){
221
		int nColumns = currentIdLists.length;
222
		String[] strIdLists = new String[nColumns];
223

    
224
		String strRecordQuery = strRecordQueryTemplate;
225
		for (int col = 0; col < nColumns; col++){
226
			for (String id: currentIdLists[col]){
227
				id = addApostropheIfNeeded(id, currentIdListType[col]);
228
				strIdLists[col] = CdmUtils.concat(",", strIdLists[col], id);
229
			}
230
			strRecordQuery = strRecordQuery.replaceFirst(IPartitionedIO.ID_LIST_TOKEN, strIdLists[col]);
231
		}
232

    
233
		ResultSet resultSet = ResultSetProxy.NewInstance(source.getResultSet(strRecordQuery));
234

    
235
		return resultSet;
236
	}
237

    
238
	/**
239
	 * @param id
240
	 * @param i
241
	 * @return
242
	 */
243
	private String addApostropheIfNeeded(String id, int sqlType) {
244
		String result = id;
245
		if (isStringType(sqlType)){
246
			result = "'" + id + "'";
247
		}
248
		return result;
249
	}
250

    
251
	/**
252
	 * @param sqlType
253
	 * @return
254
	 */
255
	private boolean isStringType(int sqlType) {
256
		if(sqlType == Types.INTEGER){
257
			return false;  //standard case
258
		}else if (sqlType == Types.CHAR || sqlType == Types.CLOB || sqlType == Types.NVARCHAR ||
259
				sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR || sqlType == Types.NCHAR ||
260
				sqlType == Types.LONGNVARCHAR || sqlType == Types.NCLOB){
261
			return true;
262
		}else{
263
			return false;
264
		}
265
	}
266

    
267
	public Map<String, ? extends CdmBase> getObjectMap(Object key){
268
		Map<String, ? extends CdmBase> objectMap = this.relatedObjects.get(key);
269
		return objectMap;
270
	}
271

    
272
	/**
273
	 *
274
	 */
275
	private int getCurrentNumberOfRows() {
276
		return ((currentPartition - 1) * partitionSize + rowsInCurrentPartition);
277
	}
278

    
279

    
280
	/**
281
	 * @param recordsPerTransaction
282
	 * @param partitionedIO
283
	 * @param i
284
	 */
285
	protected TransactionStatus getTransaction(int recordsPerTransaction, IPartitionedIO partitionedIO) {
286
		//if (loopNeedsHandling (i, recordsPerTransaction) || txStatus == null) {
287
			txStatus = partitionedIO.startTransaction();
288
			if(logger.isInfoEnabled()) {
289
				logger.debug("currentPartitionNumber = " + currentPartition + " - Transaction started");
290
			}
291
		//}
292
		return txStatus;
293
	}
294

    
295
// ************************** Not Needed ?? **************************************************
296

    
297
//	protected void doLogPerLoop(int recordsPerLog, String pluralString){
298
//		int count = getAbsoluteRow() - 1;
299
//		if ((count % recordsPerLog ) == 0 && count!= 0 ){
300
//			logger.info(pluralString + " handled: " + (count));
301
//		}
302
//	}
303
//
304
//
305

    
306

    
307

    
308
//	public boolean nextRow() throws SQLException{
309
//		if (currentRowInPartition >= partitionSize ){
310
//			return false;
311
//		}else{
312
//			currentRowInPartition++;
313
//			return resultSet.next();
314
//		}
315
//	}
316
//
317

    
318

    
319

    
320
}
(50-50/63)