Project

General

Profile

Download (6.42 KB) Statistics
| Branch: | Tag: | Revision:
1
/**
2
* Copyright (C) 2016 EDIT
3
* European Distributed Institute of Taxonomy
4
* http://www.e-taxonomy.eu
5
*
6
* The contents of this file are subject to the Mozilla Public License Version 1.1
7
* See LICENSE.TXT at the top of this package for the full license terms.
8
*/
9
package eu.etaxonomy.cdm.common;
10

    
11
import java.util.ArrayList;
12
import java.util.Iterator;
13
import java.util.List;
14

    
15
import org.apache.log4j.Logger;
16

    
17
/**
18
 * DynamicBatch: a JVM resources aware batch manager.
19
 *
20
 * @author a.kohlbecker
21
 * @since Jul 4, 2016
22
 */
23
public class DynamicBatch {
24

    
25
    public static final Logger logger = Logger.getLogger(DynamicBatch.class);
26

    
27
    int batchSize;
28
    int batchItemCount = -1;
29
    Long batchMinFreeHeap = null;
30
    List<Integer> items = null;
31

    
32
    int gcTimeIncreaseCount = 0;
33

    
34
    private int allowedGcIncreases = -1;
35

    
36
    private int itemWhereLimitsTouched = 0;
37

    
38

    
39
    List<Integer> unprocessedIds = new ArrayList<>(batchSize);
40

    
41
    private final JvmMonitor jvmMonitor = new JvmMonitor();
42

    
43
    private final long intitialFreeHeap;
44

    
45
    public DynamicBatch(int initialSize) {
46

    
47
        this.batchSize = initialSize;
48
        this.intitialFreeHeap = jvmMonitor.getFreeHeap(true);
49
    }
50

    
51
    public DynamicBatch(int initialSize, long minInitialHeap) throws JvmLimitsException {
52

    
53
        this.batchSize = initialSize;
54
        this.intitialFreeHeap = jvmMonitor.getFreeHeap(true);
55
        if(this.intitialFreeHeap < minInitialHeap) {
56
            throw new JvmLimitsException("At least " + minInitialHeap + " byte of free Heap space required but only " + intitialFreeHeap + " byte available.");
57
        }
58
    }
59

    
60
    public void setRequiredFreeHeap(double requiredFreeHeap) throws JvmLimitsException {
61

    
62
        this.batchMinFreeHeap = (long) (intitialFreeHeap * requiredFreeHeap);
63
        if(memoryLimitsExceeded()) {
64
            throw new JvmLimitsException("Not enough free heap for batch");
65
        }
66
    }
67

    
68
    public void setRequiredFreeHeap(long requiredFreeHeap) throws JvmLimitsException {
69
        this.batchMinFreeHeap = requiredFreeHeap;
70
        if(memoryLimitsExceeded()) {
71
            throw new JvmLimitsException("Not enough free heap for batch");
72
        }
73
    }
74

    
75
    /**
76
     *
77
     * @param allowedGcIncreases the amount of continuously observed increases of the gc time
78
     */
79
    public void setMaxAllowedGcIncreases(int allowedGcIncreases) {
80
        this.allowedGcIncreases = allowedGcIncreases;
81
        // reset GCtime
82
        jvmMonitor.getGCtimeSiceLastCheck();
83
    }
84

    
85
    public int size() {
86
        return batchSize;
87
    }
88

    
89
    public  List<Integer> items(){
90
        return items;
91
    }
92

    
93
    public boolean hasUnprocessedItems() {
94
        return unprocessedIds.size() > 0;
95
    }
96

    
97
    /**
98
     * 1. Fills all remaining items into the new batch and pads with next items from the iterator.
99
     *
100
     * 2. Resets the internal batchItemCount!!
101
     *
102
     * @param itemIterator
103
     * @return
104
     */
105
    public List<Integer> nextItems(Iterator<Integer> itemIterator){
106

    
107
        logger.debug("new batch of items with size of " + batchSize);
108
        items = new ArrayList<>(batchSize);
109
        if(unprocessedIds.size() > 0) {
110
            List<Integer> remainingUnprocessed = null;
111
            Iterator<Integer> unprocessedIt = unprocessedIds.iterator();
112
            int i = 0;
113
            while(unprocessedIt.hasNext()) {
114
                Integer nextUnprocessed = unprocessedIt.next();
115
                if(i < batchSize) {
116
                   items.add(nextUnprocessed);
117
                } else {
118
                    if(remainingUnprocessed == null) {
119
                        remainingUnprocessed = new ArrayList<>(unprocessedIds.size() - i + 1);
120
                    }
121
                    remainingUnprocessed.add(nextUnprocessed);
122
                }
123
                i++;
124
            }
125
            unprocessedIds.clear();
126
            if(remainingUnprocessed != null) {
127
                unprocessedIds = remainingUnprocessed;
128
            }
129
        }
130

    
131
        while(itemIterator.hasNext() && items.size() < batchSize ) {
132
            items.add(itemIterator.next());
133
        }
134

    
135
        itemWhereLimitsTouched =  0;
136
        batchItemCount = 0;
137

    
138
        return items;
139
    }
140

    
141
    public void incrementCounter() {
142
        batchItemCount++;
143
    }
144

    
145
    private void reduceSize() {
146
        manageUnprocessedItems();
147
        batchSize = itemWhereLimitsTouched;
148
        if(batchSize < 1) {
149
            batchSize = 1;
150
        }
151
    }
152

    
153
    public void reduceSize(double by) {
154
        manageUnprocessedItems();
155
        batchSize = (int) (batchSize * by);
156
        if(batchSize < 1) {
157
            batchSize = 1;
158
        }
159
    }
160

    
161
    protected void manageUnprocessedItems() {
162
        if(itemWhereLimitsTouched > 0) {
163
            int batchItemsUnprocessed = items.size() - itemWhereLimitsTouched;
164
            logger.info("batchSize reduced to " + itemWhereLimitsTouched);
165
            if(batchItemsUnprocessed > 0) {
166
                unprocessedIds.addAll(items.subList(items.size() - batchItemsUnprocessed, items.size()));
167
            }
168
        }
169
    }
170

    
171

    
172
    public boolean isWithinJvmLimits() {
173
        if(memoryLimitsExceeded()) {
174
            logger.info("memoryLimitsExceeded ==> reducing batchSize");
175
            reduceSize();
176
            return false;
177
        }
178
        if(allowedGcIncreases > 0 && gcLimitsExceeded()) {
179
            logger.info("gcIncreaseLimitExceeded ==> reducing batchSize");
180
            reduceSize();
181
            return false;
182
        }
183
        return true;
184
    }
185

    
186
    public boolean gcLimitsExceeded() {
187

    
188
        long gctimeSiceLastTime = jvmMonitor.getGCtimeSiceLastCheck();
189
        if(gctimeSiceLastTime > 0) {
190
            if(gcTimeIncreaseCount == 0) {
191
                itemWhereLimitsTouched  = batchItemCount;
192
            }
193
            gcTimeIncreaseCount++;
194
            logger.debug("gctimeSiceLastTime: " + gctimeSiceLastTime + ", gcTimeIncreaseCount: " + gcTimeIncreaseCount);
195
        } else {
196
            gcTimeIncreaseCount = 0;
197
            itemWhereLimitsTouched = 0;
198
        }
199
        return gcTimeIncreaseCount > allowedGcIncreases;
200

    
201
    }
202

    
203
    public boolean memoryLimitsExceeded() {
204

    
205
        if(!jvmMonitor.hasFreeHeap(batchMinFreeHeap)) {
206
            if(batchItemCount > -1) { // not in initial state, that it before first batch
207
                itemWhereLimitsTouched  = batchItemCount;
208
            }
209
            logger.debug("min free heap limit (" + batchMinFreeHeap + ") exceeded ");
210
            return true;
211
        } else {
212
            return false;
213
        }
214
    }
215

    
216
    public JvmMonitor getJvmMonitor() {
217
        return jvmMonitor;
218
    }
219
}
(8-8/25)