Coverage report

  %line %branch
net.sf.infrared.collector.impl.transport.AgentListener
38% 
76% 

 1  
 /* 
 2  
  * Copyright 2005 Tavant Technologies and Contributors
 3  
  * 
 4  
  * Licensed under the Apache License, Version 2.0 (the "License")
 5  
  * you may not use this file except in compliance with the License.
 6  
  * You may obtain a copy of the License at
 7  
  *
 8  
  *     http://www.apache.org/licenses/LICENSE-2.0
 9  
  *
 10  
  * Unless required by applicable law or agreed to in writing, software
 11  
  * distributed under the License is distributed on an "AS IS" BASIS,
 12  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
  * See the License for the specific language governing permissions and
 14  
  * limitations under the License.                                                                                               
 15  
  * 
 16  
  *
 17  
  *
 18  
  * Original Author:  kaushal.kumar (Tavant Technologies)
 19  
  * Contributor(s):   binil.thomas (Tavant Technologies);
 20  
  *
 21  
  * $Id: InfraRedListener.java,v 1.1 2005/09/20 16:22:06 kamal_gs Exp $
 22  
  *
 23  
  * Changes:
 24  
  * --------
 25  
  *
 26  
  */
 27  
 
 28  
 package net.sf.infrared.collector.impl.transport;
 29  
 
 30  
 import org.apache.log4j.Logger;
 31  
 
 32  
 import net.sf.infrared.base.util.LoggingFactory;
 33  
 import net.sf.infrared.collector.CollectorConfig;
 34  
 import net.sf.infrared.collector.StatisticsRepository;
 35  
 
 36  
 import java.net.BindException;
 37  
 import java.net.Socket;
 38  
 import java.net.ServerSocket;
 39  
 import java.util.ArrayList;
 40  
 import java.util.Collections;
 41  
 import java.util.Iterator;
 42  
 import java.util.List;
 43  
 import java.io.IOException;
 44  
 
 45  
 /**
 46  
  * This thread listens on a server socket for connections from remote agents.
 47  
  * This thread is initiated by the InfraRedListenerServlet. <br>
 48  
  * Once an agent connects to the server socket, a new thread
 49  
  * (InfraREDSocketNode) is spawned and further communication with that agent
 50  
  * happens on that thread.
 51  
  */
 52  
 public class AgentListener extends Thread {
 53  
     private static final int MAX_RETRIES = 5;
 54  
 
 55  
     private static final int WAIT_IN_MINS = 10;
 56  
 
 57  2
     private static final Logger log = LoggingFactory.getLogger(AgentListener.class.getName());
 58  
 
 59  
     private int port;
 60  
 
 61  
     private ServerSocket server;
 62  
 
 63  2
     private volatile boolean shutdownRequested = false;
 64  
 
 65  2
     private List agents = new ArrayList();
 66  
 
 67  2
     private int failCount = 0;
 68  
 
 69  
     private StatisticsRepository statsRepository;
 70  
 
 71  
     /**
 72  
      * Creates a new listener. The listener needs to be started using the
 73  
      * start() method
 74  
      * 
 75  
      * @throws IOException
 76  
      *             if there is any errors in creating a listener on the
 77  
      *             specified port
 78  
      */
 79  
     public AgentListener(CollectorConfig cfg, StatisticsRepository statsRepository)
 80  
             throws IOException {
 81  0
         this(cfg.getAgentListenPort());
 82  0
         this.statsRepository = statsRepository;
 83  0
         setName("InfraRED-Agent-Listener");
 84  0
         setDaemon(true);
 85  0
     }
 86  
     
 87  2
     AgentListener(int port) throws IOException {
 88  2
         server = createServerSocket(port);
 89  2
     }
 90  
 
 91  
     /**
 92  
      * Waits for agents to connect. When a connection is obtained, further
 93  
      * communication with that agent is spawned on a new thread. <br>
 94  
      * If the listener fails, the following strategy is employed to recover:
 95  
      * <ul>
 96  
      * <li> 1) Upto MAX_RETIES consecutive failures are ignored </li>
 97  
      * <li> 2) On the next failure, attempt to create a new listener is made
 98  
      * </li>
 99  
      * <li> 3) If a new listener is created, operations resume as normal </li>
 100  
      * <li> 4) If the new listener can't be created, the thread retries every
 101  
      * WAIT_IN_MINS mins until it eventually can create new listener </li>
 102  
      * </ul>
 103  
      * The listener is stopped using the {@link #requestShutdown()} method
 104  
      */
 105  
     public void run() {
 106  2
         log.debug("Starting the AgentListener for the Collector");
 107  15
         while (!shutdownRequested) {
 108  13
             Socket socket = null;
 109  
             try {
 110  13
                 socket = server.accept();
 111  6
                 handleAgentConnection(socket);
 112  7
             } catch (Exception ex) {
 113  7
                 handleListenerFailure(ex);
 114  19
             }
 115  
         }
 116  2
         shutdown();
 117  2
     }
 118  
 
 119  
     public void removeAgent(AgentThread agent) {
 120  0
         if (agent.isOpen()) {
 121  0
             throw new IllegalStateException("The agent that is asked to be removed is still open");
 122  
         }
 123  
         // when this thread is shutting down gracefully, the agents list is
 124  
         // being iterated;
 125  
         // in this case, remove() calls should be ignored (to avoid to
 126  
         // ConcurrentModificationException)
 127  
         // when the thread has shutdown gracefully, the agents list would be
 128  
         // empty, so
 129  
         // remove() calls can anyway be ignored.
 130  0
         if (!shutdownRequested) {
 131  0
             agents.remove(agent);
 132  
         }
 133  0
     }
 134  
 
 135  
     public void requestShutdown() {
 136  2
         shutdownRequested = true;
 137  2
         log.debug("Requested to be shutdown gracefully");
 138  2
     }
 139  
 
 140  
     public boolean isListenerRunning() {
 141  0
         return isAlive() && !shutdownRequested;
 142  
     }
 143  
 
 144  
     public List getAllActiveAgentConnections() {
 145  0
         return Collections.unmodifiableList(agents);
 146  
     }
 147  
 
 148  
     void shutdown() {
 149  
         try {
 150  2
             server.close();
 151  0
         } catch (IOException ignore) {
 152  0
             log.warn("Error closing server socket while shutting down listener; ignored", ignore);
 153  2
         }
 154  2
         closeAllAgentConnections();
 155  2
     }
 156  
 
 157  
     void closeAllAgentConnections() {
 158  2
         for (Iterator i = agents.iterator(); i.hasNext();) {
 159  0
             AgentThread agent = (AgentThread) i.next();
 160  0
             agent.shutdown();
 161  
         }
 162  2
     }
 163  
 
 164  
     void handleAgentConnection(Socket socket) {
 165  0
         if (log.isDebugEnabled()) {
 166  0
             log.debug("Received an agent connection on socket " + socket);
 167  
         }
 168  0
         failCount = 0;
 169  
         try {
 170  0
             AgentThread agent = new AgentThread(socket, this, statsRepository);
 171  0
             agent.start();
 172  0
             agents.add(agent);
 173  0
             if (log.isDebugEnabled()) {
 174  0
                 log.debug("Initiated communication with an agent on socket " + socket);
 175  
             }
 176  0
         } catch (Exception e) {
 177  
             // TODO: Think thru this
 178  0
             log.error(e);
 179  0
             log.error(this.getClass().getClassLoader());
 180  0
         }
 181  0
     }
 182  
 
 183  
     void handleListenerFailure(Exception ex) {
 184  7
         if (failCount < MAX_RETRIES) {
 185  6
             log.error("Listener failed while accepting connection; ignoring for "
 186  
                     + "now and continuing accepting further connections", ex);
 187  6
             failCount++;
 188  6
             return;
 189  
         }
 190  1
         log.error("Listener failed for " + MAX_RETRIES + " consecutive times while accepting "
 191  
                 + "connection; resetting listener", ex);
 192  1
         resetListener();
 193  1
     }
 194  
 
 195  
     void resetListener() {
 196  0
         boolean stopLogging = false;
 197  
         while (true) {
 198  
             // wait a bit before recreating the socket;
 199  
             // you know, the condition that caused the failure
 200  
             // might need some time to get fixed - lets wait
 201  0
             waitABit();
 202  0
             boolean success = tryCreateANewServerSocket(stopLogging);
 203  0
             if (success) {
 204  0
                 log.error("Successfully recreated listener");
 205  0
                 if (stopLogging) {
 206  0
                     log.error("Now that listener is recreated, normal logging is resumed");
 207  
                 }
 208  0
                 return; // successfully reset listener
 209  
             } else {
 210  0
                 stopLogging = true;
 211  
             }
 212  
         }
 213  
     }
 214  
 
 215  
     boolean tryCreateANewServerSocket(class="keyword">boolean dontLog) {
 216  
         try {
 217  0
             server = createServerSocket(port);
 218  0
             return true;
 219  0
         } catch (BindException bindex) {
 220  0
             if (!dontLog) {
 221  0
                 log.error("Attempt to recreate the listener failed; looks like the port '" + port
 222  
                         + "' is in use by some other application. Try to shutdown that other"
 223  
                         + "application; InfraRED will attempt to reconnect again every "
 224  
                         + WAIT_IN_MINS
 225  
                         + " mins - meanwhile, no further error logs will be produced", bindex);
 226  
             }
 227  0
             return false;
 228  0
         } catch (IOException e) {
 229  0
             if (!dontLog) {
 230  0
                 log.error("Attempt to recreate the listener failed. Please try to fix this; "
 231  
                         + "InfraRED will attempt to reconnect again in every " + WAIT_IN_MINS
 232  
                         + " mins - meanwhile, no further error logs will be produced", e);
 233  
             }
 234  0
             return false;
 235  
         }
 236  
     }
 237  
 
 238  
     void waitABit() {
 239  
         try {
 240  0
             Thread.sleep(WAIT_IN_MINS * 60 * 1000);
 241  0
         } catch (InterruptedException ignored) {
 242  0
         }
 243  0
     }
 244  
 
 245  
     ServerSocket createServerSocket(int port) throws IOException {
 246  0
         return new ServerSocket(port);
 247  
     }
 248  
 }

This report is generated by jcoverage, Maven and Maven JCoverage Plugin.