From 09b9d4c2e8d02ee0c1eba5a5518d3937530bdc57 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Andreas=20M=C3=BCller?= Date: Mon, 26 Mar 2012 08:58:39 +0000 Subject: [PATCH] updates to dwca-in --- .../etaxonomy/cdm/io/dwca/in/DwcaImport.java | 13 +++++---- .../io/dwca/in/DwcaImportConfigurator.java | 9 +++++++ .../cdm/io/dwca/in/DwcaImportState.java | 2 +- .../cdm/io/dwca/in/IImportMapping.java | 15 ++++++++++- .../cdm/io/dwca/in/InMemoryMapping.java | 16 ----------- .../cdm/io/dwca/in/StreamPartitioner.java | 27 ++++++++++++------- 6 files changed, 49 insertions(+), 33 deletions(-) diff --git a/cdmlib-io/src/main/java/eu/etaxonomy/cdm/io/dwca/in/DwcaImport.java b/cdmlib-io/src/main/java/eu/etaxonomy/cdm/io/dwca/in/DwcaImport.java index bf0ea7a5ce..b888dff58b 100644 --- a/cdmlib-io/src/main/java/eu/etaxonomy/cdm/io/dwca/in/DwcaImport.java +++ b/cdmlib-io/src/main/java/eu/etaxonomy/cdm/io/dwca/in/DwcaImport.java @@ -9,8 +9,6 @@ package eu.etaxonomy.cdm.io.dwca.in; import java.net.URI; -import java.util.Map; -import java.util.Set; import org.apache.log4j.Logger; import org.springframework.stereotype.Component; @@ -44,13 +42,18 @@ public class DwcaImport extends CdmImportBase, String> partitionConverter = getConverter(csvStream.getTerm(), state); - StreamPartitioner partitionStream = new StreamPartitioner(csvStream, partitionConverter, state, 1000);// (csvStream, streamConverter,state 1000); + int partitionSize = state.getConfig().getDefaultPartitionSize(); + StreamPartitioner partitionStream = new StreamPartitioner(csvStream, partitionConverter, state, partitionSize);// (csvStream, streamConverter,state 1000); + int i = 1; while (partitionStream.hasNext()){ + //FIXME more generic handling of transactions TransactionStatus tx = startTransaction(); + IReader partStream = partitionStream.read(); + logger.warn("Handel " + i + ". partition"); String location = "Location: partition stream (TODO)"; - handleResults(state, partitionStream, location); + handleResults(state, partStream, location); commitTransaction(tx); } }else { @@ -176,7 +179,7 @@ public class DwcaImport extends CdmImportBase> getEntryList(); + public class CdmKey{ + Class clazz; + int id; + + public CdmKey(IdentifiableEntity object){ + this.clazz = (Class)object.getClass(); + this.id = object.getId(); + } + + @Override + public String toString(){ + return id + "@" + clazz.getSimpleName(); + } + } } diff --git a/cdmlib-io/src/main/java/eu/etaxonomy/cdm/io/dwca/in/InMemoryMapping.java b/cdmlib-io/src/main/java/eu/etaxonomy/cdm/io/dwca/in/InMemoryMapping.java index 6a87705e9a..515eb4f168 100644 --- a/cdmlib-io/src/main/java/eu/etaxonomy/cdm/io/dwca/in/InMemoryMapping.java +++ b/cdmlib-io/src/main/java/eu/etaxonomy/cdm/io/dwca/in/InMemoryMapping.java @@ -16,7 +16,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import eu.etaxonomy.cdm.io.dwca.in.InMemoryMapping.CdmKey; import eu.etaxonomy.cdm.model.common.IdentifiableEntity; /** @@ -31,21 +30,6 @@ public class InMemoryMapping implements IImportMapping { */ private Map>> mapping = new HashMap>>(); - public class CdmKey{ - Class clazz; - int id; - - private CdmKey(IdentifiableEntity object){ - this.clazz = (Class)object.getClass(); - this.id = object.getId(); - } - - @Override - public String toString(){ - return id + "@" + clazz.getSimpleName(); - } - } - @Override public void putMapping(String namespace, Integer sourceKey, IdentifiableEntity destinationObject){ putMapping(namespace, String.valueOf(sourceKey), destinationObject); diff --git a/cdmlib-io/src/main/java/eu/etaxonomy/cdm/io/dwca/in/StreamPartitioner.java b/cdmlib-io/src/main/java/eu/etaxonomy/cdm/io/dwca/in/StreamPartitioner.java index 23bc2baa8b..b5e675ae7b 100644 --- a/cdmlib-io/src/main/java/eu/etaxonomy/cdm/io/dwca/in/StreamPartitioner.java +++ b/cdmlib-io/src/main/java/eu/etaxonomy/cdm/io/dwca/in/StreamPartitioner.java @@ -16,32 +16,35 @@ import java.util.Set; import org.apache.log4j.Logger; import eu.etaxonomy.cdm.io.dwca.TermUri; -import eu.etaxonomy.cdm.model.common.CdmBase; -import eu.etaxonomy.cdm.model.common.IdentifiableEntity; /** * @author a.mueller * */ -public class StreamPartitioner implements INamespaceReader{ +public class StreamPartitioner implements INamespaceReader>{ + @SuppressWarnings("unused") private static final Logger logger = Logger.getLogger(StreamPartitioner.class); private int partitionSize; private LookAheadStream inStream; private IPartitionableConverter converter; private DwcaImportState state; - private ConcatenatingReader outStream = new ConcatenatingReader(); - + private ConcatenatingReader outStream; public StreamPartitioner(INamespaceReader input, IPartitionableConverter converter, DwcaImportState state, Integer size){ this.inStream = new LookAheadStream(input); this.converter = converter; this.partitionSize = size; this.state = state; + initNewOutStream(); } + private void initNewOutStream(){ + outStream = new ConcatenatingReader(); + } + /* (non-Javadoc) * @see eu.etaxonomy.cdm.io.dwca.in.IReader#hasNext() */ @@ -50,15 +53,19 @@ public class StreamPartitioner implements INamesp return true; }else{ return inStream.hasNext(); //TODO what, if converter returns no ouput for inStream.hasNext() ?? + //but be aware that requesting the next object from the next partition crosses the transactional borders } } @Override - public MappedCdmBase read() { - if (! this.outStream.hasNext()){ - handleNextPartition(); - } - return outStream.read(); + public IReader read() { + logger.debug("Start partitioner read"); + handleNextPartition(); + IReader result = this.outStream; + + initNewOutStream(); + logger.debug("End partitioner read"); + return result; } private void handleNextPartition(){ -- 2.34.1