1 |
|
|
2 |
|
|
3 |
|
|
4 |
|
|
5 |
|
|
6 |
|
|
7 |
|
|
8 |
|
|
9 |
|
|
10 |
|
|
11 |
|
|
12 |
|
|
13 |
|
|
14 |
|
|
15 |
|
|
16 |
|
|
17 |
|
|
18 |
|
|
19 |
|
|
20 |
|
|
21 |
|
|
22 |
|
package net.sf.infrared.agent.transport.impl; |
23 |
|
|
24 |
|
import org.apache.log4j.Logger; |
25 |
|
|
26 |
|
import net.sf.infrared.agent.MonitorConfig; |
27 |
|
import net.sf.infrared.agent.transport.Aggregator; |
28 |
|
import net.sf.infrared.agent.transport.CollectionStrategy; |
29 |
|
import net.sf.infrared.agent.transport.Forwarder; |
30 |
|
import net.sf.infrared.base.model.OperationStatistics; |
31 |
|
import net.sf.infrared.base.util.LoggingFactory; |
32 |
|
|
33 |
|
|
34 |
|
|
35 |
|
|
36 |
|
|
37 |
|
|
38 |
|
|
39 |
|
|
40 |
|
public class CentralizedCollectionStrategy implements CollectionStrategy { |
41 |
|
private static final String KEY_FLUSH_FREQUENCY = "collection-strategy.ccs.flush-frequency"; |
42 |
|
|
43 |
|
private static final String KEY_POOL = "collection-strategy.ccs.pool"; |
44 |
|
|
45 |
|
private static final String KEY_POOL_MAX_THREADS = "collection-strategy.ccs.pool.maxThreads"; |
46 |
|
|
47 |
|
private static final String KEY_POOL_BUFFER_LENGTH = "collection-strategy.ccs.pool.buffer-length"; |
48 |
|
|
49 |
|
private static final String KEY_REMOTE_HOST = "collection-strategy.ccs.remotehost"; |
50 |
|
|
51 |
|
private static final String KEY_REMOTE_PORT = "collection-strategy.ccs.port"; |
52 |
|
|
53 |
|
private static final String DEFAULT_REMOTE_HOST = "localhost"; |
54 |
|
|
55 |
|
private static final int DEFAULT_REMOTE_PORT = 7777; |
56 |
|
|
57 |
|
private static final int DEFAULT_POOL_MAX_THREADS = 1; |
58 |
|
|
59 |
|
private static final int DEFAULT_POOL_BUFFER_LENGTH = 100; |
60 |
|
|
61 |
0 |
private static final Logger log = LoggingFactory.getLogger(CentralizedCollectionStrategy.class); |
62 |
|
|
63 |
0 |
private Aggregator aggregator = null; |
64 |
|
|
65 |
0 |
private Forwarder forwarder = null; |
66 |
|
|
67 |
0 |
private PeriodicFlushPolicy flushPolicy = null; |
68 |
|
|
69 |
0 |
private boolean suspended = false; |
70 |
|
|
71 |
0 |
public CentralizedCollectionStrategy() { |
72 |
0 |
} |
73 |
|
|
74 |
|
public boolean init(MonitorConfig configuration) { |
75 |
0 |
String hostName = configuration.getProperty(KEY_REMOTE_HOST, DEFAULT_REMOTE_HOST); |
76 |
0 |
int portNo = configuration.getProperty(KEY_REMOTE_PORT, DEFAULT_REMOTE_PORT); |
77 |
0 |
forwarder = new SocketForwarder(hostName, portNo); |
78 |
0 |
if (configuration.getProperty(KEY_POOL, true)) { |
79 |
0 |
log.debug("Using PooledAggregator for CentralCollectionStrategy"); |
80 |
0 |
int length = configuration.getProperty(KEY_POOL_BUFFER_LENGTH, DEFAULT_POOL_BUFFER_LENGTH); |
81 |
0 |
int maxThreads = |
82 |
|
configuration.getProperty(KEY_POOL_MAX_THREADS, DEFAULT_POOL_MAX_THREADS); |
83 |
0 |
aggregator = new PooledAggregator(class="keyword">new BufferedAggregator(), length, maxThreads); |
84 |
|
} else { |
85 |
0 |
log.debug("Using BufferedAggregator (not pooled) for CentralCollectionStrategy"); |
86 |
0 |
aggregator = new BufferedAggregator(); |
87 |
|
} |
88 |
0 |
aggregator.setForwarder(forwarder); |
89 |
0 |
flushPolicy = new PeriodicFlushPolicy(); |
90 |
0 |
long flushFrequency = configuration.getProperty(KEY_FLUSH_FREQUENCY, |
91 |
|
PeriodicFlushPolicy.DEFAULT_FREQUENCY); |
92 |
0 |
flushPolicy.setFrequency(flushFrequency); |
93 |
0 |
flushPolicy.setAggregator(aggregator); |
94 |
0 |
forwarder.init(false); |
95 |
|
|
96 |
0 |
if (log.isDebugEnabled()) { |
97 |
0 |
log.debug("Initialized CentralCollectionStrategy remote host = " + hostName |
98 |
|
+ " remote port = " + portNo); |
99 |
|
} |
100 |
|
|
101 |
0 |
return flushPolicy.activate(); |
102 |
|
} |
103 |
|
|
104 |
|
public boolean collect(OperationStatistics stats) { |
105 |
0 |
if (!suspended) { |
106 |
0 |
aggregator.aggregate(stats); |
107 |
0 |
return true; |
108 |
|
} else { |
109 |
0 |
log.info("Ignoring - as stats collection suspended"); |
110 |
|
} |
111 |
0 |
return false; |
112 |
|
} |
113 |
|
|
114 |
|
public void suspend() { |
115 |
0 |
suspended = true; |
116 |
0 |
forwarder.suspend(); |
117 |
0 |
} |
118 |
|
|
119 |
|
public void resume() { |
120 |
0 |
suspended = false; |
121 |
0 |
forwarder.resume(); |
122 |
0 |
} |
123 |
|
|
124 |
|
public Aggregator getAggregator() { |
125 |
0 |
return aggregator; |
126 |
|
} |
127 |
|
|
128 |
|
public boolean destroy() { |
129 |
0 |
flushPolicy.shutDown(); |
130 |
0 |
aggregator.setForwarder(null); |
131 |
0 |
aggregator.shutdown(); |
132 |
0 |
forwarder.destroy(); |
133 |
0 |
aggregator = null; |
134 |
0 |
forwarder = null; |
135 |
0 |
return false; |
136 |
|
} |
137 |
|
} |