Project

General

Profile

Download (6.6 KB) Statistics
| Branch: | Tag: | Revision:
1
/**
2
* Copyright (C) 2016 EDIT
3
* European Distributed Institute of Taxonomy
4
* http://www.e-taxonomy.eu
5
*
6
* The contents of this file are subject to the Mozilla Public License Version 1.1
7
* See LICENSE.TXT at the top of this package for the full license terms.
8
*/
9
package eu.etaxonomy.cdm.common;
10

    
11
import java.util.ArrayList;
12
import java.util.Iterator;
13
import java.util.List;
14

    
15
import org.apache.log4j.Logger;
16

    
17
/**
18
 * DynamicBatch: a JVM resources aware batch manager.
19
 *
20
 * @author a.kohlbecker
21
 * @since Jul 4, 2016
22
 *
23
 */
24
public class DynamicBatch {
25

    
26
    public static final Logger logger = Logger.getLogger(DynamicBatch.class);
27

    
28
    int batchSize;
29
    int batchItemCount = -1;
30
    Long batchMinFreeHeap = null;
31
    List<Integer> items = null;
32

    
33
    int gcTimeIncreaseCount = 0;
34

    
35
    private int allowedGcIncreases = -1;
36

    
37
    private int itemWhereLimitsTouched = 0;
38

    
39

    
40
    List<Integer> unprocessedIds = new ArrayList<Integer>(batchSize);
41

    
42
    private final JvmMonitor jvmMonitor = new JvmMonitor();
43

    
44
    private final long intitialFreeHeap;
45

    
46
    public DynamicBatch(int initialSize) {
47

    
48
        this.batchSize = initialSize;
49
        this.intitialFreeHeap = jvmMonitor.getFreeHeap(true);
50
    }
51

    
52
    public DynamicBatch(int initialSize, long minInitialHeap) throws JvmLimitsException {
53

    
54
        this.batchSize = initialSize;
55
        this.intitialFreeHeap = jvmMonitor.getFreeHeap(true);
56
        if(this.intitialFreeHeap < minInitialHeap) {
57
            throw new JvmLimitsException("At least " + minInitialHeap + " byte of free Heap space required but only " + intitialFreeHeap + " byte available.");
58
        }
59
    }
60

    
61
    /**
62
     * @param requiredFreeHeap
63
     * @throws JvmLimitsException
64
     */
65
    public void setRequiredFreeHeap(double requiredFreeHeap) throws JvmLimitsException {
66

    
67
        this.batchMinFreeHeap = (long) (intitialFreeHeap * requiredFreeHeap);
68
        if(memoryLimitsExceeded()) {
69
            throw new JvmLimitsException("Not enough free heap for batch");
70
        }
71
    }
72

    
73
    /**
74
     * @param requiredFreeHeap
75
     * @throws JvmLimitsException
76
     */
77
    public void setRequiredFreeHeap(long requiredFreeHeap) throws JvmLimitsException {
78
        this.batchMinFreeHeap = requiredFreeHeap;
79
        if(memoryLimitsExceeded()) {
80
            throw new JvmLimitsException("Not enough free heap for batch");
81
        }
82
    }
83

    
84
    /**
85
     *
86
     * @param allowedGcIncreases the amount of continuously observed increases of the gc time
87
     */
88
    public void setMaxAllowedGcIncreases(int allowedGcIncreases) {
89
        this.allowedGcIncreases = allowedGcIncreases;
90
        // reset GCtime
91
        jvmMonitor.getGCtimeSiceLastCheck();
92
    }
93

    
94
    public int size() {
95
        return batchSize;
96
    }
97

    
98
    public  List<Integer> items(){
99
        return items;
100
    }
101

    
102
    public boolean hasUnprocessedItems() {
103
        return unprocessedIds.size() > 0;
104
    }
105

    
106
    /**
107
     * 1. Fills all remaining items into the new batch and pads with next items from the iterator.
108
     *
109
     * 2. Resets the internal batchItemCount!!
110
     *
111
     * @param itemIterator
112
     * @return
113
     */
114
    public List<Integer> nextItems(Iterator<Integer> itemIterator){
115

    
116
        logger.debug("new batch of items with size of " + batchSize);
117
        items = new ArrayList<Integer>(batchSize);
118
        if(unprocessedIds.size() > 0) {
119
            List<Integer> remainingUnprocessed = null;
120
            Iterator<Integer> unprocessedIt = unprocessedIds.iterator();
121
            int i = 0;
122
            while(unprocessedIt.hasNext()) {
123
                Integer nextUnprocessed = unprocessedIt.next();
124
                if(i < batchSize) {
125
                   items.add(nextUnprocessed);
126
                } else {
127
                    if(remainingUnprocessed == null) {
128
                        remainingUnprocessed = new ArrayList<>(unprocessedIds.size() - i + 1);
129
                    }
130
                    remainingUnprocessed.add(nextUnprocessed);
131
                }
132
                i++;
133
            }
134
            unprocessedIds.clear();
135
            if(remainingUnprocessed != null) {
136
                unprocessedIds = remainingUnprocessed;
137
            }
138
        }
139

    
140
        while(itemIterator.hasNext() && items.size() < batchSize ) {
141
            items.add(itemIterator.next());
142
        }
143

    
144
        itemWhereLimitsTouched =  0;
145
        batchItemCount = 0;
146

    
147
        return items;
148
    }
149

    
150
    public void incrementCounter() {
151
        batchItemCount++;
152
    }
153

    
154
    private void reduceSize() {
155
        manageUnprocessedItems();
156
        batchSize = itemWhereLimitsTouched;
157
        if(batchSize < 1) {
158
            batchSize = 1;
159
        }
160
    }
161

    
162
    public void reduceSize(double by) {
163
        manageUnprocessedItems();
164
        batchSize = (int) (batchSize * by);
165
        if(batchSize < 1) {
166
            batchSize = 1;
167
        }
168
    }
169

    
170
    protected void manageUnprocessedItems() {
171

    
172
        if(itemWhereLimitsTouched > 0) {
173
            int batchItemsUnprocessed = items.size() - itemWhereLimitsTouched;
174
            logger.info("batchSize reduced to " + itemWhereLimitsTouched);
175
            if(batchItemsUnprocessed > 0) {
176
                unprocessedIds.addAll(items.subList(items.size() - batchItemsUnprocessed, items.size()));
177
            }
178
        }
179
    }
180

    
181

    
182
    public boolean isWithinJvmLimits() {
183
        if(memoryLimitsExceeded()) {
184
            logger.info("memoryLimitsExceeded ==> reducing batchSize");
185
            reduceSize();
186
            return false;
187
        }
188
        if(allowedGcIncreases > 0 && gcLimitsExceeded()) {
189
            logger.info("gcIncreaseLimitExceeded ==> reducing batchSize");
190
            reduceSize();
191
            return false;
192
        }
193

    
194
        return true;
195
    }
196

    
197
    public boolean gcLimitsExceeded() {
198

    
199
        long gctimeSiceLastTime = jvmMonitor.getGCtimeSiceLastCheck();
200
        if(gctimeSiceLastTime > 0) {
201
            if(gcTimeIncreaseCount == 0) {
202
                itemWhereLimitsTouched  = batchItemCount;
203
            }
204
            gcTimeIncreaseCount++;
205
            logger.debug("gctimeSiceLastTime: " + gctimeSiceLastTime + ", gcTimeIncreaseCount: " + gcTimeIncreaseCount);
206
        } else {
207
            gcTimeIncreaseCount = 0;
208
            itemWhereLimitsTouched = 0;
209
        }
210
        return gcTimeIncreaseCount > allowedGcIncreases;
211

    
212
    }
213

    
214

    
215
    public boolean memoryLimitsExceeded() {
216

    
217
        if(!jvmMonitor.hasFreeHeap(batchMinFreeHeap)) {
218
            if(batchItemCount > -1) { // not in initial state, that it before first batch
219
                itemWhereLimitsTouched  = batchItemCount;
220
            }
221
            logger.debug("min free heap limit (" + batchMinFreeHeap + ") exceeded ");
222
            return true;
223
        } else {
224
            return false;
225
        }
226

    
227
    }
228

    
229
    public JvmMonitor getJvmMonitor() {
230
        return jvmMonitor;
231
    }
232

    
233
}
(8-8/23)