| 1 | |
package org.webslinger.util; |
| 2 | |
|
| 3 | |
import java.io.IOException; |
| 4 | |
import java.lang.management.ManagementFactory; |
| 5 | |
import java.util.Iterator; |
| 6 | |
import java.util.concurrent.Callable; |
| 7 | |
import java.util.concurrent.ConcurrentHashMap; |
| 8 | |
import java.util.concurrent.ExecutionException; |
| 9 | |
import java.util.concurrent.CompletionService; |
| 10 | |
import java.util.concurrent.Delayed; |
| 11 | |
import java.util.concurrent.DelayQueue; |
| 12 | |
import java.util.concurrent.Future; |
| 13 | |
import java.util.concurrent.FutureTask; |
| 14 | |
import java.util.concurrent.ScheduledExecutorService; |
| 15 | |
import java.util.concurrent.TimeUnit; |
| 16 | |
import java.util.concurrent.atomic.AtomicBoolean; |
| 17 | |
import java.util.concurrent.atomic.AtomicInteger; |
| 18 | |
import java.util.concurrent.atomic.AtomicReference; |
| 19 | |
|
| 20 | |
import org.webslinger.lang.ExecutionPool; |
| 21 | |
import org.webslinger.lang.ObjectWrapper; |
| 22 | |
import org.webslinger.io.IOUtil; |
| 23 | |
|
| 24 | 9716 | public abstract class TTLObject<T> implements ObjectWrapper<T> { |
| 25 | |
static { |
| 26 | 1 | int workerCount = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors(); |
| 27 | 5 | for (int i = 0; i < workerCount; i++) { |
| 28 | 4 | Thread t = new Thread() { |
| 29 | |
public void run() { |
| 30 | |
try { |
| 31 | |
while (true) { |
| 32 | 5230 | delayQueue.take().run(); |
| 33 | |
} |
| 34 | 0 | } catch (InterruptedException e) { |
| 35 | 0 | e.printStackTrace(); |
| 36 | |
} |
| 37 | 0 | } |
| 38 | |
}; |
| 39 | 4 | t.setDaemon(true); |
| 40 | 4 | t.setName("TTLObject(" + i + ") pulse"); |
| 41 | 4 | t.start(); |
| 42 | |
} |
| 43 | |
} |
| 44 | 1 | private static final ScheduledExecutorService updateExecutor = ExecutionPool.getNewOptimalExecutor("TTLObject(async-update)"); |
| 45 | 1 | private static final DelayQueue<Pulse> delayQueue = new DelayQueue<Pulse>(); |
| 46 | |
|
| 47 | 1 | private static final ConcurrentHashMap<String, Long> ttls = new ConcurrentHashMap<String, Long>(); |
| 48 | |
|
| 49 | |
public static void setDefaultTTLForClass(Class c, long ttl) { |
| 50 | 21 | ttls.putIfAbsent(c.getName(), ttl); |
| 51 | 21 | } |
| 52 | |
|
| 53 | |
public static void setTTLForClass(Class c, long ttl) { |
| 54 | 0 | ttls.put(c.getName(), ttl); |
| 55 | 0 | } |
| 56 | |
|
| 57 | |
public static long getTTLForClass(Class c) { |
| 58 | 4981 | Class ptr = c; |
| 59 | 4980 | Long ttl = null; |
| 60 | 10346 | while (ttl == null && ptr != null) { |
| 61 | 5365 | ttl = ttls.get(ptr.getName()); |
| 62 | 5364 | ptr = ptr.getSuperclass(); |
| 63 | |
} |
| 64 | 4980 | if (ttl != null) return ttl.longValue(); |
| 65 | 1 | throw new IllegalArgumentException("No TTL defined for " + c.getName()); |
| 66 | |
} |
| 67 | |
|
| 68 | 1 | private static final ConcurrentHashMap<String, Boolean> inForeground = new ConcurrentHashMap<String, Boolean>(); |
| 69 | 1 | private static final AtomicBoolean defaultForeground = new AtomicBoolean(true); |
| 70 | |
|
| 71 | |
public static boolean getDefaultForeground() { |
| 72 | 0 | return defaultForeground.get(); |
| 73 | |
} |
| 74 | |
|
| 75 | |
public static void setDefaultForeground(boolean def) { |
| 76 | 0 | defaultForeground.set(def); |
| 77 | 0 | } |
| 78 | |
|
| 79 | |
public static void setDefaultForegroundForClass(Class c, boolean foreground) { |
| 80 | 0 | inForeground.putIfAbsent(c.getName(), foreground); |
| 81 | 0 | } |
| 82 | |
|
| 83 | |
public static void setForegroundForClass(Class c, boolean foreground) { |
| 84 | 0 | inForeground.put(c.getName(), foreground); |
| 85 | 0 | } |
| 86 | |
|
| 87 | |
public static boolean getForegroundForClass(Class c) { |
| 88 | 5224 | Class ptr = c; |
| 89 | 5224 | Boolean foreground = null; |
| 90 | 25131 | while (foreground == null && ptr != null) { |
| 91 | 19906 | foreground = inForeground.get(ptr.getName()); |
| 92 | 19907 | ptr = ptr.getSuperclass(); |
| 93 | |
} |
| 94 | 5224 | if (foreground != null) return foreground.booleanValue(); |
| 95 | 5224 | return defaultForeground.get(); |
| 96 | |
} |
| 97 | |
|
| 98 | |
public static void pulseAll() { |
| 99 | 0 | Iterator<Pulse> it = delayQueue.iterator(); |
| 100 | 0 | while (it.hasNext()) { |
| 101 | 0 | Pulse pulse = it.next(); |
| 102 | 0 | it.remove(); |
| 103 | 0 | pulse.run(); |
| 104 | 0 | } |
| 105 | 0 | } |
| 106 | |
|
| 107 | 9 | private enum State { INVALID, REGEN, REGENERATING, GENERATE, GENERATING, VALID, ERROR } |
| 108 | 4489 | protected final AtomicReference<ValueAndState> object = new AtomicReference<ValueAndState>(new ValueAndState(null, null, State.INVALID, 0, null, null)); |
| 109 | 4488 | protected final AtomicInteger serial = new AtomicInteger(); |
| 110 | |
|
| 111 | |
private final class ValueAndState { |
| 112 | |
protected final T value; |
| 113 | |
protected final FutureTask<T> future; |
| 114 | |
protected final State state; |
| 115 | |
protected final int serial; |
| 116 | |
protected final Throwable t; |
| 117 | |
protected final Pulse pulse; |
| 118 | |
|
| 119 | 20638 | protected ValueAndState(T value, FutureTask<T> future, State state, int serial, Throwable t, Pulse pulse) { |
| 120 | 20636 | this.value = value; |
| 121 | 20634 | this.future = future; |
| 122 | 20633 | this.state = state; |
| 123 | 20637 | this.serial = serial; |
| 124 | 20632 | this.t = t; |
| 125 | 20631 | this.pulse = pulse; |
| 126 | 20631 | } |
| 127 | |
|
| 128 | |
protected ValueAndState refresh(State nextState) { |
| 129 | 5232 | return new ValueAndState(value, future, nextState, serial, null, null); |
| 130 | |
} |
| 131 | |
|
| 132 | |
protected ValueAndState valid(T value) { |
| 133 | 5479 | return new ValueAndState(value, null, State.VALID, TTLObject.this.serial.incrementAndGet(), null, new Pulse(TTLObject.this)); |
| 134 | |
} |
| 135 | |
|
| 136 | |
protected ValueAndState submit(final T oldValue, State state) { |
| 137 | 5434 | return new ValueAndState(value, createTask(oldValue), state, serial, null, null); |
| 138 | |
} |
| 139 | |
|
| 140 | |
protected FutureTask<T> createTask(final T oldValue) { |
| 141 | 5434 | return new FutureTask<T>(new Callable<T>() { |
| 142 | |
public T call() throws Exception { |
| 143 | 5433 | return load(oldValue, serial); |
| 144 | |
} |
| 145 | |
}); |
| 146 | |
} |
| 147 | |
|
| 148 | |
protected ValueAndState error(Throwable t) { |
| 149 | 5 | return new ValueAndState(null, null, State.ERROR, TTLObject.this.serial.incrementAndGet(), t, pulse); |
| 150 | |
} |
| 151 | |
} |
| 152 | |
|
| 153 | 4488 | protected final static class Pulse implements Delayed, Runnable { |
| 154 | |
protected final long expireTime; |
| 155 | |
protected final TTLObject<?> ttlObject; |
| 156 | |
|
| 157 | 5479 | protected Pulse(TTLObject<?> ttlObject) { |
| 158 | 5479 | this.ttlObject = ttlObject; |
| 159 | 5478 | expireTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(ttlObject.getTTL(), TimeUnit.MILLISECONDS); |
| 160 | 5478 | } |
| 161 | |
|
| 162 | |
public void run() { |
| 163 | 5232 | ttlObject.refresh(); |
| 164 | 5229 | } |
| 165 | |
|
| 166 | |
public long getDelay(TimeUnit unit) { |
| 167 | 12721 | return unit.convert(expireTime - System.nanoTime(), TimeUnit.NANOSECONDS); |
| 168 | |
} |
| 169 | |
|
| 170 | |
public int compareTo(Delayed other) { |
| 171 | 85370 | long r = (expireTime - ((Pulse) other).expireTime); |
| 172 | 85370 | if (r < 0) return -1; |
| 173 | 61597 | if (r > 0) return 1; |
| 174 | 0 | return 0; |
| 175 | |
} |
| 176 | |
} |
| 177 | |
|
| 178 | |
protected void refresh() { |
| 179 | |
ValueAndState container; |
| 180 | 5232 | ValueAndState nextContainer = null; |
| 181 | |
do { |
| 182 | 5232 | container = object.get(); |
| 183 | 5232 | switch (container.state) { |
| 184 | |
case INVALID: |
| 185 | 0 | nextContainer = container.refresh(State.GENERATE); |
| 186 | 0 | break; |
| 187 | |
case REGENERATING: |
| 188 | 0 | nextContainer = container.refresh(State.REGEN); |
| 189 | 0 | break; |
| 190 | |
case GENERATING: |
| 191 | 0 | nextContainer = container.refresh(State.GENERATE); |
| 192 | 0 | break; |
| 193 | |
case VALID: |
| 194 | 5232 | nextContainer = container.refresh(getForeground() ? State.GENERATE : State.REGEN); |
| 195 | 5224 | break; |
| 196 | |
case ERROR: |
| 197 | 0 | nextContainer = container.refresh(State.GENERATE); |
| 198 | 0 | break; |
| 199 | |
case REGEN: |
| 200 | 0 | return; |
| 201 | |
case GENERATE: |
| 202 | 0 | return; |
| 203 | |
} |
| 204 | 5214 | } while (!object.compareAndSet(container, nextContainer)); |
| 205 | 5227 | cancelFuture(container); |
| 206 | 5229 | } |
| 207 | |
|
| 208 | |
public final int getSerial() { |
| 209 | 882 | return object.get().serial; |
| 210 | |
} |
| 211 | |
|
| 212 | |
public final boolean checkSerial(int serial) throws IOException { |
| 213 | 3560 | return object.get().serial != serial; |
| 214 | |
} |
| 215 | |
|
| 216 | |
protected final void setObject(final T newObject) { |
| 217 | |
ValueAndState container, nextContainer; |
| 218 | |
State nextState; |
| 219 | |
do { |
| 220 | 40 | container = object.get(); |
| 221 | 40 | nextContainer = container.valid(newObject); |
| 222 | 40 | } while (!object.compareAndSet(container, nextContainer)); |
| 223 | 40 | cancelFuture(container); |
| 224 | 40 | delayQueue.put(nextContainer.pulse); |
| 225 | 40 | } |
| 226 | |
|
| 227 | |
private void cancelFuture(ValueAndState container) { |
| 228 | 5257 | delayQueue.remove(container.pulse); |
| 229 | 5272 | switch (container.state) { |
| 230 | |
case REGENERATING: |
| 231 | |
case GENERATING: |
| 232 | 1 | container.future.cancel(false); |
| 233 | |
break; |
| 234 | |
} |
| 235 | 5271 | } |
| 236 | |
|
| 237 | |
public final T getObject() throws IOException { |
| 238 | |
do { |
| 239 | |
try { |
| 240 | 40559 | return getObjectInternal(); |
| 241 | 5 | } catch (Throwable e) { |
| 242 | 5 | return IOUtil.<T>checkException(e); |
| 243 | |
} |
| 244 | |
} while (true); |
| 245 | |
} |
| 246 | |
|
| 247 | |
private final T getObjectInternal() throws Throwable { |
| 248 | |
ValueAndState container; |
| 249 | 40568 | ValueAndState nextContainer = null; |
| 250 | |
do { |
| 251 | |
do { |
| 252 | 51455 | container = object.get(); |
| 253 | |
try { |
| 254 | |
try { |
| 255 | 51458 | switch (container.state) { |
| 256 | |
case INVALID: |
| 257 | 3483 | nextContainer = container.submit(getInitial(), State.GENERATING); |
| 258 | 3483 | break; |
| 259 | |
case REGENERATING: |
| 260 | 8 | if (!container.future.isDone()) { |
| 261 | 4 | return container.value; |
| 262 | |
} |
| 263 | 4 | nextContainer = container.valid(container.future.get()); |
| 264 | 3 | break; |
| 265 | |
case GENERATING: |
| 266 | 5439 | nextContainer = container.valid(container.future.get()); |
| 267 | 5434 | break; |
| 268 | |
case VALID: |
| 269 | 40558 | return container.value; |
| 270 | |
case ERROR: |
| 271 | 1 | nextContainer = container.submit(container.value, State.GENERATING); |
| 272 | 1 | break; |
| 273 | |
case REGEN: |
| 274 | 4 | nextContainer = container.submit(container.value, State.REGENERATING); |
| 275 | 4 | break; |
| 276 | |
case GENERATE: |
| 277 | 1946 | nextContainer = container.submit(container.value, State.GENERATING); |
| 278 | |
break; |
| 279 | |
} |
| 280 | 4 | } catch (ExecutionException e) { |
| 281 | 4 | throw e.getCause(); |
| 282 | 10873 | } |
| 283 | 5 | } catch (Throwable t) { |
| 284 | 5 | nextContainer = container.error(t); |
| 285 | 10872 | } |
| 286 | 10878 | } while (!object.compareAndSet(container, nextContainer)); |
| 287 | 10865 | switch (nextContainer.state) { |
| 288 | |
case GENERATING: |
| 289 | 5429 | nextContainer.future.run(); |
| 290 | 5429 | break; |
| 291 | |
case REGENERATING: |
| 292 | 4 | updateExecutor.submit(nextContainer.future); |
| 293 | 4 | break; |
| 294 | |
case VALID: |
| 295 | 5427 | delayQueue.remove(container.pulse); |
| 296 | 5427 | delayQueue.put(nextContainer.pulse); |
| 297 | 5427 | break; |
| 298 | |
case ERROR: |
| 299 | 5 | throw nextContainer.t; |
| 300 | |
} |
| 301 | 10860 | } while (true); |
| 302 | |
} |
| 303 | |
|
| 304 | |
protected T getInitial() throws Exception { |
| 305 | 3007 | return null; |
| 306 | |
} |
| 307 | |
|
| 308 | |
protected T load(T old) throws IOException { |
| 309 | 0 | throw new AbstractMethodError(); |
| 310 | |
} |
| 311 | |
|
| 312 | |
protected T load(T old, int serial) throws IOException { |
| 313 | 2545 | return load(old); |
| 314 | |
} |
| 315 | |
|
| 316 | |
protected boolean getForeground() { |
| 317 | 5224 | return getForegroundForClass(getClass()); |
| 318 | |
} |
| 319 | |
|
| 320 | |
protected long getTTL() { |
| 321 | 4914 | return getTTLForClass(getClass()); |
| 322 | |
} |
| 323 | |
} |