Project

General

Profile

Download (11 KB) Statistics
| Branch: | Tag: | Revision:
1
/**
2
* Copyright (C) 2009 EDIT
3
* European Distributed Institute of Taxonomy
4
* http://www.e-taxonomy.eu
5
*
6
* The contents of this file are subject to the Mozilla Public License Version 1.1
7
* See LICENSE.TXT at the top of this package for the full license terms.
8
*/
9
package eu.etaxonomy.cdm.io.stream;
10

    
11
import java.util.UUID;
12

    
13
import org.apache.log4j.Logger;
14
import org.springframework.transaction.TransactionStatus;
15

    
16
import eu.etaxonomy.cdm.api.service.IIdentifiableEntityService;
17
import eu.etaxonomy.cdm.io.common.CdmImportBase;
18
import eu.etaxonomy.cdm.io.dwca.TermUri;
19
import eu.etaxonomy.cdm.io.dwca.in.FilteredStream;
20
import eu.etaxonomy.cdm.io.dwca.in.IConverter;
21
import eu.etaxonomy.cdm.io.dwca.in.IPartitionableConverter;
22
import eu.etaxonomy.cdm.io.dwca.in.IReader;
23
import eu.etaxonomy.cdm.io.dwca.in.ItemFilter;
24
import eu.etaxonomy.cdm.io.dwca.in.MappedCdmBase;
25
import eu.etaxonomy.cdm.io.dwca.in.StreamPartitioner;
26
import eu.etaxonomy.cdm.model.agent.AgentBase;
27
import eu.etaxonomy.cdm.model.common.CdmBase;
28
import eu.etaxonomy.cdm.model.common.DefinedTermBase;
29
import eu.etaxonomy.cdm.model.common.IdentifiableEntity;
30
import eu.etaxonomy.cdm.model.common.Language;
31
import eu.etaxonomy.cdm.model.common.TermVocabulary;
32
import eu.etaxonomy.cdm.model.description.DescriptionBase;
33
import eu.etaxonomy.cdm.model.description.Feature;
34
import eu.etaxonomy.cdm.model.location.NamedArea;
35
import eu.etaxonomy.cdm.model.name.TaxonName;
36
import eu.etaxonomy.cdm.model.occurrence.Collection;
37
import eu.etaxonomy.cdm.model.occurrence.SpecimenOrObservationBase;
38
import eu.etaxonomy.cdm.model.reference.Reference;
39
import eu.etaxonomy.cdm.model.taxon.Classification;
40
import eu.etaxonomy.cdm.model.taxon.TaxonBase;
41

    
42
/**
43
 * @author a.mueller
44
 *
45
 * @param <CONFIG>
46
 * @param <STATE>
47
 */
48
public abstract class StreamImportBase<CONFIG extends StreamImportConfiguratorBase, STATE extends StreamImportStateBase<CONFIG,StreamImportBase>>
49
        extends CdmImportBase<CONFIG, STATE>{
50

    
51
    private static final long serialVersionUID = -125414263689509881L;
52
    private static final Logger logger = Logger.getLogger(StreamImportBase.class);
53

    
54

    
55
	protected void makeSourceRef(STATE state) {
56
		Reference sourceRef = state.getConfig().getSourceReference();
57
		getReferenceService().saveOrUpdate(sourceRef);
58
	}
59

    
60

    
61
	/**
62
	 * @param state
63
	 * @param itemStream
64
	 */
65
	protected void handleSingleRecord(STATE state, IItemStream recordStream) {
66
		recordStream.addObservers(state.getConfig().getObservers());
67

    
68
		if (state.getConfig().isUsePartitions()){
69
			IPartitionableConverter<StreamItem, IReader<CdmBase>, String> partitionConverter = getConverter(recordStream.getTerm(), state);
70
			if (partitionConverter == null){
71
				String warning = "No converter available for %s. Continue with next stream.";
72
				warning = String.format(warning, recordStream.getTerm());
73
				fireWarningEvent (warning, recordStream.toString(), 12);
74
				return;
75
			}
76

    
77
			int partitionSize = state.getConfig().getDefaultPartitionSize();
78

    
79
			ItemFilter<StreamItem> filter = partitionConverter.getItemFilter();
80
			IItemStream filteredStream = filter == null ? recordStream : new FilteredStream(recordStream, filter);
81
			StreamPartitioner<StreamItem> partitionStream = new StreamPartitioner(filteredStream,
82
					partitionConverter, state, partitionSize);//   (csvStream, streamConverter,state 1000);
83

    
84
			int i = 1;
85
			while (partitionStream.hasNext()){
86
				//FIXME more generic handling of transactions
87
				TransactionStatus tx = startTransaction();
88

    
89
				try {
90
					IReader<MappedCdmBase<? extends CdmBase>> partStream = partitionStream.read();
91

    
92
					fireProgressEvent("Handel " + i + ". partition", i + ". partition");
93
					logger.info("Handel " + i++ + ". partition");
94
					String location = "Location: partition stream (TODO)";
95
					handleResults(state, partStream, location);
96
					commitTransaction(tx);
97
				} catch (Exception e) {
98
				    e.printStackTrace();
99
					String message = "An exception occurred while handling partition: " + e;
100
					String codeLocation;
101
					if (e.getStackTrace().length > 0){
102
						StackTraceElement el = e.getStackTrace()[0];
103
						codeLocation = el.getClassName()+ "." + el.getMethodName() + "(" + el.getLineNumber() + ")";
104
					}else{
105
						codeLocation = "No stacktrace";
106
					}
107
					message = message + " in: " +  codeLocation;
108
					fireWarningEvent(message , String.valueOf(filteredStream.getItemLocation()) , 12);
109
					this.rollbackTransaction(tx);
110
				}
111

    
112
			}
113
			logger.debug("Partition stream is empty");
114
		}else {
115

    
116
			while (recordStream.hasNext()){
117
					TransactionStatus tx = startTransaction();
118

    
119
					StreamItem item = recordStream.read();
120
					handleStreamItem(state, item);
121

    
122
					commitTransaction(tx);
123
			}
124
		}
125

    
126
		finalizeStream(recordStream, state);
127
	}
128

    
129

    
130
	/**
131
	 * @param itemStream
132
	 * @param state
133
	 */
134
	protected void finalizeStream(IItemStream itemStream, STATE state) {
135
		fireWarningEvent("Stream finished", itemStream.getItemLocation(), 0);
136
	}
137

    
138

    
139
	/**
140
	 * @param state
141
	 * @param item
142
	 * @return
143
	 */
144
	private void handleStreamItem(STATE state, StreamItem item) {
145
		IConverter<StreamItem, IReader<CdmBase>, String> converter = getConverter(item.term, state);
146
		if (converter == null){
147
			state.setSuccess(false);
148
			return;
149
		}
150
		IReader<MappedCdmBase<? extends CdmBase>> resultReader = converter.map(item);
151
		handleResults(state, resultReader, item.getLocation());
152
		return;
153
	}
154

    
155

    
156
	/**
157
	 * @param state
158
	 * @param item
159
	 * @param resultReader
160
	 */
161
	private void handleResults(STATE state, IReader<MappedCdmBase<? extends CdmBase>> resultReader, String location) {
162
		while (resultReader.hasNext()){
163

    
164
			MappedCdmBase<?> mappedCdmBase = resultReader.read();
165
			CdmBase cdmBase = mappedCdmBase.getCdmBase();
166
			save(cdmBase, state, location);
167
			if (mappedCdmBase.getSourceId() != null && cdmBase.isInstanceOf(IdentifiableEntity.class)){
168
				IdentifiableEntity<?> entity = CdmBase.deproxy(cdmBase, IdentifiableEntity.class);
169

    
170
				String namespace = mappedCdmBase.getNamespace();
171
				state.putMapping(namespace, mappedCdmBase.getSourceId(), entity);
172
			}
173
		}
174
	}
175

    
176

    
177

    
178
//	private void handlePartitionedStreamItem(DwcaImportState state,  StreamPartitioner<CsvStreamItem> partitionStream) {
179
//		IPartitionableConverter<CsvStreamItem, IReader<CdmBase>, String> converter = getConverter(partitionStream.getTerm(), state);
180
//		if (converter == null){
181
//			state.setSuccess(false);
182
//			return;
183
//		}
184
//
185
//		IReader<CsvStreamItem> lookaheadStream = partitionStream.getLookaheadReader();
186
//		Map<String, Set<String>> foreignKeys = converter.getPartitionForeignKeys(lookaheadStream);
187
//		IImportMapping mapping = state.getMapping();
188
//		IImportMapping partialMapping = mapping.getPartialMapping(foreignKeys);
189
//		state.loadRelatedObjects(partialMapping);
190
//
191
//		ConcatenatingReader<MappedCdmBase> reader = new ConcatenatingReader<MappedCdmBase>();
192
//
193
//		IReader<CsvStreamItem> inputStream = partitionStream.read();
194
//		while (inputStream.hasNext()){
195
//			IReader<MappedCdmBase> resultReader = converter.map(inputStream.read());
196
//			reader.add(resultReader);
197
//		}
198
//
199
//		while (reader.hasNext()){
200
//			MappedCdmBase mappedCdmBase = (reader.read());
201
//			CdmBase cdmBase = mappedCdmBase.getCdmBase();
202
//			//locate
203
//			//TODO find a way to define the location
204
//			String location = "partitionStream";
205
//			//save
206
//			save(cdmBase, state, location);
207
//			//store in mapping
208
//			if (mappedCdmBase.getSourceId() != null && cdmBase.isInstanceOf(IdentifiableEntity.class)){
209
//				IdentifiableEntity<?> entity = CdmBase.deproxy(cdmBase, IdentifiableEntity.class);
210
//				String namespace = mappedCdmBase.getNamespace();
211
//				//TODO also store in partition mapping
212
//				state.putMapping(namespace,mappedCdmBase.getSourceId(), entity);
213
//			}
214
//		}
215
//		return;
216
//	}
217

    
218
	protected void save(CdmBase cdmBase, STATE state, String location) {
219
		if (state.isCheck()){
220
			//do nothing
221
		}else{
222
			if (cdmBase == null){
223
				logger.warn("cdmBase is null");
224
			}
225
			//start preliminary for testing
226
			IIdentifiableEntityService service;
227
			try {
228
				if (cdmBase.isInstanceOf(IdentifiableEntity.class)){
229
					service = getServiceByClass(cdmBase.getClass());
230
					if (service != null){
231
						IdentifiableEntity<?> entity = CdmBase.deproxy(cdmBase, IdentifiableEntity.class);
232
						service.saveOrUpdate(entity);
233
					}
234
				}
235
			} catch (IllegalArgumentException e) {
236
				fireWarningEvent(e.getMessage(), location, 12);
237
			}
238

    
239
//			System.out.println(cdmBase.toString());
240
			//end preliminary
241

    
242
			//TODO
243
		}
244
	}
245

    
246
	protected abstract IPartitionableConverter<StreamItem,IReader<CdmBase>, String> getConverter(TermUri namespace, STATE state);
247

    
248

    
249
	/**
250
	 * Returns an appropriate service to persist data of a certain class.
251
	 * If an appropriate service can't be found an {@link IllegalArgumentException} is thrown.
252
	 *
253
	 * TODO move to a more general place to make it available to everyone.
254
	 *
255
	 * @param app
256
	 * @param clazz
257
	 * @return
258
	 */
259
	protected IIdentifiableEntityService getServiceByClass(Class<?> clazz)  throws IllegalArgumentException {
260
		if (clazz == null){
261
			//throw exception below
262
		}else if (TaxonBase.class.isAssignableFrom(clazz)){
263
			return this.getTaxonService();
264
		}else if (Classification.class.isAssignableFrom(clazz)){
265
			return this.getClassificationService();
266
		}else if (Reference.class.isAssignableFrom(clazz)){
267
			return this.getReferenceService();
268
		}else if (TaxonName.class.isAssignableFrom(clazz)){
269
			return this.getNameService();
270
		}else if (DefinedTermBase.class.isAssignableFrom(clazz)){
271
			return this.getTermService();
272
		}else if (DescriptionBase.class.isAssignableFrom(clazz)){
273
			return this.getDescriptionService();
274
		}else if (SpecimenOrObservationBase.class.isAssignableFrom(clazz)){
275
			return this.getOccurrenceService();
276
		}else if (Collection.class.isAssignableFrom(clazz)){
277
			return this.getCollectionService();
278
		}else if (AgentBase.class.isAssignableFrom(clazz)){
279
			return this.getDescriptionService();
280
		}
281
		String warning = "Can't map class to API service: %s";
282
		warning = String.format(warning, (clazz == null ? "-" : clazz.getName()));
283
		throw new IllegalArgumentException(warning);
284
	}
285

    
286

    
287
	/**
288
	 * Saves a new term. Immediate saving is required to avoid by Transient-Object-Exceptions.
289
	 * @param newTerm
290
	 */
291
	public void saveNewTerm(DefinedTermBase newTerm) {
292
		getTermService().save(newTerm);
293
	}
294

    
295

    
296
    //Make public to allow to use by converters
297
    @Override
298
    public Feature getFeature(STATE state, UUID uuid, String label, String description, String labelAbbrev, TermVocabulary<Feature> voc) {
299
        return super.getFeature(state, uuid, label, description, labelAbbrev, voc);
300
    }
301

    
302
    /**
303
     * {@inheritDoc}
304
     *
305
     * If uuid is null a random one is created.
306
     */
307
    @Override
308
    public Language getLanguage(STATE state,
309
            UUID uuid, String label, String text, String labelAbbrev, TermVocabulary voc) {
310
        if (uuid == null){
311
            uuid = UUID.randomUUID();
312
        }
313
        return super.getLanguage(state, uuid, label, text, labelAbbrev, voc);
314
    }
315

    
316
    public NamedArea getNamedArea(STATE state, UUID namedAreaUuid,
317
            String label, String description, String abbrevLabel, TermVocabulary voc) {
318
        return super.getNamedArea(state, namedAreaUuid, label, description, abbrevLabel,
319
                null, null, voc, null);
320
    }
321

    
322

    
323

    
324
}
(2-2/5)