3 * Copyright (C) 2007 EDIT
4 * European Distributed Institute of Taxonomy
5 * http://www.e-taxonomy.eu
7 * The contents of this file are subject to the Mozilla Public License Version 1.1
8 * See LICENSE.TXT at the top of this package for the full license terms.
11 package eu
.etaxonomy
.cdm
.io
.common
;
13 import java
.sql
.ResultSet
;
14 import java
.sql
.ResultSetMetaData
;
15 import java
.sql
.SQLException
;
16 import java
.sql
.Types
;
17 import java
.util
.ArrayList
;
18 import java
.util
.List
;
21 import org
.apache
.log4j
.Logger
;
22 import org
.springframework
.transaction
.TransactionStatus
;
24 import eu
.etaxonomy
.cdm
.common
.CdmUtils
;
25 import eu
.etaxonomy
.cdm
.model
.common
.CdmBase
;
31 public class ResultSetPartitioner
<STATE
extends IPartitionedState
> {
32 private static final Logger logger
= Logger
.getLogger(ResultSetPartitioner
.class);
33 private PartitionerProfiler profiler
= new PartitionerProfiler();
35 //************************* STATIC ***************************************************/
37 public static ResultSetPartitioner
NewInstance(Source source
, String strIdQuery
, String strRecordQuery
, int partitionSize
) throws SQLException
{
38 ResultSetPartitioner
<IPartitionedState
> resultSetPartitioner
= new ResultSetPartitioner
<IPartitionedState
>(source
, strIdQuery
, strRecordQuery
, partitionSize
);
39 return resultSetPartitioner
;
42 //*********************** VARIABLES *************************************************/
47 private Source source
;
50 * The result set containing all records and at least the ids as a field. This result set
51 * will be used for partitioning
53 private ResultSet idResultSet
;
56 * A template for a SQL Query returning all records and all values needed for a partition
57 * to be handled. The 'where' condition is filled by replacing the templates '@IdList' token
59 private String strRecordQueryTemplate
;
62 * The resultset returned for the strRecordQueryTemplate
64 private ResultSet partitionResultSet
;
67 * A 2-key map holding all related objects needed during the handling of a partition (e.g. when
68 * creating a taxon partition the map holds all taxon names.
69 * The key is a combination of a namespace and the id in the original source
71 private Map
<Object
, Map
<String
, ?
extends CdmBase
>> relatedObjects
;
74 * Number of records handled in the partition
76 private int partitionSize
;
79 * Lists of ids handled in this partition (must be an array of lists because sometimes
80 * we have non-single keys
82 private List
<String
>[] currentIdLists
;
85 * The sql type of the id columns.
88 private int[] currentIdListType
;
91 * counter for the partitions
93 private int currentPartition
;
96 * number of records in the current partition
98 private int rowsInCurrentPartition
;
100 private TransactionStatus txStatus
;
102 //*********************** CONSTRUCTOR *************************************************/
104 private ResultSetPartitioner(Source source
, String strIdQuery
, String strRecordQuery
, int partitionSize
) throws SQLException
{
105 ResultSet idResultSet
= source
.getResultSet(strIdQuery
);
106 // if (! idResultSet.isBeforeFirst()){
107 // idResultSet.beforeFirst();
109 this.source
= source
;
110 this.idResultSet
= idResultSet
;
111 this.strRecordQueryTemplate
= strRecordQuery
;
112 this.partitionSize
= partitionSize
;
115 //************************ METHODS ****************************************************/
119 * Import the whole partition of an input stream by starting a transaction, getting the related objects
120 * stored in the destination, invoke the IOs partition handling and commit the transaction
121 * @param partitionedIO
123 public void doPartition(IPartitionedIO
<STATE
> partitionedIO
, STATE state
) {
126 TransactionStatus txStatus
= getTransaction(partitionSize
, partitionedIO
);
128 state
.makeTransactionalSourceReference(partitionedIO
.getReferenceService());
131 ResultSet rs
= makePartitionResultSet();
133 profiler
.startRelObjects();
134 this.relatedObjects
= partitionedIO
.getRelatedObjectsForPartition(rs
, state
);
135 state
.setRelatedObjects(relatedObjects
);
138 partitionResultSet
= makePartitionResultSet();
140 profiler
.startDoPartition();
141 partitionedIO
.doPartition(this, state
);
143 profiler
.startDoCommit();
144 partitionedIO
.commitTransaction(txStatus
);
145 state
.resetTransactionalSourceReference();
148 state
.setRelatedObjects(null);
150 logger
.info("Saved " + getCurrentNumberOfRows() + " " + partitionedIO
.getPluralString() );
153 String message
= "Exception (%s) occurred while handling import partition for %s.";
154 message
= String
.format(message
, e
.getLocalizedMessage(), partitionedIO
.getPluralString());
155 throw new RuntimeException(message
, e
);
160 public void startDoSave(){
161 profiler
.startDoSave();
165 * Increases the partition counter and generates the <code>currentIdLists</code> for this partition
167 * @throws SQLException
169 public boolean nextPartition() throws SQLException
{
170 boolean result
= false;
171 ResultSetMetaData metaData
= idResultSet
.getMetaData();
172 int nOfIdColumns
= metaData
.getColumnCount();
174 currentIdLists
= new ArrayList
[nOfIdColumns
];
175 currentIdListType
= new int[nOfIdColumns
];
177 for (int col
= 0; col
< currentIdLists
.length
; col
++){
178 currentIdLists
[col
] = new ArrayList
<String
>();
179 currentIdListType
[col
] = metaData
.getColumnType(col
+ 1);
181 List
<String
> currentIdList
;
185 for (i
= 0; i
< partitionSize
; i
++){
186 if (idResultSet
.next() == false){
190 for (int colIndex
= 0; colIndex
< nOfIdColumns
; colIndex
++){
191 Object oNextId
= idResultSet
.getObject(colIndex
+ 1);
192 String strNextId
= String
.valueOf(oNextId
);
193 currentIdList
= currentIdLists
[colIndex
];
194 currentIdList
.add(strNextId
);
196 result
= true; //true if at least one record was read
198 rowsInCurrentPartition
= i
;
206 * Returns the underlying resultSet holding all records needed to handle the partition.<BR>
209 public ResultSet
getResultSet(){
210 return partitionResultSet
;
216 * Computes the value result set needed to handle a partition by using the <code>currentIdList</code>
217 * created during {@link #nextPartition}
220 private ResultSet
makePartitionResultSet(){
221 int nColumns
= currentIdLists
.length
;
222 String
[] strIdLists
= new String
[nColumns
];
224 String strRecordQuery
= strRecordQueryTemplate
;
225 for (int col
= 0; col
< nColumns
; col
++){
226 for (String id
: currentIdLists
[col
]){
227 id
= addApostropheIfNeeded(id
, currentIdListType
[col
]);
228 strIdLists
[col
] = CdmUtils
.concat(",", strIdLists
[col
], id
);
230 strRecordQuery
= strRecordQuery
.replaceFirst(IPartitionedIO
.ID_LIST_TOKEN
, strIdLists
[col
]);
233 ResultSet resultSet
= ResultSetProxy
.NewInstance(source
.getResultSet(strRecordQuery
));
243 private String
addApostropheIfNeeded(String id
, int sqlType
) {
245 if (isStringType(sqlType
)){
246 result
= "'" + id
+ "'";
255 private boolean isStringType(int sqlType
) {
256 if(sqlType
== Types
.INTEGER
){
257 return false; //standard case
258 }else if (sqlType
== Types
.CHAR
|| sqlType
== Types
.CLOB
|| sqlType
== Types
.NVARCHAR
||
259 sqlType
== Types
.VARCHAR
|| sqlType
== Types
.LONGVARCHAR
|| sqlType
== Types
.NCHAR
||
260 sqlType
== Types
.LONGNVARCHAR
|| sqlType
== Types
.NCLOB
){
267 public Map
<String
, ?
extends CdmBase
> getObjectMap(Object key
){
268 Map
<String
, ?
extends CdmBase
> objectMap
= this.relatedObjects
.get(key
);
275 private int getCurrentNumberOfRows() {
276 return ((currentPartition
- 1) * partitionSize
+ rowsInCurrentPartition
);
281 * @param recordsPerTransaction
282 * @param partitionedIO
285 protected TransactionStatus
getTransaction(int recordsPerTransaction
, IPartitionedIO partitionedIO
) {
286 //if (loopNeedsHandling (i, recordsPerTransaction) || txStatus == null) {
287 txStatus
= partitionedIO
.startTransaction();
288 if(logger
.isInfoEnabled()) {
289 logger
.debug("currentPartitionNumber = " + currentPartition
+ " - Transaction started");
295 // ************************** Not Needed ?? **************************************************
297 // protected void doLogPerLoop(int recordsPerLog, String pluralString){
298 // int count = getAbsoluteRow() - 1;
299 // if ((count % recordsPerLog ) == 0 && count!= 0 ){
300 // logger.info(pluralString + " handled: " + (count));
308 // public boolean nextRow() throws SQLException{
309 // if (currentRowInPartition >= partitionSize ){
312 // currentRowInPartition++;
313 // return resultSet.next();