001    package edu.harvard.deas.hyperenc.vsat.server;
002    
003    import java.io.BufferedReader;
004    import java.io.DataInputStream;
005    import java.io.DataOutputStream;
006    import java.io.File;
007    import java.io.FileInputStream;
008    import java.io.FileNotFoundException;
009    import java.io.FileReader;
010    import java.io.IOException;
011    import java.net.InetAddress;
012    import java.net.InetSocketAddress;
013    import java.net.ServerSocket;
014    import java.net.Socket;
015    import java.net.SocketAddress;
016    import java.net.UnknownHostException;
017    import java.util.LinkedList;
018    import java.util.List;
019    import java.util.Properties;
020    
021    import org.apache.log4j.Logger;
022    
023    import edu.harvard.deas.hyperenc.util.NNLookup;
024    
025    /**
026     * This is a central nameserver of the PSN network. It accepts incoming requests
027     * consisting of a single 32-bit integer, the <i>PSN selection key</i>. This key
028     * is used to choose a PSN from a list of PSNs and associated keys; the PSN
029     * whose key is numerically closest to the request key is chosen. The server
030     * replies to the incoming request by sending the IP address of the chosen PSN
031     * as a sequence of 4 bytes (high octet first).
032     * <p>
033     * The server is a standalone application. When launched, it reads in a
034     * configuration file.
035     * <p>
036     * The configuration file must be a Java Properties XML file. The only required
037     * key is <code>db_file</code>, which specifies the file containing the PSN
038     * list. Optional keys include:
039     * <ul>
040     * <li><code>port</code> - the port on which to listen for incoming connections</li>
041     * <li><code>backlog</code> - the number of connections to backlog before
042     * refusing additional connections</li>
043     * <li><code>num_workers</code> - the number of simultaneous threads to spawn to
044     * service incoming requests</li>
045     * </ul>
046     * <p>
047     * The PSN list must be a text file. Each line contains an integer <i>key</i>,
048     * followed by a comma, followed by a hostname or IP address giving the location
049     * of a PSN. Incoming requests result in a PSN being chosen from this list as
050     * described above.
051     */
052    public class VSatNameServer
053    {
054      private static final Logger logger = Logger.getLogger(VSatNameServer.class);
055      
056      /**
057       * Default server port to listen to.
058       */
059      public static final int DEFAULT_PORT = 48329;
060    
061      /**
062       * Default number of connections to backlog.
063       */
064      private static final int DEFAULT_BACKLOG = 200;
065    
066      /**
067       * Default number of worker threads to launch.
068       */
069      private static final int DEFAULT_NUM_WORKERS = 10;
070    
071      /**
072       * A lookup of all registered nodes.
073       */
074      private NNLookup<InetAddress> nodeDB;
075    
076      /**
077       * A list of sockets waiting to be serviced.
078       */
079      private List<Socket> socketQueue;
080    
081      /**
082       * Whether to shut down the server.
083       */
084      private boolean shutdown = false;
085    
086      /**
087       * Thread that gets launched when program is terminated by Ctrl-C.
088       */
089      private Thread cleanup;
090    
091      /**
092       * An array of worker threads.
093       */
094      private Thread[] workers;
095    
096      /**
097       * The actual port the server is listening on
098       */
099      private int portnum;
100    
101      /**
102            * The number of pending connections to allow
103            */
104      private int backlog;
105      
106      /**
107       * The socket on which the server listens.
108       */
109      private ServerSocket serverSocket;
110    
111      /**
112       * Launches the server. Reads the configuration file, then reads a PSN list
113       * from the location specified by the config file. Then begins listening for
114       * incoming connections.
115       * 
116       * @param args
117       *        Requires a single command line argument: the location of the
118       *        configuration file.
119       * 
120       * @see java.util.Properties
121       */
122      public static void main(String[] args)
123      {
124        if (args.length != 1) {
125          System.err.println("Usage: VSatNameServer configfile");
126          System.exit(1);
127        }
128        
129        int port = DEFAULT_PORT;
130        int backlog = DEFAULT_BACKLOG;
131        int numWorkers = DEFAULT_NUM_WORKERS;
132        Properties conf = new Properties();
133        String dbFile = null;
134    
135        // read in the configuration file
136        try {
137          conf.loadFromXML(new FileInputStream(args[0]));
138          if (conf.getProperty("port") != null)
139            port = Integer.parseInt(conf.getProperty("port"));
140          if (conf.getProperty("backlog") != null)
141            backlog = Integer.parseInt(conf.getProperty("backlog"));
142          if (conf.getProperty("num_workers") != null)
143            numWorkers = Integer.parseInt(conf.getProperty("num_workers"));
144          if (conf.getProperty("db_file") != null)
145            dbFile = conf.getProperty("db_file");
146        } catch (IOException ex) {
147          throw new RuntimeException("Could not read configuration from "
148              + args[0]);
149        }
150        
151        for (Object k : conf.keySet()) {
152          System.err.println("KEY: " + k);
153        }
154            
155        // the db_file key is required
156        if (dbFile == null) {
157          System.err.println("Configuration file must contain db_file key.");
158          System.exit(1);
159        }
160    
161        // create the server and register the shutdown thread
162        VSatNameServer vs = new VSatNameServer(port, backlog, numWorkers, dbFile);
163    
164            System.err.println("Starting server.");
165            vs.run();
166      }
167              
168    
169      /**
170       * Construct a new VSatNameServer.
171       * @param portnum Port number to run the server on
172       * @param backlog The number of parallel connections to maintain
173       * @param numWorkers The number of worker threads to create.
174       * @param dbFile The name of the file that we write the node database to.
175       */
176      private VSatNameServer(int portnum, int backlog, int numWorkers, String dbFile)
177      {
178        this.portnum = portnum;
179        this.backlog = backlog;
180    
181        logger.info("Reading PSN list");
182        
183        // load the PSN list
184            try {
185              nodeDB = readPSNList(new File(dbFile));
186        } catch (FileNotFoundException e) {
187          System.err.println("Could not find PSN list file " + dbFile
188              + ". Please double-check your configuration file and try again.");
189          System.exit(1);
190        } catch (IOException e) {
191          logger.error(e);
192          throw new RuntimeException("Error while reading PSN list file " + dbFile);
193        }
194    
195        logger.info("Creating socket queue");
196        // where all the sockets will be stored
197        socketQueue = new LinkedList<Socket>();
198    
199        logger.info("Making cleanup thread");
200        cleanup = new VSatServerCleanupThread();
201    
202        logger.info("Making worker threads");
203        workers = new Thread[numWorkers];
204        for(int i = 0; i < numWorkers; i++)
205        {
206          workers[i] = new VSatServerWorkerThread();
207        }
208      }
209      
210      
211      /**
212       * Reads a file containing a list of PSNs, one per line. The ID comes first,
213       * followed by a comma, followed by a domain name or IP of the PSN.
214       * 
215       * @param f the file to read
216       * @return a {@link NNLookup} containing the PSNs from the file
217       * @throws FileNotFoundException if the file <code>f</code> could not be found
218       * @throws IOException if an IOException occurs while reading <code>f</code>
219       */
220      private static NNLookup<InetAddress> readPSNList(File f)
221          throws FileNotFoundException, IOException {
222        NNLookup<InetAddress> psnList = new NNLookup<InetAddress>();
223        
224        BufferedReader in = new BufferedReader(new FileReader(f));
225        String line = in.readLine();
226        while (line != null) {   // not EOF
227          String[] tokens = line.split(",", 2);
228          int id = Integer.parseInt(tokens[0].trim());
229          
230          String hostname = tokens[1].trim();
231          InetAddress ip;
232          try {
233            ip = InetAddress.getByName(hostname);
234          } catch (UnknownHostException e) {
235            logger.warn("Could not resolve hostname " + hostname + ", ID " + id);
236            continue;
237          }
238          
239          psnList.add(id, ip);
240          line = in.readLine();  // read next line...
241        }
242        
243        return psnList;
244      }
245      
246    
247      /** Start the server */
248      private void run() {
249        // listen to the socket
250        try
251        {
252          serverSocket = new ServerSocket();
253          SocketAddress addr = new InetSocketAddress(portnum);
254          serverSocket.setReuseAddress(true);
255          serverSocket.bind(addr, backlog);
256        }
257        catch(IOException e)
258        {
259          logger.error("Could not start server socket, aborting", e);
260          System.err.println("Could not start server socket, aborting");
261          System.exit(2);
262        }
263    
264        logger.info("Registering shutdown hook");
265        Runtime.getRuntime().addShutdownHook(getCleanupThread());
266    
267        logger.info("Starting workers");
268        // start the server
269        startWorkers();
270        logger.info("Workers started");
271    
272        // loop until told to shut down.  accept connections and push them
273        // on the server's queue, where they are taken out by the worker
274        // threads
275        while(!isShutdown())
276        {
277          try
278          {
279            logger.trace("Accepting socket");
280            Socket s = serverSocket.accept();
281            logger.trace("Adding socket to queue");
282            addSocket(s);
283          } 
284          catch(IOException ex)
285          { 
286            logger.warn("Error accepting new connection. Continuing anyway...", ex);
287            continue;
288          }
289        }
290      }       
291    
292      /**
293       * Return the cleanup thread.
294       * @return The cleanup thread.
295       */
296      private Thread getCleanupThread()
297      {
298        return cleanup;
299      }
300    
301      /**
302       * Launch all the worker threads.
303       */
304      private void startWorkers()
305      {
306        for(int i = 0; i < workers.length; i++)
307        {
308          workers[i].start();
309          logger.debug("Started worker " + (i+1));
310        }
311      }
312    
313      /**
314       * Whether or not to shut down.
315       * @return Whether we have been told to shut down.
316       */
317      private synchronized boolean isShutdown()
318      {
319        return shutdown;
320      }
321    
322      /**
323       * Shut down the server now. Only involves closing the socket.
324       */
325      private synchronized void shutdownNow()
326      {
327        shutdown = true;
328        try {
329          serverSocket.close();
330        } catch (IOException e) {
331          e.printStackTrace();
332          // shutting down, so don't bother to do anything else
333        }
334      }
335    
336      /**
337       * Add a socket to the server's queue.
338       * @param s The socket to add
339       */
340      private synchronized void addSocket(Socket s)
341      {
342        socketQueue.add(s);
343        notify();
344      }
345    
346      /**
347       * Get a socket from the server's queue.
348       * @return The next socket to be serviced, or null if none in the queue.
349       */
350      private synchronized Socket getSocket()
351      {
352        if (socketQueue.size() == 0)
353        {
354          try
355          {
356            wait();
357          }
358          catch(InterruptedException ex)
359          {
360            return null;
361          }
362        }
363    
364        return socketQueue.remove(0);
365      }
366    
367      /**
368       * This thread is run when we are shutdown (probably via a SIGTERM). Invokes
369       * the shutdown method of the enclosing class.
370       */
371      private class VSatServerCleanupThread extends Thread
372      {
373        /**
374         * Just calls shutdownNow() on the enclosing VSatNameServer.
375         */
376        public void run()
377        {
378          logger.info("Shutting down...");
379          shutdownNow();
380        }
381      }
382    
383      /** This thread processes TCP requests. */
384      private class VSatServerWorkerThread extends Thread
385      {
386    
387        /**
388         * Get sockets from the enclosing VSatNameServer and process them.
389         */
390        public void run()
391        {
392          while(!isShutdown())
393          {
394            Socket s;
395            // blocks until we get a request socket
396            s = getSocket();
397            if (s == null)
398              continue;
399    
400            logger.debug("Got a connection from " + s.getRemoteSocketAddress());
401            try
402            {
403              DataOutputStream out = new DataOutputStream(s.getOutputStream());
404              DataInputStream in = new DataInputStream(s.getInputStream());
405              
406              // readInt reads the int high byte first, as we require
407              int psnKey = in.readInt();
408              logger.debug("Got a request: " + psnKey);
409    
410              InetAddress requestedIP = nodeDB.get(psnKey);
411              logger.debug("Selected PSN: " + requestedIP);
412                            
413              if (requestedIP == null) {
414                throw new IllegalStateException("PSN list is empty!");
415              }
416              
417              out.write(requestedIP.getAddress());
418              logger.debug("Response sent.");
419    
420              out.close();
421              in.close();
422              s.close();
423            } catch (IOException e) {
424              logger.warn("I/O error communicating with client", e);
425            }
426          }
427        }
428      }
429    }