1 |
26658899
|
Andreas Müller
|
/**
|
2 |
|
|
* Copyright (C) 2007 EDIT
|
3 |
|
|
* European Distributed Institute of Taxonomy
|
4 |
|
|
* http://www.e-taxonomy.eu
|
5 |
|
|
*
|
6 |
|
|
* The contents of this file are subject to the Mozilla Public License Version 1.1
|
7 |
|
|
* See LICENSE.TXT at the top of this package for the full license terms.
|
8 |
|
|
*/
|
9 |
|
|
package eu.etaxonomy.cdm.io.dwca.in;
|
10 |
|
|
|
11 |
|
|
import java.util.ArrayList;
|
12 |
|
|
import java.util.List;
|
13 |
c3be32ca
|
Andreas Müller
|
import java.util.Map;
|
14 |
|
|
import java.util.Set;
|
15 |
26658899
|
Andreas Müller
|
|
16 |
|
|
import org.apache.log4j.Logger;
|
17 |
|
|
|
18 |
|
|
import eu.etaxonomy.cdm.io.dwca.TermUri;
|
19 |
202541e6
|
Alexander Oppermann
|
import eu.etaxonomy.cdm.io.stream.StreamImportBase;
|
20 |
|
|
import eu.etaxonomy.cdm.io.stream.StreamImportConfiguratorBase;
|
21 |
|
|
import eu.etaxonomy.cdm.io.stream.StreamImportStateBase;
|
22 |
0add22f5
|
Andreas Müller
|
import eu.etaxonomy.cdm.model.common.CdmBase;
|
23 |
|
|
import eu.etaxonomy.cdm.model.common.IdentifiableEntity;
|
24 |
c88bcdaa
|
Andreas Müller
|
import eu.etaxonomy.cdm.model.reference.Reference;
|
25 |
26658899
|
Andreas Müller
|
|
26 |
|
|
|
27 |
|
|
/**
|
28 |
|
|
* @author a.mueller
|
29 |
|
|
*
|
30 |
|
|
*/
|
31 |
09b9d4c2
|
Andreas Müller
|
public class StreamPartitioner<ITEM extends IConverterInput> implements INamespaceReader<IReader<MappedCdmBase>>{
|
32 |
26658899
|
Andreas Müller
|
private static final Logger logger = Logger.getLogger(StreamPartitioner.class);
|
33 |
|
|
|
34 |
|
|
private int partitionSize;
|
35 |
c3be32ca
|
Andreas Müller
|
private LookAheadStream<ITEM> inStream;
|
36 |
|
|
private IPartitionableConverter converter;
|
37 |
202541e6
|
Alexander Oppermann
|
private StreamImportStateBase<StreamImportConfiguratorBase, StreamImportBase> state;
|
38 |
09b9d4c2
|
Andreas Müller
|
private ConcatenatingReader<MappedCdmBase> outStream;
|
39 |
26658899
|
Andreas Müller
|
|
40 |
202541e6
|
Alexander Oppermann
|
public StreamPartitioner(INamespaceReader<ITEM> input, IPartitionableConverter converter, StreamImportStateBase state, Integer size){
|
41 |
c3be32ca
|
Andreas Müller
|
this.inStream = new LookAheadStream<ITEM>(input);
|
42 |
26658899
|
Andreas Müller
|
this.converter = converter;
|
43 |
|
|
this.partitionSize = size;
|
44 |
c3be32ca
|
Andreas Müller
|
this.state = state;
|
45 |
09b9d4c2
|
Andreas Müller
|
initNewOutStream();
|
46 |
26658899
|
Andreas Müller
|
}
|
47 |
|
|
|
48 |
|
|
|
49 |
09b9d4c2
|
Andreas Müller
|
private void initNewOutStream(){
|
50 |
|
|
outStream = new ConcatenatingReader<MappedCdmBase>();
|
51 |
|
|
}
|
52 |
|
|
|
53 |
c3be32ca
|
Andreas Müller
|
/* (non-Javadoc)
|
54 |
|
|
* @see eu.etaxonomy.cdm.io.dwca.in.IReader#hasNext()
|
55 |
|
|
*/
|
56 |
26658899
|
Andreas Müller
|
public boolean hasNext() {
|
57 |
c3be32ca
|
Andreas Müller
|
if (this.outStream.hasNext()){
|
58 |
|
|
return true;
|
59 |
|
|
}else{
|
60 |
|
|
return inStream.hasNext(); //TODO what, if converter returns no ouput for inStream.hasNext() ??
|
61 |
09b9d4c2
|
Andreas Müller
|
//but be aware that requesting the next object from the next partition crosses the transactional borders
|
62 |
c3be32ca
|
Andreas Müller
|
}
|
63 |
26658899
|
Andreas Müller
|
}
|
64 |
c3be32ca
|
Andreas Müller
|
|
65 |
26658899
|
Andreas Müller
|
@Override
|
66 |
09b9d4c2
|
Andreas Müller
|
public IReader<MappedCdmBase> read() {
|
67 |
|
|
logger.debug("Start partitioner read");
|
68 |
|
|
handleNextPartition();
|
69 |
|
|
IReader<MappedCdmBase> result = this.outStream;
|
70 |
|
|
|
71 |
|
|
initNewOutStream();
|
72 |
|
|
logger.debug("End partitioner read");
|
73 |
|
|
return result;
|
74 |
c3be32ca
|
Andreas Müller
|
}
|
75 |
|
|
|
76 |
|
|
private void handleNextPartition(){
|
77 |
|
|
|
78 |
|
|
List<ITEM> lookaheadArray = new ArrayList<ITEM>();
|
79 |
|
|
while (this.inStream.hasNextLookAhead(partitionSize)){
|
80 |
|
|
lookaheadArray.add(this.inStream.readLookAhead());
|
81 |
26658899
|
Andreas Müller
|
}
|
82 |
|
|
|
83 |
c3be32ca
|
Andreas Müller
|
IReader<ITEM> lookaheadStream = new ListReader<ITEM>(lookaheadArray);
|
84 |
|
|
|
85 |
|
|
Map<String, Set<String>> foreignKeys = converter.getPartitionForeignKeys(lookaheadStream);
|
86 |
|
|
IImportMapping mapping = state.getMapping();
|
87 |
60837fa6
|
Andreas Müller
|
InMemoryMapping partialMapping = mapping.getPartialMapping(foreignKeys);
|
88 |
c88bcdaa
|
Andreas Müller
|
Reference<?> sourceRef = state.getCurrentIO().getReferenceService().find(state.getConfig().getSourceRefUuid());
|
89 |
|
|
partialMapping.putMapping(TermUri.CDM_SOURCE_REFERENCE.toString(), state.getConfig().getSourceRefUuid().toString(), sourceRef);
|
90 |
0add22f5
|
Andreas Müller
|
|
91 |
c3be32ca
|
Andreas Müller
|
state.loadRelatedObjects(partialMapping);
|
92 |
c88bcdaa
|
Andreas Müller
|
|
93 |
c3be32ca
|
Andreas Müller
|
while (inStream.isLookingAhead() && inStream.hasNext()){
|
94 |
|
|
IReader<MappedCdmBase> resultReader = converter.map(inStream.read());
|
95 |
0add22f5
|
Andreas Müller
|
List<MappedCdmBase> resultList = new ArrayList<MappedCdmBase>(); //maybe better let converter return list from the beginning
|
96 |
|
|
while (resultReader.hasNext()){
|
97 |
|
|
MappedCdmBase item = resultReader.read();
|
98 |
|
|
resultList.add(item);
|
99 |
|
|
addItemToRelatedObjects(item);
|
100 |
|
|
}
|
101 |
|
|
outStream.add(new ListReader<MappedCdmBase>(resultList));
|
102 |
c3be32ca
|
Andreas Müller
|
}
|
103 |
|
|
|
104 |
|
|
return;
|
105 |
|
|
|
106 |
26658899
|
Andreas Müller
|
}
|
107 |
c3be32ca
|
Andreas Müller
|
|
108 |
26658899
|
Andreas Müller
|
|
109 |
0add22f5
|
Andreas Müller
|
/**
|
110 |
|
|
* Add new items to the local mapping
|
111 |
|
|
* @param item
|
112 |
|
|
*/
|
113 |
|
|
private void addItemToRelatedObjects(MappedCdmBase<IdentifiableEntity> item) {
|
114 |
|
|
CdmBase cdmBase = item.getCdmBase();
|
115 |
|
|
if (cdmBase.getId() == 0){
|
116 |
|
|
if (cdmBase.isInstanceOf(IdentifiableEntity.class)){
|
117 |
|
|
if (converter.requiredSourceNamespaces().contains(item.getNamespace())){
|
118 |
|
|
state.addRelatedObject(item.getNamespace(), item.getSourceId(), item.getCdmBase());
|
119 |
|
|
}
|
120 |
|
|
}
|
121 |
|
|
}
|
122 |
|
|
}
|
123 |
|
|
|
124 |
|
|
|
125 |
26658899
|
Andreas Müller
|
@Override
|
126 |
|
|
public TermUri getTerm() {
|
127 |
c3be32ca
|
Andreas Müller
|
return inStream.getTerm();
|
128 |
26658899
|
Andreas Müller
|
}
|
129 |
|
|
|
130 |
|
|
|
131 |
|
|
}
|