Coverage Report - org.webslinger.containers.AjaxContainer
 
Classes in this File Line Coverage Branch Coverage Complexity
AjaxContainer
0%
0/24
0%
0/1
0
AjaxContainer$1
0%
0/4
N/A
0
AjaxContainer$2
0%
0/1
N/A
0
AjaxContainer$AcceptorThread
0%
0/3
N/A
0
AjaxContainer$HttpState
0%
0/121
0%
0/34
0
AjaxContainer$OpenHandler
0%
0/23
0%
0/1
0
AjaxContainer$PendingWork
0%
0/1
N/A
0
AjaxContainer$ReadResult
0%
0/1
N/A
0
AjaxContainer$SelectorThread
0%
0/48
0%
0/9
0
 
 1  
 package org.webslinger.containers;
 2  
 
 3  
 import java.io.IOException;
 4  
 import java.net.InetAddress;
 5  
 import java.net.InetSocketAddress;
 6  
 import java.net.ServerSocket;
 7  
 import java.net.Socket;
 8  
 import java.net.SocketAddress;
 9  
 import java.nio.ByteBuffer;
 10  
 import java.nio.channels.SelectionKey;
 11  
 import static java.nio.channels.SelectionKey.OP_ACCEPT;
 12  
 import static java.nio.channels.SelectionKey.OP_CONNECT;
 13  
 import static java.nio.channels.SelectionKey.OP_READ;
 14  
 import static java.nio.channels.SelectionKey.OP_WRITE;
 15  
 import java.nio.channels.SelectableChannel;
 16  
 import java.nio.channels.Selector;
 17  
 import java.nio.channels.ServerSocketChannel;
 18  
 import java.nio.channels.SocketChannel;
 19  
 import java.util.concurrent.ConcurrentLinkedQueue;
 20  
 import java.util.Iterator;
 21  
 import java.util.LinkedList;
 22  
 import java.util.HashMap;
 23  
 import java.util.HashSet;
 24  
 import java.util.List;
 25  
 import java.util.Map;
 26  
 import java.util.Queue;
 27  
 import java.util.Set;
 28  
 
 29  0
 public class AjaxContainer {
 30  0
     private final AcceptorThread acceptorThread = new AcceptorThread();
 31  
     private volatile boolean isRunning;
 32  
 
 33  0
     public AjaxContainer() throws IOException {
 34  0
     }
 35  
 
 36  
     public void start() {
 37  0
         isRunning = true;
 38  0
         acceptorThread.start();
 39  0
     }
 40  
 
 41  
     public void listenOn(SocketAddress address, final Queue<OpenHandler> handlers) throws IOException {
 42  0
         final ServerSocketChannel ssc = ServerSocketChannel.open();
 43  0
         ssc.socket().bind(address);
 44  0
         ssc.socket().setReuseAddress(true);
 45  0
         ssc.socket().setSoTimeout(60000);
 46  0
         ssc.configureBlocking(false);
 47  0
         acceptorThread.addWork(new PendingWork() {
 48  
             protected void run(SelectorThread thread, Selector selector) throws IOException {
 49  0
                 thread.handlers.put(ssc, handlers);
 50  0
                 ssc.register(selector, OP_ACCEPT, null);
 51  0
             }
 52  
         });
 53  0
     }
 54  
 
 55  
     public void stop() {
 56  0
         isRunning = false;
 57  0
         acceptorThread.interrupt();
 58  0
     }
 59  
 
 60  0
     protected abstract static class PendingWork {
 61  
         protected abstract void run(SelectorThread thread, Selector selector) throws IOException;
 62  
     }
 63  
 
 64  0
     protected abstract class SelectorThread extends Thread {
 65  0
         private final ConcurrentLinkedQueue<PendingWork> pendingWork = new ConcurrentLinkedQueue<PendingWork>();
 66  0
         private final Map<SelectableChannel, Queue<OpenHandler>> handlers = new HashMap<SelectableChannel, Queue<OpenHandler>>();
 67  
         private final Selector selector;
 68  0
         private final HashSet<SelectionKey> pausedAcceptKeys = new HashSet<SelectionKey>();
 69  
 
 70  0
         protected SelectorThread() throws IOException {
 71  0
             selector = Selector.open();
 72  0
         }
 73  
 
 74  
         protected final void addWork(PendingWork work) {
 75  0
             pendingWork.add(work);
 76  0
             selector.wakeup();
 77  0
         }
 78  
 
 79  
         public void run() {
 80  0
             while (isRunning) {
 81  
                 try {
 82  
                     while (true) {
 83  0
                         PendingWork work = pendingWork.poll();
 84  0
                         if (work == null) break;
 85  0
                         work.run(this, selector);
 86  0
                     }
 87  0
                     selector.select();
 88  0
                     Set<SelectionKey> selected = selector.selectedKeys();
 89  0
                     Iterator<SelectionKey> it = selected.iterator();
 90  0
                     while (it.hasNext()) {
 91  0
                         SelectionKey key = it.next();
 92  0
                         if (!key.isValid()) continue;
 93  0
                         it.remove();
 94  0
                         if (key.isAcceptable()) {
 95  0
                             accept(key);
 96  0
                         } else if (key.isReadable()) {
 97  0
                             OpenHandler handler = (OpenHandler) key.attachment();
 98  0
                             handler.read(key);
 99  0
                         } else if (key.isWritable()) {
 100  0
                             write(key);
 101  
                         }
 102  0
                     }
 103  0
                 } catch (IOException e) {
 104  0
                     e.printStackTrace();
 105  0
                     break;
 106  0
                 }
 107  
             }
 108  0
         }
 109  
 
 110  
         protected void accept(SelectionKey key) throws IOException {
 111  0
             Queue<OpenHandler> acceptHandlers = handlers.get(key.channel());
 112  0
             OpenHandler handler = acceptHandlers.poll();
 113  0
             if (handler == null) {
 114  0
                 System.err.println("no buffer, can't accept");
 115  0
                 key.interestOps(0);
 116  0
                 pausedAcceptKeys.add(key);
 117  
             } else {
 118  0
                 SocketChannel acceptedChannel = ((ServerSocketChannel) key.channel()).accept();
 119  0
                 if (acceptedChannel != null) {
 120  0
                     acceptedChannel.configureBlocking(false);
 121  0
                     acceptedChannel.register(selector, OP_READ, handler);
 122  
                 } else {
 123  0
                     throw new IOException("no acceptedChannel");
 124  
                 }
 125  
             }
 126  0
         }
 127  
 
 128  
         protected void write(SelectionKey key) {
 129  0
         }
 130  
     }
 131  
 
 132  
     protected final class AcceptorThread extends SelectorThread {
 133  0
         protected AcceptorThread() throws IOException {
 134  0
             super();
 135  0
         }
 136  
     }
 137  
 
 138  0
     public enum ReadResult { NEED_MORE, CLOSE, STOP };
 139  
 
 140  
     public static abstract class OpenHandler {
 141  
         protected final ByteBuffer buffer;
 142  
 
 143  0
         protected OpenHandler(int bufSize) throws IOException {
 144  0
             buffer = ByteBuffer.allocate(bufSize);
 145  0
         }
 146  
 
 147  
         public void read(SelectionKey key) throws IOException {
 148  0
             SocketChannel channel = (SocketChannel) key.channel();
 149  0
             int numRead = channel.read(buffer);
 150  0
             System.err.println("numRead=" + numRead);
 151  0
             switch (numRead) {
 152  
                 case -1:
 153  0
                     System.err.println("EOF");
 154  0
                     channel.close();
 155  0
                     key.cancel();
 156  0
                     break;
 157  
                 case 0:
 158  0
                     System.err.println("no bytes");
 159  0
                     break;
 160  
                 default:
 161  0
                     int position = buffer.position();
 162  0
                     int limit = buffer.limit();
 163  0
                     buffer.flip();
 164  0
                     switch (processRead(channel, buffer)) {
 165  
                         case NEED_MORE:
 166  0
                             break;
 167  
                         case CLOSE:
 168  0
                             channel.close();
 169  0
                             break;
 170  
                         case STOP:
 171  
                             break;
 172  
                     }
 173  0
                     buffer.position(position);
 174  0
                     buffer.limit(limit);
 175  
                     break;
 176  
             }
 177  0
         }
 178  
 
 179  
         protected abstract ReadResult processRead(SocketChannel channel, ByteBuffer buffer) throws IOException;
 180  
     }
 181  
 
 182  
     protected static class HttpState extends OpenHandler {
 183  0
         protected final LinkedList<String> headerNameValues = new LinkedList<String>();
 184  
 
 185  
         protected HttpState(int bufSize) throws IOException {
 186  0
             super(bufSize);
 187  0
         }
 188  
 
 189  
         public ReadResult processRead(SocketChannel channel, ByteBuffer buffer) throws IOException {
 190  0
             int remaining = buffer.remaining();
 191  0
             int i = buffer.position();
 192  0
             int limit = buffer.limit();
 193  0
             System.err.println("i=" + i + ", limit=" + limit + ", remaining=" + remaining);
 194  0
             byte[] bytes = buffer.array();
 195  0
             int mode = 0;
 196  0
             int methodEnd = -1;
 197  0
             int uriStart = -1, uriEnd = -1;
 198  0
             int protocolStart = -1, protocolEnd = -1;
 199  0
             int headerNameStart = -1;
 200  0
             int headerValueStart = -1, headerValueEnd = -1;
 201  0
             headerNameValues.clear();
 202  0
             while (i < limit) {
 203  0
                 byte b = bytes[i++];
 204  0
                 System.err.printf("mode=%d b=%d\n", mode, b);
 205  0
                 switch (mode) {
 206  
                     case 0:     // find space
 207  0
                         if (b == ' ') {
 208  0
                             uriStart = i;
 209  0
                             methodEnd = uriStart - 1;
 210  0
                             mode = 1;
 211  0
                         } else if (b == '\r') {
 212  0
                             return ReadResult.CLOSE;
 213  0
                         } else if (b == '\n') {
 214  0
                             return ReadResult.CLOSE;
 215  
                         }
 216  
                         break;
 217  
                     case 1:     // skip space
 218  0
                         if (b == ' ') {
 219  0
                             uriStart = i;
 220  0
                         } else if (b == '\r') {
 221  0
                             return ReadResult.CLOSE;
 222  0
                         } else if (b == '\n') {
 223  0
                             return ReadResult.CLOSE;
 224  
                         } else {
 225  0
                             mode = 2;
 226  
                         }
 227  0
                         break;
 228  
                     case 2:     // find space
 229  0
                         if (b == ' ') {
 230  0
                             protocolStart = i;
 231  0
                             uriEnd = protocolStart - 1;
 232  0
                             mode = 3;
 233  0
                         } else if (b == '\r') {
 234  0
                             return ReadResult.CLOSE;
 235  0
                         } else if (b == '\n') {
 236  0
                             return ReadResult.CLOSE;
 237  
                         }
 238  
                         break;
 239  
                     case 3:     // skip space
 240  0
                         if (b == ' ') {
 241  0
                             protocolStart = i;
 242  0
                         } else if (b == '\r') {
 243  0
                             return ReadResult.CLOSE;
 244  0
                         } else if (b == '\n') {
 245  0
                             return ReadResult.CLOSE;
 246  
                         } else {
 247  0
                             mode = 4;
 248  
                         }
 249  0
                         break;
 250  
                     case 4:     // find space
 251  0
                         if (b == '\r') {
 252  0
                             protocolEnd = i - 1;
 253  0
                             mode = 5;
 254  0
                         } else if (b == '\n') {
 255  0
                             protocolEnd = i - 1;
 256  0
                             mode = 10;
 257  
                         }
 258  
                         break;
 259  
                     case 5:
 260  0
                         mode = 10;
 261  0
                         if (b != '\n') i--;
 262  0
                         headerNameStart = i;
 263  0
                         break;
 264  
 
 265  
                     case 10:
 266  0
                         if (b < 32 || b == ' ') return ReadResult.CLOSE;
 267  0
                         if (b == ':') {
 268  0
                             System.err.println("add");
 269  0
                             headerNameValues.add(new String(bytes, headerNameStart, i - headerNameStart - 1));
 270  0
                             mode = 11;
 271  
                         }
 272  
                         break;
 273  
                     case 11:
 274  0
                         if (b == ' ' || b == '\t') {
 275  
                             // ignore
 276  0
                             break;
 277  
                         }
 278  0
                         if (b < 32) return ReadResult.CLOSE;
 279  0
                         headerValueStart = i - 1;
 280  0
                         if (b == '\r') {
 281  0
                             mode = 12;
 282  0
                         } else if (b == '\n') {
 283  0
                             mode = 13;
 284  
                         } else {
 285  0
                             mode = 14;
 286  
                         }
 287  0
                         break;
 288  
                     case 12:
 289  0
                         if (b != '\n') i--;
 290  0
                         mode = 13;
 291  0
                         break;
 292  
                     case 13:
 293  0
                         if (b == ' ' || b == '\t') {
 294  
                             // continuation
 295  0
                             mode = 14;
 296  0
                         } else if (b == '\r') {
 297  0
                             System.err.println("add");
 298  0
                             headerNameValues.add("<" + new String(bytes, headerValueStart, headerValueEnd - headerValueStart) + ">");
 299  0
                             mode = 15;
 300  0
                         } else if (b == '\n') {
 301  0
                             System.err.println("add");
 302  0
                             headerNameValues.add("(" + new String(bytes, headerValueStart, headerValueEnd - headerValueStart) + ")");
 303  0
                             mode = 20;
 304  0
                         } else if (b < 32) {
 305  0
                             return ReadResult.CLOSE;
 306  
                         } else {
 307  0
                             headerNameValues.add("{" + new String(bytes, headerValueStart, i - headerValueStart - 1) + "}");
 308  0
                             i--;
 309  0
                             mode = 10;
 310  0
                             headerNameStart = i;
 311  
                         }
 312  0
                         break;
 313  
                     case 14:
 314  0
                         if (b == '\r') {
 315  0
                             headerValueEnd = i;
 316  0
                             mode = 12;
 317  0
                         } else if (b == '\n') {
 318  0
                             headerValueEnd = i;
 319  0
                             mode = 13;
 320  0
                         } else if (b == '\t') {
 321  0
                         } else if (b < 32) {
 322  0
                             return ReadResult.CLOSE;
 323  
                         } else {
 324  
                         }
 325  
                         break;
 326  
                     case 15:
 327  0
                         if (b != '\n') i--;
 328  0
                         mode = 20;
 329  0
                         break;
 330  
                     case 20:
 331  
                         break;
 332  
                 }
 333  0
                 System.err.printf("mode=%d method[0 %d] uri[%d %d] protocol[%d %d]\n", mode, methodEnd, uriStart, uriEnd, protocolStart, protocolEnd);
 334  0
             }
 335  0
             if (mode >= 10) {
 336  0
                 System.err.printf("method[0, %d], uri[%d, %d], protocol[%d, %d]\n", methodEnd, uriStart, uriEnd, protocolStart, protocolEnd);
 337  0
                 System.err.println("method[" + new String(bytes, 0, methodEnd) + "]");
 338  0
                 System.err.println("uri[" + new String(bytes, uriStart, uriEnd - uriStart) + "]");
 339  0
                 System.err.println("protocol[" + new String(bytes, protocolStart, protocolEnd - protocolStart) + "]");
 340  0
                 for (String v: headerNameValues) {
 341  0
                     System.err.printf("\t[%s]\n", v);
 342  
                 }
 343  
             }
 344  0
             String header = new String(bytes, 0, limit);
 345  0
             System.err.println("header[" + header + "]");
 346  0
             buffer.position(i);
 347  0
             return ReadResult.NEED_MORE;
 348  
         }
 349  
     }
 350  
 
 351  
     public static void main(String[] args) throws Exception {
 352  0
         AjaxContainer ac = new AjaxContainer();
 353  0
         LinkedList<OpenHandler> handlers = new LinkedList<OpenHandler>();
 354  0
         for (int i = 0; i < 100; i++) {
 355  0
             handlers.add(new HttpState(2048));
 356  
         }
 357  0
         ac.listenOn(new InetSocketAddress(InetAddress.getLocalHost(), 8111), handlers);
 358  0
         ac.start();
 359  0
     }
 360  
     // remove()
 361  
     // offer()
 362  
 }