Project

General

Profile

Download (10.7 KB) Statistics
| Branch: | Tag: | Revision:
1
/**
2
* Copyright (C) 2007 EDIT
3
* European Distributed Institute of Taxonomy
4
* http://www.e-taxonomy.eu
5
*
6
* The contents of this file are subject to the Mozilla Public License Version 1.1
7
* See LICENSE.TXT at the top of this package for the full license terms.
8
*/
9

    
10
package eu.etaxonomy.cdm.io.common;
11

    
12
import java.sql.ResultSet;
13
import java.sql.ResultSetMetaData;
14
import java.sql.SQLException;
15
import java.sql.Types;
16
import java.util.ArrayList;
17
import java.util.List;
18
import java.util.Map;
19

    
20
import org.apache.log4j.Logger;
21
import org.springframework.transaction.TransactionStatus;
22

    
23
import eu.etaxonomy.cdm.common.CdmUtils;
24
import eu.etaxonomy.cdm.model.common.CdmBase;
25

    
26
/**
27
 * @author a.mueller
28
 * @since 16.02.2010
29
 */
30
public class ResultSetPartitioner<STATE extends IPartitionedState> {
31
	private static final Logger logger = Logger.getLogger(ResultSetPartitioner.class);
32
	private final PartitionerProfiler profiler = new PartitionerProfiler();
33

    
34
//************************* STATIC ***************************************************/
35

    
36
	public static <T extends IPartitionedState> ResultSetPartitioner<T> NewInstance(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
37
		ResultSetPartitioner<T> resultSetPartitioner
38
		        = new ResultSetPartitioner<>(source, strIdQuery, strRecordQuery, partitionSize);
39
		return resultSetPartitioner;
40
	}
41

    
42
//*********************** VARIABLES *************************************************/
43

    
44
	/**
45
	 * The database
46
	 */
47
	private final Source source;
48

    
49
	/**
50
	 * The result set containing all records and at least the ids as a field. This result set
51
	 * will be used for partitioning
52
	 */
53
	private final ResultSet idResultSet;
54

    
55
	/**
56
	 * A template for a SQL Query returning all records and all values needed for a partition
57
	 * to be handled. The 'where' condition is filled by replacing the templates '@IdList' token
58
	 */
59
	private final String strRecordQueryTemplate;
60

    
61
	/**
62
	 * The resultset returned for the strRecordQueryTemplate
63
	 */
64
	private ResultSet partitionResultSet;
65

    
66
	/**
67
	 * A 2-key map holding all related objects needed during the handling of a partition (e.g. when
68
	 * creating a taxon partition the map holds all taxon names.
69
	 * The key is a combination of a namespace and the id in the original source
70
	 */
71
	private Map<Object, Map<String, ? extends CdmBase>> relatedObjects;
72

    
73
	/**
74
	 * Number of records handled in the partition
75
	 */
76
	private final int partitionSize;
77

    
78
	/**
79
	 * Lists of ids handled in this partition (must be an array of lists because sometimes
80
	 * we have non-single keys
81
	 */
82
	private List<String>[] currentIdLists;
83

    
84
	/**
85
	 * The sql type of the id columns.
86
	 * @see Types
87
	 */
88
	private int[] currentIdListType;
89

    
90
	private String lastPartitionHighestIDs;
91

    
92
	boolean nextAlreadyCalled = false;
93

    
94
	/**
95
	 * counter for the partitions
96
	 */
97
	private int currentPartition;
98

    
99

    
100
	/**
101
	 * counter for all records
102
	 */
103
	private int allRecords;
104

    
105
	/**
106
	 * number of records in the current partition
107
	 */
108
	private int rowsInCurrentPartition;
109

    
110
	private TransactionStatus txStatus;
111

    
112
//*********************** CONSTRUCTOR *************************************************/
113

    
114
	private ResultSetPartitioner(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
115
		ResultSet idResultSet = source.getResultSet(strIdQuery);
116
//		if (! idResultSet.isBeforeFirst()){
117
//			idResultSet.beforeFirst();
118
//		}
119
		this.source = source;
120
		this.idResultSet = idResultSet;
121
		this.strRecordQueryTemplate = strRecordQuery;
122
		this.partitionSize = partitionSize;
123
	}
124

    
125
//************************ METHODS ****************************************************/
126

    
127

    
128
	/**
129
	 * Import the whole partition of an input stream by starting a transaction, getting the related objects
130
	 * stored in the destination, invoke the IOs partition handling and commit the transaction
131
	 * @param partitionedIO
132
	 */
133
	public void doPartition(IPartitionedIO<STATE> partitionedIO, STATE state) {
134
		int i = 0;
135
	    try{
136
			profiler.startTx();
137
			TransactionStatus txStatus = getTransaction(partitionSize, partitionedIO);
138

    
139
			i = 1;
140
			state.makeTransactionalSourceReference(partitionedIO.getReferenceService());
141

    
142
			profiler.startRs();
143
			ResultSet rs = makePartitionResultSet();
144
            i = 2;
145
			profiler.startRelObjects();
146
			this.relatedObjects = partitionedIO.getRelatedObjectsForPartition(rs, state);
147
            i = 3;
148
            state.setRelatedObjects(relatedObjects);
149
            i = 4;
150
			profiler.startRs2();
151
			partitionResultSet = makePartitionResultSet();
152
            i = 5;
153
			profiler.startDoPartition();
154
			partitionedIO.doPartition(this, state);
155
            i = 6;
156
			profiler.startDoCommit();
157
			partitionedIO.commitTransaction(txStatus);
158
			state.resetTransactionalSourceReference();
159
            i = 7;
160
			profiler.end();
161
			state.setRelatedObjects(null);
162
            i = 8;
163
			logger.info("Saved " + getCurrentNumberOfRows() + " " + partitionedIO.getPluralString() );
164
			profiler.print();
165
		}catch(Exception e){
166
			String message = "Exception (%s) occurred at position " + i + " while handling import partition for %s.";
167
			message = String.format(message, e.getMessage(), partitionedIO.getPluralString());
168
			e.printStackTrace();
169
			throw new RuntimeException(message, e);
170
		}
171
	}
172

    
173

    
174
	public void startDoSave(){
175
		profiler.startDoSave();
176
	}
177

    
178
	/**
179
	 * Increases the partition counter and generates the <code>currentIdLists</code> for this partition
180
	 * @return
181
	 * @throws SQLException
182
	 */
183
    public boolean nextPartition() throws SQLException{
184
		boolean result = false;
185
		ResultSetMetaData metaData = idResultSet.getMetaData();
186
		int nOfIdColumns = metaData.getColumnCount();
187
		currentPartition++;
188

    
189
		currentIdLists = new List[nOfIdColumns];
190
		currentIdListType = new int[nOfIdColumns];
191

    
192
		for (int col = 0; col< currentIdLists.length; col++){
193
			currentIdLists[col] = new ArrayList<>();
194
			currentIdListType[col] = metaData.getColumnType(col + 1);
195
		}
196

    
197
		int i = 0;
198
		//for each record
199
		for (i = 0; i < partitionSize || !firstIdIsNew(); i++){
200
			if( !nextAlreadyCalled){
201
			    if (!idResultSet.next()){
202
			        break;
203
			    }
204
			}else if (idResultSet.isAfterLast()){
205
			    break;
206
			}
207
			nextAlreadyCalled = false;
208
		    allRecords++;
209
			//for each column
210
			for (int colIndex = 0; colIndex < nOfIdColumns; colIndex++){
211

    
212
			    String strNextId = String.valueOf(idResultSet.getObject(colIndex + 1));
213
				List<String> currentIdList = currentIdLists[colIndex];
214
				currentIdList.add(strNextId);
215
				if(colIndex == 0){
216
				    lastPartitionHighestIDs = strNextId;
217
				}
218
			}
219
			result = true; //true if at least one record was read
220
		}
221
		rowsInCurrentPartition = i;
222

    
223
		return result;
224
	}
225

    
226
    /**
227
     * Checks if the current partition may have duplicates that were handled already.
228
     * This may happen if the result set has >1 columns and if the first column does
229
     * not change it value with the first record but with a later record and the
230
     * following columns jump back with there values.
231
     * E.g. first result set contains (x1=1,x2=3) and with the second result set
232
     * we ask for x1 in (1,2) x2 in (1-5, 10-15) where the 1-5 comes from x1=2.
233
     * This should not happen and therefore we increase the partition a bit such that
234
     * the first column always changes its value and therefore it is guranteed that
235
     * such duplicates will never exist.
236
     * The reason for this problem is, that we do not create tuples in the WHERE clause
237
     * of getIdRecord but we handle the range for each column separately. This is not correct
238
     * but handling of tuples is more difficult in SQL.
239
     * As multiple columns do not appear so often this workaround seems acceptable.
240
     * @return
241
     * @throws SQLException
242
     */
243
    private boolean firstIdIsNew() throws SQLException {
244
        String last = lastPartitionHighestIDs;
245
        if (!idResultSet.next()){
246
            return true;
247
        }
248
        nextAlreadyCalled = true;
249
        String current = String.valueOf(idResultSet.getObject(1));
250
        return !current.equals(last);
251
    }
252

    
253
    /**
254
	 * Returns the underlying resultSet holding all records needed to handle the partition.<BR>
255
	 * @return
256
	 */
257
	public ResultSet getResultSet(){
258
		return partitionResultSet;
259
	}
260

    
261
	/**
262
	 * Computes the value result set needed to handle a partition by using the <code>currentIdList</code>
263
	 * created during {@link #nextPartition}
264
	 * @return ResultSet
265
	 */
266
	private ResultSet makePartitionResultSet(){
267
		int nColumns = currentIdLists.length;
268
		String[] strIdLists = new String[nColumns];
269

    
270
		String strRecordQuery = strRecordQueryTemplate;
271
		for (int col = 0; col < nColumns; col++){
272
			for (String id: currentIdLists[col]){
273
				id = addApostropheIfNeeded(id, currentIdListType[col]);
274
				strIdLists[col] = CdmUtils.concat(",", strIdLists[col], id);
275
			}
276
			strRecordQuery = strRecordQuery.replaceFirst(IPartitionedIO.ID_LIST_TOKEN, strIdLists[col]);
277
		}
278

    
279
		ResultSet resultSet = ResultSetProxy.NewInstance(source.getResultSet(strRecordQuery));
280

    
281
		return resultSet;
282
	}
283

    
284
	private String addApostropheIfNeeded(String id, int sqlType) {
285
		String result = id;
286
		if (isStringType(sqlType)){
287
			result = "'" + id + "'";
288
		}
289
		return result;
290
	}
291

    
292
	private boolean isStringType(int sqlType) {
293
		if(sqlType == Types.INTEGER){
294
			return false;  //standard case
295
		}else if (sqlType == Types.CHAR || sqlType == Types.CLOB || sqlType == Types.NVARCHAR ||
296
				sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR || sqlType == Types.NCHAR ||
297
				sqlType == Types.LONGNVARCHAR || sqlType == Types.NCLOB){
298
			return true;
299
		}else{
300
			return false;
301
		}
302
	}
303

    
304
	public Map<String, ? extends CdmBase> getObjectMap(Object key){
305
		Map<String, ? extends CdmBase> objectMap = this.relatedObjects.get(key);
306
		return objectMap;
307
	}
308

    
309
	/**
310
	 *
311
	 */
312
	private int getCurrentNumberOfRows() {
313
	    return allRecords;
314
//		return ((currentPartition - 1) * partitionSize + rowsInCurrentPartition);
315
	}
316

    
317

    
318
	/**
319
	 * @param recordsPerTransaction
320
	 * @param partitionedIO
321
	 * @param i
322
	 */
323
	protected TransactionStatus getTransaction(int recordsPerTransaction, IPartitionedIO partitionedIO) {
324
		//if (loopNeedsHandling (i, recordsPerTransaction) || txStatus == null) {
325
			txStatus = partitionedIO.startTransaction();
326
			if(logger.isInfoEnabled()) {
327
				logger.debug("currentPartitionNumber = " + currentPartition + " - Transaction started");
328
			}
329
		//}
330
		return txStatus;
331
	}
332

    
333
// ************************** Not Needed ?? **************************************************
334

    
335
//	protected void doLogPerLoop(int recordsPerLog, String pluralString){
336
//		int count = getAbsoluteRow() - 1;
337
//		if ((count % recordsPerLog ) == 0 && count!= 0 ){
338
//			logger.info(pluralString + " handled: " + (count));
339
//		}
340
//	}
341
//
342
//
343

    
344

    
345

    
346
//	public boolean nextRow() throws SQLException{
347
//		if (currentRowInPartition >= partitionSize ){
348
//			return false;
349
//		}else{
350
//			currentRowInPartition++;
351
//			return resultSet.next();
352
//		}
353
//	}
354
//
355

    
356

    
357

    
358
}
(51-51/65)