Project

General

Profile

Download (11.5 KB) Statistics
| Branch: | Tag: | Revision:
1
/**
2
* Copyright (C) 2009 EDIT
3
* European Distributed Institute of Taxonomy 
4
* http://www.e-taxonomy.eu
5
* 
6
* The contents of this file are subject to the Mozilla Public License Version 1.1
7
* See LICENSE.TXT at the top of this package for the full license terms.
8
*/
9
package eu.etaxonomy.cdm.io.dwca.in;
10

    
11
import java.net.URI;
12
import java.util.UUID;
13

    
14
import org.apache.log4j.Logger;
15
import org.springframework.stereotype.Component;
16
import org.springframework.transaction.TransactionStatus;
17

    
18
import eu.etaxonomy.cdm.api.service.IIdentifiableEntityService;
19
import eu.etaxonomy.cdm.io.common.CdmImportBase;
20
import eu.etaxonomy.cdm.io.dwca.TermUri;
21
import eu.etaxonomy.cdm.model.common.CdmBase;
22
import eu.etaxonomy.cdm.model.common.IdentifiableEntity;
23
import eu.etaxonomy.cdm.model.common.TermVocabulary;
24
import eu.etaxonomy.cdm.model.name.TaxonNameBase;
25
import eu.etaxonomy.cdm.model.reference.Reference;
26
import eu.etaxonomy.cdm.model.taxon.Classification;
27
import eu.etaxonomy.cdm.model.taxon.TaxonBase;
28
import eu.etaxonomy.cdm.model.common.DefinedTermBase;
29
import eu.etaxonomy.cdm.model.description.DescriptionBase;
30
import eu.etaxonomy.cdm.model.description.Feature;
31
/**
32
 * 
33
 * @author a.mueller
34
 *
35
 */
36

    
37
@Component
38
public class DwcaImport extends CdmImportBase<DwcaImportConfigurator, DwcaImportState>{
39
	private static final Logger logger = Logger.getLogger(DwcaImport.class);
40

    
41
	@Override
42
	protected void doInvoke(DwcaImportState state) {
43
		URI source = state.getConfig().getSource();
44
		makeSourceRef(state);
45
		
46
		DwcaZipToStreamConverter<DwcaImportState> dwcaStreamConverter = DwcaZipToStreamConverter.NewInstance(source);
47
		IReader<CsvStream> zipEntryStream = dwcaStreamConverter.getEntriesStream(state);
48
		while (zipEntryStream.hasNext()){
49
			CsvStream csvStream = zipEntryStream.read();
50
			try {
51
				handleSingleZipEntry(state, csvStream);
52
			} catch (Exception e) {
53
				String message = "Exception (%s) occurred while handling zip entry %s";
54
				message = String.format(message, e.getMessage(), csvStream.toString());
55
				fireWarningEvent (message, csvStream.toString(), 14);
56
			}
57
		}
58
		state.finish();
59
		return;
60
	}
61

    
62
	/**
63
	 * @param state
64
	 * @param zipEntryStream
65
	 */
66
	private void handleSingleZipEntry(DwcaImportState state, CsvStream csvStream) {
67
		csvStream.addObservers(state.getConfig().getObservers());
68
		
69
		if (state.getConfig().isUsePartitions()){
70
			IPartitionableConverter<CsvStreamItem, IReader<CdmBase>, String> partitionConverter = getConverter(csvStream.getTerm(), state);
71
			if (partitionConverter == null){
72
				String warning = "No converter available for %s. Continue with next stream.";
73
				warning = String.format(warning, csvStream.getTerm());
74
				fireWarningEvent (warning, csvStream.toString(), 12);
75
				return;
76
			}
77
			
78
			int partitionSize = state.getConfig().getDefaultPartitionSize();
79
			StreamPartitioner<CsvStreamItem> partitionStream = new StreamPartitioner<CsvStreamItem>(csvStream, 
80
					partitionConverter, state, partitionSize);//   (csvStream, streamConverter,state 1000);
81
			
82
			int i = 1;
83
			while (partitionStream.hasNext()){
84
				//FIXME more generic handling of transactions
85
				TransactionStatus tx = startTransaction();
86
				
87
				try {
88
					IReader<MappedCdmBase> partStream = partitionStream.read();
89

    
90
					fireProgressEvent("Handel " + i + ". partition", i + ". partition");
91
					logger.info("Handel " + i++ + ". partition");
92
					String location = "Location: partition stream (TODO)";
93
					handleResults(state, partStream, location);
94
					commitTransaction(tx);
95
				} catch (Exception e) {
96
					String message = "An exception occurred while handling partition: " + e;
97
					String codeLocation;
98
					if (e.getStackTrace().length > 0){
99
						StackTraceElement el = e.getStackTrace()[0];
100
						codeLocation = el.getClassName()+ "." + el.getMethodName() + "(" + el.getLineNumber() + ")";
101
					}else{
102
						codeLocation = "No stacktrace";
103
					}
104
					message = message + " in: " +  codeLocation;
105
					fireWarningEvent(message , String.valueOf(csvStream.getLine()) , 12);
106
					this.rollbackTransaction(tx);
107
				}
108
				
109
			}
110
			logger.debug("Partition stream is empty");
111
		}else {
112
				
113
			while (csvStream.hasNext()){
114
					TransactionStatus tx = startTransaction();
115
					
116
					CsvStreamItem item = csvStream.read();
117
					handleCsvStreamItem(state, item);
118
					
119
					commitTransaction(tx);
120
			}
121
		}
122

    
123
		finalizeStream(csvStream, state);
124
	}
125

    
126
	private void makeSourceRef(DwcaImportState state) {
127
		Reference<?> sourceRef = state.getConfig().getSourceReference();
128
		getReferenceService().saveOrUpdate(sourceRef);
129
	}
130

    
131
//	private void handlePartitionedStreamItem(DwcaImportState state,  StreamPartitioner<CsvStreamItem> partitionStream) {
132
//		IPartitionableConverter<CsvStreamItem, IReader<CdmBase>, String> converter = getConverter(partitionStream.getTerm(), state);
133
//		if (converter == null){
134
//			state.setSuccess(false);
135
//			return;
136
//		}
137
//		
138
//		IReader<CsvStreamItem> lookaheadStream = partitionStream.getLookaheadReader();
139
//		Map<String, Set<String>> foreignKeys = converter.getPartitionForeignKeys(lookaheadStream);
140
//		IImportMapping mapping = state.getMapping();
141
//		IImportMapping partialMapping = mapping.getPartialMapping(foreignKeys);
142
//		state.loadRelatedObjects(partialMapping);
143
//		
144
//		ConcatenatingReader<MappedCdmBase> reader = new ConcatenatingReader<MappedCdmBase>();
145
// 		
146
//		IReader<CsvStreamItem> inputStream = partitionStream.read();
147
//		while (inputStream.hasNext()){
148
//			IReader<MappedCdmBase> resultReader = converter.map(inputStream.read());
149
//			reader.add(resultReader);
150
//		}
151
//			
152
//		while (reader.hasNext()){
153
//			MappedCdmBase mappedCdmBase = (reader.read());
154
//			CdmBase cdmBase = mappedCdmBase.getCdmBase();
155
//			//locate
156
//			//TODO find a way to define the location
157
//			String location = "partitionStream";
158
//			//save
159
//			save(cdmBase, state, location);
160
//			//store in mapping
161
//			if (mappedCdmBase.getSourceId() != null && cdmBase.isInstanceOf(IdentifiableEntity.class)){
162
//				IdentifiableEntity<?> entity = CdmBase.deproxy(cdmBase, IdentifiableEntity.class);
163
//				String namespace = mappedCdmBase.getNamespace();
164
//				//TODO also store in partition mapping
165
//				state.putMapping(namespace,mappedCdmBase.getSourceId(), entity);
166
//			}
167
//		}
168
//		return;
169
//	}
170

    
171
	/**
172
	 * @param state
173
	 * @param item
174
	 * @return
175
	 */
176
	private void handleCsvStreamItem(DwcaImportState state, CsvStreamItem item) {
177
		IConverter<CsvStreamItem, IReader<CdmBase>, String> converter = getConverter(item.term, state);
178
		if (converter == null){
179
			state.setSuccess(false);
180
			return;
181
		}
182
		IReader<MappedCdmBase> resultReader = converter.map(item);
183
		handleResults(state, resultReader, item.getLocation());
184
		return;
185
	}
186

    
187
	/**
188
	 * @param state
189
	 * @param item
190
	 * @param resultReader
191
	 */
192
	private void handleResults(DwcaImportState state, IReader<MappedCdmBase> resultReader, String location) {
193
		while (resultReader.hasNext()){
194
			
195
			MappedCdmBase mappedCdmBase = resultReader.read();
196
			CdmBase cdmBase = mappedCdmBase.getCdmBase();
197
			save(cdmBase, state, location);
198
			if (mappedCdmBase.getSourceId() != null && cdmBase.isInstanceOf(IdentifiableEntity.class)){
199
				IdentifiableEntity<?> entity = CdmBase.deproxy(cdmBase, IdentifiableEntity.class);
200
				
201
				String namespace = mappedCdmBase.getNamespace();
202
				state.putMapping(namespace,mappedCdmBase.getSourceId(), entity);
203
			}
204
		}
205
	}
206

    
207
	private void finalizeStream(CsvStream csvStream, DwcaImportState state) {
208
		fireWarningEvent("Stream finished", csvStream.getFilesLocation(), 0);
209
		if (csvStream.getTerm().equals(TermUri.DWC_TAXON)){
210
			if (state.isTaxaCreated() == false){
211
				state.setTaxaCreated(true);
212
			}
213
		}
214
		
215
	}
216

    
217
	private void save(CdmBase cdmBase, DwcaImportState state, String location) {
218
		if (state.isCheck()){
219
			//do nothing
220
		}else{
221
			if (cdmBase == null){
222
				logger.warn("cdmBase is null");
223
			}
224
			//start preliminary for testing
225
			IIdentifiableEntityService service;
226
			try {
227
				if (cdmBase.isInstanceOf(IdentifiableEntity.class)){
228
					service = getServiceByClass(cdmBase.getClass());
229
					if (service != null){
230
						IdentifiableEntity<?> entity = CdmBase.deproxy(cdmBase, IdentifiableEntity.class);
231
						service.saveOrUpdate(entity);
232
					}
233
				}
234
			} catch (IllegalArgumentException e) {
235
				fireWarningEvent(e.getMessage(), location, 12);
236
			}
237
			
238
//			System.out.println(cdmBase.toString());
239
			//end preliminary
240
			
241
			//TODO
242
		}
243
	}
244

    
245
	private IPartitionableConverter<CsvStreamItem,IReader<CdmBase>, String> getConverter(TermUri namespace, DwcaImportState state) {
246
		if (namespace.equals(TermUri.DWC_TAXON)){
247
			if (! state.isTaxaCreated()){
248
				return new DwcTaxonCsv2CdmTaxonConverter(state);
249
			}else{
250
				return new DwcTaxonCsv2CdmTaxonRelationConverter(state);
251
			}
252
		}else if (namespace.equals(TermUri.GBIF_VERNACULAR_NAMES)){
253
			return new GbifVernacularNameCsv2CdmConverter(state);
254
		}else if (namespace.equals(TermUri.GBIF_DESCRIPTION)){
255
			return new GbifDescriptionCsv2CdmConverter(state);
256
		}else if (namespace.equals(TermUri.GBIF_DISTRIBUTION)){
257
			return new GbifDistributionCsv2CdmConverter(state);
258
		}else if (namespace.equals(TermUri.GBIF_REFERENCE)){
259
			return new GbifReferenceCsv2CdmConverter(state);
260
		}else{
261
			String message = "No converter available for %s";
262
			logger.error(String.format(message, namespace));
263
			return null;
264
		}
265
	}
266
	
267
	
268
	/**
269
	 * Returns an appropriate service to persist data of a certain class.
270
	 * If an appropriate service can't be found an {@link IllegalArgumentException} is thrown.
271
	 * 
272
	 * TODO move to a more general place to make it available to everyone.
273
	 * 
274
	 * @param app
275
	 * @param clazz
276
	 * @return
277
	 */
278
	protected IIdentifiableEntityService getServiceByClass(Class<?> clazz)  throws IllegalArgumentException {
279
		if (clazz == null){
280
			//throw exception below
281
		}else if (TaxonBase.class.isAssignableFrom(clazz)){
282
			return this.getTaxonService();
283
		}else if (Classification.class.isAssignableFrom(clazz)){
284
			return this.getClassificationService();
285
		}else if (Reference.class.isAssignableFrom(clazz)){
286
			return this.getReferenceService();
287
		}else if (TaxonNameBase.class.isAssignableFrom(clazz)){
288
			return this.getNameService();
289
		}else if (DefinedTermBase.class.isAssignableFrom(clazz)){
290
			return this.getTermService();
291
		}else if (DescriptionBase.class.isAssignableFrom(clazz)){
292
			return this.getDescriptionService();
293
		}
294
		String warning = "Can't map class to API service: %s";
295
		warning = String.format(warning, (clazz == null ? "-" : clazz.getName()));
296
		throw new IllegalArgumentException(warning);
297
	}
298
	
299
	
300
	
301

    
302
	/* (non-Javadoc)
303
	 * @see eu.etaxonomy.cdm.io.common.CdmImportBase#getFeature(eu.etaxonomy.cdm.io.common.ImportStateBase, java.util.UUID, java.lang.String, java.lang.String, java.lang.String, eu.etaxonomy.cdm.model.common.TermVocabulary)
304
	 */
305
	//Make public to allow to use by converters
306
	@Override
307
	public Feature getFeature(DwcaImportState state, UUID uuid, String label, String description, String labelAbbrev, TermVocabulary<Feature> voc) {
308
		return super.getFeature(state, uuid, label, description, labelAbbrev, voc);
309
	}
310

    
311
	/**
312
	 * Saves a new term. Immediate saving is required to avoid by Transient-Object-Exceptions.
313
	 * @param newTerm
314
	 */
315
	public void saveNewTerm(DefinedTermBase newTerm) {
316
		getTermService().save(newTerm);
317
	}
318

    
319
	@Override
320
	protected boolean doCheck(DwcaImportState state) {
321
		return state.isCheck();
322
	}
323

    
324
	@Override
325
	protected boolean isIgnore(DwcaImportState state) {
326
		return false;
327
	}
328

    
329
	
330
}
(7-7/30)