001 package edu.harvard.deas.hyperenc.vsat.server; 002 003 import java.io.BufferedReader; 004 import java.io.DataInputStream; 005 import java.io.DataOutputStream; 006 import java.io.File; 007 import java.io.FileInputStream; 008 import java.io.FileNotFoundException; 009 import java.io.FileReader; 010 import java.io.IOException; 011 import java.net.InetAddress; 012 import java.net.InetSocketAddress; 013 import java.net.ServerSocket; 014 import java.net.Socket; 015 import java.net.SocketAddress; 016 import java.net.UnknownHostException; 017 import java.util.LinkedList; 018 import java.util.List; 019 import java.util.Properties; 020 021 import org.apache.log4j.Logger; 022 023 import edu.harvard.deas.hyperenc.util.NNLookup; 024 025 /** 026 * This is a central nameserver of the PSN network. It accepts incoming requests 027 * consisting of a single 32-bit integer, the <i>PSN selection key</i>. This key 028 * is used to choose a PSN from a list of PSNs and associated keys; the PSN 029 * whose key is numerically closest to the request key is chosen. The server 030 * replies to the incoming request by sending the IP address of the chosen PSN 031 * as a sequence of 4 bytes (high octet first). 032 * <p> 033 * The server is a standalone application. When launched, it reads in a 034 * configuration file. 035 * <p> 036 * The configuration file must be a Java Properties XML file. The only required 037 * key is <code>db_file</code>, which specifies the file containing the PSN 038 * list. Optional keys include: 039 * <ul> 040 * <li><code>port</code> - the port on which to listen for incoming connections</li> 041 * <li><code>backlog</code> - the number of connections to backlog before 042 * refusing additional connections</li> 043 * <li><code>num_workers</code> - the number of simultaneous threads to spawn to 044 * service incoming requests</li> 045 * </ul> 046 * <p> 047 * The PSN list must be a text file. Each line contains an integer <i>key</i>, 048 * followed by a comma, followed by a hostname or IP address giving the location 049 * of a PSN. Incoming requests result in a PSN being chosen from this list as 050 * described above. 051 */ 052 public class VSatNameServer 053 { 054 private static final Logger logger = Logger.getLogger(VSatNameServer.class); 055 056 /** 057 * Default server port to listen to. 058 */ 059 public static final int DEFAULT_PORT = 48329; 060 061 /** 062 * Default number of connections to backlog. 063 */ 064 private static final int DEFAULT_BACKLOG = 200; 065 066 /** 067 * Default number of worker threads to launch. 068 */ 069 private static final int DEFAULT_NUM_WORKERS = 10; 070 071 /** 072 * A lookup of all registered nodes. 073 */ 074 private NNLookup<InetAddress> nodeDB; 075 076 /** 077 * A list of sockets waiting to be serviced. 078 */ 079 private List<Socket> socketQueue; 080 081 /** 082 * Whether to shut down the server. 083 */ 084 private boolean shutdown = false; 085 086 /** 087 * Thread that gets launched when program is terminated by Ctrl-C. 088 */ 089 private Thread cleanup; 090 091 /** 092 * An array of worker threads. 093 */ 094 private Thread[] workers; 095 096 /** 097 * The actual port the server is listening on 098 */ 099 private int portnum; 100 101 /** 102 * The number of pending connections to allow 103 */ 104 private int backlog; 105 106 /** 107 * The socket on which the server listens. 108 */ 109 private ServerSocket serverSocket; 110 111 /** 112 * Launches the server. Reads the configuration file, then reads a PSN list 113 * from the location specified by the config file. Then begins listening for 114 * incoming connections. 115 * 116 * @param args 117 * Requires a single command line argument: the location of the 118 * configuration file. 119 * 120 * @see java.util.Properties 121 */ 122 public static void main(String[] args) 123 { 124 if (args.length != 1) { 125 System.err.println("Usage: VSatNameServer configfile"); 126 System.exit(1); 127 } 128 129 int port = DEFAULT_PORT; 130 int backlog = DEFAULT_BACKLOG; 131 int numWorkers = DEFAULT_NUM_WORKERS; 132 Properties conf = new Properties(); 133 String dbFile = null; 134 135 // read in the configuration file 136 try { 137 conf.loadFromXML(new FileInputStream(args[0])); 138 if (conf.getProperty("port") != null) 139 port = Integer.parseInt(conf.getProperty("port")); 140 if (conf.getProperty("backlog") != null) 141 backlog = Integer.parseInt(conf.getProperty("backlog")); 142 if (conf.getProperty("num_workers") != null) 143 numWorkers = Integer.parseInt(conf.getProperty("num_workers")); 144 if (conf.getProperty("db_file") != null) 145 dbFile = conf.getProperty("db_file"); 146 } catch (IOException ex) { 147 throw new RuntimeException("Could not read configuration from " 148 + args[0]); 149 } 150 151 for (Object k : conf.keySet()) { 152 System.err.println("KEY: " + k); 153 } 154 155 // the db_file key is required 156 if (dbFile == null) { 157 System.err.println("Configuration file must contain db_file key."); 158 System.exit(1); 159 } 160 161 // create the server and register the shutdown thread 162 VSatNameServer vs = new VSatNameServer(port, backlog, numWorkers, dbFile); 163 164 System.err.println("Starting server."); 165 vs.run(); 166 } 167 168 169 /** 170 * Construct a new VSatNameServer. 171 * @param portnum Port number to run the server on 172 * @param backlog The number of parallel connections to maintain 173 * @param numWorkers The number of worker threads to create. 174 * @param dbFile The name of the file that we write the node database to. 175 */ 176 private VSatNameServer(int portnum, int backlog, int numWorkers, String dbFile) 177 { 178 this.portnum = portnum; 179 this.backlog = backlog; 180 181 logger.info("Reading PSN list"); 182 183 // load the PSN list 184 try { 185 nodeDB = readPSNList(new File(dbFile)); 186 } catch (FileNotFoundException e) { 187 System.err.println("Could not find PSN list file " + dbFile 188 + ". Please double-check your configuration file and try again."); 189 System.exit(1); 190 } catch (IOException e) { 191 logger.error(e); 192 throw new RuntimeException("Error while reading PSN list file " + dbFile); 193 } 194 195 logger.info("Creating socket queue"); 196 // where all the sockets will be stored 197 socketQueue = new LinkedList<Socket>(); 198 199 logger.info("Making cleanup thread"); 200 cleanup = new VSatServerCleanupThread(); 201 202 logger.info("Making worker threads"); 203 workers = new Thread[numWorkers]; 204 for(int i = 0; i < numWorkers; i++) 205 { 206 workers[i] = new VSatServerWorkerThread(); 207 } 208 } 209 210 211 /** 212 * Reads a file containing a list of PSNs, one per line. The ID comes first, 213 * followed by a comma, followed by a domain name or IP of the PSN. 214 * 215 * @param f the file to read 216 * @return a {@link NNLookup} containing the PSNs from the file 217 * @throws FileNotFoundException if the file <code>f</code> could not be found 218 * @throws IOException if an IOException occurs while reading <code>f</code> 219 */ 220 private static NNLookup<InetAddress> readPSNList(File f) 221 throws FileNotFoundException, IOException { 222 NNLookup<InetAddress> psnList = new NNLookup<InetAddress>(); 223 224 BufferedReader in = new BufferedReader(new FileReader(f)); 225 String line = in.readLine(); 226 while (line != null) { // not EOF 227 String[] tokens = line.split(",", 2); 228 int id = Integer.parseInt(tokens[0].trim()); 229 230 String hostname = tokens[1].trim(); 231 InetAddress ip; 232 try { 233 ip = InetAddress.getByName(hostname); 234 } catch (UnknownHostException e) { 235 logger.warn("Could not resolve hostname " + hostname + ", ID " + id); 236 continue; 237 } 238 239 psnList.add(id, ip); 240 line = in.readLine(); // read next line... 241 } 242 243 return psnList; 244 } 245 246 247 /** Start the server */ 248 private void run() { 249 // listen to the socket 250 try 251 { 252 serverSocket = new ServerSocket(); 253 SocketAddress addr = new InetSocketAddress(portnum); 254 serverSocket.setReuseAddress(true); 255 serverSocket.bind(addr, backlog); 256 } 257 catch(IOException e) 258 { 259 logger.error("Could not start server socket, aborting", e); 260 System.err.println("Could not start server socket, aborting"); 261 System.exit(2); 262 } 263 264 logger.info("Registering shutdown hook"); 265 Runtime.getRuntime().addShutdownHook(getCleanupThread()); 266 267 logger.info("Starting workers"); 268 // start the server 269 startWorkers(); 270 logger.info("Workers started"); 271 272 // loop until told to shut down. accept connections and push them 273 // on the server's queue, where they are taken out by the worker 274 // threads 275 while(!isShutdown()) 276 { 277 try 278 { 279 logger.trace("Accepting socket"); 280 Socket s = serverSocket.accept(); 281 logger.trace("Adding socket to queue"); 282 addSocket(s); 283 } 284 catch(IOException ex) 285 { 286 logger.warn("Error accepting new connection. Continuing anyway...", ex); 287 continue; 288 } 289 } 290 } 291 292 /** 293 * Return the cleanup thread. 294 * @return The cleanup thread. 295 */ 296 private Thread getCleanupThread() 297 { 298 return cleanup; 299 } 300 301 /** 302 * Launch all the worker threads. 303 */ 304 private void startWorkers() 305 { 306 for(int i = 0; i < workers.length; i++) 307 { 308 workers[i].start(); 309 logger.debug("Started worker " + (i+1)); 310 } 311 } 312 313 /** 314 * Whether or not to shut down. 315 * @return Whether we have been told to shut down. 316 */ 317 private synchronized boolean isShutdown() 318 { 319 return shutdown; 320 } 321 322 /** 323 * Shut down the server now. Only involves closing the socket. 324 */ 325 private synchronized void shutdownNow() 326 { 327 shutdown = true; 328 try { 329 serverSocket.close(); 330 } catch (IOException e) { 331 e.printStackTrace(); 332 // shutting down, so don't bother to do anything else 333 } 334 } 335 336 /** 337 * Add a socket to the server's queue. 338 * @param s The socket to add 339 */ 340 private synchronized void addSocket(Socket s) 341 { 342 socketQueue.add(s); 343 notify(); 344 } 345 346 /** 347 * Get a socket from the server's queue. 348 * @return The next socket to be serviced, or null if none in the queue. 349 */ 350 private synchronized Socket getSocket() 351 { 352 if (socketQueue.size() == 0) 353 { 354 try 355 { 356 wait(); 357 } 358 catch(InterruptedException ex) 359 { 360 return null; 361 } 362 } 363 364 return socketQueue.remove(0); 365 } 366 367 /** 368 * This thread is run when we are shutdown (probably via a SIGTERM). Invokes 369 * the shutdown method of the enclosing class. 370 */ 371 private class VSatServerCleanupThread extends Thread 372 { 373 /** 374 * Just calls shutdownNow() on the enclosing VSatNameServer. 375 */ 376 public void run() 377 { 378 logger.info("Shutting down..."); 379 shutdownNow(); 380 } 381 } 382 383 /** This thread processes TCP requests. */ 384 private class VSatServerWorkerThread extends Thread 385 { 386 387 /** 388 * Get sockets from the enclosing VSatNameServer and process them. 389 */ 390 public void run() 391 { 392 while(!isShutdown()) 393 { 394 Socket s; 395 // blocks until we get a request socket 396 s = getSocket(); 397 if (s == null) 398 continue; 399 400 logger.debug("Got a connection from " + s.getRemoteSocketAddress()); 401 try 402 { 403 DataOutputStream out = new DataOutputStream(s.getOutputStream()); 404 DataInputStream in = new DataInputStream(s.getInputStream()); 405 406 // readInt reads the int high byte first, as we require 407 int psnKey = in.readInt(); 408 logger.debug("Got a request: " + psnKey); 409 410 InetAddress requestedIP = nodeDB.get(psnKey); 411 logger.debug("Selected PSN: " + requestedIP); 412 413 if (requestedIP == null) { 414 throw new IllegalStateException("PSN list is empty!"); 415 } 416 417 out.write(requestedIP.getAddress()); 418 logger.debug("Response sent."); 419 420 out.close(); 421 in.close(); 422 s.close(); 423 } catch (IOException e) { 424 logger.warn("I/O error communicating with client", e); 425 } 426 } 427 } 428 } 429 }