Project

General

Profile

Download (6.14 KB) Statistics
| Branch: | Tag: | Revision:
1
/**
2
* Copyright (C) 2007 EDIT
3
* European Distributed Institute of Taxonomy
4
* http://www.e-taxonomy.eu
5
*
6
* The contents of this file are subject to the Mozilla Public License Version 1.1
7
* See LICENSE.TXT at the top of this package for the full license terms.
8
*/
9

    
10
package eu.etaxonomy.cdm.io.common;
11

    
12
import java.util.List;
13
import java.util.concurrent.Callable;
14
import java.util.concurrent.ExecutionException;
15
import java.util.concurrent.ExecutorService;
16
import java.util.concurrent.Executors;
17
import java.util.concurrent.Future;
18

    
19
import org.apache.log4j.Logger;
20

    
21
import eu.etaxonomy.cdm.api.application.CdmApplicationController;
22
import eu.etaxonomy.cdm.api.application.ICdmRepository;
23
import eu.etaxonomy.cdm.common.concurrent.ConcurrentQueue;
24
import eu.etaxonomy.cdm.common.monitor.IProgressMonitor;
25
import eu.etaxonomy.cdm.database.DbSchemaValidation;
26
import eu.etaxonomy.cdm.database.ICdmDataSource;
27
import eu.etaxonomy.cdm.database.ICdmImportSource;
28
import eu.etaxonomy.cdm.filter.TaxonNodeFilter;
29
import eu.etaxonomy.cdm.model.taxon.TaxonNode;
30

    
31
/**
32
 * @author a.mueller
33
 * @since 01.07.2017
34
 */
35
public class TaxonNodeOutStreamPartitionerConcurrent implements ITaxonNodeOutStreamPartitioner  {
36

    
37
    @SuppressWarnings("unused")
38
    private static final Logger logger = Logger.getLogger(TaxonNodeOutStreamPartitionerConcurrent.class);
39

    
40
  //*********************** VARIABLES *************************************************/
41

    
42
    private ICdmRepository repository;
43

    
44
    private ICdmImportSource source;
45

    
46
    private ConcurrentQueue<TaxonNode> queue;
47

    
48
    private boolean readOnly = true;
49

    
50
    private TaxonNodeOutStreamPartitioner<?> innerPartitioner;
51

    
52
    private Future<ICdmRepository> repoFuture;
53

    
54
    private Integer partitionSize;
55
    private IProgressMonitor parentMonitor;
56
    private Integer parentTicks;
57
    private List<String> propertyPaths;
58
    private TaxonNodeFilter filter;
59

    
60
//************************* STATIC ***************************************************/
61

    
62
	public static TaxonNodeOutStreamPartitionerConcurrent NewInstance(
63
	        ICdmImportSource source, TaxonNodeFilter filter, Integer partitionSize,
64
            IProgressMonitor parentMonitor, Integer parentTicks){
65

    
66
	    TaxonNodeOutStreamPartitionerConcurrent taxonNodePartitionerThread
67
		        = new TaxonNodeOutStreamPartitionerConcurrent(source, filter, partitionSize,
68
		                parentMonitor, parentTicks, null);
69
		return taxonNodePartitionerThread;
70
	}
71

    
72
    public static ITaxonNodeOutStreamPartitioner NewInstance(ICdmImportSource source, TaxonNodeFilter filter,
73
            int partitionSize, IProgressMonitor parentMonitor, Integer parentTicks, List<String> fullpropertypaths) {
74

    
75
        TaxonNodeOutStreamPartitionerConcurrent taxonNodePartitionerThread
76
            = new TaxonNodeOutStreamPartitionerConcurrent(source, filter, partitionSize,
77
                parentMonitor, parentTicks, fullpropertypaths);
78
        return taxonNodePartitionerThread;
79
    }
80

    
81
//*********************** CONSTRUCTOR *************************************************/
82

    
83
	private TaxonNodeOutStreamPartitionerConcurrent(ICdmImportSource source,
84
	        TaxonNodeFilter filter, Integer partitionSize,
85
	        IProgressMonitor parentMonitor, Integer parentTicks, List<String> propertyPaths){
86

    
87
	    this.source = source;
88
	    this.queue = new ConcurrentQueue<>(10);
89
	    this.repoFuture = getExecutorService().submit(repoCall);
90
	    this.partitionSize = partitionSize;
91
	    this.parentMonitor = parentMonitor;
92
	    this.parentTicks = parentTicks;
93
	    this.propertyPaths = propertyPaths;
94
	    this.filter = filter;
95

    
96
	}
97

    
98
//************************ METHODS ****************************************************/
99
	boolean isStarted = false;
100
	private ExecutorService es;
101

    
102

    
103
    public void initialize(){
104
	    if (isStarted){
105
	        return;
106
	    }
107
	    getExecutorService().submit(()->{
108
            try {
109
                ICdmRepository repo = repoFuture.get();
110
                innerPartitioner = TaxonNodeOutStreamPartitioner.NewInstance(
111
                        repo, null, filter, partitionSize, parentMonitor, parentTicks, propertyPaths);
112

    
113
                //state = null
114
                TaxonNodeOutStreamPartitioner<?> partitioner = innerPartitioner;
115
                partitioner.setReadOnly(readOnly);
116
                 try {
117
                     TaxonNode node = partitioner.next();
118
                     while (node!= null) {
119
                        this.queue.enqueue(node);
120
                        node = partitioner.next();
121
                    }
122
                } catch (InterruptedException ex) {
123
                    System.out.println(Thread.currentThread().getName() +
124
                            " interrupted");
125
                }
126
            } catch (ExecutionException | InterruptedException e) {
127
                // TODO Auto-generated catch block
128
                e.printStackTrace();
129
            }
130
        });
131
	    isStarted = true;
132
	}
133

    
134

    
135
    @Override
136
    public TaxonNode next(){
137
	    initialize();
138
	    try {
139
            return queue.dequeue();
140
        } catch (InterruptedException e) {
141
            // TODO Auto-generated catch block
142
            e.printStackTrace();
143
            return null;
144
        }
145
	}
146

    
147
    private Callable<ICdmRepository> repoCall = ()->{
148
        if (repository == null){
149
            if (source instanceof ICdmRepository){
150
                repository = (ICdmRepository)source;
151
            }else if (source instanceof ICdmDataSource){
152
                System.out.println("start source repo");
153
                boolean omitTermLoading = true;
154
                repository = CdmApplicationController.NewInstance((ICdmDataSource)source,
155
                        DbSchemaValidation.VALIDATE, omitTermLoading);
156
                System.out.println("end source repo");
157
            }else{
158
                throw new IllegalStateException("Unsupported ICdmImportSource type");
159
            }
160
        }
161
        return repository;
162
    };
163

    
164

    
165
    @Override
166
    public void setReadOnly(boolean readOnly) {
167
        this.readOnly = readOnly;
168
    }
169

    
170

    
171
    private ExecutorService getExecutorService() {
172
        if (es == null){
173
            es = Executors.newSingleThreadExecutor();
174
        }
175
        return es;
176
    }
177

    
178
    @Override
179
    public void close() {
180
        // TODO Auto-generated method stub
181
    }
182

    
183

    
184
}
(58-58/65)