Project

General

Profile

Download (6.37 KB) Statistics
| Branch: | Tag: | Revision:
1
/**
2
* Copyright (C) 2016 EDIT
3
* European Distributed Institute of Taxonomy
4
* http://www.e-taxonomy.eu
5
*
6
* The contents of this file are subject to the Mozilla Public License Version 1.1
7
* See LICENSE.TXT at the top of this package for the full license terms.
8
*/
9
package eu.etaxonomy.cdm.common;
10

    
11
import java.util.ArrayList;
12
import java.util.Iterator;
13
import java.util.List;
14

    
15
import org.apache.log4j.Logger;
16

    
17
/**
18
 * DynamicBatch: a JVM resources aware batch manager.
19
 *
20
 * @author a.kohlbecker
21
 * @since Jul 4, 2016
22
 */
23
public class DynamicBatch {
24

    
25
    public static final Logger logger = Logger.getLogger(DynamicBatch.class);
26

    
27
    int batchSize;
28
    int batchItemCount = -1;
29
    Long batchMinFreeHeap = null;
30
    List<Integer> items = null;
31

    
32
    int gcTimeIncreaseCount = 0;
33

    
34
    private int allowedGcIncreases = -1;
35

    
36
    private int itemWhereLimitsTouched = 0;
37

    
38

    
39
    List<Integer> unprocessedIds = new ArrayList<>(batchSize);
40

    
41
    private final JvmMonitor jvmMonitor = new JvmMonitor();
42

    
43
    private final long intitialFreeHeap;
44

    
45
    public DynamicBatch(int initialSize) {
46

    
47
        this.batchSize = initialSize;
48
        this.intitialFreeHeap = jvmMonitor.getFreeHeap(true);
49
    }
50

    
51
    public DynamicBatch(int initialSize, long minInitialHeap) throws JvmLimitsException {
52

    
53
        this.batchSize = initialSize;
54
        this.intitialFreeHeap = jvmMonitor.getFreeHeap(true);
55
        if(this.intitialFreeHeap < minInitialHeap) {
56
            throw new JvmLimitsException("At least " + minInitialHeap + " byte of free Heap space required but only " + intitialFreeHeap + " byte available.");
57
        }
58
    }
59

    
60
    public void setRequiredFreeHeap(double requiredFreeHeap) throws JvmLimitsException {
61

    
62
        this.batchMinFreeHeap = (long)(intitialFreeHeap * requiredFreeHeap);
63
        if(memoryLimitsExceeded()) {
64
            throw new JvmLimitsException("Not enough free heap for batch");
65
        }
66
    }
67

    
68
    public void setRequiredFreeHeap(long requiredFreeHeap) throws JvmLimitsException {
69
        this.batchMinFreeHeap = requiredFreeHeap;
70
        if(memoryLimitsExceeded()) {
71
            throw new JvmLimitsException("Not enough free heap for batch");
72
        }
73
    }
74

    
75
    /**
76
     *
77
     * @param allowedGcIncreases the amount of continuously observed increases of the gc time
78
     */
79
    public void setMaxAllowedGcIncreases(int allowedGcIncreases) {
80
        this.allowedGcIncreases = allowedGcIncreases;
81
        // reset GCtime
82
        jvmMonitor.getGCtimeSiceLastCheck();
83
    }
84

    
85
    public int size() {
86
        return batchSize;
87
    }
88

    
89
    public  List<Integer> items(){
90
        return items;
91
    }
92

    
93
    public boolean hasUnprocessedItems() {
94
        return unprocessedIds.size() > 0;
95
    }
96

    
97
    /**
98
     * 1. Fills all remaining items into the new batch and pads with next items from the iterator.<BR>
99
     * 2. Resets the internal batchItemCount!!
100
     */
101
    public List<Integer> nextItems(Iterator<Integer> itemIterator){
102

    
103
        logger.debug("new batch of items with size of " + batchSize);
104
        items = new ArrayList<>(batchSize);
105
        if(unprocessedIds.size() > 0) {
106
            List<Integer> remainingUnprocessed = null;
107
            Iterator<Integer> unprocessedIt = unprocessedIds.iterator();
108
            int i = 0;
109
            while(unprocessedIt.hasNext()) {
110
                Integer nextUnprocessed = unprocessedIt.next();
111
                if(i < batchSize) {
112
                   items.add(nextUnprocessed);
113
                } else {
114
                    if(remainingUnprocessed == null) {
115
                        remainingUnprocessed = new ArrayList<>(unprocessedIds.size() - i + 1);
116
                    }
117
                    remainingUnprocessed.add(nextUnprocessed);
118
                }
119
                i++;
120
            }
121
            unprocessedIds.clear();
122
            if(remainingUnprocessed != null) {
123
                unprocessedIds = remainingUnprocessed;
124
            }
125
        }
126

    
127
        while(itemIterator.hasNext() && items.size() < batchSize ) {
128
            items.add(itemIterator.next());
129
        }
130

    
131
        itemWhereLimitsTouched =  0;
132
        batchItemCount = 0;
133

    
134
        return items;
135
    }
136

    
137
    public void incrementCounter() {
138
        batchItemCount++;
139
    }
140

    
141
    private void reduceSize() {
142
        manageUnprocessedItems();
143
        batchSize = itemWhereLimitsTouched;
144
        if(batchSize < 1) {
145
            batchSize = 1;
146
        }
147
    }
148

    
149
    public void reduceSize(double by) {
150
        manageUnprocessedItems();
151
        batchSize = (int) (batchSize * by);
152
        if(batchSize < 1) {
153
            batchSize = 1;
154
        }
155
    }
156

    
157
    protected void manageUnprocessedItems() {
158
        if(itemWhereLimitsTouched > 0) {
159
            int batchItemsUnprocessed = items.size() - itemWhereLimitsTouched;
160
            logger.info("batchSize reduced to " + itemWhereLimitsTouched);
161
            if(batchItemsUnprocessed > 0) {
162
                unprocessedIds.addAll(items.subList(items.size() - batchItemsUnprocessed, items.size()));
163
            }
164
        }
165
    }
166

    
167

    
168
    public boolean isWithinJvmLimits() {
169
        if(memoryLimitsExceeded()) {
170
            logger.info("memoryLimitsExceeded ==> reducing batchSize");
171
            reduceSize();
172
            return false;
173
        }
174
        if(allowedGcIncreases > 0 && gcLimitsExceeded()) {
175
            logger.info("gcIncreaseLimitExceeded ==> reducing batchSize");
176
            reduceSize();
177
            return false;
178
        }
179
        return true;
180
    }
181

    
182
    public boolean gcLimitsExceeded() {
183

    
184
        long gctimeSiceLastTime = jvmMonitor.getGCtimeSiceLastCheck();
185
        if(gctimeSiceLastTime > 0) {
186
            if(gcTimeIncreaseCount == 0) {
187
                itemWhereLimitsTouched  = batchItemCount;
188
            }
189
            gcTimeIncreaseCount++;
190
            logger.debug("gctimeSiceLastTime: " + gctimeSiceLastTime + ", gcTimeIncreaseCount: " + gcTimeIncreaseCount);
191
        } else {
192
            gcTimeIncreaseCount = 0;
193
            itemWhereLimitsTouched = 0;
194
        }
195
        return gcTimeIncreaseCount > allowedGcIncreases;
196

    
197
    }
198

    
199
    public boolean memoryLimitsExceeded() {
200

    
201
        if(!jvmMonitor.hasFreeHeap(batchMinFreeHeap)) {
202
            if(batchItemCount > -1) { // not in initial state, that it before first batch
203
                itemWhereLimitsTouched  = batchItemCount;
204
            }
205
            logger.debug("min free heap limit (" + batchMinFreeHeap + ") exceeded ");
206
            return true;
207
        } else {
208
            return false;
209
        }
210
    }
211

    
212
    public JvmMonitor getJvmMonitor() {
213
        return jvmMonitor;
214
    }
215
}
(8-8/25)