1
1
package org .datadog .jmxfetch ;
2
2
3
+ import java .io .ByteArrayInputStream ;
3
4
import java .io .File ;
4
5
import java .io .FileInputStream ;
6
+ import java .io .FileFilter ;
5
7
import java .io .FileNotFoundException ;
8
+ import java .io .InputStream ;
6
9
import java .io .IOException ;
10
+ import java .io .UnsupportedEncodingException ;
11
+ import java .util .Arrays ;
7
12
import java .util .ArrayList ;
8
13
import java .util .Enumeration ;
9
14
import java .util .HashMap ;
15
+ import java .util .concurrent .ConcurrentHashMap ;
16
+ import java .util .concurrent .atomic .AtomicBoolean ;
17
+ import java .util .regex .Pattern ;
18
+ import java .util .regex .Matcher ;
10
19
import java .util .Iterator ;
11
20
import java .util .LinkedHashMap ;
12
21
import java .util .LinkedList ;
19
28
import org .apache .log4j .Appender ;
20
29
import org .apache .log4j .Level ;
21
30
import org .apache .log4j .Logger ;
31
+ import org .apache .commons .lang3 .CharEncoding ;
22
32
import org .datadog .jmxfetch .reporter .Reporter ;
23
33
import org .datadog .jmxfetch .util .CustomLogger ;
24
34
29
39
@ SuppressWarnings ("unchecked" )
30
40
public class App {
31
41
private final static Logger LOGGER = Logger .getLogger (App .class .getName ());
42
+ private final static String SERVICE_DISCOVERY_PREFIX = "SD-" ;
32
43
public static final String CANNOT_CONNECT_TO_INSTANCE = "Cannot connect to instance " ;
44
+ private static final String SD_CONFIG_SEP = "#### SERVICE-DISCOVERY ####" ;
33
45
private static int loopCounter ;
34
- private HashMap <String , YamlParser > configs ;
46
+ private AtomicBoolean reinit = new AtomicBoolean (false );
47
+ private ConcurrentHashMap <String , YamlParser > configs ;
48
+ private ConcurrentHashMap <String , YamlParser > sdConfigs = new ConcurrentHashMap <String , YamlParser >();
35
49
private ArrayList <Instance > instances = new ArrayList <Instance >();
36
50
private LinkedList <Instance > brokenInstances = new LinkedList <Instance >();
37
51
private AppConfig appConfig ;
@@ -132,6 +146,10 @@ public void run() {
132
146
new ShutdownHook ().attachShutDownHook ();
133
147
}
134
148
149
+ public void setReinit (boolean reinit ) {
150
+ this .reinit .set (reinit );
151
+ }
152
+
135
153
public static int getLoopCounter () {
136
154
return loopCounter ;
137
155
}
@@ -145,16 +163,84 @@ private static void clearInstances(List<Instance> instances) {
145
163
}
146
164
}
147
165
166
+ private String getSDName (String config ){
167
+ String [] splitted = config .split (System .getProperty ("line.separator" ), 2 );
168
+
169
+ return SERVICE_DISCOVERY_PREFIX + splitted [0 ].substring (2 , splitted [0 ].length ());
170
+ }
171
+
172
+ public boolean processServiceDiscovery (byte [] buffer ) {
173
+ boolean reinit = false ;
174
+ String [] discovered ;
175
+
176
+ try {
177
+ String configs = new String (buffer , CharEncoding .UTF_8 );
178
+ discovered = configs .split (App .SD_CONFIG_SEP + System .getProperty ("line.separator" ));
179
+ } catch (UnsupportedEncodingException e ) {
180
+ LOGGER .debug ("Unable to parse byte buffer to UTF-8 String." );
181
+ return false ;
182
+ }
183
+
184
+ for (String config : discovered ) {
185
+ if (config == null || config .isEmpty ()) {
186
+ continue ;
187
+ }
188
+
189
+ try {
190
+ String name = getSDName (config );
191
+ LOGGER .debug ("Attempting to apply config. Name: " + name + "\n config: \n " + config );
192
+ InputStream stream = new ByteArrayInputStream (config .getBytes (CharEncoding .UTF_8 ));
193
+ YamlParser yaml = new YamlParser (stream );
194
+
195
+ if (this .addConfig (name , yaml )){
196
+ reinit = true ;
197
+ LOGGER .debug ("Configuration added succesfully reinit in order" );
198
+ }
199
+ } catch (UnsupportedEncodingException e ) {
200
+ LOGGER .debug ("Unable to parse byte buffer to UTF-8 String." );
201
+ }
202
+ }
203
+
204
+ return reinit ;
205
+ }
206
+
148
207
void start () {
149
208
// Main Loop that will periodically collect metrics from the JMX Server
209
+ long start_ms = System .currentTimeMillis ();
210
+ long delta_s = 0 ;
211
+ FileInputStream sdPipe = null ;
212
+
213
+ try {
214
+ sdPipe = new FileInputStream (appConfig .getServiceDiscoveryPipe ()); //Should we use RandomAccessFile?
215
+ } catch (FileNotFoundException e ) {
216
+ LOGGER .warn ("Unable to open named pipe - Service Discovery disabled." );
217
+ sdPipe = null ;
218
+ }
219
+
150
220
while (true ) {
151
- // Exit on exit file trigger
221
+ // Exit on exit file trigger...
152
222
if (appConfig .getExitWatcher ().shouldExit ()){
153
223
LOGGER .info ("Exit file detected: stopping JMXFetch." );
154
224
System .exit (0 );
155
225
}
156
226
227
+ // any SD configs waiting in pipe?
228
+ try {
229
+ if (sdPipe != null && sdPipe .available () > 0 ) {
230
+ int len = sdPipe .available ();
231
+ byte [] buffer = new byte [len ];
232
+ sdPipe .read (buffer );
233
+ setReinit (processServiceDiscovery (buffer ));
234
+ }
235
+ } catch (IOException e ) {
236
+ LOGGER .warn ("Unable to read from pipe - Service Discovery configuration may have been skipped." );
237
+ }
238
+
157
239
long start = System .currentTimeMillis ();
240
+ if (this .reinit .get ()) {
241
+ init (true );
242
+ }
243
+
158
244
if (instances .size () > 0 ) {
159
245
doIteration ();
160
246
} else {
@@ -279,10 +365,39 @@ public void doIteration() {
279
365
}
280
366
}
281
367
282
- private HashMap <String , YamlParser > getConfigs (AppConfig config ) {
283
- HashMap <String , YamlParser > configs = new HashMap <String , YamlParser >();
368
+ public boolean addConfig (String name , YamlParser config ) {
369
+ // named groups not supported with Java6: "(?<check>.{1,30})_(?<version>\\d{0,30})"
370
+ Pattern pattern = Pattern .compile (SERVICE_DISCOVERY_PREFIX +"(.{1,30})_(\\ d{0,30})" );
371
+
372
+ Matcher matcher = pattern .matcher (name );
373
+ if (!matcher .find ()) {
374
+ // bad name.
375
+ return false ;
376
+ }
377
+
378
+ // Java 6 doesn't allow name matching - group 1 is "check"
379
+ String check = matcher .group (1 );
380
+ if (this .configs .containsKey (check )) {
381
+ // there was already a file config for the check.
382
+ return false ;
383
+ }
384
+
385
+ this .sdConfigs .put (name , config );
386
+ this .setReinit (true );
387
+
388
+ return true ;
389
+ }
390
+
391
+ private ConcurrentHashMap <String , YamlParser > getConfigs (AppConfig config ) {
392
+ ConcurrentHashMap <String , YamlParser > configs = new ConcurrentHashMap <String , YamlParser >();
284
393
YamlParser fileConfig ;
285
- for (String fileName : config .getYamlFileList ()) {
394
+
395
+ List <String > fileList = config .getYamlFileList ();
396
+ if (fileList == null ) {
397
+ return configs ;
398
+ }
399
+
400
+ for (String fileName : fileList ) {
286
401
File f = new File (config .getConfdDirectory (), fileName );
287
402
String name = f .getName ().replace (".yaml" , "" );
288
403
FileInputStream yamlInputStream = null ;
@@ -306,6 +421,7 @@ private HashMap<String, YamlParser> getConfigs(AppConfig config) {
306
421
}
307
422
}
308
423
}
424
+
309
425
LOGGER .info ("Found " + configs .size () + " config files" );
310
426
return configs ;
311
427
}
@@ -336,11 +452,23 @@ public void init(boolean forceNewConnection) {
336
452
337
453
338
454
Iterator <Entry <String , YamlParser >> it = configs .entrySet ().iterator ();
339
- while (it .hasNext ()) {
340
- Map .Entry <String , YamlParser > entry = it .next ();
455
+ // SD config cache doesn't remove configs - it just overwrites.
456
+ Iterator <Entry <String , YamlParser >> itSD = sdConfigs .entrySet ().iterator ();
457
+ while (it .hasNext () || itSD .hasNext ()) {
458
+ Map .Entry <String , YamlParser > entry ;
459
+ boolean sdIterator = false ;
460
+ if (it .hasNext ()) {
461
+ entry = it .next ();
462
+ } else {
463
+ entry = itSD .next ();
464
+ sdIterator = true ;
465
+ }
466
+
341
467
String name = entry .getKey ();
342
468
YamlParser yamlConfig = entry .getValue ();
343
- it .remove ();
469
+ if (!sdIterator ) {
470
+ it .remove ();
471
+ }
344
472
345
473
ArrayList <LinkedHashMap <String , Object >> configInstances = ((ArrayList <LinkedHashMap <String , Object >>) yamlConfig .getYamlInstances ());
346
474
if (configInstances == null || configInstances .size () == 0 ) {
0 commit comments