updates to dwca-in
[cdmlib.git] / cdmlib-io / src / main / java / eu / etaxonomy / cdm / io / dwca / in / DwcaImport.java
1 /**
2 * Copyright (C) 2009 EDIT
3 * European Distributed Institute of Taxonomy
4 * http://www.e-taxonomy.eu
5 *
6 * The contents of this file are subject to the Mozilla Public License Version 1.1
7 * See LICENSE.TXT at the top of this package for the full license terms.
8 */
9 package eu.etaxonomy.cdm.io.dwca.in;
10
11 import java.net.URI;
12
13 import org.apache.log4j.Logger;
14 import org.springframework.stereotype.Component;
15 import org.springframework.transaction.TransactionStatus;
16
17 import eu.etaxonomy.cdm.api.service.IIdentifiableEntityService;
18 import eu.etaxonomy.cdm.io.common.CdmImportBase;
19 import eu.etaxonomy.cdm.io.dwca.TermUri;
20 import eu.etaxonomy.cdm.model.common.CdmBase;
21 import eu.etaxonomy.cdm.model.common.IdentifiableEntity;
22 import eu.etaxonomy.cdm.model.taxon.Classification;
23 import eu.etaxonomy.cdm.model.taxon.TaxonBase;
24
25 /**
26 *
27 * @author a.mueller
28 *
29 */
30
31 @Component
32 public class DwcaImport extends CdmImportBase<DwcaImportConfigurator, DwcaImportState>{
33 private static final Logger logger = Logger.getLogger(DwcaImport.class);
34
35 @Override
36 protected void doInvoke(DwcaImportState state) {
37 URI source = state.getConfig().getSource();
38 DwcaZipToStreamConverter<DwcaImportState> dwcaStreamConverter = DwcaZipToStreamConverter.NewInstance(source);
39 IReader<CsvStream> stream = dwcaStreamConverter.getStreamStream(state);
40 while (stream.hasNext()){
41 CsvStream csvStream = stream.read();
42
43 if (state.getConfig().isUsePartitions()){
44 IPartitionableConverter<CsvStreamItem, IReader<CdmBase>, String> partitionConverter = getConverter(csvStream.getTerm(), state);
45 int partitionSize = state.getConfig().getDefaultPartitionSize();
46 StreamPartitioner<CsvStreamItem> partitionStream = new StreamPartitioner<CsvStreamItem>(csvStream, partitionConverter, state, partitionSize);// (csvStream, streamConverter,state 1000);
47
48 int i = 1;
49 while (partitionStream.hasNext()){
50 //FIXME more generic handling of transactions
51 TransactionStatus tx = startTransaction();
52
53 IReader<MappedCdmBase> partStream = partitionStream.read();
54 logger.warn("Handel " + i + ". partition");
55 String location = "Location: partition stream (TODO)";
56 handleResults(state, partStream, location);
57 commitTransaction(tx);
58 }
59 }else {
60
61 while (csvStream.hasNext()){
62 TransactionStatus tx = startTransaction();
63
64 CsvStreamItem item = csvStream.read();
65 handleCsvStreamItem(state, item);
66
67 commitTransaction(tx);
68 }
69 }
70
71 finalizeStream(csvStream, state);
72 }
73 return;
74 }
75
76 // private void handlePartitionedStreamItem(DwcaImportState state, StreamPartitioner<CsvStreamItem> partitionStream) {
77 // IPartitionableConverter<CsvStreamItem, IReader<CdmBase>, String> converter = getConverter(partitionStream.getTerm(), state);
78 // if (converter == null){
79 // state.setSuccess(false);
80 // return;
81 // }
82 //
83 // IReader<CsvStreamItem> lookaheadStream = partitionStream.getLookaheadReader();
84 // Map<String, Set<String>> foreignKeys = converter.getPartitionForeignKeys(lookaheadStream);
85 // IImportMapping mapping = state.getMapping();
86 // IImportMapping partialMapping = mapping.getPartialMapping(foreignKeys);
87 // state.loadRelatedObjects(partialMapping);
88 //
89 // ConcatenatingReader<MappedCdmBase> reader = new ConcatenatingReader<MappedCdmBase>();
90 //
91 // IReader<CsvStreamItem> inputStream = partitionStream.read();
92 // while (inputStream.hasNext()){
93 // IReader<MappedCdmBase> resultReader = converter.map(inputStream.read());
94 // reader.add(resultReader);
95 // }
96 //
97 // while (reader.hasNext()){
98 // MappedCdmBase mappedCdmBase = (reader.read());
99 // CdmBase cdmBase = mappedCdmBase.getCdmBase();
100 // //locate
101 // //TODO find a way to define the location
102 // String location = "partitionStream";
103 // //save
104 // save(cdmBase, state, location);
105 // //store in mapping
106 // if (mappedCdmBase.getSourceId() != null && cdmBase.isInstanceOf(IdentifiableEntity.class)){
107 // IdentifiableEntity<?> entity = CdmBase.deproxy(cdmBase, IdentifiableEntity.class);
108 // String namespace = mappedCdmBase.getNamespace();
109 // //TODO also store in partition mapping
110 // state.putMapping(namespace,mappedCdmBase.getSourceId(), entity);
111 // }
112 // }
113 // return;
114 // }
115
116 /**
117 * @param state
118 * @param item
119 * @return
120 */
121 private void handleCsvStreamItem(DwcaImportState state, CsvStreamItem item) {
122 IConverter<CsvStreamItem, IReader<CdmBase>, String> converter = getConverter(item.term, state);
123 if (converter == null){
124 state.setSuccess(false);
125 return;
126 }
127 IReader<MappedCdmBase> resultReader = converter.map(item);
128 handleResults(state, resultReader, item.getLocation());
129 return;
130 }
131
132 /**
133 * @param state
134 * @param item
135 * @param resultReader
136 */
137 private void handleResults(DwcaImportState state, IReader<MappedCdmBase> resultReader, String location) {
138 while (resultReader.hasNext()){
139
140 MappedCdmBase mappedCdmBase = (resultReader.read());
141 CdmBase cdmBase = mappedCdmBase.getCdmBase();
142 save(cdmBase, state, location);
143 if (mappedCdmBase.getSourceId() != null && cdmBase.isInstanceOf(IdentifiableEntity.class)){
144 IdentifiableEntity<?> entity = CdmBase.deproxy(cdmBase, IdentifiableEntity.class);
145
146 String namespace = mappedCdmBase.getNamespace();
147 state.putMapping(namespace,mappedCdmBase.getSourceId(), entity);
148 }
149 }
150 }
151
152 private void finalizeStream(CsvStream csvStream, DwcaImportState state) {
153 if (csvStream.getTerm().equals(TermUri.DWC_TAXON)){
154 if (state.isTaxaCreated() == false){
155 state.setTaxaCreated(true);
156 }
157 }
158
159 }
160
161 private void save(CdmBase cdmBase, DwcaImportState state, String location) {
162 if (state.isCheck()){
163 //do nothing
164 }else{
165 if (cdmBase == null){
166 logger.warn("cdmBase is null");
167 }
168 //start preliminary for testing
169 IIdentifiableEntityService service;
170 try {
171 if (cdmBase.isInstanceOf(IdentifiableEntity.class)){
172 service = getServiceByClass(cdmBase.getClass());
173 if (service != null){
174 IdentifiableEntity<?> entity = CdmBase.deproxy(cdmBase, IdentifiableEntity.class);
175 service.saveOrUpdate(entity);
176 }
177 }
178 } catch (IllegalArgumentException e) {
179 fireWarningEvent(e.getMessage(), location, 12);
180 }
181
182 // System.out.println(cdmBase.toString());
183 //end preliminary
184
185 //TODO
186 }
187 }
188
189 private IPartitionableConverter<CsvStreamItem,IReader<CdmBase>, String> getConverter(TermUri namespace, DwcaImportState state) {
190 if (namespace.equals(TermUri.DWC_TAXON)){
191 if (! state.isTaxaCreated()){
192 return new DwcTaxonCsv2CdmTaxonConverter(state);
193 }else{
194 return new DwcTaxonCsv2CdmTaxonRelationConverter(state);
195 }
196 }else if (namespace.equals(TermUri.GBIF_VERNACULAR_NAMES)){
197 return new GbifVernacularNameCsv2CdmConverter(state);
198 }else if (namespace.equals(TermUri.GBIF_DESCRIPTION)){
199 return new GbifDescriptionCsv2CdmConverter(state);
200 }else{
201 String message = "Now converter available for %s";
202 logger.error(String.format(message, namespace));
203 return null;
204 }
205 }
206
207
208 /**
209 * Returns an appropriate service to persist data of a certain class.
210 * If an appropriate service can't be found an {@link IllegalArgumentException} is thrown.
211 *
212 * TODO move to a more general place to make it available to everyone.
213 *
214 * @param app
215 * @param clazz
216 * @return
217 */
218 protected IIdentifiableEntityService getServiceByClass(Class<?> clazz) throws IllegalArgumentException {
219 if (clazz == null){
220 //throw exception below
221 }else if (TaxonBase.class.isAssignableFrom(clazz)){
222 return this.getTaxonService();
223 }else if (Classification.class.isAssignableFrom(clazz)){
224 return this.getClassificationService();
225 }
226 String warning = "Can't map class to cdmService: %s";
227 warning = String.format(warning, (clazz == null ? "-" : clazz.getName()));
228 throw new IllegalArgumentException(warning);
229 }
230
231
232 @Override
233 protected boolean doCheck(DwcaImportState state) {
234 return state.isCheck();
235 }
236
237 @Override
238 protected boolean isIgnore(DwcaImportState state) {
239 return false;
240 }
241
242 }