Coverage Report - org.webslinger.util.TTLObject
 
Classes in this File Line Coverage Branch Coverage Complexity
TTLObject
77%
92/119
92%
11/12
0
TTLObject$1
40%
2/5
N/A
0
TTLObject$2
100%
1/1
N/A
0
TTLObject$Pulse
92%
11/12
100%
2/2
0
TTLObject$State
100%
1/1
N/A
0
TTLObject$ValueAndState
100%
13/13
N/A
0
TTLObject$ValueAndState$1
100%
2/2
N/A
0
 
 1  
 package org.webslinger.util;
 2  
 
 3  
 import java.io.IOException;
 4  
 import java.lang.management.ManagementFactory;
 5  
 import java.util.Iterator;
 6  
 import java.util.concurrent.Callable;
 7  
 import java.util.concurrent.ConcurrentHashMap;
 8  
 import java.util.concurrent.ExecutionException;
 9  
 import java.util.concurrent.CompletionService;
 10  
 import java.util.concurrent.Delayed;
 11  
 import java.util.concurrent.DelayQueue;
 12  
 import java.util.concurrent.Future;
 13  
 import java.util.concurrent.FutureTask;
 14  
 import java.util.concurrent.ScheduledExecutorService;
 15  
 import java.util.concurrent.TimeUnit;
 16  
 import java.util.concurrent.atomic.AtomicBoolean;
 17  
 import java.util.concurrent.atomic.AtomicInteger;
 18  
 import java.util.concurrent.atomic.AtomicReference;
 19  
 
 20  
 import org.webslinger.lang.ExecutionPool;
 21  
 import org.webslinger.lang.ObjectWrapper;
 22  
 import org.webslinger.io.IOUtil;
 23  
 
 24  9716
 public abstract class TTLObject<T> implements ObjectWrapper<T> {
 25  
     static {
 26  1
         int workerCount = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors();
 27  5
         for (int i = 0; i < workerCount; i++) {
 28  4
             Thread t = new Thread() {
 29  
                 public void run() {
 30  
                     try {
 31  
                         while (true) {
 32  5230
                             delayQueue.take().run();
 33  
                         }
 34  0
                     } catch (InterruptedException e) {
 35  0
                         e.printStackTrace();
 36  
                     }
 37  0
                 }
 38  
             };
 39  4
             t.setDaemon(true);
 40  4
             t.setName("TTLObject(" + i + ") pulse");
 41  4
             t.start();
 42  
         }
 43  
     }
 44  1
     private static final ScheduledExecutorService updateExecutor = ExecutionPool.getNewOptimalExecutor("TTLObject(async-update)");
 45  1
     private static final DelayQueue<Pulse> delayQueue = new DelayQueue<Pulse>();
 46  
 
 47  1
     private static final ConcurrentHashMap<String, Long> ttls = new ConcurrentHashMap<String, Long>();
 48  
 
 49  
     public static void setDefaultTTLForClass(Class c, long ttl) {
 50  21
         ttls.putIfAbsent(c.getName(), ttl);
 51  21
     }
 52  
 
 53  
     public static void setTTLForClass(Class c, long ttl) {
 54  0
         ttls.put(c.getName(), ttl);
 55  0
     }
 56  
 
 57  
     public static long getTTLForClass(Class c) {
 58  4981
         Class ptr = c;
 59  4980
         Long ttl = null;
 60  10346
         while (ttl == null && ptr != null) {
 61  5365
             ttl = ttls.get(ptr.getName());
 62  5364
             ptr = ptr.getSuperclass();
 63  
         }
 64  4980
         if (ttl != null) return ttl.longValue();
 65  1
         throw new IllegalArgumentException("No TTL defined for " + c.getName());
 66  
     }
 67  
 
 68  1
     private static final ConcurrentHashMap<String, Boolean> inForeground = new ConcurrentHashMap<String, Boolean>();
 69  1
     private static final AtomicBoolean defaultForeground = new AtomicBoolean(true);
 70  
 
 71  
     public static boolean getDefaultForeground() {
 72  0
         return defaultForeground.get();
 73  
     }
 74  
 
 75  
     public static void setDefaultForeground(boolean def) {
 76  0
         defaultForeground.set(def);
 77  0
     }
 78  
 
 79  
     public static void setDefaultForegroundForClass(Class c, boolean foreground) {
 80  0
         inForeground.putIfAbsent(c.getName(), foreground);
 81  0
     }
 82  
 
 83  
     public static void setForegroundForClass(Class c, boolean foreground) {
 84  0
         inForeground.put(c.getName(), foreground);
 85  0
     }
 86  
 
 87  
     public static boolean getForegroundForClass(Class c) {
 88  5224
         Class ptr = c;
 89  5224
         Boolean foreground = null;
 90  25131
         while (foreground == null && ptr != null) {
 91  19906
             foreground = inForeground.get(ptr.getName());
 92  19907
             ptr = ptr.getSuperclass();
 93  
         }
 94  5224
         if (foreground != null) return foreground.booleanValue();
 95  5224
         return defaultForeground.get();
 96  
     }
 97  
 
 98  
     public static void pulseAll() {
 99  0
         Iterator<Pulse> it = delayQueue.iterator();
 100  0
         while (it.hasNext()) {
 101  0
             Pulse pulse = it.next();
 102  0
             it.remove();
 103  0
             pulse.run();
 104  0
         }
 105  0
     }
 106  
 
 107  9
     private enum State { INVALID, REGEN, REGENERATING, GENERATE, GENERATING, VALID, ERROR }
 108  4489
     protected final AtomicReference<ValueAndState> object = new AtomicReference<ValueAndState>(new ValueAndState(null, null, State.INVALID, 0, null, null));
 109  4488
     protected final AtomicInteger serial = new AtomicInteger();
 110  
 
 111  
     private final class ValueAndState {
 112  
         protected final T value;
 113  
         protected final FutureTask<T> future;
 114  
         protected final State state;
 115  
         protected final int serial;
 116  
         protected final Throwable t;
 117  
         protected final Pulse pulse;
 118  
 
 119  20638
         protected ValueAndState(T value, FutureTask<T> future, State state, int serial, Throwable t, Pulse pulse) {
 120  20636
             this.value = value;
 121  20634
             this.future = future;
 122  20633
             this.state = state;
 123  20637
             this.serial = serial;
 124  20632
             this.t = t;
 125  20631
             this.pulse = pulse;
 126  20631
         }
 127  
 
 128  
         protected ValueAndState refresh(State nextState) {
 129  5232
             return new ValueAndState(value, future, nextState, serial, null, null);
 130  
         }
 131  
 
 132  
         protected ValueAndState valid(T value) {
 133  5479
             return new ValueAndState(value, null, State.VALID, TTLObject.this.serial.incrementAndGet(), null, new Pulse(TTLObject.this));
 134  
         }
 135  
 
 136  
         protected ValueAndState submit(final T oldValue, State state) {
 137  5434
             return new ValueAndState(value, createTask(oldValue), state, serial, null, null);
 138  
         }
 139  
 
 140  
         protected FutureTask<T> createTask(final T oldValue) {
 141  5434
             return new FutureTask<T>(new Callable<T>() {
 142  
                 public T call() throws Exception {
 143  5433
                     return load(oldValue, serial);
 144  
                 }
 145  
             });
 146  
         }
 147  
 
 148  
         protected ValueAndState error(Throwable t) {
 149  5
             return new ValueAndState(null, null, State.ERROR, TTLObject.this.serial.incrementAndGet(), t, pulse);
 150  
         }
 151  
     }
 152  
 
 153  4488
     protected final static class Pulse implements Delayed, Runnable {
 154  
         protected final long expireTime;
 155  
         protected final TTLObject<?> ttlObject;
 156  
 
 157  5479
         protected Pulse(TTLObject<?> ttlObject) {
 158  5479
             this.ttlObject = ttlObject;
 159  5478
             expireTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(ttlObject.getTTL(), TimeUnit.MILLISECONDS);
 160  5478
         }
 161  
 
 162  
         public void run() {
 163  5232
             ttlObject.refresh();
 164  5229
         }
 165  
 
 166  
         public long getDelay(TimeUnit unit) {
 167  12721
             return unit.convert(expireTime - System.nanoTime(), TimeUnit.NANOSECONDS);
 168  
         }
 169  
 
 170  
         public int compareTo(Delayed other) {
 171  85370
             long r = (expireTime - ((Pulse) other).expireTime);
 172  85370
             if (r < 0) return -1;
 173  61597
             if (r > 0) return 1;
 174  0
             return 0;
 175  
         }
 176  
     }
 177  
 
 178  
     protected void refresh() {
 179  
         ValueAndState container;
 180  5232
         ValueAndState nextContainer = null;
 181  
         do {
 182  5232
             container = object.get();
 183  5232
             switch (container.state) {
 184  
                 case INVALID:
 185  0
                     nextContainer = container.refresh(State.GENERATE);
 186  0
                     break;
 187  
                 case REGENERATING:
 188  0
                     nextContainer = container.refresh(State.REGEN);
 189  0
                     break;
 190  
                 case GENERATING:
 191  0
                     nextContainer = container.refresh(State.GENERATE);
 192  0
                     break;
 193  
                 case VALID:
 194  5232
                     nextContainer = container.refresh(getForeground() ? State.GENERATE : State.REGEN);
 195  5224
                     break;
 196  
                 case ERROR:
 197  0
                     nextContainer = container.refresh(State.GENERATE);
 198  0
                     break;
 199  
                 case REGEN:
 200  0
                     return;
 201  
                 case GENERATE:
 202  0
                     return;
 203  
             }
 204  5214
         } while (!object.compareAndSet(container, nextContainer));
 205  5227
         cancelFuture(container);
 206  5229
     }
 207  
 
 208  
     public final int getSerial() {
 209  882
         return object.get().serial;
 210  
     }
 211  
 
 212  
     public final boolean checkSerial(int serial) throws IOException {
 213  3560
         return object.get().serial != serial;
 214  
     }
 215  
 
 216  
     protected final void setObject(final T newObject) {
 217  
         ValueAndState container, nextContainer;
 218  
         State nextState;
 219  
         do {
 220  40
             container = object.get();
 221  40
             nextContainer = container.valid(newObject);
 222  40
         } while (!object.compareAndSet(container, nextContainer));
 223  40
         cancelFuture(container);
 224  40
         delayQueue.put(nextContainer.pulse);
 225  40
     }
 226  
 
 227  
     private void cancelFuture(ValueAndState container) {
 228  5257
         delayQueue.remove(container.pulse);
 229  5272
         switch (container.state) {
 230  
             case REGENERATING:
 231  
             case GENERATING:
 232  1
                 container.future.cancel(false);
 233  
                 break;
 234  
         }
 235  5271
     }
 236  
 
 237  
     public final T getObject() throws IOException {
 238  
         do {
 239  
             try {
 240  40559
                 return getObjectInternal();
 241  5
             } catch (Throwable e) {
 242  5
                 return IOUtil.<T>checkException(e);
 243  
             }
 244  
         } while (true);
 245  
     }
 246  
 
 247  
     private final T getObjectInternal() throws Throwable {
 248  
         ValueAndState container;
 249  40568
         ValueAndState nextContainer = null;
 250  
         do {
 251  
             do {
 252  51455
                 container = object.get();
 253  
                 try {
 254  
                     try {
 255  51458
                         switch (container.state) {
 256  
                             case INVALID:
 257  3483
                                 nextContainer = container.submit(getInitial(), State.GENERATING);
 258  3483
                                 break;
 259  
                             case REGENERATING:
 260  8
                                 if (!container.future.isDone()) {
 261  4
                                     return container.value;
 262  
                                 }
 263  4
                                 nextContainer = container.valid(container.future.get());
 264  3
                                 break;
 265  
                             case GENERATING:
 266  5439
                                 nextContainer = container.valid(container.future.get());
 267  5434
                                 break;
 268  
                             case VALID:
 269  40558
                                 return container.value;
 270  
                             case ERROR:
 271  1
                                 nextContainer = container.submit(container.value, State.GENERATING);
 272  1
                                 break;
 273  
                             case REGEN:
 274  4
                                 nextContainer = container.submit(container.value, State.REGENERATING);
 275  4
                                 break;
 276  
                             case GENERATE:
 277  1946
                                 nextContainer = container.submit(container.value, State.GENERATING);
 278  
                                 break;
 279  
                         }
 280  4
                     } catch (ExecutionException e) {
 281  4
                         throw e.getCause();
 282  10873
                     }
 283  5
                 } catch (Throwable t) {
 284  5
                     nextContainer = container.error(t);
 285  10872
                 }
 286  10878
             } while (!object.compareAndSet(container, nextContainer));
 287  10865
             switch (nextContainer.state) {
 288  
                 case GENERATING:
 289  5429
                     nextContainer.future.run();
 290  5429
                     break;
 291  
                 case REGENERATING:
 292  4
                     updateExecutor.submit(nextContainer.future);
 293  4
                     break;
 294  
                 case VALID:
 295  5427
                     delayQueue.remove(container.pulse);
 296  5427
                     delayQueue.put(nextContainer.pulse);
 297  5427
                     break;
 298  
                 case ERROR:
 299  5
                     throw nextContainer.t;
 300  
             }
 301  10860
         } while (true);
 302  
     }
 303  
 
 304  
     protected T getInitial() throws Exception {
 305  3007
         return null;
 306  
     }
 307  
 
 308  
     protected T load(T old) throws IOException {
 309  0
         throw new AbstractMethodError();
 310  
     }
 311  
 
 312  
     protected T load(T old, int serial) throws IOException {
 313  2545
         return load(old);
 314  
     }
 315  
 
 316  
     protected boolean getForeground() {
 317  5224
         return getForegroundForClass(getClass());
 318  
     }
 319  
 
 320  
     protected long getTTL() {
 321  4914
         return getTTLForClass(getClass());
 322  
     }
 323  
 }