Project

General

Profile

Download (2.81 KB) Statistics
| Branch: | Tag: | Revision:
1
/**
2
* Copyright (C) 2007 EDIT
3
* European Distributed Institute of Taxonomy 
4
* http://www.e-taxonomy.eu
5
* 
6
* The contents of this file are subject to the Mozilla Public License Version 1.1
7
* See LICENSE.TXT at the top of this package for the full license terms.
8
*/
9
package eu.etaxonomy.cdm.io.dwca.in;
10

    
11
import java.util.ArrayList;
12
import java.util.List;
13
import java.util.Map;
14
import java.util.Set;
15

    
16
import org.apache.log4j.Logger;
17

    
18
import eu.etaxonomy.cdm.io.dwca.TermUri;
19

    
20

    
21
/**
22
 * @author a.mueller
23
 *
24
 */
25
public class StreamPartitioner<ITEM extends IConverterInput>  implements INamespaceReader<IReader<MappedCdmBase>>{
26
	@SuppressWarnings("unused")
27
	private static final Logger logger = Logger.getLogger(StreamPartitioner.class);
28
	
29
	private int partitionSize;
30
	private LookAheadStream<ITEM> inStream;
31
	private IPartitionableConverter converter;
32
	private DwcaImportState state;
33
	private ConcatenatingReader<MappedCdmBase> outStream;
34
	
35
	public StreamPartitioner(INamespaceReader<ITEM> input, IPartitionableConverter converter, DwcaImportState state, Integer size){
36
		 this.inStream = new LookAheadStream<ITEM>(input);
37
		 this.converter = converter;
38
		 this.partitionSize = size;
39
		 this.state = state;
40
		 initNewOutStream();
41
	}
42
	
43

    
44
	private void initNewOutStream(){
45
		outStream = new ConcatenatingReader<MappedCdmBase>();
46
	}
47
	
48
	/* (non-Javadoc)
49
	 * @see eu.etaxonomy.cdm.io.dwca.in.IReader#hasNext()
50
	 */
51
	public boolean hasNext() {
52
		if (this.outStream.hasNext()){
53
			return true;
54
		}else{
55
			return inStream.hasNext();  //TODO what, if converter returns no ouput for inStream.hasNext() ??
56
			//but be aware that requesting the next object from the next partition crosses the transactional borders 
57
		}
58
	}
59
	
60
	@Override
61
	public IReader<MappedCdmBase> read() {
62
		logger.debug("Start partitioner read");
63
		handleNextPartition();
64
		IReader<MappedCdmBase> result = this.outStream;
65
		
66
		initNewOutStream();
67
		logger.debug("End partitioner read");
68
		return result;
69
	}
70
	
71
	private void handleNextPartition(){
72

    
73
		List<ITEM> lookaheadArray = new ArrayList<ITEM>();
74
		while (this.inStream.hasNextLookAhead(partitionSize)){
75
			lookaheadArray.add(this.inStream.readLookAhead());
76
		}
77
		
78
		IReader<ITEM> lookaheadStream = new ListReader<ITEM>(lookaheadArray);
79
		
80
		Map<String, Set<String>> foreignKeys = converter.getPartitionForeignKeys(lookaheadStream);
81
		IImportMapping mapping = state.getMapping();
82
		IImportMapping partialMapping = mapping.getPartialMapping(foreignKeys);
83
		state.loadRelatedObjects(partialMapping);
84
		
85
		
86
		while (inStream.isLookingAhead() && inStream.hasNext()){
87
			IReader<MappedCdmBase> resultReader = converter.map(inStream.read());
88
			outStream.add(resultReader);
89
		}
90
			
91
		return;
92

    
93
	}
94

    
95
	
96
	@Override
97
	public TermUri getTerm() {
98
		return inStream.getTerm();
99
	}
100
	
101

    
102
}
(27-27/27)