-// $Id$\r
-/**\r
-* Copyright (C) 2007 EDIT\r
-* European Distributed Institute of Taxonomy \r
-* http://www.e-taxonomy.eu\r
-* \r
-* The contents of this file are subject to the Mozilla Public License Version 1.1\r
-* See LICENSE.TXT at the top of this package for the full license terms.\r
-*/\r
-\r
-package eu.etaxonomy.cdm.io.common;\r
-\r
-import java.sql.ResultSet;\r
-import java.sql.ResultSetMetaData;\r
-import java.sql.SQLException;\r
-import java.sql.Types;\r
-import java.util.ArrayList;\r
-import java.util.List;\r
-import java.util.Map;\r
-\r
-import org.apache.log4j.Logger;\r
-import org.springframework.transaction.TransactionStatus;\r
-\r
-import eu.etaxonomy.cdm.common.CdmUtils;\r
-import eu.etaxonomy.cdm.model.common.CdmBase;\r
-\r
-/**\r
- * @author a.mueller\r
- * @created 16.02.2010\r
- * @version 1.0\r
- */\r
-public class ResultSetPartitioner<STATE extends IPartitionedState> {\r
- private static final Logger logger = Logger.getLogger(ResultSetPartitioner.class);\r
- private PartitionerProfiler profiler = new PartitionerProfiler();\r
-\r
-//************************* STATIC ***************************************************/\r
- \r
- public static ResultSetPartitioner NewInstance(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{\r
- ResultSetPartitioner resultSetPartitioner = new ResultSetPartitioner(source, strIdQuery, strRecordQuery, partitionSize);\r
- return resultSetPartitioner;\r
- }\r
-\r
-//*********************** VARIABLES *************************************************/\r
- \r
- /**\r
- * The database\r
- */\r
- private Source source;\r
- \r
- /**\r
- * The result set containing all records and at least the ids as a field. This result set\r
- * will be used for partitioning\r
- */\r
- private ResultSet idResultSet;\r
- \r
- /**\r
- * A template for a SQL Query returning all records and all values needed for a partition\r
- * to be handled. The 'where' condition is filled by replacing the templates '@IdList' token\r
- */\r
- private String strRecordQueryTemplate;\r
- \r
- /**\r
- * The resultset returned for the strRecordQueryTemplate \r
- */\r
- private ResultSet partitionResultSet;\r
- \r
- /**\r
- * A 2-key map holding all related objects needed during the handling of a partition (e.g. when \r
- * creating a taxon partition the map holds all taxon names.\r
- * The key is a combination of a namespace and the id in the original source\r
- */\r
- private Map<Object, Map<String, CdmBase>> relatedObjects;\r
- \r
- /**\r
- * Number of records handled in the partition\r
- */\r
- private int partitionSize;\r
- \r
- /**\r
- * Lists of ids handled in this partition (must be an array of lists because sometimes \r
- * we have non-single keys \r
- */\r
- private List<String>[] currentIdLists;\r
- \r
- /**\r
- * The sql type of the id columns.\r
- * @see Types\r
- */\r
- private int[] currentIdListType;\r
- \r
- /**\r
- * counter for the partitions\r
- */\r
- private int currentPartition;\r
- \r
- /**\r
- * number of records in the current partition\r
- */\r
- private int rowsInCurrentPartition;\r
- \r
- private TransactionStatus txStatus;\r
- \r
-//*********************** CONSTRUCTOR *************************************************/\r
-\r
- private ResultSetPartitioner(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{\r
- ResultSet idResultSet = source.getResultSet(strIdQuery);\r
-// if (! idResultSet.isBeforeFirst()){\r
-// idResultSet.beforeFirst();\r
-// }\r
- this.source = source;\r
- this.idResultSet = idResultSet;\r
- this.strRecordQueryTemplate = strRecordQuery;\r
- this.partitionSize = partitionSize;\r
- }\r
- \r
-//************************ METHODS ****************************************************/\r
- \r
-\r
- /**\r
- * Import the whole partition of an input stream by starting a transaction, getting the related objects\r
- * stored in the destination, invoke the IOs partition handling and commit the transaction\r
- * @param partitionedIO\r
- */\r
- public void doPartition(IPartitionedIO partitionedIO, STATE state) {\r
- try{\r
- profiler.startTx();\r
- TransactionStatus txStatus = getTransaction(partitionSize, partitionedIO);\r
- \r
- profiler.startRs();\r
- ResultSet rs = makePartitionResultSet();\r
-\r
- profiler.startRelObjects();\r
- this.relatedObjects = partitionedIO.getRelatedObjectsForPartition(rs);\r
- state.setRelatedObjects(relatedObjects);\r
- \r
- profiler.startRs2();\r
- partitionResultSet = makePartitionResultSet();\r
- \r
- profiler.startDoPartition(); \r
- partitionedIO.doPartition(this, state);\r
- \r
- profiler.startDoCommit();\r
- partitionedIO.commitTransaction(txStatus);\r
- \r
- profiler.end();\r
- state.setRelatedObjects(null);\r
- \r
- logger.info("Saved " + getCurrentNumberOfRows() + " " + partitionedIO.getPluralString() );\r
- profiler.print();\r
- }catch(Exception e){\r
- String message = "Exception (%s) occurred while handling import partition.";\r
- message = String.format(message, e.getLocalizedMessage());\r
- throw new RuntimeException(message, e);\r
- }\r
- }\r
- \r
- \r
- public void startDoSave(){\r
- profiler.startDoSave();\r
- }\r
- \r
- /**\r
- * Increases the partition counter and generates the <code>currentIdLists</code> for this partition\r
- * @return\r
- * @throws SQLException\r
- */\r
- public boolean nextPartition() throws SQLException{\r
- boolean result = false;\r
- ResultSetMetaData metaData = idResultSet.getMetaData();\r
- int nOfIdColumns = metaData.getColumnCount();\r
- currentPartition++;\r
- currentIdLists = new ArrayList[nOfIdColumns];\r
- currentIdListType = new int[nOfIdColumns];\r
- \r
- for (int col = 0; col< currentIdLists.length; col++){\r
- currentIdLists[col] = new ArrayList<String>();\r
- currentIdListType[col] = metaData.getColumnType(col + 1);\r
- }\r
- List<String> currentIdList;\r
- \r
- int i = 0;\r
- //for each record\r
- for (i = 0; i < partitionSize; i++){\r
- if (idResultSet.next() == false){\r
- break; \r
- }\r
- //for each column\r
- for (int colIndex = 0; colIndex < nOfIdColumns; colIndex++){\r
- Object oNextId = idResultSet.getObject(colIndex + 1);\r
- String strNextId = String.valueOf(oNextId); \r
- currentIdList = currentIdLists[colIndex];\r
- currentIdList.add(strNextId);\r
- }\r
- result = true; //true if at least one record was read\r
- }\r
- rowsInCurrentPartition = i;\r
- \r
- return result;\r
- }\r
-\r
-\r
-\r
- /**\r
- * Returns the underlying resultSet holding all records needed to handle the partition.<BR>\r
- * @return\r
- */\r
- public ResultSet getResultSet(){\r
- return partitionResultSet;\r
- }\r
-\r
- \r
- \r
- /**\r
- * Computes the value result set needed to handle a partition by using the <code>currentIdList</code>\r
- * created during {@link #nextPartition}\r
- * @return ResultSet\r
- */\r
- private ResultSet makePartitionResultSet(){\r
- int nColumns = currentIdLists.length;\r
- String[] strIdLists = new String[nColumns];\r
- \r
- String strRecordQuery = strRecordQueryTemplate;\r
- for (int col = 0; col < nColumns; col++){\r
- for (String id: currentIdLists[col]){\r
- id = addApostropheIfNeeded(id, currentIdListType[col]);\r
- strIdLists[col] = CdmUtils.concat(",", strIdLists[col], id);\r
- }\r
- strRecordQuery = strRecordQuery.replaceFirst(IPartitionedIO.ID_LIST_TOKEN, strIdLists[col]);\r
- }\r
- \r
- ResultSet resultSet = ResultSetProxy.NewInstance(source.getResultSet(strRecordQuery));\r
- \r
- return resultSet;\r
- }\r
- \r
- /**\r
- * @param id\r
- * @param i\r
- * @return\r
- */\r
- private String addApostropheIfNeeded(String id, int sqlType) {\r
- String result = id;\r
- if (isStringType(sqlType)){\r
- result = "'" + id + "'";\r
- }\r
- return result;\r
- }\r
-\r
- /**\r
- * @param sqlType\r
- * @return\r
- */\r
- private boolean isStringType(int sqlType) {\r
- if(sqlType == Types.INTEGER){\r
- return false; //standard case\r
- }else if (sqlType == Types.CHAR || sqlType == Types.CLOB || sqlType == Types.NVARCHAR || \r
- sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR || sqlType == Types.NCHAR ||\r
- sqlType == Types.LONGNVARCHAR || sqlType == Types.NCLOB){\r
- return true;\r
- }else{\r
- return false;\r
- }\r
- }\r
-\r
- public Map<String, ? extends CdmBase> getObjectMap(Object key){\r
- Map<String, ? extends CdmBase> objectMap = this.relatedObjects.get(key);\r
- return objectMap;\r
- }\r
- \r
- /**\r
- * \r
- */\r
- private int getCurrentNumberOfRows() {\r
- return ((currentPartition - 1) * partitionSize + rowsInCurrentPartition);\r
- }\r
- \r
-\r
- /**\r
- * @param recordsPerTransaction\r
- * @param partitionedIO \r
- * @param i\r
- */\r
- protected TransactionStatus getTransaction(int recordsPerTransaction, IPartitionedIO partitionedIO) {\r
- //if (loopNeedsHandling (i, recordsPerTransaction) || txStatus == null) {\r
- txStatus = partitionedIO.startTransaction();\r
- if(logger.isInfoEnabled()) {\r
- logger.debug("currentPartitionNumber = " + currentPartition + " - Transaction started"); \r
- }\r
- //}\r
- return txStatus;\r
- }\r
- \r
-// ************************** Not Needed ?? **************************************************\r
- \r
-// protected void doLogPerLoop(int recordsPerLog, String pluralString){\r
-// int count = getAbsoluteRow() - 1;\r
-// if ((count % recordsPerLog ) == 0 && count!= 0 ){ \r
-// logger.info(pluralString + " handled: " + (count));\r
-// }\r
-// }\r
-//\r
-// \r
-\r
- \r
-\r
-// public boolean nextRow() throws SQLException{\r
-// if (currentRowInPartition >= partitionSize ){\r
-// return false;\r
-// }else{\r
-// currentRowInPartition++;\r
-// return resultSet.next();\r
-// }\r
-// }\r
-// \r
- \r
- \r
- \r
-}\r
+// $Id$
+/**
+* Copyright (C) 2007 EDIT
+* European Distributed Institute of Taxonomy
+* http://www.e-taxonomy.eu
+*
+* The contents of this file are subject to the Mozilla Public License Version 1.1
+* See LICENSE.TXT at the top of this package for the full license terms.
+*/
+
+package eu.etaxonomy.cdm.io.common;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.springframework.transaction.TransactionStatus;
+
+import eu.etaxonomy.cdm.common.CdmUtils;
+import eu.etaxonomy.cdm.model.common.CdmBase;
+
+/**
+ * @author a.mueller
+ * @created 16.02.2010
+ */
+public class ResultSetPartitioner<STATE extends IPartitionedState> {
+ private static final Logger logger = Logger.getLogger(ResultSetPartitioner.class);
+ private final PartitionerProfiler profiler = new PartitionerProfiler();
+
+//************************* STATIC ***************************************************/
+
+ public static ResultSetPartitioner NewInstance(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
+ ResultSetPartitioner<IPartitionedState> resultSetPartitioner = new ResultSetPartitioner<IPartitionedState>(source, strIdQuery, strRecordQuery, partitionSize);
+ return resultSetPartitioner;
+ }
+
+//*********************** VARIABLES *************************************************/
+
+ /**
+ * The database
+ */
+ private final Source source;
+
+ /**
+ * The result set containing all records and at least the ids as a field. This result set
+ * will be used for partitioning
+ */
+ private final ResultSet idResultSet;
+
+ /**
+ * A template for a SQL Query returning all records and all values needed for a partition
+ * to be handled. The 'where' condition is filled by replacing the templates '@IdList' token
+ */
+ private final String strRecordQueryTemplate;
+
+ /**
+ * The resultset returned for the strRecordQueryTemplate
+ */
+ private ResultSet partitionResultSet;
+
+ /**
+ * A 2-key map holding all related objects needed during the handling of a partition (e.g. when
+ * creating a taxon partition the map holds all taxon names.
+ * The key is a combination of a namespace and the id in the original source
+ */
+ private Map<Object, Map<String, ? extends CdmBase>> relatedObjects;
+
+ /**
+ * Number of records handled in the partition
+ */
+ private final int partitionSize;
+
+ /**
+ * Lists of ids handled in this partition (must be an array of lists because sometimes
+ * we have non-single keys
+ */
+ private List<String>[] currentIdLists;
+
+ /**
+ * The sql type of the id columns.
+ * @see Types
+ */
+ private int[] currentIdListType;
+
+ /**
+ * counter for the partitions
+ */
+ private int currentPartition;
+
+ /**
+ * number of records in the current partition
+ */
+ private int rowsInCurrentPartition;
+
+ private TransactionStatus txStatus;
+
+//*********************** CONSTRUCTOR *************************************************/
+
+ private ResultSetPartitioner(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
+ ResultSet idResultSet = source.getResultSet(strIdQuery);
+// if (! idResultSet.isBeforeFirst()){
+// idResultSet.beforeFirst();
+// }
+ this.source = source;
+ this.idResultSet = idResultSet;
+ this.strRecordQueryTemplate = strRecordQuery;
+ this.partitionSize = partitionSize;
+ }
+
+//************************ METHODS ****************************************************/
+
+
+ /**
+ * Import the whole partition of an input stream by starting a transaction, getting the related objects
+ * stored in the destination, invoke the IOs partition handling and commit the transaction
+ * @param partitionedIO
+ */
+ public void doPartition(IPartitionedIO<STATE> partitionedIO, STATE state) {
+ try{
+ profiler.startTx();
+ TransactionStatus txStatus = getTransaction(partitionSize, partitionedIO);
+
+ state.makeTransactionalSourceReference(partitionedIO.getReferenceService());
+
+ profiler.startRs();
+ ResultSet rs = makePartitionResultSet();
+
+ profiler.startRelObjects();
+ this.relatedObjects = partitionedIO.getRelatedObjectsForPartition(rs, state);
+ state.setRelatedObjects(relatedObjects);
+
+ profiler.startRs2();
+ partitionResultSet = makePartitionResultSet();
+
+ profiler.startDoPartition();
+ partitionedIO.doPartition(this, state);
+
+ profiler.startDoCommit();
+ partitionedIO.commitTransaction(txStatus);
+ state.resetTransactionalSourceReference();
+
+ profiler.end();
+ state.setRelatedObjects(null);
+
+ logger.info("Saved " + getCurrentNumberOfRows() + " " + partitionedIO.getPluralString() );
+ profiler.print();
+ }catch(Exception e){
+ String message = "Exception (%s) occurred while handling import partition for %s.";
+ message = String.format(message, e.getLocalizedMessage(), partitionedIO.getPluralString());
+ throw new RuntimeException(message, e);
+ }
+ }
+
+
+ public void startDoSave(){
+ profiler.startDoSave();
+ }
+
+ /**
+ * Increases the partition counter and generates the <code>currentIdLists</code> for this partition
+ * @return
+ * @throws SQLException
+ */
+ public boolean nextPartition() throws SQLException{
+ boolean result = false;
+ ResultSetMetaData metaData = idResultSet.getMetaData();
+ int nOfIdColumns = metaData.getColumnCount();
+ currentPartition++;
+ currentIdLists = new ArrayList[nOfIdColumns];
+ currentIdListType = new int[nOfIdColumns];
+
+ for (int col = 0; col< currentIdLists.length; col++){
+ currentIdLists[col] = new ArrayList<String>();
+ currentIdListType[col] = metaData.getColumnType(col + 1);
+ }
+ List<String> currentIdList;
+
+ int i = 0;
+ //for each record
+ for (i = 0; i < partitionSize; i++){
+ if (idResultSet.next() == false){
+ break;
+ }
+ //for each column
+ for (int colIndex = 0; colIndex < nOfIdColumns; colIndex++){
+ Object oNextId = idResultSet.getObject(colIndex + 1);
+ String strNextId = String.valueOf(oNextId);
+ currentIdList = currentIdLists[colIndex];
+ currentIdList.add(strNextId);
+ }
+ result = true; //true if at least one record was read
+ }
+ rowsInCurrentPartition = i;
+
+ return result;
+ }
+
+
+
+ /**
+ * Returns the underlying resultSet holding all records needed to handle the partition.<BR>
+ * @return
+ */
+ public ResultSet getResultSet(){
+ return partitionResultSet;
+ }
+
+
+
+ /**
+ * Computes the value result set needed to handle a partition by using the <code>currentIdList</code>
+ * created during {@link #nextPartition}
+ * @return ResultSet
+ */
+ private ResultSet makePartitionResultSet(){
+ int nColumns = currentIdLists.length;
+ String[] strIdLists = new String[nColumns];
+
+ String strRecordQuery = strRecordQueryTemplate;
+ for (int col = 0; col < nColumns; col++){
+ for (String id: currentIdLists[col]){
+ id = addApostropheIfNeeded(id, currentIdListType[col]);
+ strIdLists[col] = CdmUtils.concat(",", strIdLists[col], id);
+ }
+ strRecordQuery = strRecordQuery.replaceFirst(IPartitionedIO.ID_LIST_TOKEN, strIdLists[col]);
+ }
+
+ ResultSet resultSet = ResultSetProxy.NewInstance(source.getResultSet(strRecordQuery));
+
+ return resultSet;
+ }
+
+ /**
+ * @param id
+ * @param i
+ * @return
+ */
+ private String addApostropheIfNeeded(String id, int sqlType) {
+ String result = id;
+ if (isStringType(sqlType)){
+ result = "'" + id + "'";
+ }
+ return result;
+ }
+
+ /**
+ * @param sqlType
+ * @return
+ */
+ private boolean isStringType(int sqlType) {
+ if(sqlType == Types.INTEGER){
+ return false; //standard case
+ }else if (sqlType == Types.CHAR || sqlType == Types.CLOB || sqlType == Types.NVARCHAR ||
+ sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR || sqlType == Types.NCHAR ||
+ sqlType == Types.LONGNVARCHAR || sqlType == Types.NCLOB){
+ return true;
+ }else{
+ return false;
+ }
+ }
+
+ public Map<String, ? extends CdmBase> getObjectMap(Object key){
+ Map<String, ? extends CdmBase> objectMap = this.relatedObjects.get(key);
+ return objectMap;
+ }
+
+ /**
+ *
+ */
+ private int getCurrentNumberOfRows() {
+ return ((currentPartition - 1) * partitionSize + rowsInCurrentPartition);
+ }
+
+
+ /**
+ * @param recordsPerTransaction
+ * @param partitionedIO
+ * @param i
+ */
+ protected TransactionStatus getTransaction(int recordsPerTransaction, IPartitionedIO partitionedIO) {
+ //if (loopNeedsHandling (i, recordsPerTransaction) || txStatus == null) {
+ txStatus = partitionedIO.startTransaction();
+ if(logger.isInfoEnabled()) {
+ logger.debug("currentPartitionNumber = " + currentPartition + " - Transaction started");
+ }
+ //}
+ return txStatus;
+ }
+
+// ************************** Not Needed ?? **************************************************
+
+// protected void doLogPerLoop(int recordsPerLog, String pluralString){
+// int count = getAbsoluteRow() - 1;
+// if ((count % recordsPerLog ) == 0 && count!= 0 ){
+// logger.info(pluralString + " handled: " + (count));
+// }
+// }
+//
+//
+
+
+
+// public boolean nextRow() throws SQLException{
+// if (currentRowInPartition >= partitionSize ){
+// return false;
+// }else{
+// currentRowInPartition++;
+// return resultSet.next();
+// }
+// }
+//
+
+
+
+}