15
15
*/
16
16
package com .linecorp .armeria .client ;
17
17
18
+ import static com .linecorp .armeria .internal .common .util .IpAddrUtil .isCreatedWithIpAddressOnly ;
18
19
import static java .util .Objects .requireNonNull ;
19
20
20
21
import java .net .InetAddress ;
@@ -88,24 +89,36 @@ public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Ex
88
89
}
89
90
90
91
final SessionProtocol protocol = ctx .sessionProtocol ();
91
- final ProxyConfig proxyConfig ;
92
+
93
+ final Endpoint endpointWithPort = endpoint .withDefaultPort (ctx .sessionProtocol ());
94
+ final EventLoop eventLoop = ctx .eventLoop ().withoutContext ();
95
+ // TODO(ikhoon) Use ctx.exchangeType() to create an optimized HttpResponse for non-streaming response.
96
+ final DecodedHttpResponse res = new DecodedHttpResponse (eventLoop );
97
+ updateCancellationTask (ctx , req , res );
98
+
92
99
try {
93
- proxyConfig = getProxyConfig (protocol , endpoint );
100
+ resolveProxyConfig (protocol , endpoint , ctx , (proxyConfig , thrown ) -> {
101
+ if (thrown != null ) {
102
+ earlyFailedResponse (thrown , ctx , res );
103
+ } else {
104
+ assert proxyConfig != null ;
105
+ execute0 (ctx , endpointWithPort , req , res , proxyConfig );
106
+ }
107
+ });
94
108
} catch (Throwable t ) {
95
109
return earlyFailedResponse (t , ctx );
96
110
}
111
+ return res ;
112
+ }
97
113
114
+ private void execute0 (ClientRequestContext ctx , Endpoint endpointWithPort , HttpRequest req ,
115
+ DecodedHttpResponse res , ProxyConfig proxyConfig ) {
98
116
final Throwable cancellationCause = ctx .cancellationCause ();
99
117
if (cancellationCause != null ) {
100
- return earlyFailedResponse (cancellationCause , ctx );
118
+ earlyFailedResponse (cancellationCause , ctx , res );
119
+ return ;
101
120
}
102
121
103
- final Endpoint endpointWithPort = endpoint .withDefaultPort (ctx .sessionProtocol ());
104
- final EventLoop eventLoop = ctx .eventLoop ().withoutContext ();
105
- // TODO(ikhoon) Use ctx.exchangeType() to create an optimized HttpResponse for non-streaming response.
106
- final DecodedHttpResponse res = new DecodedHttpResponse (eventLoop );
107
- updateCancellationTask (ctx , req , res );
108
-
109
122
final ClientConnectionTimingsBuilder timingsBuilder = ClientConnectionTimings .builder ();
110
123
111
124
if (endpointWithPort .hasIpAddr () ||
@@ -125,8 +138,6 @@ public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Ex
125
138
}
126
139
});
127
140
}
128
-
129
- return res ;
130
141
}
131
142
132
143
private static void updateCancellationTask (ClientRequestContext ctx , HttpRequest req ,
@@ -215,24 +226,52 @@ private void acquireConnectionAndExecute0(ClientRequestContext ctx, Endpoint end
215
226
}
216
227
}
217
228
218
- private ProxyConfig getProxyConfig (SessionProtocol protocol , Endpoint endpoint ) {
219
- final ProxyConfig proxyConfig = factory .proxyConfigSelector ().select (protocol , endpoint );
220
- requireNonNull (proxyConfig , "proxyConfig" );
229
+ private void resolveProxyConfig (SessionProtocol protocol , Endpoint endpoint , ClientRequestContext ctx ,
230
+ BiConsumer <@ Nullable ProxyConfig , @ Nullable Throwable > onComplete ) {
231
+ final ProxyConfig unresolvedProxyConfig = factory .proxyConfigSelector ().select (protocol , endpoint );
232
+ requireNonNull (unresolvedProxyConfig , "unresolvedProxyConfig" );
233
+ final ProxyConfig proxyConfig = maybeSetHAProxySourceAddress (unresolvedProxyConfig );
221
234
222
- // special behavior for haproxy when sourceAddress is null
223
- if (proxyConfig .proxyType () == ProxyType .HAPROXY &&
224
- ((HAProxyConfig ) proxyConfig ).sourceAddress () == null ) {
225
- final InetSocketAddress proxyAddress = proxyConfig .proxyAddress ();
235
+ final InetSocketAddress proxyAddress = proxyConfig .proxyAddress ();
236
+ final boolean needsDnsResolution = proxyAddress != null && !isCreatedWithIpAddressOnly (proxyAddress );
237
+ if (needsDnsResolution ) {
226
238
assert proxyAddress != null ;
239
+ final Future <InetSocketAddress > resolveFuture = addressResolverGroup
240
+ .getResolver (ctx .eventLoop ().withoutContext ())
241
+ .resolve (createUnresolvedAddressForRefreshing (proxyAddress ));
227
242
228
- // use proxy information in context if available
229
- final ServiceRequestContext serviceCtx = ServiceRequestContext .currentOrNull ();
230
- if (serviceCtx != null ) {
231
- final ProxiedAddresses proxiedAddresses = serviceCtx .proxiedAddresses ();
232
- return ProxyConfig .haproxy (proxyAddress , proxiedAddresses .sourceAddress ());
233
- }
243
+ resolveFuture .addListener (future -> {
244
+ if (future .isSuccess ()) {
245
+ final InetSocketAddress resolvedAddress = (InetSocketAddress ) future .getNow ();
246
+ final ProxyConfig newProxyConfig = proxyConfig .withProxyAddress (resolvedAddress );
247
+ onComplete .accept (newProxyConfig , null );
248
+ } else {
249
+ final Throwable cause = future .cause ();
250
+ onComplete .accept (null , cause );
251
+ }
252
+ });
253
+ } else {
254
+ onComplete .accept (proxyConfig , null );
234
255
}
256
+ }
235
257
258
+ private static ProxyConfig maybeSetHAProxySourceAddress (ProxyConfig proxyConfig ) {
259
+ if (proxyConfig .proxyType () != ProxyType .HAPROXY ) {
260
+ return proxyConfig ;
261
+ }
262
+ if (((HAProxyConfig ) proxyConfig ).sourceAddress () != null ) {
263
+ return proxyConfig ;
264
+ }
265
+
266
+ final ServiceRequestContext sctx = ServiceRequestContext .currentOrNull ();
267
+ final ProxiedAddresses serviceProxiedAddresses = sctx == null ? null : sctx .proxiedAddresses ();
268
+ if (serviceProxiedAddresses != null ) {
269
+ // A special behavior for haproxy when sourceAddress is null.
270
+ // Use proxy information in the service context if available.
271
+ final InetSocketAddress proxyAddress = proxyConfig .proxyAddress ();
272
+ assert proxyAddress != null ;
273
+ return ProxyConfig .haproxy (proxyAddress , serviceProxiedAddresses .sourceAddress ());
274
+ }
236
275
return proxyConfig ;
237
276
}
238
277
@@ -253,11 +292,24 @@ private static HttpResponse earlyFailedResponse(Throwable t, ClientRequestContex
253
292
return HttpResponse .ofFailure (cause );
254
293
}
255
294
295
+ private static HttpResponse earlyFailedResponse (Throwable t ,
296
+ ClientRequestContext ctx ,
297
+ DecodedHttpResponse response ) {
298
+ final UnprocessedRequestException cause = UnprocessedRequestException .of (t );
299
+ ctx .cancel (cause );
300
+ response .close (cause );
301
+ return response ;
302
+ }
303
+
256
304
private static void doExecute (PooledChannel pooledChannel , ClientRequestContext ctx ,
257
305
HttpRequest req , DecodedHttpResponse res ) {
258
306
final Channel channel = pooledChannel .get ();
259
307
final HttpSession session = HttpSession .get (channel );
260
308
res .init (session .inboundTrafficController ());
261
309
session .invoke (pooledChannel , ctx , req , res );
262
310
}
311
+
312
+ private static InetSocketAddress createUnresolvedAddressForRefreshing (InetSocketAddress previousAddress ) {
313
+ return InetSocketAddress .createUnresolved (previousAddress .getHostString (), previousAddress .getPort ());
314
+ }
263
315
}
0 commit comments