Project

General

Profile

Download (10.3 KB) Statistics
| Branch: | Tag: | Revision:
1
/**
2
* Copyright (C) 2009 EDIT
3
* European Distributed Institute of Taxonomy
4
* http://www.e-taxonomy.eu
5
*
6
* The contents of this file are subject to the Mozilla Public License Version 1.1
7
* See LICENSE.TXT at the top of this package for the full license terms.
8
*/
9
package eu.etaxonomy.cdm.io.stream;
10

    
11
import java.util.UUID;
12

    
13
import org.apache.log4j.Logger;
14
import org.springframework.transaction.TransactionStatus;
15

    
16
import eu.etaxonomy.cdm.api.service.IIdentifiableEntityService;
17
import eu.etaxonomy.cdm.io.common.CdmImportBase;
18
import eu.etaxonomy.cdm.io.dwca.TermUri;
19
import eu.etaxonomy.cdm.io.dwca.in.FilteredStream;
20
import eu.etaxonomy.cdm.io.dwca.in.IConverter;
21
import eu.etaxonomy.cdm.io.dwca.in.IPartitionableConverter;
22
import eu.etaxonomy.cdm.io.dwca.in.IReader;
23
import eu.etaxonomy.cdm.io.dwca.in.ItemFilter;
24
import eu.etaxonomy.cdm.io.dwca.in.MappedCdmBase;
25
import eu.etaxonomy.cdm.io.dwca.in.StreamPartitioner;
26
import eu.etaxonomy.cdm.model.agent.AgentBase;
27
import eu.etaxonomy.cdm.model.common.CdmBase;
28
import eu.etaxonomy.cdm.model.common.DefinedTermBase;
29
import eu.etaxonomy.cdm.model.common.IdentifiableEntity;
30
import eu.etaxonomy.cdm.model.common.TermVocabulary;
31
import eu.etaxonomy.cdm.model.description.DescriptionBase;
32
import eu.etaxonomy.cdm.model.description.Feature;
33
import eu.etaxonomy.cdm.model.name.TaxonNameBase;
34
import eu.etaxonomy.cdm.model.occurrence.Collection;
35
import eu.etaxonomy.cdm.model.occurrence.SpecimenOrObservationBase;
36
import eu.etaxonomy.cdm.model.reference.Reference;
37
import eu.etaxonomy.cdm.model.taxon.Classification;
38
import eu.etaxonomy.cdm.model.taxon.TaxonBase;
39

    
40
/**
41
 *
42
 * @author a.mueller
43
 *
44
 */
45
public abstract class StreamImportBase<CONFIG extends StreamImportConfiguratorBase, STATE extends StreamImportStateBase<CONFIG,StreamImportBase>> extends CdmImportBase<CONFIG, STATE>{
46
	private static final Logger logger = Logger.getLogger(StreamImportBase.class);
47

    
48

    
49
	protected void makeSourceRef(STATE state) {
50
		Reference sourceRef = state.getConfig().getSourceReference();
51
		getReferenceService().saveOrUpdate(sourceRef);
52
	}
53

    
54

    
55
	/**
56
	 * @param state
57
	 * @param itemStream
58
	 */
59
	protected void handleSingleRecord(STATE state, IItemStream recordStream) {
60
		recordStream.addObservers(state.getConfig().getObservers());
61

    
62
		if (state.getConfig().isUsePartitions()){
63
			IPartitionableConverter<StreamItem, IReader<CdmBase>, String> partitionConverter = getConverter(recordStream.getTerm(), state);
64
			if (partitionConverter == null){
65
				String warning = "No converter available for %s. Continue with next stream.";
66
				warning = String.format(warning, recordStream.getTerm());
67
				fireWarningEvent (warning, recordStream.toString(), 12);
68
				return;
69
			}
70

    
71
			int partitionSize = state.getConfig().getDefaultPartitionSize();
72

    
73
			ItemFilter<StreamItem> filter = partitionConverter.getItemFilter();
74
			IItemStream filteredStream = filter == null ? recordStream : new FilteredStream(recordStream, filter);
75
			StreamPartitioner<StreamItem> partitionStream = new StreamPartitioner<StreamItem>(filteredStream,
76
					partitionConverter, state, partitionSize);//   (csvStream, streamConverter,state 1000);
77

    
78
			int i = 1;
79
			while (partitionStream.hasNext()){
80
				//FIXME more generic handling of transactions
81
				TransactionStatus tx = startTransaction();
82

    
83
				try {
84
					IReader<MappedCdmBase> partStream = partitionStream.read();
85

    
86
					fireProgressEvent("Handel " + i + ". partition", i + ". partition");
87
					logger.info("Handel " + i++ + ". partition");
88
					String location = "Location: partition stream (TODO)";
89
					handleResults(state, partStream, location);
90
					commitTransaction(tx);
91
				} catch (Exception e) {
92
					String message = "An exception occurred while handling partition: " + e;
93
					String codeLocation;
94
					if (e.getStackTrace().length > 0){
95
						StackTraceElement el = e.getStackTrace()[0];
96
						codeLocation = el.getClassName()+ "." + el.getMethodName() + "(" + el.getLineNumber() + ")";
97
					}else{
98
						codeLocation = "No stacktrace";
99
					}
100
					message = message + " in: " +  codeLocation;
101
					fireWarningEvent(message , String.valueOf(filteredStream.getItemLocation()) , 12);
102
					this.rollbackTransaction(tx);
103
				}
104

    
105
			}
106
			logger.debug("Partition stream is empty");
107
		}else {
108

    
109
			while (recordStream.hasNext()){
110
					TransactionStatus tx = startTransaction();
111

    
112
					StreamItem item = recordStream.read();
113
					handleStreamItem(state, item);
114

    
115
					commitTransaction(tx);
116
			}
117
		}
118

    
119
		finalizeStream(recordStream, state);
120
	}
121

    
122

    
123
	/**
124
	 * @param itemStream
125
	 * @param state
126
	 */
127
	protected void finalizeStream(IItemStream itemStream, STATE state) {
128
		fireWarningEvent("Stream finished", itemStream.getItemLocation(), 0);
129
	}
130

    
131

    
132
	/**
133
	 * @param state
134
	 * @param item
135
	 * @return
136
	 */
137
	private void handleStreamItem(STATE state, StreamItem item) {
138
		IConverter<StreamItem, IReader<CdmBase>, String> converter = getConverter(item.term, state);
139
		if (converter == null){
140
			state.setSuccess(false);
141
			return;
142
		}
143
		IReader<MappedCdmBase> resultReader = converter.map(item);
144
		handleResults(state, resultReader, item.getLocation());
145
		return;
146
	}
147

    
148

    
149
	/**
150
	 * @param state
151
	 * @param item
152
	 * @param resultReader
153
	 */
154
	private void handleResults(STATE state, IReader<MappedCdmBase> resultReader, String location) {
155
		while (resultReader.hasNext()){
156

    
157
			MappedCdmBase<?> mappedCdmBase = resultReader.read();
158
			CdmBase cdmBase = mappedCdmBase.getCdmBase();
159
			save(cdmBase, state, location);
160
			if (mappedCdmBase.getSourceId() != null && cdmBase.isInstanceOf(IdentifiableEntity.class)){
161
				IdentifiableEntity<?> entity = CdmBase.deproxy(cdmBase, IdentifiableEntity.class);
162

    
163
				String namespace = mappedCdmBase.getNamespace();
164
				state.putMapping(namespace,mappedCdmBase.getSourceId(), entity);
165
			}
166
		}
167
	}
168

    
169

    
170

    
171
//	private void handlePartitionedStreamItem(DwcaImportState state,  StreamPartitioner<CsvStreamItem> partitionStream) {
172
//		IPartitionableConverter<CsvStreamItem, IReader<CdmBase>, String> converter = getConverter(partitionStream.getTerm(), state);
173
//		if (converter == null){
174
//			state.setSuccess(false);
175
//			return;
176
//		}
177
//
178
//		IReader<CsvStreamItem> lookaheadStream = partitionStream.getLookaheadReader();
179
//		Map<String, Set<String>> foreignKeys = converter.getPartitionForeignKeys(lookaheadStream);
180
//		IImportMapping mapping = state.getMapping();
181
//		IImportMapping partialMapping = mapping.getPartialMapping(foreignKeys);
182
//		state.loadRelatedObjects(partialMapping);
183
//
184
//		ConcatenatingReader<MappedCdmBase> reader = new ConcatenatingReader<MappedCdmBase>();
185
//
186
//		IReader<CsvStreamItem> inputStream = partitionStream.read();
187
//		while (inputStream.hasNext()){
188
//			IReader<MappedCdmBase> resultReader = converter.map(inputStream.read());
189
//			reader.add(resultReader);
190
//		}
191
//
192
//		while (reader.hasNext()){
193
//			MappedCdmBase mappedCdmBase = (reader.read());
194
//			CdmBase cdmBase = mappedCdmBase.getCdmBase();
195
//			//locate
196
//			//TODO find a way to define the location
197
//			String location = "partitionStream";
198
//			//save
199
//			save(cdmBase, state, location);
200
//			//store in mapping
201
//			if (mappedCdmBase.getSourceId() != null && cdmBase.isInstanceOf(IdentifiableEntity.class)){
202
//				IdentifiableEntity<?> entity = CdmBase.deproxy(cdmBase, IdentifiableEntity.class);
203
//				String namespace = mappedCdmBase.getNamespace();
204
//				//TODO also store in partition mapping
205
//				state.putMapping(namespace,mappedCdmBase.getSourceId(), entity);
206
//			}
207
//		}
208
//		return;
209
//	}
210

    
211
	protected void save(CdmBase cdmBase, STATE state, String location) {
212
		if (state.isCheck()){
213
			//do nothing
214
		}else{
215
			if (cdmBase == null){
216
				logger.warn("cdmBase is null");
217
			}
218
			//start preliminary for testing
219
			IIdentifiableEntityService service;
220
			try {
221
				if (cdmBase.isInstanceOf(IdentifiableEntity.class)){
222
					service = getServiceByClass(cdmBase.getClass());
223
					if (service != null){
224
						IdentifiableEntity<?> entity = CdmBase.deproxy(cdmBase, IdentifiableEntity.class);
225
						service.saveOrUpdate(entity);
226
					}
227
				}
228
			} catch (IllegalArgumentException e) {
229
				fireWarningEvent(e.getMessage(), location, 12);
230
			}
231

    
232
//			System.out.println(cdmBase.toString());
233
			//end preliminary
234

    
235
			//TODO
236
		}
237
	}
238

    
239
	protected abstract IPartitionableConverter<StreamItem,IReader<CdmBase>, String> getConverter(TermUri namespace, STATE state);
240

    
241

    
242
	/**
243
	 * Returns an appropriate service to persist data of a certain class.
244
	 * If an appropriate service can't be found an {@link IllegalArgumentException} is thrown.
245
	 *
246
	 * TODO move to a more general place to make it available to everyone.
247
	 *
248
	 * @param app
249
	 * @param clazz
250
	 * @return
251
	 */
252
	protected IIdentifiableEntityService getServiceByClass(Class<?> clazz)  throws IllegalArgumentException {
253
		if (clazz == null){
254
			//throw exception below
255
		}else if (TaxonBase.class.isAssignableFrom(clazz)){
256
			return this.getTaxonService();
257
		}else if (Classification.class.isAssignableFrom(clazz)){
258
			return this.getClassificationService();
259
		}else if (Reference.class.isAssignableFrom(clazz)){
260
			return this.getReferenceService();
261
		}else if (TaxonNameBase.class.isAssignableFrom(clazz)){
262
			return this.getNameService();
263
		}else if (DefinedTermBase.class.isAssignableFrom(clazz)){
264
			return this.getTermService();
265
		}else if (DescriptionBase.class.isAssignableFrom(clazz)){
266
			return this.getDescriptionService();
267
		}else if (SpecimenOrObservationBase.class.isAssignableFrom(clazz)){
268
			return this.getOccurrenceService();
269
		}else if (Collection.class.isAssignableFrom(clazz)){
270
			return this.getCollectionService();
271
		}else if (AgentBase.class.isAssignableFrom(clazz)){
272
			return this.getDescriptionService();
273
		}
274
		String warning = "Can't map class to API service: %s";
275
		warning = String.format(warning, (clazz == null ? "-" : clazz.getName()));
276
		throw new IllegalArgumentException(warning);
277
	}
278

    
279

    
280

    
281

    
282
	/* (non-Javadoc)
283
	 * @see eu.etaxonomy.cdm.io.common.CdmImportBase#getFeature(eu.etaxonomy.cdm.io.common.ImportStateBase, java.util.UUID, java.lang.String, java.lang.String, java.lang.String, eu.etaxonomy.cdm.model.common.TermVocabulary)
284
	 */
285
	//Make public to allow to use by converters
286
	@Override
287
	public Feature getFeature(STATE state, UUID uuid, String label, String description, String labelAbbrev, TermVocabulary<Feature> voc) {
288
		return super.getFeature(state, uuid, label, description, labelAbbrev, voc);
289
	}
290

    
291
	/**
292
	 * Saves a new term. Immediate saving is required to avoid by Transient-Object-Exceptions.
293
	 * @param newTerm
294
	 */
295
	public void saveNewTerm(DefinedTermBase newTerm) {
296
		getTermService().save(newTerm);
297
	}
298

    
299

    
300

    
301
}
(2-2/5)