Project

General

Profile

Download (6.65 KB) Statistics
| Branch: | Tag: | Revision:
1
/**
2
* Copyright (C) 2016 EDIT
3
* European Distributed Institute of Taxonomy
4
* http://www.e-taxonomy.eu
5
*
6
* The contents of this file are subject to the Mozilla Public License Version 1.1
7
* See LICENSE.TXT at the top of this package for the full license terms.
8
*/
9
package eu.etaxonomy.cdm.common;
10

    
11
import java.util.ArrayList;
12
import java.util.Iterator;
13
import java.util.List;
14

    
15
import org.apache.log4j.Logger;
16

    
17
/**
18
 * DynamicBatch: a JVM resources aware batch manager.
19
 *
20
 * @author a.kohlbecker
21
 \* @since Jul 4, 2016
22
 *
23
 */
24
public class DynamicBatch {
25

    
26
    public static final Logger logger = Logger.getLogger(DynamicBatch.class);
27

    
28
    int batchSize;
29
    int batchItemCount = -1;
30
    Long batchMinFreeHeap = null;
31
    List<Integer> items = null;
32

    
33
    int gcTimeIncreaseCount = 0;
34

    
35
    private int allowedGcIncreases = -1;
36

    
37
    private int itemWhereLimitsTouched = 0;
38

    
39

    
40
    List<Integer> unprocessedIds = new ArrayList<Integer>(batchSize);
41

    
42
    private final JvmMonitor jvmMonitor = new JvmMonitor();
43

    
44
    private final long intitialFreeHeap;
45

    
46
    public DynamicBatch(int initialSize) {
47

    
48
        this.batchSize = initialSize;
49
        this.intitialFreeHeap = jvmMonitor.getFreeHeap(true);
50
    }
51

    
52
    public DynamicBatch(int initialSize, long minInitialHeap) throws JvmLimitsException {
53

    
54
        this.batchSize = initialSize;
55
        this.intitialFreeHeap = jvmMonitor.getFreeHeap(true);
56
        if(this.intitialFreeHeap < minInitialHeap) {
57
            throw new JvmLimitsException("At least " + minInitialHeap + " byte of free Heap space required but only " + intitialFreeHeap + " byte available.");
58
        }
59
    }
60

    
61
    /**
62
     * @param requiredFreeHeap
63
     * @throws JvmLimitsException
64
     */
65
    public void setRequiredFreeHeap(double requiredFreeHeap) throws JvmLimitsException {
66

    
67
        this.batchMinFreeHeap = (long) (intitialFreeHeap * requiredFreeHeap);
68
        if(memoryLimitsExceeded()) {
69
            throw new JvmLimitsException("Not enough free heap for batch");
70
        }
71
    }
72

    
73
    /**
74
     * @param requiredFreeHeap
75
     * @throws JvmLimitsException
76
     */
77
    public void setRequiredFreeHeap(long requiredFreeHeap) throws JvmLimitsException {
78
        this.batchMinFreeHeap = requiredFreeHeap;
79
        if(memoryLimitsExceeded()) {
80
            throw new JvmLimitsException("Not enough free heap for batch");
81
        }
82
    }
83

    
84
    /**
85
     *
86
     * @param allowedGcIncreases the amount of continiously observed increases of the gc time
87
     */
88
    public void setMaxAllowedGcIncreases(int allowedGcIncreases) {
89
        this.allowedGcIncreases = allowedGcIncreases;
90
        // reset GCtime
91
        jvmMonitor.getGCtimeSiceLastCheck();
92
    }
93

    
94
    public int size() {
95
        return batchSize;
96
    }
97

    
98
    public  List<Integer> items(){
99
        return items;
100
    }
101

    
102
    public boolean hasUnprocessedItems() {
103
        return unprocessedIds.size() > 0;
104
    }
105

    
106
    /**
107
     * 1. Fills all remaining items into the new batch and pads with next items from the iterator.
108
     *
109
     * 2. Resets the internal batchItemCount!!
110
     *
111
     * @param itemIterator
112
     * @return
113
     */
114
    public List<Integer> nextItems(Iterator<Integer> itemIterator){
115

    
116
        logger.debug("new batch of items with size of " + batchSize);
117
        items = new ArrayList<Integer>(batchSize);
118
        if(unprocessedIds.size() > 0) {
119
            List<Integer> remainingUnprocessed = null;
120
            Iterator<Integer> unprocessedIt = unprocessedIds.iterator();
121
            int i = 0;
122
            while(unprocessedIt.hasNext()) {
123
                Integer nextUnprocessed = unprocessedIt.next();
124
                if(i < batchSize) {
125
                   items.add(nextUnprocessed);
126
                } else {
127
                    if(remainingUnprocessed == null) {
128
                        remainingUnprocessed = new ArrayList<Integer>(unprocessedIds.size() - i + 1);
129
                    }
130
                    remainingUnprocessed.add(nextUnprocessed);
131
                }
132
                i++;
133
            }
134
            unprocessedIds.clear();
135
            if(remainingUnprocessed != null) {
136
                unprocessedIds = remainingUnprocessed;
137
            }
138
        }
139

    
140
        while(itemIterator.hasNext() && items.size() < batchSize ) {
141
            items.add(itemIterator.next());
142
        }
143

    
144
        itemWhereLimitsTouched =  0;
145
        batchItemCount = 0;
146

    
147
        return items;
148
    }
149

    
150
    public void incementCounter() {
151
        batchItemCount++;
152
    }
153

    
154
    /**
155
     *
156
     */
157
    private void reduceSize() {
158
        manageUnprocessedItems();
159
        batchSize = itemWhereLimitsTouched;
160
        if(batchSize < 1) {
161
            batchSize = 1;
162
        }
163
    }
164

    
165
    public void reduceSize(double by) {
166
        manageUnprocessedItems();
167
        batchSize = (int) (batchSize * by);
168
        if(batchSize < 1) {
169
            batchSize = 1;
170
        }
171
    }
172

    
173
    /**
174
     *
175
     */
176
    protected void manageUnprocessedItems() {
177

    
178
        if(itemWhereLimitsTouched > 0) {
179
            int batchItemsUnprocessed = items.size() - itemWhereLimitsTouched;
180
            logger.info("batchSize reduced to " + itemWhereLimitsTouched);
181
            if(batchItemsUnprocessed > 0) {
182
                unprocessedIds.addAll(items.subList(items.size() - batchItemsUnprocessed, items.size()));
183
            }
184
        }
185
    }
186

    
187

    
188
    public boolean isWithinJvmLimits() {
189
        if(memoryLimitsExceeded()) {
190
            logger.info("memoryLimitsExceeded ==> reducing batchSize");
191
            reduceSize();
192
            return false;
193
        }
194
        if(allowedGcIncreases > 0 && gcLimitsExceeded()) {
195
            logger.info("gcIncreaseLimitExceeded ==> reducing batchSize");
196
            reduceSize();
197
            return false;
198
        }
199

    
200
        return true;
201
    }
202

    
203
    public boolean gcLimitsExceeded() {
204

    
205
        long gctimeSiceLastTime = jvmMonitor.getGCtimeSiceLastCheck();
206
        if(gctimeSiceLastTime > 0) {
207
            if(gcTimeIncreaseCount == 0) {
208
                itemWhereLimitsTouched  = batchItemCount;
209
            }
210
            gcTimeIncreaseCount++;
211
            logger.debug("gctimeSiceLastTime: " + gctimeSiceLastTime + ", gcTimeIncreaseCount: " + gcTimeIncreaseCount);
212
        } else {
213
            gcTimeIncreaseCount = 0;
214
            itemWhereLimitsTouched = 0;
215
        }
216
        return gcTimeIncreaseCount > allowedGcIncreases;
217

    
218
    }
219

    
220

    
221
    public boolean memoryLimitsExceeded() {
222

    
223
        if(!jvmMonitor.hasFreeHeap(batchMinFreeHeap)) {
224
            if(batchItemCount > -1) { // not in initial state, that it before first batch
225
                itemWhereLimitsTouched  = batchItemCount;
226
            }
227
            logger.debug("min free heap limit (" + batchMinFreeHeap + ") exceeded ");
228
            return true;
229
        } else {
230
            return false;
231
        }
232

    
233
    }
234

    
235
    public JvmMonitor getJvmMonitor() {
236
        return jvmMonitor;
237
    }
238

    
239
}
(8-8/21)