TokenBucketImpl.java
5.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package com.zhaoonline.coupen.concurrent.tokenbucket;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* A token bucket implementation that is of a leaky bucket in the sense that it has a finite capacity and any added
* tokens that would exceed this capacity will "overflow" out of the bucket and are lost forever.
* <p>
* In this implementation the rules for refilling the bucket are encapsulated in a provided {@code RefillStrategy}
* instance. Prior to attempting to consume any tokens the refill strategy will be consulted to see how many tokens
* should be added to the bucket.
* <p>
* In addition in this implementation the method of yielding CPU control is encapsulated in the provided
* {@code SleepStrategy} instance. For high performance applications where tokens are being refilled incredibly quickly
* and an accurate bucket implementation is required, it may be useful to never yield control of the CPU and to instead
* busy wait. This strategy allows the caller to make this decision for themselves instead of the library forcing a
* decision.
*/
/**
* Created by ZhaoOnline<br/>
* User: yangyoupeng<br/>
* Date: 2016/12/16<br/>
* Time: 17:42<br/>
* Description:please descript you class
*/
public class TokenBucketImpl implements TokenBucket {
private final long capacity;
private final RefillStrategy refillStrategy;
private final SleepStrategy sleepStrategy;
private long size;
TokenBucketImpl(long capacity, long initialTokens, RefillStrategy refillStrategy, SleepStrategy sleepStrategy) {
checkArgument(capacity > 0);
checkArgument(initialTokens <= capacity);
this.capacity = capacity;
this.refillStrategy = checkNotNull(refillStrategy);
this.sleepStrategy = checkNotNull(sleepStrategy);
this.size = initialTokens;
}
/**
* Returns the capacity of this token bucket. This is the maximum number of tokens that the bucket can hold at
* any one time.
*
* @return The capacity of the bucket.
*/
@Override
public long getCapacity() {
return capacity;
}
/**
* Returns the current number of tokens in the bucket. If the bucket is empty then this method will return 0.
*
* @return The current number of tokens in the bucket.
*/
@Override
public synchronized long getNumTokens() {
// Give the refill strategy a chance to add tokens if it needs to so that we have an accurate
// count.
refill(refillStrategy.refill());
return size;
}
/**
* Returns the amount of time in the specified time unit until the next group of tokens can be added to the token
* bucket.
*
* @see org.isomorphism.util.TokenBucket.RefillStrategy#getDurationUntilNextRefill(java.util.concurrent.TimeUnit)
* @param unit The time unit to express the return value in.
* @return The amount of time until the next group of tokens can be added to the token bucket.
*/
@Override
public long getDurationUntilNextRefill(TimeUnit unit) throws UnsupportedOperationException {
return refillStrategy.getDurationUntilNextRefill(unit);
}
/**
* Attempt to consume a single token from the bucket. If it was consumed then {@code true} is returned, otherwise
* {@code false} is returned.
*
* @return {@code true} if a token was consumed, {@code false} otherwise.
*/
public boolean tryConsume() {
return tryConsume(1);
}
/**
* Attempt to consume a specified number of tokens from the bucket. If the tokens were consumed then {@code true}
* is returned, otherwise {@code false} is returned.
*
* @param numTokens The number of tokens to consume from the bucket, must be a positive number.
* @return {@code true} if the tokens were consumed, {@code false} otherwise.
*/
public synchronized boolean tryConsume(long numTokens) {
checkArgument(numTokens > 0, "Number of tokens to consume must be positive");
checkArgument(numTokens <= capacity, "Number of tokens to consume must be less than the capacity of the bucket.");
refill(refillStrategy.refill());
// Now try to consume some tokens
if (numTokens <= size) {
size -= numTokens;
return true;
}
return false;
}
/**
* Consume a single token from the bucket. If no token is currently available then this method will block until a
* token becomes available.
*/
public void consume() {
consume(1);
}
/**
* Consumes multiple tokens from the bucket. If enough tokens are not currently available then this method will block
* until
*
* @param numTokens The number of tokens to consume from teh bucket, must be a positive number.
*/
public void consume(long numTokens) {
while (true) {
if (tryConsume(numTokens)) {
break;
}
sleepStrategy.sleep();
}
}
/**
* Refills the bucket with the specified number of tokens. If the bucket is currently full or near capacity then
* fewer than {@code numTokens} may be added.
*
* @param numTokens The number of tokens to add to the bucket.
*/
public synchronized void refill(long numTokens) {
long newTokens = Math.min(capacity, Math.max(0, numTokens));
size = Math.max(0, Math.min(size + newTokens, capacity));
}
}