3 * Copyright (C) 2007 EDIT
4 * European Distributed Institute of Taxonomy
5 * http://www.e-taxonomy.eu
7 * The contents of this file are subject to the Mozilla Public License Version 1.1
8 * See LICENSE.TXT at the top of this package for the full license terms.
11 package eu
.etaxonomy
.cdm
.io
.common
;
13 import java
.sql
.ResultSet
;
14 import java
.sql
.ResultSetMetaData
;
15 import java
.sql
.SQLException
;
16 import java
.sql
.Types
;
17 import java
.util
.ArrayList
;
18 import java
.util
.List
;
21 import org
.apache
.log4j
.Logger
;
22 import org
.springframework
.transaction
.TransactionStatus
;
24 import eu
.etaxonomy
.cdm
.common
.CdmUtils
;
25 import eu
.etaxonomy
.cdm
.model
.common
.CdmBase
;
32 public class ResultSetPartitioner
<STATE
extends IPartitionedState
> {
33 private static final Logger logger
= Logger
.getLogger(ResultSetPartitioner
.class);
34 private PartitionerProfiler profiler
= new PartitionerProfiler();
36 //************************* STATIC ***************************************************/
38 public static ResultSetPartitioner
NewInstance(Source source
, String strIdQuery
, String strRecordQuery
, int partitionSize
) throws SQLException
{
39 ResultSetPartitioner resultSetPartitioner
= new ResultSetPartitioner(source
, strIdQuery
, strRecordQuery
, partitionSize
);
40 return resultSetPartitioner
;
43 //*********************** VARIABLES *************************************************/
48 private Source source
;
51 * The result set containing all records and at least the ids as a field. This result set
52 * will be used for partitioning
54 private ResultSet idResultSet
;
57 * A template for a SQL Query returning all records and all values needed for a partition
58 * to be handled. The 'where' condition is filled by replacing the templates '@IdList' token
60 private String strRecordQueryTemplate
;
63 * The resultset returned for the strRecordQueryTemplate
65 private ResultSet partitionResultSet
;
68 * A 2-key map holding all related objects needed during the handling of a partition (e.g. when
69 * creating a taxon partition the map holds all taxon names.
70 * The key is a combination of a namespace and the id in the original source
72 private Map
<Object
, Map
<String
, CdmBase
>> relatedObjects
;
75 * Number of records handled in the partition
77 private int partitionSize
;
80 * Lists of ids handled in this partition (must be an array of lists because sometimes
81 * we have non-single keys
83 private List
<String
>[] currentIdLists
;
86 * The sql type of the id columns.
89 private int[] currentIdListType
;
92 * counter for the partitions
94 private int currentPartition
;
97 * number of records in the current partition
99 private int rowsInCurrentPartition
;
101 private TransactionStatus txStatus
;
103 //*********************** CONSTRUCTOR *************************************************/
105 private ResultSetPartitioner(Source source
, String strIdQuery
, String strRecordQuery
, int partitionSize
) throws SQLException
{
106 ResultSet idResultSet
= source
.getResultSet(strIdQuery
);
107 // if (! idResultSet.isBeforeFirst()){
108 // idResultSet.beforeFirst();
110 this.source
= source
;
111 this.idResultSet
= idResultSet
;
112 this.strRecordQueryTemplate
= strRecordQuery
;
113 this.partitionSize
= partitionSize
;
116 //************************ METHODS ****************************************************/
120 * Import the whole partition of an input stream by starting a transaction, getting the related objects
121 * stored in the destination, invoke the IOs partition handling and commit the transaction
122 * @param partitionedIO
124 public void doPartition(IPartitionedIO partitionedIO
, STATE state
) {
127 TransactionStatus txStatus
= getTransaction(partitionSize
, partitionedIO
);
129 state
.makeTransactionalSourceReference(partitionedIO
.getReferenceService());
132 ResultSet rs
= makePartitionResultSet();
134 profiler
.startRelObjects();
135 this.relatedObjects
= partitionedIO
.getRelatedObjectsForPartition(rs
);
136 state
.setRelatedObjects(relatedObjects
);
139 partitionResultSet
= makePartitionResultSet();
141 profiler
.startDoPartition();
142 partitionedIO
.doPartition(this, state
);
144 profiler
.startDoCommit();
145 partitionedIO
.commitTransaction(txStatus
);
146 state
.resetTransactionalSourceReference();
149 state
.setRelatedObjects(null);
151 logger
.info("Saved " + getCurrentNumberOfRows() + " " + partitionedIO
.getPluralString() );
154 String message
= "Exception (%s) occurred while handling import partition for %s.";
155 message
= String
.format(message
, e
.getLocalizedMessage(), partitionedIO
.getPluralString());
156 throw new RuntimeException(message
, e
);
161 public void startDoSave(){
162 profiler
.startDoSave();
166 * Increases the partition counter and generates the <code>currentIdLists</code> for this partition
168 * @throws SQLException
170 public boolean nextPartition() throws SQLException
{
171 boolean result
= false;
172 ResultSetMetaData metaData
= idResultSet
.getMetaData();
173 int nOfIdColumns
= metaData
.getColumnCount();
175 currentIdLists
= new ArrayList
[nOfIdColumns
];
176 currentIdListType
= new int[nOfIdColumns
];
178 for (int col
= 0; col
< currentIdLists
.length
; col
++){
179 currentIdLists
[col
] = new ArrayList
<String
>();
180 currentIdListType
[col
] = metaData
.getColumnType(col
+ 1);
182 List
<String
> currentIdList
;
186 for (i
= 0; i
< partitionSize
; i
++){
187 if (idResultSet
.next() == false){
191 for (int colIndex
= 0; colIndex
< nOfIdColumns
; colIndex
++){
192 Object oNextId
= idResultSet
.getObject(colIndex
+ 1);
193 String strNextId
= String
.valueOf(oNextId
);
194 currentIdList
= currentIdLists
[colIndex
];
195 currentIdList
.add(strNextId
);
197 result
= true; //true if at least one record was read
199 rowsInCurrentPartition
= i
;
207 * Returns the underlying resultSet holding all records needed to handle the partition.<BR>
210 public ResultSet
getResultSet(){
211 return partitionResultSet
;
217 * Computes the value result set needed to handle a partition by using the <code>currentIdList</code>
218 * created during {@link #nextPartition}
221 private ResultSet
makePartitionResultSet(){
222 int nColumns
= currentIdLists
.length
;
223 String
[] strIdLists
= new String
[nColumns
];
225 String strRecordQuery
= strRecordQueryTemplate
;
226 for (int col
= 0; col
< nColumns
; col
++){
227 for (String id
: currentIdLists
[col
]){
228 id
= addApostropheIfNeeded(id
, currentIdListType
[col
]);
229 strIdLists
[col
] = CdmUtils
.concat(",", strIdLists
[col
], id
);
231 strRecordQuery
= strRecordQuery
.replaceFirst(IPartitionedIO
.ID_LIST_TOKEN
, strIdLists
[col
]);
234 ResultSet resultSet
= ResultSetProxy
.NewInstance(source
.getResultSet(strRecordQuery
));
244 private String
addApostropheIfNeeded(String id
, int sqlType
) {
246 if (isStringType(sqlType
)){
247 result
= "'" + id
+ "'";
256 private boolean isStringType(int sqlType
) {
257 if(sqlType
== Types
.INTEGER
){
258 return false; //standard case
259 }else if (sqlType
== Types
.CHAR
|| sqlType
== Types
.CLOB
|| sqlType
== Types
.NVARCHAR
||
260 sqlType
== Types
.VARCHAR
|| sqlType
== Types
.LONGVARCHAR
|| sqlType
== Types
.NCHAR
||
261 sqlType
== Types
.LONGNVARCHAR
|| sqlType
== Types
.NCLOB
){
268 public Map
<String
, ?
extends CdmBase
> getObjectMap(Object key
){
269 Map
<String
, ?
extends CdmBase
> objectMap
= this.relatedObjects
.get(key
);
276 private int getCurrentNumberOfRows() {
277 return ((currentPartition
- 1) * partitionSize
+ rowsInCurrentPartition
);
282 * @param recordsPerTransaction
283 * @param partitionedIO
286 protected TransactionStatus
getTransaction(int recordsPerTransaction
, IPartitionedIO partitionedIO
) {
287 //if (loopNeedsHandling (i, recordsPerTransaction) || txStatus == null) {
288 txStatus
= partitionedIO
.startTransaction();
289 if(logger
.isInfoEnabled()) {
290 logger
.debug("currentPartitionNumber = " + currentPartition
+ " - Transaction started");
296 // ************************** Not Needed ?? **************************************************
298 // protected void doLogPerLoop(int recordsPerLog, String pluralString){
299 // int count = getAbsoluteRow() - 1;
300 // if ((count % recordsPerLog ) == 0 && count!= 0 ){
301 // logger.info(pluralString + " handled: " + (count));
309 // public boolean nextRow() throws SQLException{
310 // if (currentRowInPartition >= partitionSize ){
313 // currentRowInPartition++;
314 // return resultSet.next();