Skip to content

Commit 4f3c530

Browse files
committed
Add JMX/RMI socket factory with timeout
1 parent ed87e76 commit 4f3c530

File tree

3 files changed

+156
-93
lines changed

3 files changed

+156
-93
lines changed

src/main/java/org/datadog/jmxfetch/Connection.java

+1-66
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,10 @@
3131
public class Connection {
3232
private static final long CONNECTION_TIMEOUT = 10000;
3333
public static final String CLOSED_CLIENT_CAUSE = "The client has been closed";
34-
private static final ThreadFactory daemonThreadFactory = new DaemonThreadFactory();
3534
private JMXConnector connector;
3635
protected MBeanServerConnection mbs;
3736
protected Map<String, Object> env;
3837
protected JMXServiceURL address;
39-
protected long jmxTimeout = 20;
40-
41-
private static <T extends Throwable> T initCause(T wrapper, Throwable wrapped) {
42-
wrapper.initCause(wrapped);
43-
return wrapper;
44-
}
4538

4639
/** Gets attributes for matching bean name. */
4740
public MBeanAttributeInfo[] getAttributesForBean(ObjectName beanName)
@@ -68,7 +61,7 @@ protected void createConnection() throws IOException {
6861
this.env.put("attribute.remote.x.request.waiting.timeout", CONNECTION_TIMEOUT);
6962
closeConnector();
7063
log.info("Connecting to: " + this.address);
71-
connector = connectWithTimeout(this.address, this.env);
64+
connector = JMXConnectorFactory.connect(this.address, this.env);
7265
mbs = connector.getMBeanServerConnection();
7366
}
7467

@@ -83,56 +76,6 @@ public Object getAttribute(ObjectName objectName, String attributeName)
8376
return attr;
8477
}
8578

86-
/**
87-
* Connect to a MBean Server with a timeout This code comes from this blog post:
88-
* https://weblogs.java.net/blog/emcmanus/archive/2007/05/making_a_jmx_co.html.
89-
*/
90-
JMXConnector connectWithTimeout(final JMXServiceURL url, final Map<String, Object> env)
91-
throws IOException {
92-
93-
final BlockingQueue<Object> mailbox = new ArrayBlockingQueue<Object>(1);
94-
95-
ExecutorService executor = Executors.newSingleThreadExecutor(daemonThreadFactory);
96-
executor.submit(
97-
new Runnable() {
98-
public void run() {
99-
try {
100-
JMXConnector connector = JMXConnectorFactory.connect(url, env);
101-
if (!mailbox.offer(connector)) {
102-
connector.close();
103-
}
104-
} catch (Throwable t) {
105-
mailbox.offer(t);
106-
}
107-
}
108-
});
109-
Object result;
110-
try {
111-
result = mailbox.poll(jmxTimeout, TimeUnit.SECONDS);
112-
if (result == null) {
113-
if (!mailbox.offer("")) {
114-
result = mailbox.take();
115-
}
116-
}
117-
} catch (InterruptedException e) {
118-
throw initCause(new InterruptedIOException(e.getMessage()), e);
119-
} finally {
120-
executor.shutdown();
121-
}
122-
if (result == null) {
123-
log.warn("Connection timed out: " + url);
124-
throw new SocketTimeoutException("Connection timed out: " + url);
125-
}
126-
if (result instanceof JMXConnector) {
127-
return (JMXConnector) result;
128-
}
129-
try {
130-
throw (Throwable) result;
131-
} catch (Throwable e) {
132-
throw new IOException(e.toString(), e);
133-
}
134-
}
135-
13679
/** Closes the connector. */
13780
public void closeConnector() {
13881
if (connector != null) {
@@ -156,12 +99,4 @@ public boolean isAlive() {
15699
}
157100
return true;
158101
}
159-
160-
private static class DaemonThreadFactory implements ThreadFactory {
161-
public Thread newThread(Runnable run) {
162-
Thread thread = Executors.defaultThreadFactory().newThread(run);
163-
thread.setDaemon(true);
164-
return thread;
165-
}
166-
}
167102
}

src/main/java/org/datadog/jmxfetch/RemoteConnection.java

+22-27
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.datadog.jmxfetch;
22

33
import lombok.extern.slf4j.Slf4j;
4+
import org.datadog.jmxfetch.util.JmxfetchRmiClientSocketFactory;
45

56
import java.io.IOException;
67
import java.net.MalformedURLException;
@@ -20,13 +21,15 @@ public class RemoteConnection extends Connection {
2021
private String password;
2122
private String path = "jmxrmi";
2223
private String jmxUrl;
23-
private String rmiTimeout;
24+
private Integer rmiTimeout;
25+
private Integer rmiConnectionTimeout;
2426
private static final String TRUST_STORE_PATH_KEY = "trust_store_path";
2527
private static final String TRUST_STORE_PASSWORD_KEY = "trust_store_password";
2628
private static final String KEY_STORE_PATH_KEY = "key_store_path";
2729
private static final String KEY_STORE_PASSWORD_KEY = "key_store_password";
28-
private static final String DEFAULT_RMI_RESPONSE_TIMEOUT =
29-
"15000"; // Match the collection period default
30+
private static final int DEFAULT_RMI_CONNECTION_TIMEOUT = 20000;
31+
private static final int DEFAULT_RMI_TIMEOUT =
32+
15000; // Match the collection period default
3033

3134
/** RemoteConnection constructor for specified remote connection parameters. */
3235
public RemoteConnection(Map<String, Object> connectionParams) throws IOException {
@@ -38,24 +41,22 @@ public RemoteConnection(Map<String, Object> connectionParams) throws IOException
3841
}
3942

4043
try {
41-
rmiTimeout = (String) connectionParams.get("rmi_client_timeout");
42-
} catch (ClassCastException e) {
43-
rmiTimeout = Integer.toString((Integer) connectionParams.get("rmi_client_timeout"));
44+
rmiTimeout = (Integer) connectionParams.get("rmi_client_timeout");
45+
} catch (final ClassCastException e) {
46+
rmiTimeout = Integer.parseInt((String) connectionParams.get("rmi_client_timeout"));
4447
}
45-
4648
if (rmiTimeout == null) {
47-
rmiTimeout = DEFAULT_RMI_RESPONSE_TIMEOUT;
49+
rmiTimeout = DEFAULT_RMI_TIMEOUT;
4850
}
4951

50-
Integer connectionTimeout;
5152
try {
52-
connectionTimeout = (Integer) connectionParams.get("rmi_connection_timeout");
53+
rmiConnectionTimeout = (Integer) connectionParams.get("rmi_connection_timeout");
5354
} catch (final ClassCastException e) {
54-
connectionTimeout =
55+
rmiConnectionTimeout =
5556
Integer.parseInt((String) connectionParams.get("rmi_connection_timeout"));
5657
}
57-
if (connectionTimeout != null) {
58-
jmxTimeout = connectionTimeout;
58+
if (rmiConnectionTimeout == null) {
59+
rmiConnectionTimeout = DEFAULT_RMI_CONNECTION_TIMEOUT;
5960
}
6061

6162
user = (String) connectionParams.get("user");
@@ -65,6 +66,7 @@ public RemoteConnection(Map<String, Object> connectionParams) throws IOException
6566
if (connectionParams.containsKey("path")) {
6667
path = (String) connectionParams.get("path");
6768
}
69+
6870
env = getEnv(connectionParams);
6971
address = getAddress();
7072

@@ -96,25 +98,18 @@ public RemoteConnection(Map<String, Object> connectionParams) throws IOException
9698
log.info("Setting keyStore path: " + keyStorePath + " and keyStorePassword");
9799
}
98100
}
99-
100-
// Set an RMI timeout so we don't get stuck waiting for a bean to report a value
101-
System.setProperty("sun.rmi.transport.tcp.responseTimeout", rmiTimeout);
102-
103101
createConnection();
104102
}
105103

106104
private Map<String, Object> getEnv(Map<String, Object> connectionParams) {
107-
108105
Map<String, Object> environment = new HashMap<String, Object>();
109-
110-
if (connectionParams.containsKey("rmi_registry_ssl")
111-
&& (Boolean) connectionParams.get("rmi_registry_ssl")) {
112-
SslRMIClientSocketFactory csf = new SslRMIClientSocketFactory();
113-
environment.put("com.sun.jndi.rmi.factory.socket", csf);
114-
environment.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, csf);
115-
}
116-
117-
environment.put(JMXConnector.CREDENTIALS, new String[] {user, password});
106+
boolean useSsl = (connectionParams.containsKey("rmi_registry_ssl")
107+
&& (Boolean) connectionParams.get("rmi_registry_ssl"));
108+
JmxfetchRmiClientSocketFactory csf =
109+
new JmxfetchRmiClientSocketFactory(rmiTimeout, rmiConnectionTimeout, useSsl);
110+
environment.put("com.sun.jndi.rmi.factory.socket", csf);
111+
environment.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, csf);
112+
environment.put(JMXConnector.CREDENTIALS, new String[] { user, password });
118113
return environment;
119114
}
120115

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package org.datadog.jmxfetch.util;
2+
3+
import java.io.IOException;
4+
import java.io.InterruptedIOException;
5+
import java.net.Socket;
6+
import java.rmi.server.RMIClientSocketFactory;
7+
import java.rmi.server.RMISocketFactory;
8+
import javax.rmi.ssl.SslRMIClientSocketFactory;
9+
10+
public class JmxfetchRmiClientSocketFactory implements RMIClientSocketFactory {
11+
private final int timeoutMs;
12+
private final int connectionTimeoutMs;
13+
private final RMIClientSocketFactory factory;
14+
15+
/**
16+
* JmxfetchRmiClientSocketFactory constructor with socket timeout (milliseconds), a socket
17+
* connection timeout (millisecondes) and a flag to enable/disable SSL.
18+
*/
19+
public JmxfetchRmiClientSocketFactory(
20+
final int timeoutMs, final int connectionTimeoutMs,final boolean ssl) {
21+
this.timeoutMs = timeoutMs;
22+
this.connectionTimeoutMs = connectionTimeoutMs;
23+
this.factory =
24+
ssl ? new SslRMIClientSocketFactory() : RMISocketFactory.getDefaultSocketFactory();
25+
}
26+
27+
@Override
28+
public Socket createSocket(final String host, final int port) throws IOException {
29+
Socket socket = null;
30+
final AsyncSocketFactory f = new AsyncSocketFactory(factory, host, port);
31+
final Thread t = new Thread(f);
32+
try {
33+
synchronized (f) {
34+
t.start();
35+
try {
36+
long now = System.currentTimeMillis();
37+
final long until = now + connectionTimeoutMs;
38+
do {
39+
f.wait(until - now);
40+
socket = getSocketFromFactory(f);
41+
if (socket != null) {
42+
break;
43+
}
44+
now = System.currentTimeMillis();
45+
} while (now < until);
46+
} catch (final InterruptedException e) {
47+
throw new InterruptedIOException(
48+
"interrupted during socket connection attempt");
49+
}
50+
}
51+
} catch (IOException e) {
52+
/* will close socket if it ever connects */
53+
f.clean();
54+
throw e;
55+
}
56+
if (socket == null) {
57+
throw new IOException("connect timed out: " + host + ":" + port);
58+
}
59+
socket.setSoTimeout(timeoutMs);
60+
socket.setSoLinger(false, 0);
61+
return socket;
62+
}
63+
64+
Socket getSocketFromFactory(final AsyncSocketFactory factory) throws IOException {
65+
final Exception e = factory.getException();
66+
if (e != null) {
67+
e.fillInStackTrace();
68+
if (e instanceof RuntimeException) {
69+
throw (RuntimeException) e;
70+
} else if (e instanceof IOException) {
71+
throw (IOException) e;
72+
} else {
73+
throw new Error("unforeseen checked exception" + e.toString());
74+
}
75+
}
76+
return factory.getSocket();
77+
}
78+
79+
private class AsyncSocketFactory implements Runnable {
80+
private final RMIClientSocketFactory factory;
81+
private final String host;
82+
private final int port;
83+
private Exception exception = null;
84+
private Socket socket = null;
85+
private boolean shouldClose = false;
86+
87+
AsyncSocketFactory(
88+
final RMIClientSocketFactory factory,final String host, final int port) {
89+
this.factory = factory;
90+
this.host = host;
91+
this.port = port;
92+
}
93+
94+
public void run() {
95+
try {
96+
final Socket s = factory.createSocket(host, port);
97+
synchronized (this) {
98+
socket = s;
99+
notify();
100+
}
101+
synchronized (this) {
102+
if (shouldClose) {
103+
try {
104+
s.close();
105+
} catch (final IOException e) { /* empty on purpose */ }
106+
}
107+
}
108+
} catch (final Exception e) {
109+
synchronized (this) {
110+
exception = e;
111+
notify();
112+
}
113+
}
114+
}
115+
116+
synchronized void clean() {
117+
if (socket != null) {
118+
try {
119+
socket.close();
120+
} catch (final IOException e) { /* empty on purpose */ }
121+
}
122+
shouldClose = true;
123+
}
124+
125+
private synchronized Exception getException() {
126+
return exception;
127+
}
128+
129+
private synchronized Socket getSocket() {
130+
return socket;
131+
}
132+
}
133+
}

0 commit comments

Comments
 (0)