Project

General

Profile

Download (10.2 KB) Statistics
| Branch: | Tag: | Revision:
1
/**
2
* Copyright (C) 2009 EDIT
3
* European Distributed Institute of Taxonomy
4
* http://www.e-taxonomy.eu
5
*
6
* The contents of this file are subject to the Mozilla Public License Version 1.1
7
* See LICENSE.TXT at the top of this package for the full license terms.
8
*/
9
package eu.etaxonomy.cdm.io.stream;
10

    
11
import java.util.UUID;
12

    
13
import org.apache.log4j.Logger;
14
import org.springframework.transaction.TransactionStatus;
15

    
16
import eu.etaxonomy.cdm.api.service.IIdentifiableEntityService;
17
import eu.etaxonomy.cdm.io.common.CdmImportBase;
18
import eu.etaxonomy.cdm.io.dwca.TermUri;
19
import eu.etaxonomy.cdm.io.dwca.in.FilteredStream;
20
import eu.etaxonomy.cdm.io.dwca.in.IConverter;
21
import eu.etaxonomy.cdm.io.dwca.in.IPartitionableConverter;
22
import eu.etaxonomy.cdm.io.dwca.in.IReader;
23
import eu.etaxonomy.cdm.io.dwca.in.ItemFilter;
24
import eu.etaxonomy.cdm.io.dwca.in.MappedCdmBase;
25
import eu.etaxonomy.cdm.io.dwca.in.StreamPartitioner;
26
import eu.etaxonomy.cdm.model.agent.AgentBase;
27
import eu.etaxonomy.cdm.model.common.CdmBase;
28
import eu.etaxonomy.cdm.model.common.DefinedTermBase;
29
import eu.etaxonomy.cdm.model.common.IdentifiableEntity;
30
import eu.etaxonomy.cdm.model.common.TermVocabulary;
31
import eu.etaxonomy.cdm.model.description.DescriptionBase;
32
import eu.etaxonomy.cdm.model.description.Feature;
33
import eu.etaxonomy.cdm.model.name.TaxonName;
34
import eu.etaxonomy.cdm.model.occurrence.Collection;
35
import eu.etaxonomy.cdm.model.occurrence.SpecimenOrObservationBase;
36
import eu.etaxonomy.cdm.model.reference.Reference;
37
import eu.etaxonomy.cdm.model.taxon.Classification;
38
import eu.etaxonomy.cdm.model.taxon.TaxonBase;
39

    
40
/**
41
 *
42
 * @author a.mueller
43
 *
44
 */
45
public abstract class StreamImportBase<CONFIG extends StreamImportConfiguratorBase, STATE extends StreamImportStateBase<CONFIG,StreamImportBase>>
46
        extends CdmImportBase<CONFIG, STATE>{
47

    
48
    private static final long serialVersionUID = -125414263689509881L;
49
    private static final Logger logger = Logger.getLogger(StreamImportBase.class);
50

    
51

    
52
	protected void makeSourceRef(STATE state) {
53
		Reference sourceRef = state.getConfig().getSourceReference();
54
		getReferenceService().saveOrUpdate(sourceRef);
55
	}
56

    
57

    
58
	/**
59
	 * @param state
60
	 * @param itemStream
61
	 */
62
	protected void handleSingleRecord(STATE state, IItemStream recordStream) {
63
		recordStream.addObservers(state.getConfig().getObservers());
64

    
65
		if (state.getConfig().isUsePartitions()){
66
			IPartitionableConverter<StreamItem, IReader<CdmBase>, String> partitionConverter = getConverter(recordStream.getTerm(), state);
67
			if (partitionConverter == null){
68
				String warning = "No converter available for %s. Continue with next stream.";
69
				warning = String.format(warning, recordStream.getTerm());
70
				fireWarningEvent (warning, recordStream.toString(), 12);
71
				return;
72
			}
73

    
74
			int partitionSize = state.getConfig().getDefaultPartitionSize();
75

    
76
			ItemFilter<StreamItem> filter = partitionConverter.getItemFilter();
77
			IItemStream filteredStream = filter == null ? recordStream : new FilteredStream(recordStream, filter);
78
			StreamPartitioner<StreamItem> partitionStream = new StreamPartitioner<>(filteredStream,
79
					partitionConverter, state, partitionSize);//   (csvStream, streamConverter,state 1000);
80

    
81
			int i = 1;
82
			while (partitionStream.hasNext()){
83
				//FIXME more generic handling of transactions
84
				TransactionStatus tx = startTransaction();
85

    
86
				try {
87
					IReader<MappedCdmBase<? extends CdmBase>> partStream = partitionStream.read();
88

    
89
					fireProgressEvent("Handel " + i + ". partition", i + ". partition");
90
					logger.info("Handel " + i++ + ". partition");
91
					String location = "Location: partition stream (TODO)";
92
					handleResults(state, partStream, location);
93
					commitTransaction(tx);
94
				} catch (Exception e) {
95
					String message = "An exception occurred while handling partition: " + e;
96
					String codeLocation;
97
					if (e.getStackTrace().length > 0){
98
						StackTraceElement el = e.getStackTrace()[0];
99
						codeLocation = el.getClassName()+ "." + el.getMethodName() + "(" + el.getLineNumber() + ")";
100
					}else{
101
						codeLocation = "No stacktrace";
102
					}
103
					message = message + " in: " +  codeLocation;
104
					fireWarningEvent(message , String.valueOf(filteredStream.getItemLocation()) , 12);
105
					this.rollbackTransaction(tx);
106
				}
107

    
108
			}
109
			logger.debug("Partition stream is empty");
110
		}else {
111

    
112
			while (recordStream.hasNext()){
113
					TransactionStatus tx = startTransaction();
114

    
115
					StreamItem item = recordStream.read();
116
					handleStreamItem(state, item);
117

    
118
					commitTransaction(tx);
119
			}
120
		}
121

    
122
		finalizeStream(recordStream, state);
123
	}
124

    
125

    
126
	/**
127
	 * @param itemStream
128
	 * @param state
129
	 */
130
	protected void finalizeStream(IItemStream itemStream, STATE state) {
131
		fireWarningEvent("Stream finished", itemStream.getItemLocation(), 0);
132
	}
133

    
134

    
135
	/**
136
	 * @param state
137
	 * @param item
138
	 * @return
139
	 */
140
	private void handleStreamItem(STATE state, StreamItem item) {
141
		IConverter<StreamItem, IReader<CdmBase>, String> converter = getConverter(item.term, state);
142
		if (converter == null){
143
			state.setSuccess(false);
144
			return;
145
		}
146
		IReader<MappedCdmBase<? extends CdmBase>> resultReader = converter.map(item);
147
		handleResults(state, resultReader, item.getLocation());
148
		return;
149
	}
150

    
151

    
152
	/**
153
	 * @param state
154
	 * @param item
155
	 * @param resultReader
156
	 */
157
	private void handleResults(STATE state, IReader<MappedCdmBase<? extends CdmBase>> resultReader, String location) {
158
		while (resultReader.hasNext()){
159

    
160
			MappedCdmBase<?> mappedCdmBase = resultReader.read();
161
			CdmBase cdmBase = mappedCdmBase.getCdmBase();
162
			save(cdmBase, state, location);
163
			if (mappedCdmBase.getSourceId() != null && cdmBase.isInstanceOf(IdentifiableEntity.class)){
164
				IdentifiableEntity<?> entity = CdmBase.deproxy(cdmBase, IdentifiableEntity.class);
165

    
166
				String namespace = mappedCdmBase.getNamespace();
167
				state.putMapping(namespace,mappedCdmBase.getSourceId(), entity);
168
			}
169
		}
170
	}
171

    
172

    
173

    
174
//	private void handlePartitionedStreamItem(DwcaImportState state,  StreamPartitioner<CsvStreamItem> partitionStream) {
175
//		IPartitionableConverter<CsvStreamItem, IReader<CdmBase>, String> converter = getConverter(partitionStream.getTerm(), state);
176
//		if (converter == null){
177
//			state.setSuccess(false);
178
//			return;
179
//		}
180
//
181
//		IReader<CsvStreamItem> lookaheadStream = partitionStream.getLookaheadReader();
182
//		Map<String, Set<String>> foreignKeys = converter.getPartitionForeignKeys(lookaheadStream);
183
//		IImportMapping mapping = state.getMapping();
184
//		IImportMapping partialMapping = mapping.getPartialMapping(foreignKeys);
185
//		state.loadRelatedObjects(partialMapping);
186
//
187
//		ConcatenatingReader<MappedCdmBase> reader = new ConcatenatingReader<MappedCdmBase>();
188
//
189
//		IReader<CsvStreamItem> inputStream = partitionStream.read();
190
//		while (inputStream.hasNext()){
191
//			IReader<MappedCdmBase> resultReader = converter.map(inputStream.read());
192
//			reader.add(resultReader);
193
//		}
194
//
195
//		while (reader.hasNext()){
196
//			MappedCdmBase mappedCdmBase = (reader.read());
197
//			CdmBase cdmBase = mappedCdmBase.getCdmBase();
198
//			//locate
199
//			//TODO find a way to define the location
200
//			String location = "partitionStream";
201
//			//save
202
//			save(cdmBase, state, location);
203
//			//store in mapping
204
//			if (mappedCdmBase.getSourceId() != null && cdmBase.isInstanceOf(IdentifiableEntity.class)){
205
//				IdentifiableEntity<?> entity = CdmBase.deproxy(cdmBase, IdentifiableEntity.class);
206
//				String namespace = mappedCdmBase.getNamespace();
207
//				//TODO also store in partition mapping
208
//				state.putMapping(namespace,mappedCdmBase.getSourceId(), entity);
209
//			}
210
//		}
211
//		return;
212
//	}
213

    
214
	protected void save(CdmBase cdmBase, STATE state, String location) {
215
		if (state.isCheck()){
216
			//do nothing
217
		}else{
218
			if (cdmBase == null){
219
				logger.warn("cdmBase is null");
220
			}
221
			//start preliminary for testing
222
			IIdentifiableEntityService service;
223
			try {
224
				if (cdmBase.isInstanceOf(IdentifiableEntity.class)){
225
					service = getServiceByClass(cdmBase.getClass());
226
					if (service != null){
227
						IdentifiableEntity<?> entity = CdmBase.deproxy(cdmBase, IdentifiableEntity.class);
228
						service.saveOrUpdate(entity);
229
					}
230
				}
231
			} catch (IllegalArgumentException e) {
232
				fireWarningEvent(e.getMessage(), location, 12);
233
			}
234

    
235
//			System.out.println(cdmBase.toString());
236
			//end preliminary
237

    
238
			//TODO
239
		}
240
	}
241

    
242
	protected abstract IPartitionableConverter<StreamItem,IReader<CdmBase>, String> getConverter(TermUri namespace, STATE state);
243

    
244

    
245
	/**
246
	 * Returns an appropriate service to persist data of a certain class.
247
	 * If an appropriate service can't be found an {@link IllegalArgumentException} is thrown.
248
	 *
249
	 * TODO move to a more general place to make it available to everyone.
250
	 *
251
	 * @param app
252
	 * @param clazz
253
	 * @return
254
	 */
255
	protected IIdentifiableEntityService getServiceByClass(Class<?> clazz)  throws IllegalArgumentException {
256
		if (clazz == null){
257
			//throw exception below
258
		}else if (TaxonBase.class.isAssignableFrom(clazz)){
259
			return this.getTaxonService();
260
		}else if (Classification.class.isAssignableFrom(clazz)){
261
			return this.getClassificationService();
262
		}else if (Reference.class.isAssignableFrom(clazz)){
263
			return this.getReferenceService();
264
		}else if (TaxonName.class.isAssignableFrom(clazz)){
265
			return this.getNameService();
266
		}else if (DefinedTermBase.class.isAssignableFrom(clazz)){
267
			return this.getTermService();
268
		}else if (DescriptionBase.class.isAssignableFrom(clazz)){
269
			return this.getDescriptionService();
270
		}else if (SpecimenOrObservationBase.class.isAssignableFrom(clazz)){
271
			return this.getOccurrenceService();
272
		}else if (Collection.class.isAssignableFrom(clazz)){
273
			return this.getCollectionService();
274
		}else if (AgentBase.class.isAssignableFrom(clazz)){
275
			return this.getDescriptionService();
276
		}
277
		String warning = "Can't map class to API service: %s";
278
		warning = String.format(warning, (clazz == null ? "-" : clazz.getName()));
279
		throw new IllegalArgumentException(warning);
280
	}
281

    
282

    
283
	//Make public to allow to use by converters
284
	@Override
285
	public Feature getFeature(STATE state, UUID uuid, String label, String description, String labelAbbrev, TermVocabulary<Feature> voc) {
286
		return super.getFeature(state, uuid, label, description, labelAbbrev, voc);
287
	}
288

    
289
	/**
290
	 * Saves a new term. Immediate saving is required to avoid by Transient-Object-Exceptions.
291
	 * @param newTerm
292
	 */
293
	public void saveNewTerm(DefinedTermBase newTerm) {
294
		getTermService().save(newTerm);
295
	}
296

    
297

    
298

    
299
}
(2-2/5)