Project

General

Profile

« Previous | Next » 

Revision 0d0d2537

Added by Andreas Kohlbecker over 6 years ago

DynamicBatch: a JVM resources aware batch manager

View differences:

cdmlib-commons/src/main/java/eu/etaxonomy/cdm/common/DynamicBatch.java
1
// $Id$
2
/**
3
* Copyright (C) 2016 EDIT
4
* European Distributed Institute of Taxonomy
5
* http://www.e-taxonomy.eu
6
*
7
* The contents of this file are subject to the Mozilla Public License Version 1.1
8
* See LICENSE.TXT at the top of this package for the full license terms.
9
*/
10
package eu.etaxonomy.cdm.common;
11

  
12
import java.util.ArrayList;
13
import java.util.Iterator;
14
import java.util.List;
15

  
16
import org.apache.log4j.Logger;
17

  
18
/**
19
 * DynamicBatch: a JVM resources aware batch manager.
20
 *
21
 * @author a.kohlbecker
22
 * @date Jul 4, 2016
23
 *
24
 */
25
public class DynamicBatch {
26

  
27
    public static final Logger logger = Logger.getLogger(DynamicBatch.class);
28

  
29
    int batchSize;
30
    int batchItemCount = -1;
31
    Long batchMinFreeHeap = null;
32
    List<Integer> items = null;
33

  
34
    int gcTimeIncreaseCount = 0;
35

  
36
    private int allowedGcIncreases = -1;
37

  
38
    private int itemWhereLimitsTouched = 0;
39

  
40

  
41
    List<Integer> unprocessedIds = new ArrayList<Integer>(batchSize);
42

  
43
    private final JvmMonitor jvmMonitor = new JvmMonitor();
44

  
45
    private final long intitialFreeHeap;
46

  
47
    public DynamicBatch(int initialSize) {
48

  
49
        this.batchSize = initialSize;
50
        this.intitialFreeHeap = jvmMonitor.getFreeHeap(true);
51
    }
52

  
53
    public DynamicBatch(int initialSize, long minInitialHeap) throws JvmLimitsException {
54

  
55
        this.batchSize = initialSize;
56
        this.intitialFreeHeap = jvmMonitor.getFreeHeap(true);
57
        if(this.intitialFreeHeap < minInitialHeap) {
58
            throw new JvmLimitsException("At least " + minInitialHeap + " byte of free Heap space required but only " + intitialFreeHeap + " byte available.");
59
        }
60
    }
61

  
62
    /**
63
     * @param requiredFreeHeap
64
     * @throws JvmLimitsException
65
     */
66
    public void setRequiredFreeHeap(double requiredFreeHeap) throws JvmLimitsException {
67

  
68
        this.batchMinFreeHeap = (long) (intitialFreeHeap * requiredFreeHeap);
69
        if(memoryLimitsExceeded()) {
70
            throw new JvmLimitsException("Not enough free heap for batch");
71
        }
72
    }
73

  
74
    /**
75
     * @param requiredFreeHeap
76
     * @throws JvmLimitsException
77
     */
78
    public void setRequiredFreeHeap(long requiredFreeHeap) throws JvmLimitsException {
79
        this.batchMinFreeHeap = requiredFreeHeap;
80
        if(memoryLimitsExceeded()) {
81
            throw new JvmLimitsException("Not enough free heap for batch");
82
        }
83
    }
84

  
85
    /**
86
     *
87
     * @param allowedGcIncreases the amount of continiously observed increases of the gc time
88
     */
89
    public void setMaxAllowedGcIncreases(int allowedGcIncreases) {
90
        this.allowedGcIncreases = allowedGcIncreases;
91
        // reset GCtime
92
        jvmMonitor.getGCtimeSiceLastCheck();
93
    }
94

  
95
    public int size() {
96
        return batchSize;
97
    }
98

  
99
    public  List<Integer> items(){
100
        return items;
101
    }
102

  
103
    public boolean hasUnprocessedItems() {
104
        return unprocessedIds.size() > 0;
105
    }
106

  
107
    /**
108
     * 1. Fills all remaining items into the new batch and pads with next items from the iterator.
109
     *
110
     * 2. Resets the internal batchItemCount!!
111
     *
112
     * @param itemIterator
113
     * @return
114
     */
115
    public List<Integer> nextItems(Iterator<Integer> itemIterator){
116

  
117
        logger.debug("new batch of items with size of " + batchSize);
118
        items = new ArrayList<Integer>(batchSize);
119
        if(unprocessedIds.size() > 0) {
120
            List<Integer> remainingUnprocessed = null;
121
            Iterator<Integer> unprocessedIt = unprocessedIds.iterator();
122
            int i = 0;
123
            while(unprocessedIt.hasNext()) {
124
                Integer nextUnprocessed = unprocessedIt.next();
125
                if(i < batchSize) {
126
                   items.add(nextUnprocessed);
127
                } else {
128
                    if(remainingUnprocessed == null) {
129
                        remainingUnprocessed = new ArrayList<Integer>(unprocessedIds.size() - i + 1);
130
                    }
131
                    remainingUnprocessed.add(nextUnprocessed);
132
                }
133
                i++;
134
            }
135
            unprocessedIds.clear();
136
            if(remainingUnprocessed != null) {
137
                unprocessedIds = remainingUnprocessed;
138
            }
139
        }
140

  
141
        while(itemIterator.hasNext() && items.size() < batchSize ) {
142
            items.add(itemIterator.next());
143
        }
144

  
145
        itemWhereLimitsTouched =  0;
146
        batchItemCount = 0;
147

  
148
        return items;
149
    }
150

  
151
    public void incementCounter() {
152
        batchItemCount++;
153
    }
154

  
155
    /**
156
     *
157
     */
158
    private void reduceSize() {
159
        manageUnprocessedItems();
160
        batchSize = itemWhereLimitsTouched;
161
        if(batchSize < 1) {
162
            batchSize = 1;
163
        }
164
    }
165

  
166
    public void reduceSize(double by) {
167
        manageUnprocessedItems();
168
        batchSize = (int) (batchSize * by);
169
        if(batchSize < 1) {
170
            batchSize = 1;
171
        }
172
    }
173

  
174
    /**
175
     *
176
     */
177
    protected void manageUnprocessedItems() {
178

  
179
        if(itemWhereLimitsTouched > 0) {
180
            int batchItemsUnprocessed = items.size() - itemWhereLimitsTouched;
181
            logger.info("batchSize reduced to " + itemWhereLimitsTouched);
182
            if(batchItemsUnprocessed > 0) {
183
                unprocessedIds.addAll(items.subList(items.size() - batchItemsUnprocessed, items.size()));
184
            }
185
        }
186
    }
187

  
188

  
189
    public boolean isWithinJvmLimits() {
190
        if(memoryLimitsExceeded()) {
191
            logger.info("memoryLimitsExceeded ==> reducing batchSize");
192
            reduceSize();
193
            return false;
194
        }
195
        if(allowedGcIncreases > 0 && gcLimitsExceeded()) {
196
            logger.info("gcIncreaseLimitExceeded ==> reducing batchSize");
197
            reduceSize();
198
            return false;
199
        }
200

  
201
        return true;
202
    }
203

  
204
    public boolean gcLimitsExceeded() {
205

  
206
        long gctimeSiceLastTime = jvmMonitor.getGCtimeSiceLastCheck();
207
        if(gctimeSiceLastTime > 0) {
208
            if(gcTimeIncreaseCount == 0) {
209
                itemWhereLimitsTouched  = batchItemCount;
210
            }
211
            gcTimeIncreaseCount++;
212
            logger.debug("gctimeSiceLastTime: " + gctimeSiceLastTime + ", gcTimeIncreaseCount: " + gcTimeIncreaseCount);
213
        } else {
214
            gcTimeIncreaseCount = 0;
215
            itemWhereLimitsTouched = 0;
216
        }
217
        return gcTimeIncreaseCount > allowedGcIncreases;
218

  
219
    }
220

  
221

  
222
    public boolean memoryLimitsExceeded() {
223

  
224
        if(!jvmMonitor.hasFreeHeap(batchMinFreeHeap)) {
225
            if(batchItemCount > -1) { // not in initial state, that it before first batch
226
                itemWhereLimitsTouched  = batchItemCount;
227
            }
228
            logger.debug("min free heap limit (" + batchMinFreeHeap + ") exceeded ");
229
            return true;
230
        } else {
231
            return false;
232
        }
233

  
234
    }
235

  
236
    public JvmMonitor getJvmMonitor() {
237
        return jvmMonitor;
238
    }
239

  
240
}
cdmlib-commons/src/main/java/eu/etaxonomy/cdm/common/JvmLimitsException.java
1
// $Id$
2
/**
3
* Copyright (C) 2016 EDIT
4
* European Distributed Institute of Taxonomy
5
* http://www.e-taxonomy.eu
6
*
7
* The contents of this file are subject to the Mozilla Public License Version 1.1
8
* See LICENSE.TXT at the top of this package for the full license terms.
9
*/
10
package eu.etaxonomy.cdm.common;
11

  
12
/**
13
 * @author a.kohlbecker
14
 * @date Jul 4, 2016
15
 *
16
 */
17
public class JvmLimitsException extends Exception {
18

  
19
    /**
20
     * @param string
21
     */
22
    public JvmLimitsException(String string) {
23
        super(string);
24
    }
25

  
26
}
cdmlib-commons/src/main/java/eu/etaxonomy/cdm/common/JvmMonitor.java
1
// $Id$
2
/**
3
* Copyright (C) 2016 EDIT
4
* European Distributed Institute of Taxonomy
5
* http://www.e-taxonomy.eu
6
*
7
* The contents of this file are subject to the Mozilla Public License Version 1.1
8
* See LICENSE.TXT at the top of this package for the full license terms.
9
*/
10
package eu.etaxonomy.cdm.common;
11

  
12
import java.lang.management.GarbageCollectorMXBean;
13
import java.lang.management.ManagementFactory;
14
import java.lang.management.MemoryMXBean;
15
import java.lang.management.MemoryUsage;
16
import java.util.List;
17

  
18
import org.apache.log4j.Logger;
19

  
20
/**
21
 * @author a.kohlbecker
22
 * @date Jul 1, 2016
23
 *
24
 */
25
public class JvmMonitor {
26

  
27

  
28
    public static final Logger logger = Logger.getLogger(JvmMonitor.class);
29

  
30
    private long gcTimeLast = 0;
31

  
32
    private long lastCheckTime = 0;
33

  
34
    /**
35
     * Returns the sum of approximate accumulated collection elapsed time in milliseconds
36
     * as reported by all garbage collectors.
37
     *
38
     * This method returns -1 if the collection elapsed time is undefined.
39
     *
40
     * @return
41
     */
42
    public long gcTime() {
43
        List<GarbageCollectorMXBean> gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
44

  
45
        //logger.setLevel(Level.DEBUG);
46

  
47
        long gcTime = -1;
48
        long collectorGcTime;
49
        for(GarbageCollectorMXBean gcMXBean : gcMXBeans){
50
                if(gcTime == -1) {
51
                    gcTime = 0;
52
                }
53
                collectorGcTime = gcMXBean.getCollectionTime();
54
                logger.debug("cgMxBean: " + gcMXBean.getName()
55
                        + " gcTime = " + collectorGcTime
56
                        + " gcCount = " + gcMXBean.getCollectionCount());
57
                if(collectorGcTime != -1) {
58
                    // only sum up if the time is defined
59
                    gcTime = gcMXBean.getCollectionTime();
60
                }
61
        }
62
        logger.debug("gcTimeSum = " + gcTime);
63
        return gcTime;
64

  
65
    }
66

  
67
    public MemoryUsage getHeapMemoryUsage(){
68

  
69
        MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
70
        if(memoryMXBean != null){
71
            logger.debug("HeapMemoryUsage: " + memoryMXBean.getHeapMemoryUsage());
72
            return memoryMXBean.getHeapMemoryUsage();
73
        }
74
        return null;
75
    }
76

  
77

  
78
    public boolean hasFreeHeap(long freeHeapLimit) {
79

  
80
        if(!_hasFreeHeap(freeHeapLimit)) {
81
            Runtime.getRuntime().gc();
82
            return _hasFreeHeap(freeHeapLimit);
83
        }
84
        return true;
85
    }
86

  
87
    /**
88
     * @param maxUsedFraction
89
     * @return
90
     */
91
    private boolean _hasFreeHeap(long freeHeapLimit) {
92
        long freeHeap = getFreeHeap(false);
93
        return freeHeap > freeHeapLimit;
94
    }
95

  
96
    /**
97
     * @return
98
     */
99
    public long getFreeHeap(boolean gcBeforeMeasure) {
100
        if(gcBeforeMeasure) {
101
            Runtime.getRuntime().gc();
102
        }
103
        MemoryUsage heapUsage = getHeapMemoryUsage();
104
        long freeHeap =  heapUsage.getMax() - heapUsage.getUsed();
105
        return freeHeap;
106
    }
107

  
108
    /**
109
     * Returns the gcTime in milliseconds as obtained through {@link #gctime()} of the
110
     * time interval since this method has been called the last time and now.
111
     *
112
     * @return
113
     */
114
    public long getGCtimeSiceLastCheck() {
115
        long gcTimeNow = gcTime();
116
        long gcTimeSince = gcTimeNow - gcTimeLast;
117
        gcTimeLast = gcTimeNow;
118
        lastCheckTime  = System.currentTimeMillis();
119
        return gcTimeSince;
120
    }
121

  
122
    /**
123
     * Returns the time spend in gc as proportion (0.0 to 1.0) of the
124
     * time interval since this method has been called the last time and now.
125
     *
126
     * @return
127
     */
128
    public double getGCRateSiceLastCheck() {
129

  
130
        long gcTimeSince = getGCtimeSiceLastCheck();
131
        long timeDiff = System.currentTimeMillis() - lastCheckTime;
132
        double gcRate = gcTimeSince / (double) timeDiff;
133
        return gcRate;
134
    }
135

  
136

  
137

  
138

  
139
}
cdmlib-commons/src/test/java/eu/etaxonomy/cdm/common/JvmMonitorTest.java
1
// $Id$
2
/**
3
* Copyright (C) 2016 EDIT
4
* European Distributed Institute of Taxonomy
5
* http://www.e-taxonomy.eu
6
*
7
* The contents of this file are subject to the Mozilla Public License Version 1.1
8
* See LICENSE.TXT at the top of this package for the full license terms.
9
*/
10
package eu.etaxonomy.cdm.common;
11

  
12
import org.apache.log4j.Logger;
13
import org.junit.Assert;
14
import org.junit.Test;
15

  
16
/**
17
 * @author a.kohlbecker
18
 * @date Jul 1, 2016
19
 *
20
 */
21
public class JvmMonitorTest extends Assert{
22

  
23

  
24
    public static final Logger logger = Logger.getLogger(JvmMonitorTest.class);
25

  
26
    @Test
27
    public void testGcTime() {
28
        JvmMonitor jvmMonitor = new JvmMonitor();
29
        assertNotEquals(-1l, jvmMonitor.gcTime());
30

  
31
        Runtime.getRuntime().gc();
32
        Runtime.getRuntime().gc();
33
        Runtime.getRuntime().gc();
34
        long gcTimeLast_1 = jvmMonitor.getGCtimeSiceLastCheck();
35
        assertTrue(gcTimeLast_1 > 0);
36
        Runtime.getRuntime().gc();
37
        Runtime.getRuntime().gc();
38
        Runtime.getRuntime().gc();
39
        long gcTimeLast_2 = jvmMonitor.getGCtimeSiceLastCheck();
40
        assertTrue(gcTimeLast_1 > 0);
41
        assertTrue(jvmMonitor.gcTime() > gcTimeLast_2);
42

  
43
    }
44

  
45
    @Test
46
    public void testHeapUsage() {
47
        int MB = 1024 * 1024;
48
        int failWithMB = 300 * MB;
49
        JvmMonitor jvmMonitor = new JvmMonitor();
50

  
51
        long baseline = jvmMonitor.getHeapMemoryUsage().getUsed();
52
        logger.debug("before: " + baseline);
53
/*
54
        assertTrue(jvmMonitor.hasFreeHeap(0.9));
55

  
56
        logger.setLevel(Level.DEBUG);
57

  
58
        Object[] measure = new Object[MB]; // 1MB
59
        double bytePerObject = (jvmMonitor.getHeapMemoryUsage().getUsed() - baseline) / MB;
60
        long maxHeap = jvmMonitor.getHeapMemoryUsage().getMax();
61
        logger.debug("max: " + maxHeap);
62
        Object[] heapEater = new Object[(int)Math.round((failWithMB / bytePerObject))];
63
        logger.debug("after: " + jvmMonitor.getHeapMemoryUsage().getUsed());
64

  
65
        assertFalse(jvmMonitor.hasFreeHeap((failWithMB * 2) / (double)maxHeap));
66
*/
67

  
68
    }
69

  
70
}

Also available in: Unified diff