%line | %branch | |||||||||
---|---|---|---|---|---|---|---|---|---|---|
net.sf.infrared.collector.impl.transport.AgentListener |
|
|
1 | /* |
|
2 | * Copyright 2005 Tavant Technologies and Contributors |
|
3 | * |
|
4 | * Licensed under the Apache License, Version 2.0 (the "License") |
|
5 | * you may not use this file except in compliance with the License. |
|
6 | * You may obtain a copy of the License at |
|
7 | * |
|
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
|
9 | * |
|
10 | * Unless required by applicable law or agreed to in writing, software |
|
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
|
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
13 | * See the License for the specific language governing permissions and |
|
14 | * limitations under the License. |
|
15 | * |
|
16 | * |
|
17 | * |
|
18 | * Original Author: kaushal.kumar (Tavant Technologies) |
|
19 | * Contributor(s): binil.thomas (Tavant Technologies); |
|
20 | * |
|
21 | * $Id: InfraRedListener.java,v 1.1 2005/09/20 16:22:06 kamal_gs Exp $ |
|
22 | * |
|
23 | * Changes: |
|
24 | * -------- |
|
25 | * |
|
26 | */ |
|
27 | ||
28 | package net.sf.infrared.collector.impl.transport; |
|
29 | ||
30 | import org.apache.log4j.Logger; |
|
31 | ||
32 | import net.sf.infrared.base.util.LoggingFactory; |
|
33 | import net.sf.infrared.collector.CollectorConfig; |
|
34 | import net.sf.infrared.collector.StatisticsRepository; |
|
35 | ||
36 | import java.net.BindException; |
|
37 | import java.net.Socket; |
|
38 | import java.net.ServerSocket; |
|
39 | import java.util.ArrayList; |
|
40 | import java.util.Collections; |
|
41 | import java.util.Iterator; |
|
42 | import java.util.List; |
|
43 | import java.io.IOException; |
|
44 | ||
45 | /** |
|
46 | * This thread listens on a server socket for connections from remote agents. |
|
47 | * This thread is initiated by the InfraRedListenerServlet. <br> |
|
48 | * Once an agent connects to the server socket, a new thread |
|
49 | * (InfraREDSocketNode) is spawned and further communication with that agent |
|
50 | * happens on that thread. |
|
51 | */ |
|
52 | public class AgentListener extends Thread { |
|
53 | private static final int MAX_RETRIES = 5; |
|
54 | ||
55 | private static final int WAIT_IN_MINS = 10; |
|
56 | ||
57 | 2 | private static final Logger log = LoggingFactory.getLogger(AgentListener.class.getName()); |
58 | ||
59 | private int port; |
|
60 | ||
61 | private ServerSocket server; |
|
62 | ||
63 | 2 | private volatile boolean shutdownRequested = false; |
64 | ||
65 | 2 | private List agents = new ArrayList(); |
66 | ||
67 | 2 | private int failCount = 0; |
68 | ||
69 | private StatisticsRepository statsRepository; |
|
70 | ||
71 | /** |
|
72 | * Creates a new listener. The listener needs to be started using the |
|
73 | * start() method |
|
74 | * |
|
75 | * @throws IOException |
|
76 | * if there is any errors in creating a listener on the |
|
77 | * specified port |
|
78 | */ |
|
79 | public AgentListener(CollectorConfig cfg, StatisticsRepository statsRepository) |
|
80 | throws IOException { |
|
81 | 0 | this(cfg.getAgentListenPort()); |
82 | 0 | this.statsRepository = statsRepository; |
83 | 0 | setName("InfraRED-Agent-Listener"); |
84 | 0 | setDaemon(true); |
85 | 0 | } |
86 | ||
87 | 2 | AgentListener(int port) throws IOException { |
88 | 2 | server = createServerSocket(port); |
89 | 2 | } |
90 | ||
91 | /** |
|
92 | * Waits for agents to connect. When a connection is obtained, further |
|
93 | * communication with that agent is spawned on a new thread. <br> |
|
94 | * If the listener fails, the following strategy is employed to recover: |
|
95 | * <ul> |
|
96 | * <li> 1) Upto MAX_RETIES consecutive failures are ignored </li> |
|
97 | * <li> 2) On the next failure, attempt to create a new listener is made |
|
98 | * </li> |
|
99 | * <li> 3) If a new listener is created, operations resume as normal </li> |
|
100 | * <li> 4) If the new listener can't be created, the thread retries every |
|
101 | * WAIT_IN_MINS mins until it eventually can create new listener </li> |
|
102 | * </ul> |
|
103 | * The listener is stopped using the {@link #requestShutdown()} method |
|
104 | */ |
|
105 | public void run() { |
|
106 | 2 | log.debug("Starting the AgentListener for the Collector"); |
107 | 15 | while (!shutdownRequested) { |
108 | 13 | Socket socket = null; |
109 | try { |
|
110 | 13 | socket = server.accept(); |
111 | 6 | handleAgentConnection(socket); |
112 | 7 | } catch (Exception ex) { |
113 | 7 | handleListenerFailure(ex); |
114 | 19 | } |
115 | } |
|
116 | 2 | shutdown(); |
117 | 2 | } |
118 | ||
119 | public void removeAgent(AgentThread agent) { |
|
120 | 0 | if (agent.isOpen()) { |
121 | 0 | throw new IllegalStateException("The agent that is asked to be removed is still open"); |
122 | } |
|
123 | // when this thread is shutting down gracefully, the agents list is |
|
124 | // being iterated; |
|
125 | // in this case, remove() calls should be ignored (to avoid to |
|
126 | // ConcurrentModificationException) |
|
127 | // when the thread has shutdown gracefully, the agents list would be |
|
128 | // empty, so |
|
129 | // remove() calls can anyway be ignored. |
|
130 | 0 | if (!shutdownRequested) { |
131 | 0 | agents.remove(agent); |
132 | } |
|
133 | 0 | } |
134 | ||
135 | public void requestShutdown() { |
|
136 | 2 | shutdownRequested = true; |
137 | 2 | log.debug("Requested to be shutdown gracefully"); |
138 | 2 | } |
139 | ||
140 | public boolean isListenerRunning() { |
|
141 | 0 | return isAlive() && !shutdownRequested; |
142 | } |
|
143 | ||
144 | public List getAllActiveAgentConnections() { |
|
145 | 0 | return Collections.unmodifiableList(agents); |
146 | } |
|
147 | ||
148 | void shutdown() { |
|
149 | try { |
|
150 | 2 | server.close(); |
151 | 0 | } catch (IOException ignore) { |
152 | 0 | log.warn("Error closing server socket while shutting down listener; ignored", ignore); |
153 | 2 | } |
154 | 2 | closeAllAgentConnections(); |
155 | 2 | } |
156 | ||
157 | void closeAllAgentConnections() { |
|
158 | 2 | for (Iterator i = agents.iterator(); i.hasNext();) { |
159 | 0 | AgentThread agent = (AgentThread) i.next(); |
160 | 0 | agent.shutdown(); |
161 | } |
|
162 | 2 | } |
163 | ||
164 | void handleAgentConnection(Socket socket) { |
|
165 | 0 | if (log.isDebugEnabled()) { |
166 | 0 | log.debug("Received an agent connection on socket " + socket); |
167 | } |
|
168 | 0 | failCount = 0; |
169 | try { |
|
170 | 0 | AgentThread agent = new AgentThread(socket, this, statsRepository); |
171 | 0 | agent.start(); |
172 | 0 | agents.add(agent); |
173 | 0 | if (log.isDebugEnabled()) { |
174 | 0 | log.debug("Initiated communication with an agent on socket " + socket); |
175 | } |
|
176 | 0 | } catch (Exception e) { |
177 | // TODO: Think thru this |
|
178 | 0 | log.error(e); |
179 | 0 | log.error(this.getClass().getClassLoader()); |
180 | 0 | } |
181 | 0 | } |
182 | ||
183 | void handleListenerFailure(Exception ex) { |
|
184 | 7 | if (failCount < MAX_RETRIES) { |
185 | 6 | log.error("Listener failed while accepting connection; ignoring for " |
186 | + "now and continuing accepting further connections", ex); |
|
187 | 6 | failCount++; |
188 | 6 | return; |
189 | } |
|
190 | 1 | log.error("Listener failed for " + MAX_RETRIES + " consecutive times while accepting " |
191 | + "connection; resetting listener", ex); |
|
192 | 1 | resetListener(); |
193 | 1 | } |
194 | ||
195 | void resetListener() { |
|
196 | 0 | boolean stopLogging = false; |
197 | while (true) { |
|
198 | // wait a bit before recreating the socket; |
|
199 | // you know, the condition that caused the failure |
|
200 | // might need some time to get fixed - lets wait |
|
201 | 0 | waitABit(); |
202 | 0 | boolean success = tryCreateANewServerSocket(stopLogging); |
203 | 0 | if (success) { |
204 | 0 | log.error("Successfully recreated listener"); |
205 | 0 | if (stopLogging) { |
206 | 0 | log.error("Now that listener is recreated, normal logging is resumed"); |
207 | } |
|
208 | 0 | return; // successfully reset listener |
209 | } else { |
|
210 | 0 | stopLogging = true; |
211 | } |
|
212 | } |
|
213 | } |
|
214 | ||
215 | boolean tryCreateANewServerSocket(class="keyword">boolean dontLog) { |
|
216 | try { |
|
217 | 0 | server = createServerSocket(port); |
218 | 0 | return true; |
219 | 0 | } catch (BindException bindex) { |
220 | 0 | if (!dontLog) { |
221 | 0 | log.error("Attempt to recreate the listener failed; looks like the port '" + port |
222 | + "' is in use by some other application. Try to shutdown that other" |
|
223 | + "application; InfraRED will attempt to reconnect again every " |
|
224 | + WAIT_IN_MINS |
|
225 | + " mins - meanwhile, no further error logs will be produced", bindex); |
|
226 | } |
|
227 | 0 | return false; |
228 | 0 | } catch (IOException e) { |
229 | 0 | if (!dontLog) { |
230 | 0 | log.error("Attempt to recreate the listener failed. Please try to fix this; " |
231 | + "InfraRED will attempt to reconnect again in every " + WAIT_IN_MINS |
|
232 | + " mins - meanwhile, no further error logs will be produced", e); |
|
233 | } |
|
234 | 0 | return false; |
235 | } |
|
236 | } |
|
237 | ||
238 | void waitABit() { |
|
239 | try { |
|
240 | 0 | Thread.sleep(WAIT_IN_MINS * 60 * 1000); |
241 | 0 | } catch (InterruptedException ignored) { |
242 | 0 | } |
243 | 0 | } |
244 | ||
245 | ServerSocket createServerSocket(int port) throws IOException { |
|
246 | 0 | return new ServerSocket(port); |
247 | } |
|
248 | } |
This report is generated by jcoverage, Maven and Maven JCoverage Plugin. |