2 * Copyright (C) 2009 EDIT
3 * European Distributed Institute of Taxonomy
4 * http://www.e-taxonomy.eu
6 * The contents of this file are subject to the Mozilla Public License Version 1.1
7 * See LICENSE.TXT at the top of this package for the full license terms.
9 package eu
.etaxonomy
.cdm
.io
.dwca
.in
;
13 import org
.apache
.log4j
.Logger
;
14 import org
.springframework
.stereotype
.Component
;
15 import org
.springframework
.transaction
.TransactionStatus
;
17 import eu
.etaxonomy
.cdm
.api
.service
.IIdentifiableEntityService
;
18 import eu
.etaxonomy
.cdm
.io
.common
.CdmImportBase
;
19 import eu
.etaxonomy
.cdm
.io
.dwca
.TermUri
;
20 import eu
.etaxonomy
.cdm
.model
.common
.CdmBase
;
21 import eu
.etaxonomy
.cdm
.model
.common
.IdentifiableEntity
;
22 import eu
.etaxonomy
.cdm
.model
.taxon
.Classification
;
23 import eu
.etaxonomy
.cdm
.model
.taxon
.TaxonBase
;
32 public class DwcaImport
extends CdmImportBase
<DwcaImportConfigurator
, DwcaImportState
>{
33 private static final Logger logger
= Logger
.getLogger(DwcaImport
.class);
36 protected void doInvoke(DwcaImportState state
) {
37 URI source
= state
.getConfig().getSource();
38 DwcaZipToStreamConverter
<DwcaImportState
> dwcaStreamConverter
= DwcaZipToStreamConverter
.NewInstance(source
);
39 IReader
<CsvStream
> stream
= dwcaStreamConverter
.getStreamStream(state
);
40 while (stream
.hasNext()){
41 CsvStream csvStream
= stream
.read();
43 if (state
.getConfig().isUsePartitions()){
44 IPartitionableConverter
<CsvStreamItem
, IReader
<CdmBase
>, String
> partitionConverter
= getConverter(csvStream
.getTerm(), state
);
45 int partitionSize
= state
.getConfig().getDefaultPartitionSize();
46 StreamPartitioner
<CsvStreamItem
> partitionStream
= new StreamPartitioner
<CsvStreamItem
>(csvStream
, partitionConverter
, state
, partitionSize
);// (csvStream, streamConverter,state 1000);
49 while (partitionStream
.hasNext()){
50 //FIXME more generic handling of transactions
51 TransactionStatus tx
= startTransaction();
53 IReader
<MappedCdmBase
> partStream
= partitionStream
.read();
54 logger
.warn("Handel " + i
+ ". partition");
55 String location
= "Location: partition stream (TODO)";
56 handleResults(state
, partStream
, location
);
57 commitTransaction(tx
);
61 while (csvStream
.hasNext()){
62 TransactionStatus tx
= startTransaction();
64 CsvStreamItem item
= csvStream
.read();
65 handleCsvStreamItem(state
, item
);
67 commitTransaction(tx
);
71 finalizeStream(csvStream
, state
);
76 // private void handlePartitionedStreamItem(DwcaImportState state, StreamPartitioner<CsvStreamItem> partitionStream) {
77 // IPartitionableConverter<CsvStreamItem, IReader<CdmBase>, String> converter = getConverter(partitionStream.getTerm(), state);
78 // if (converter == null){
79 // state.setSuccess(false);
83 // IReader<CsvStreamItem> lookaheadStream = partitionStream.getLookaheadReader();
84 // Map<String, Set<String>> foreignKeys = converter.getPartitionForeignKeys(lookaheadStream);
85 // IImportMapping mapping = state.getMapping();
86 // IImportMapping partialMapping = mapping.getPartialMapping(foreignKeys);
87 // state.loadRelatedObjects(partialMapping);
89 // ConcatenatingReader<MappedCdmBase> reader = new ConcatenatingReader<MappedCdmBase>();
91 // IReader<CsvStreamItem> inputStream = partitionStream.read();
92 // while (inputStream.hasNext()){
93 // IReader<MappedCdmBase> resultReader = converter.map(inputStream.read());
94 // reader.add(resultReader);
97 // while (reader.hasNext()){
98 // MappedCdmBase mappedCdmBase = (reader.read());
99 // CdmBase cdmBase = mappedCdmBase.getCdmBase();
101 // //TODO find a way to define the location
102 // String location = "partitionStream";
104 // save(cdmBase, state, location);
105 // //store in mapping
106 // if (mappedCdmBase.getSourceId() != null && cdmBase.isInstanceOf(IdentifiableEntity.class)){
107 // IdentifiableEntity<?> entity = CdmBase.deproxy(cdmBase, IdentifiableEntity.class);
108 // String namespace = mappedCdmBase.getNamespace();
109 // //TODO also store in partition mapping
110 // state.putMapping(namespace,mappedCdmBase.getSourceId(), entity);
121 private void handleCsvStreamItem(DwcaImportState state
, CsvStreamItem item
) {
122 IConverter
<CsvStreamItem
, IReader
<CdmBase
>, String
> converter
= getConverter(item
.term
, state
);
123 if (converter
== null){
124 state
.setSuccess(false);
127 IReader
<MappedCdmBase
> resultReader
= converter
.map(item
);
128 handleResults(state
, resultReader
, item
.getLocation());
135 * @param resultReader
137 private void handleResults(DwcaImportState state
, IReader
<MappedCdmBase
> resultReader
, String location
) {
138 while (resultReader
.hasNext()){
140 MappedCdmBase mappedCdmBase
= (resultReader
.read());
141 CdmBase cdmBase
= mappedCdmBase
.getCdmBase();
142 save(cdmBase
, state
, location
);
143 if (mappedCdmBase
.getSourceId() != null && cdmBase
.isInstanceOf(IdentifiableEntity
.class)){
144 IdentifiableEntity
<?
> entity
= CdmBase
.deproxy(cdmBase
, IdentifiableEntity
.class);
146 String namespace
= mappedCdmBase
.getNamespace();
147 state
.putMapping(namespace
,mappedCdmBase
.getSourceId(), entity
);
152 private void finalizeStream(CsvStream csvStream
, DwcaImportState state
) {
153 if (csvStream
.getTerm().equals(TermUri
.DWC_TAXON
)){
154 if (state
.isTaxaCreated() == false){
155 state
.setTaxaCreated(true);
161 private void save(CdmBase cdmBase
, DwcaImportState state
, String location
) {
162 if (state
.isCheck()){
165 if (cdmBase
== null){
166 logger
.warn("cdmBase is null");
168 //start preliminary for testing
169 IIdentifiableEntityService service
;
171 if (cdmBase
.isInstanceOf(IdentifiableEntity
.class)){
172 service
= getServiceByClass(cdmBase
.getClass());
173 if (service
!= null){
174 IdentifiableEntity
<?
> entity
= CdmBase
.deproxy(cdmBase
, IdentifiableEntity
.class);
175 service
.saveOrUpdate(entity
);
178 } catch (IllegalArgumentException e
) {
179 fireWarningEvent(e
.getMessage(), location
, 12);
182 // System.out.println(cdmBase.toString());
189 private IPartitionableConverter
<CsvStreamItem
,IReader
<CdmBase
>, String
> getConverter(TermUri namespace
, DwcaImportState state
) {
190 if (namespace
.equals(TermUri
.DWC_TAXON
)){
191 if (! state
.isTaxaCreated()){
192 return new DwcTaxonCsv2CdmTaxonConverter(state
);
194 return new DwcTaxonCsv2CdmTaxonRelationConverter(state
);
196 }else if (namespace
.equals(TermUri
.GBIF_VERNACULAR_NAMES
)){
197 return new GbifVernacularNameCsv2CdmConverter(state
);
198 }else if (namespace
.equals(TermUri
.GBIF_DESCRIPTION
)){
199 return new GbifDescriptionCsv2CdmConverter(state
);
201 String message
= "Now converter available for %s";
202 logger
.error(String
.format(message
, namespace
));
209 * Returns an appropriate service to persist data of a certain class.
210 * If an appropriate service can't be found an {@link IllegalArgumentException} is thrown.
212 * TODO move to a more general place to make it available to everyone.
218 protected IIdentifiableEntityService
getServiceByClass(Class
<?
> clazz
) throws IllegalArgumentException
{
220 //throw exception below
221 }else if (TaxonBase
.class.isAssignableFrom(clazz
)){
222 return this.getTaxonService();
223 }else if (Classification
.class.isAssignableFrom(clazz
)){
224 return this.getClassificationService();
226 String warning
= "Can't map class to cdmService: %s";
227 warning
= String
.format(warning
, (clazz
== null ?
"-" : clazz
.getName()));
228 throw new IllegalArgumentException(warning
);
233 protected boolean doCheck(DwcaImportState state
) {
234 return state
.isCheck();
238 protected boolean isIgnore(DwcaImportState state
) {