Minor changes to ResultSetPartitioner
[cdmlib.git] / cdmlib-io / src / main / java / eu / etaxonomy / cdm / io / common / ResultSetPartitioner.java
index 9a28592e71af2f83ffad15a8c98f6ff9efee45ad..28120309c072f6e992c510601b1208c187917ffa 100644 (file)
-// $Id$\r
-/**\r
-* Copyright (C) 2007 EDIT\r
-* European Distributed Institute of Taxonomy \r
-* http://www.e-taxonomy.eu\r
-* \r
-* The contents of this file are subject to the Mozilla Public License Version 1.1\r
-* See LICENSE.TXT at the top of this package for the full license terms.\r
-*/\r
-\r
-package eu.etaxonomy.cdm.io.common;\r
-\r
-import java.sql.ResultSet;\r
-import java.sql.ResultSetMetaData;\r
-import java.sql.SQLException;\r
-import java.sql.Types;\r
-import java.util.ArrayList;\r
-import java.util.List;\r
-import java.util.Map;\r
-\r
-import org.apache.log4j.Logger;\r
-import org.springframework.transaction.TransactionStatus;\r
-\r
-import eu.etaxonomy.cdm.common.CdmUtils;\r
-import eu.etaxonomy.cdm.model.common.CdmBase;\r
-\r
-/**\r
- * @author a.mueller\r
- * @created 16.02.2010\r
- * @version 1.0\r
- */\r
-public class ResultSetPartitioner<STATE extends IPartitionedState> {\r
-       private static final Logger logger = Logger.getLogger(ResultSetPartitioner.class);\r
-       private PartitionerProfiler profiler = new PartitionerProfiler();\r
-\r
-//************************* STATIC ***************************************************/\r
-       \r
-       public static ResultSetPartitioner NewInstance(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{\r
-               ResultSetPartitioner resultSetPartitioner = new ResultSetPartitioner(source, strIdQuery, strRecordQuery, partitionSize);\r
-               return resultSetPartitioner;\r
-       }\r
-\r
-//*********************** VARIABLES *************************************************/\r
-       \r
-       /**\r
-        * The database\r
-        */\r
-       private Source source;\r
-       \r
-       /**\r
-        * The result set containing all records and at least the ids as a field. This result set\r
-        * will be used for partitioning\r
-        */\r
-       private ResultSet idResultSet;\r
-       \r
-       /**\r
-        * A template for a SQL Query returning all records and all values needed for a partition\r
-        * to be handled. The 'where' condition is filled by replacing the templates '@IdList' token\r
-        */\r
-       private String strRecordQueryTemplate;\r
-       \r
-       /**\r
-        * The resultset returned for the strRecordQueryTemplate \r
-        */\r
-       private ResultSet partitionResultSet;\r
-       \r
-       /**\r
-        * A 2-key map holding all related objects needed during the handling of a partition (e.g. when \r
-        * creating a taxon partition the map holds all taxon names.\r
-        * The key is a combination of a namespace and the id in the original source\r
-        */\r
-       private Map<Object, Map<String, CdmBase>> relatedObjects;\r
-       \r
-       /**\r
-        * Number of records handled in the partition\r
-        */\r
-       private int partitionSize;\r
-       \r
-       /**\r
-        * Lists of ids handled in this partition (must be an array of lists because sometimes \r
-        * we have non-single keys \r
-        */\r
-       private List<String>[] currentIdLists;\r
-       \r
-       /**\r
-        * The sql type of the id columns.\r
-        * @see Types\r
-        */\r
-       private int[] currentIdListType;\r
-       \r
-       /**\r
-        * counter for the partitions\r
-        */\r
-       private int currentPartition;\r
-       \r
-       /**\r
-        * number of records in the current partition\r
-        */\r
-       private int rowsInCurrentPartition;\r
-       \r
-       private TransactionStatus txStatus;\r
-       \r
-//*********************** CONSTRUCTOR *************************************************/\r
-\r
-       private ResultSetPartitioner(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{\r
-               ResultSet idResultSet = source.getResultSet(strIdQuery);\r
-//             if (! idResultSet.isBeforeFirst()){\r
-//                     idResultSet.beforeFirst();\r
-//             }\r
-               this.source = source;\r
-               this.idResultSet = idResultSet;\r
-               this.strRecordQueryTemplate = strRecordQuery;\r
-               this.partitionSize = partitionSize;\r
-       }\r
-       \r
-//************************ METHODS ****************************************************/\r
-       \r
-\r
-       /**\r
-        * Import the whole partition of an input stream by starting a transaction, getting the related objects\r
-        * stored in the destination, invoke the IOs partition handling and commit the transaction\r
-        * @param partitionedIO\r
-        */\r
-       public void doPartition(IPartitionedIO partitionedIO, STATE state) {\r
-               try{\r
-                       profiler.startTx();\r
-                       TransactionStatus txStatus = getTransaction(partitionSize, partitionedIO);\r
-                       \r
-                       profiler.startRs();\r
-                       ResultSet rs = makePartitionResultSet();\r
-\r
-                       profiler.startRelObjects();\r
-                       this.relatedObjects = partitionedIO.getRelatedObjectsForPartition(rs);\r
-                       state.setRelatedObjects(relatedObjects);\r
-                       \r
-                       profiler.startRs2();\r
-                       partitionResultSet = makePartitionResultSet();\r
-                       \r
-                       profiler.startDoPartition(); \r
-                       partitionedIO.doPartition(this, state);\r
-                       \r
-                       profiler.startDoCommit();\r
-                       partitionedIO.commitTransaction(txStatus);\r
-                       \r
-                       profiler.end();\r
-                       state.setRelatedObjects(null);\r
-                       \r
-                       logger.info("Saved " + getCurrentNumberOfRows() + " " + partitionedIO.getPluralString() );\r
-                       profiler.print();\r
-               }catch(Exception e){\r
-                       String message = "Exception (%s) occurred while handling import partition.";\r
-                       message = String.format(message, e.getLocalizedMessage());\r
-                       throw new RuntimeException(message, e);\r
-               }\r
-       }\r
-       \r
-       \r
-       public void startDoSave(){\r
-               profiler.startDoSave();\r
-       }\r
-       \r
-       /**\r
-        * Increases the partition counter and generates the <code>currentIdLists</code> for this partition\r
-        * @return\r
-        * @throws SQLException\r
-        */\r
-       public boolean nextPartition() throws SQLException{\r
-               boolean result = false;\r
-               ResultSetMetaData metaData = idResultSet.getMetaData();\r
-               int nOfIdColumns = metaData.getColumnCount();\r
-               currentPartition++;\r
-               currentIdLists = new ArrayList[nOfIdColumns];\r
-               currentIdListType = new int[nOfIdColumns];\r
-               \r
-               for (int col = 0; col< currentIdLists.length; col++){\r
-                       currentIdLists[col] = new ArrayList<String>();\r
-                       currentIdListType[col] = metaData.getColumnType(col + 1);\r
-               }\r
-               List<String> currentIdList;\r
-               \r
-               int i = 0;\r
-               //for each record\r
-               for (i = 0; i < partitionSize; i++){\r
-                       if (idResultSet.next() == false){\r
-                               break; \r
-                       }\r
-                       //for each column\r
-                       for (int colIndex = 0; colIndex < nOfIdColumns; colIndex++){\r
-                               Object oNextId = idResultSet.getObject(colIndex + 1);\r
-                               String strNextId = String.valueOf(oNextId); \r
-                               currentIdList = currentIdLists[colIndex];\r
-                               currentIdList.add(strNextId);\r
-                       }\r
-                       result = true; //true if at least one record was read\r
-               }\r
-               rowsInCurrentPartition = i;\r
-               \r
-               return result;\r
-       }\r
-\r
-\r
-\r
-       /**\r
-        * Returns the underlying resultSet holding all records needed to handle the partition.<BR>\r
-        * @return\r
-        */\r
-       public ResultSet getResultSet(){\r
-               return partitionResultSet;\r
-       }\r
-\r
-       \r
-       \r
-       /**\r
-        * Computes the value result set needed to handle a partition by using the <code>currentIdList</code>\r
-        * created during {@link #nextPartition}\r
-        * @return ResultSet\r
-        */\r
-       private ResultSet makePartitionResultSet(){\r
-               int nColumns = currentIdLists.length;\r
-               String[] strIdLists = new String[nColumns];\r
-               \r
-               String strRecordQuery = strRecordQueryTemplate;\r
-               for (int col = 0; col < nColumns; col++){\r
-                       for (String id: currentIdLists[col]){\r
-                               id = addApostropheIfNeeded(id, currentIdListType[col]);\r
-                               strIdLists[col] = CdmUtils.concat(",", strIdLists[col], id);\r
-                       }\r
-                       strRecordQuery = strRecordQuery.replaceFirst(IPartitionedIO.ID_LIST_TOKEN, strIdLists[col]);\r
-               }\r
-               \r
-               ResultSet resultSet = ResultSetProxy.NewInstance(source.getResultSet(strRecordQuery));\r
-               \r
-               return resultSet;\r
-       }\r
-       \r
-       /**\r
-        * @param id\r
-        * @param i\r
-        * @return\r
-        */\r
-       private String addApostropheIfNeeded(String id, int sqlType) {\r
-               String result = id;\r
-               if (isStringType(sqlType)){\r
-                       result = "'" + id + "'";\r
-               }\r
-               return result;\r
-       }\r
-\r
-       /**\r
-        * @param sqlType\r
-        * @return\r
-        */\r
-       private boolean isStringType(int sqlType) {\r
-               if(sqlType == Types.INTEGER){\r
-                       return false;  //standard case\r
-               }else if (sqlType == Types.CHAR || sqlType == Types.CLOB || sqlType == Types.NVARCHAR || \r
-                               sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR || sqlType == Types.NCHAR ||\r
-                               sqlType == Types.LONGNVARCHAR || sqlType == Types.NCLOB){\r
-                       return true;\r
-               }else{\r
-                       return false;\r
-               }\r
-       }\r
-\r
-       public Map<String, ? extends CdmBase> getObjectMap(Object key){\r
-               Map<String, ? extends CdmBase> objectMap = this.relatedObjects.get(key);\r
-               return objectMap;\r
-       }\r
-       \r
-       /**\r
-        * \r
-        */\r
-       private int getCurrentNumberOfRows() {\r
-               return ((currentPartition - 1) * partitionSize + rowsInCurrentPartition);\r
-       }\r
-       \r
-\r
-       /**\r
-        * @param recordsPerTransaction\r
-        * @param partitionedIO \r
-        * @param i\r
-        */\r
-       protected TransactionStatus getTransaction(int recordsPerTransaction, IPartitionedIO partitionedIO) {\r
-               //if (loopNeedsHandling (i, recordsPerTransaction) || txStatus == null) {\r
-                       txStatus = partitionedIO.startTransaction();\r
-                       if(logger.isInfoEnabled()) {\r
-                               logger.debug("currentPartitionNumber = " + currentPartition + " - Transaction started"); \r
-                       }\r
-               //}\r
-               return txStatus;\r
-       }\r
-       \r
-// ************************** Not Needed ?? **************************************************\r
-       \r
-//     protected void doLogPerLoop(int recordsPerLog, String pluralString){\r
-//             int count = getAbsoluteRow() - 1;\r
-//             if ((count % recordsPerLog ) == 0 && count!= 0 ){ \r
-//                     logger.info(pluralString + " handled: " + (count));\r
-//             }\r
-//     }\r
-//\r
-//     \r
-\r
-       \r
-\r
-//     public boolean nextRow() throws SQLException{\r
-//             if (currentRowInPartition >= partitionSize ){\r
-//                     return false;\r
-//             }else{\r
-//                     currentRowInPartition++;\r
-//                     return resultSet.next();\r
-//             }\r
-//     }\r
-//     \r
-       \r
-       \r
-       \r
-}\r
+// $Id$
+/**
+* Copyright (C) 2007 EDIT
+* European Distributed Institute of Taxonomy
+* http://www.e-taxonomy.eu
+*
+* The contents of this file are subject to the Mozilla Public License Version 1.1
+* See LICENSE.TXT at the top of this package for the full license terms.
+*/
+
+package eu.etaxonomy.cdm.io.common;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.springframework.transaction.TransactionStatus;
+
+import eu.etaxonomy.cdm.common.CdmUtils;
+import eu.etaxonomy.cdm.model.common.CdmBase;
+
+/**
+ * @author a.mueller
+ * @created 16.02.2010
+ */
+public class ResultSetPartitioner<STATE extends IPartitionedState> {
+       private static final Logger logger = Logger.getLogger(ResultSetPartitioner.class);
+       private final PartitionerProfiler profiler = new PartitionerProfiler();
+
+//************************* STATIC ***************************************************/
+
+       public static ResultSetPartitioner NewInstance(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
+               ResultSetPartitioner<IPartitionedState> resultSetPartitioner = new ResultSetPartitioner<IPartitionedState>(source, strIdQuery, strRecordQuery, partitionSize);
+               return resultSetPartitioner;
+       }
+
+//*********************** VARIABLES *************************************************/
+
+       /**
+        * The database
+        */
+       private final Source source;
+
+       /**
+        * The result set containing all records and at least the ids as a field. This result set
+        * will be used for partitioning
+        */
+       private final ResultSet idResultSet;
+
+       /**
+        * A template for a SQL Query returning all records and all values needed for a partition
+        * to be handled. The 'where' condition is filled by replacing the templates '@IdList' token
+        */
+       private final String strRecordQueryTemplate;
+
+       /**
+        * The resultset returned for the strRecordQueryTemplate
+        */
+       private ResultSet partitionResultSet;
+
+       /**
+        * A 2-key map holding all related objects needed during the handling of a partition (e.g. when
+        * creating a taxon partition the map holds all taxon names.
+        * The key is a combination of a namespace and the id in the original source
+        */
+       private Map<Object, Map<String, ? extends CdmBase>> relatedObjects;
+
+       /**
+        * Number of records handled in the partition
+        */
+       private final int partitionSize;
+
+       /**
+        * Lists of ids handled in this partition (must be an array of lists because sometimes
+        * we have non-single keys
+        */
+       private List<String>[] currentIdLists;
+
+       /**
+        * The sql type of the id columns.
+        * @see Types
+        */
+       private int[] currentIdListType;
+
+       /**
+        * counter for the partitions
+        */
+       private int currentPartition;
+
+       /**
+        * number of records in the current partition
+        */
+       private int rowsInCurrentPartition;
+
+       private TransactionStatus txStatus;
+
+//*********************** CONSTRUCTOR *************************************************/
+
+       private ResultSetPartitioner(Source source, String strIdQuery, String strRecordQuery, int partitionSize) throws SQLException{
+               ResultSet idResultSet = source.getResultSet(strIdQuery);
+//             if (! idResultSet.isBeforeFirst()){
+//                     idResultSet.beforeFirst();
+//             }
+               this.source = source;
+               this.idResultSet = idResultSet;
+               this.strRecordQueryTemplate = strRecordQuery;
+               this.partitionSize = partitionSize;
+       }
+
+//************************ METHODS ****************************************************/
+
+
+       /**
+        * Import the whole partition of an input stream by starting a transaction, getting the related objects
+        * stored in the destination, invoke the IOs partition handling and commit the transaction
+        * @param partitionedIO
+        */
+       public void doPartition(IPartitionedIO<STATE> partitionedIO, STATE state) {
+               try{
+                       profiler.startTx();
+                       TransactionStatus txStatus = getTransaction(partitionSize, partitionedIO);
+
+                       state.makeTransactionalSourceReference(partitionedIO.getReferenceService());
+
+                       profiler.startRs();
+                       ResultSet rs = makePartitionResultSet();
+
+                       profiler.startRelObjects();
+                       this.relatedObjects = partitionedIO.getRelatedObjectsForPartition(rs, state);
+                       state.setRelatedObjects(relatedObjects);
+
+                       profiler.startRs2();
+                       partitionResultSet = makePartitionResultSet();
+
+                       profiler.startDoPartition();
+                       partitionedIO.doPartition(this, state);
+
+                       profiler.startDoCommit();
+                       partitionedIO.commitTransaction(txStatus);
+                       state.resetTransactionalSourceReference();
+
+                       profiler.end();
+                       state.setRelatedObjects(null);
+
+                       logger.info("Saved " + getCurrentNumberOfRows() + " " + partitionedIO.getPluralString() );
+                       profiler.print();
+               }catch(Exception e){
+                       String message = "Exception (%s) occurred while handling import partition for %s.";
+                       message = String.format(message, e.getLocalizedMessage(), partitionedIO.getPluralString());
+                       throw new RuntimeException(message, e);
+               }
+       }
+
+
+       public void startDoSave(){
+               profiler.startDoSave();
+       }
+
+       /**
+        * Increases the partition counter and generates the <code>currentIdLists</code> for this partition
+        * @return
+        * @throws SQLException
+        */
+       public boolean nextPartition() throws SQLException{
+               boolean result = false;
+               ResultSetMetaData metaData = idResultSet.getMetaData();
+               int nOfIdColumns = metaData.getColumnCount();
+               currentPartition++;
+               currentIdLists = new ArrayList[nOfIdColumns];
+               currentIdListType = new int[nOfIdColumns];
+
+               for (int col = 0; col< currentIdLists.length; col++){
+                       currentIdLists[col] = new ArrayList<String>();
+                       currentIdListType[col] = metaData.getColumnType(col + 1);
+               }
+               List<String> currentIdList;
+
+               int i = 0;
+               //for each record
+               for (i = 0; i < partitionSize; i++){
+                       if (idResultSet.next() == false){
+                               break;
+                       }
+                       //for each column
+                       for (int colIndex = 0; colIndex < nOfIdColumns; colIndex++){
+                               Object oNextId = idResultSet.getObject(colIndex + 1);
+                               String strNextId = String.valueOf(oNextId);
+                               currentIdList = currentIdLists[colIndex];
+                               currentIdList.add(strNextId);
+                       }
+                       result = true; //true if at least one record was read
+               }
+               rowsInCurrentPartition = i;
+
+               return result;
+       }
+
+
+
+       /**
+        * Returns the underlying resultSet holding all records needed to handle the partition.<BR>
+        * @return
+        */
+       public ResultSet getResultSet(){
+               return partitionResultSet;
+       }
+
+
+
+       /**
+        * Computes the value result set needed to handle a partition by using the <code>currentIdList</code>
+        * created during {@link #nextPartition}
+        * @return ResultSet
+        */
+       private ResultSet makePartitionResultSet(){
+               int nColumns = currentIdLists.length;
+               String[] strIdLists = new String[nColumns];
+
+               String strRecordQuery = strRecordQueryTemplate;
+               for (int col = 0; col < nColumns; col++){
+                       for (String id: currentIdLists[col]){
+                               id = addApostropheIfNeeded(id, currentIdListType[col]);
+                               strIdLists[col] = CdmUtils.concat(",", strIdLists[col], id);
+                       }
+                       strRecordQuery = strRecordQuery.replaceFirst(IPartitionedIO.ID_LIST_TOKEN, strIdLists[col]);
+               }
+
+               ResultSet resultSet = ResultSetProxy.NewInstance(source.getResultSet(strRecordQuery));
+
+               return resultSet;
+       }
+
+       /**
+        * @param id
+        * @param i
+        * @return
+        */
+       private String addApostropheIfNeeded(String id, int sqlType) {
+               String result = id;
+               if (isStringType(sqlType)){
+                       result = "'" + id + "'";
+               }
+               return result;
+       }
+
+       /**
+        * @param sqlType
+        * @return
+        */
+       private boolean isStringType(int sqlType) {
+               if(sqlType == Types.INTEGER){
+                       return false;  //standard case
+               }else if (sqlType == Types.CHAR || sqlType == Types.CLOB || sqlType == Types.NVARCHAR ||
+                               sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR || sqlType == Types.NCHAR ||
+                               sqlType == Types.LONGNVARCHAR || sqlType == Types.NCLOB){
+                       return true;
+               }else{
+                       return false;
+               }
+       }
+
+       public Map<String, ? extends CdmBase> getObjectMap(Object key){
+               Map<String, ? extends CdmBase> objectMap = this.relatedObjects.get(key);
+               return objectMap;
+       }
+
+       /**
+        *
+        */
+       private int getCurrentNumberOfRows() {
+               return ((currentPartition - 1) * partitionSize + rowsInCurrentPartition);
+       }
+
+
+       /**
+        * @param recordsPerTransaction
+        * @param partitionedIO
+        * @param i
+        */
+       protected TransactionStatus getTransaction(int recordsPerTransaction, IPartitionedIO partitionedIO) {
+               //if (loopNeedsHandling (i, recordsPerTransaction) || txStatus == null) {
+                       txStatus = partitionedIO.startTransaction();
+                       if(logger.isInfoEnabled()) {
+                               logger.debug("currentPartitionNumber = " + currentPartition + " - Transaction started");
+                       }
+               //}
+               return txStatus;
+       }
+
+// ************************** Not Needed ?? **************************************************
+
+//     protected void doLogPerLoop(int recordsPerLog, String pluralString){
+//             int count = getAbsoluteRow() - 1;
+//             if ((count % recordsPerLog ) == 0 && count!= 0 ){
+//                     logger.info(pluralString + " handled: " + (count));
+//             }
+//     }
+//
+//
+
+
+
+//     public boolean nextRow() throws SQLException{
+//             if (currentRowInPartition >= partitionSize ){
+//                     return false;
+//             }else{
+//                     currentRowInPartition++;
+//                     return resultSet.next();
+//             }
+//     }
+//
+
+
+
+}