Skip to content

Commit

Permalink
LoadBalancer2 and reprocess() in delayed transport.
Browse files Browse the repository at this point in the history
This is the second step of a major refactor for the LoadBalancer-related
part of Channel impl (grpc#1600)
  • Loading branch information
zhangkun83 committed Nov 23, 2016
1 parent 6a04022 commit 652a0ef
Show file tree
Hide file tree
Showing 3 changed files with 289 additions and 16 deletions.
134 changes: 134 additions & 0 deletions core/src/main/java/io/grpc/LoadBalancer2.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package io.grpc;

import com.google.common.base.Preconditions;

import java.util.List;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
import javax.annotation.concurrent.NotThreadSafe;
import javax.annotation.concurrent.ThreadSafe;

@NotThreadSafe
public abstract class LoadBalancer2 {
// All methods are run in the LoadBalancer executor, a serializing
// executor.
public abstract void handleResolvedAddresses(
List<ResolvedServerInfoGroup> servers, Attributes attributes);
public abstract void handleNameResolutionError(Status error);
// Supersedes handleTransportReady() and handleTransportShutdown()
public abstract void handleSubchannelState(
Subchannel subchannel, ConnectivityStateInfo stateInfo);
public abstract void shutdown();


// Where the LoadBalancer implement the main routing logic
@ThreadSafe
public static abstract class SubchannelPicker {
public abstract PickResult pickSubchannel(Attributes affinity, Metadata headers);
}

@Immutable
public static final class PickResult {
// A READY channel, or null
@Nullable private final Subchannel subchannel;
// An error to be propagated to the application if subchannel == null
// Or OK if there is no error.
// subchannel being null and error being OK means RPC needs to wait
private final Status status;

private PickResult(Subchannel subchannel, Status status) {
this.subchannel = subchannel;
this.status = Preconditions.checkNotNull(status, "status");
}

public static PickResult withSubchannel(Subchannel subchannel) {
return new PickResult(Preconditions.checkNotNull(subchannel, "subchannel"), Status.OK);
}

public static PickResult withError(Status error) {
Preconditions.checkArgument(!error.isOk(), "error status shouldn't be OK");
return new PickResult(null, error);
}

public static PickResult withNothingYet() {
return new PickResult(null, Status.OK);
}

@Nullable
public Subchannel getSubchannel() {
return subchannel;
}

public Status getStatus() {
return status;
}
}

// Implemented by Channel, used by LoadBalancer implementations.
@ThreadSafe
public abstract static class Helper {
// Subchannel for making RPCs. Wraps a TransportSet
public abstract Subchannel createSubChannel(EquivalentAddressGroup addrs, Attributes attrs);
// Out-of-band channel for LoadBalancer’s own RPC needs, e.g.,
// talking to an external load-balancer. Wraps a TransportSet
public abstract ManagedChannel createOobChannel(
EquivalentAddressGroup eag, String authority);
// LoadBalancer calls this whenever its connectivity state changes
// Channel will run the new picker against all pending RPCs
public abstract void updatePicker(SubchannelPicker picker);
// The serializing executor where the LoadBalancer methods are called
public abstract Executor getExecutor();
// For GRPCLB which needs to resolve the address for delegation
public abstract NameResolver.Factory getNameResolverFactory();
public abstract String getServiceName();
}

// Represents the logical connection to a certain server represented by an EquivalentAddressGroup
@ThreadSafe
public static abstract class Subchannel {
public abstract void shutdown();
public abstract void requestConnection();
public abstract EquivalentAddressGroup getAddresses();
// The same Attributes passed to createSubChannel.
// LoadBalancer can use it to attach additional information here, e.g.,
// the shard this Subchannel belongs to.
public abstract Attributes getAttributes();
}

@ThreadSafe
public static abstract class Factory {
public abstract LoadBalancer2 newLoadBalancer(Helper helper);
}
}
114 changes: 99 additions & 15 deletions core/src/main/java/io/grpc/internal/DelayedClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@

import io.grpc.CallOptions;
import io.grpc.Context;
import io.grpc.LoadBalancer2;
import io.grpc.LoadBalancer2.PickResult;
import io.grpc.LoadBalancer2.Subchannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
Expand Down Expand Up @@ -70,8 +73,12 @@ class DelayedClientTransport implements ManagedClientTransport {

@GuardedBy("lock")
private Collection<PendingStream> pendingStreams = new LinkedHashSet<PendingStream>();

// TODO(zhangkun83): remove it once LBv2 refactor is done. In practice ping() is only called on
// real transports.
@GuardedBy("lock")
private Collection<PendingPing> pendingPings = new ArrayList<PendingPing>();

/**
* When shutdown == true and pendingStreams == null, then the transport is considered terminated.
*/
Expand All @@ -96,7 +103,7 @@ class DelayedClientTransport implements ManagedClientTransport {
}

@Override
public Runnable start(Listener listener) {
public final Runnable start(Listener listener) {
this.listener = Preconditions.checkNotNull(listener, "listener");
return null;
}
Expand All @@ -113,7 +120,7 @@ public Runnable start(Listener listener) {
* {@link FailingClientStream} is returned.
*/
@Override
public ClientStream newStream(MethodDescriptor<?, ?> method, Metadata headers,
public final ClientStream newStream(MethodDescriptor<?, ?> method, Metadata headers,
CallOptions callOptions, StatsTraceContext statsTraceCtx) {
Supplier<ClientTransport> supplier = transportSupplier;
if (supplier == null) {
Expand Down Expand Up @@ -141,12 +148,12 @@ public ClientStream newStream(MethodDescriptor<?, ?> method, Metadata headers,
}

@Override
public ClientStream newStream(MethodDescriptor<?, ?> method, Metadata headers) {
public final ClientStream newStream(MethodDescriptor<?, ?> method, Metadata headers) {
return newStream(method, headers, CallOptions.DEFAULT, StatsTraceContext.NOOP);
}

@Override
public void ping(final PingCallback callback, Executor executor) {
public final void ping(final PingCallback callback, Executor executor) {
Supplier<ClientTransport> supplier = transportSupplier;
if (supplier == null) {
synchronized (lock) {
Expand Down Expand Up @@ -177,7 +184,7 @@ public void ping(final PingCallback callback, Executor executor) {
* you still need to call {@link #setTransport} to make this transport terminated.
*/
@Override
public void shutdown() {
public final void shutdown() {
synchronized (lock) {
if (shutdown) {
return;
Expand All @@ -197,7 +204,7 @@ public void shutdown() {
* this transport.
*/
@Override
public void shutdownNow(Status status) {
public final void shutdownNow(Status status) {
shutdown();
Collection<PendingStream> savedPendingStreams = null;
synchronized (lock) {
Expand All @@ -223,7 +230,7 @@ public void shutdownNow(Status status) {
* <p>{@code transport} will be used for all future calls to {@link #newStream}, even if this
* transport is {@link #shutdown}.
*/
public void setTransport(ClientTransport transport) {
public final void setTransport(ClientTransport transport) {
Preconditions.checkArgument(this != transport,
"delayed transport calling setTransport on itself");
setTransportSupplier(Suppliers.ofInstance(transport));
Expand All @@ -239,7 +246,7 @@ public void setTransport(ClientTransport transport) {
* will be used for all future calls to {@link #newStream}, even if this transport is {@link
* #shutdown}.
*/
public void setTransportSupplier(final Supplier<ClientTransport> supplier) {
public final void setTransportSupplier(final Supplier<ClientTransport> supplier) {
synchronized (lock) {
if (transportSupplier != null) {
return;
Expand Down Expand Up @@ -282,14 +289,14 @@ public void setTransportSupplier(final Supplier<ClientTransport> supplier) {
}
}

public boolean hasPendingStreams() {
public final boolean hasPendingStreams() {
synchronized (lock) {
return pendingStreams != null && !pendingStreams.isEmpty();
}
}

@VisibleForTesting
int getPendingStreamsCount() {
final int getPendingStreamsCount() {
synchronized (lock) {
return pendingStreams == null ? 0 : pendingStreams.size();
}
Expand All @@ -301,8 +308,9 @@ int getPendingStreamsCount() {
* fail immediately, and that non-fail fast streams can be created as {@link PendingStream} and
* should keep pending during this back-off period.
*/
// TODO(zhangkun83): remove it once the LBv2 refactor is done.
@VisibleForTesting
boolean isInBackoffPeriod() {
final boolean isInBackoffPeriod() {
synchronized (lock) {
return backoffStatus != null;
}
Expand All @@ -321,7 +329,8 @@ boolean isInBackoffPeriod() {
*
* @param status the causal status for triggering back-off.
*/
void startBackoff(final Status status) {
// TODO(zhangkun83): remove it once the LBv2 refactor is done.
final void startBackoff(final Status status) {
synchronized (lock) {
if (shutdown) {
return;
Expand Down Expand Up @@ -359,22 +368,97 @@ public void run() {
* Is only called at the beginning of the callback function of {@code endOfCurrentBackoff} in the
* {@link TransportSet#scheduleBackoff} method.
*/
void endBackoff() {
// TODO(zhangkun83): remove it once the LBv2 refactor is done.
final void endBackoff() {
synchronized (lock) {
Preconditions.checkState(backoffStatus != null,
"Error when calling endBackoff: transport is not in backoff period");
backoffStatus = null;
}
}

/**
* Use the picker to try picking a transport for every pending stream and pending ping, proceed the
* stream or ping if the pick is successful, otherwise keep it pending.
*
* <p>If new pending streams are created while reprocess() is in progress, there is no guarantee
* that the pending streams will or will not be processed by this picker.
*/
final void reprocess(LoadBalancer2.SubchannelPicker picker) {
ArrayList<PendingStream> toProcess;
ArrayList<PendingStream> toRemove = new ArrayList<PendingStream>();
synchronized (lock) {
if (pendingStreams == null || pendingStreams.isEmpty()) {
return;
}
toProcess = new ArrayList<PendingStream>(pendingStreams);
}

for (final PendingStream stream : toProcess) {
PickResult pickResult = picker.pickSubchannel(
stream.callOptions.getAffinity(), stream.headers);
final ClientTransport realTransport;
Subchannel subchannel = pickResult.getSubchannel();
if (subchannel != null) {
realTransport = obtainActiveTransport(subchannel);
} else {
realTransport = null;
}
if (realTransport != null) {
Executor executor = streamCreationExecutor;
// createRealStream may be expensive. It will start real streams on the transport. If
// there are pending requests, they will be serialized too, which may be expensive. Since
// we are now on transport thread, we need to offload the work to an executor.
if (stream.callOptions.getExecutor() != null) {
executor = stream.callOptions.getExecutor();
}
executor.execute(new Runnable() {
@Override
public void run() {
stream.createRealStream(realTransport);
}
});
toRemove.add(stream);
} else if (!pickResult.getStatus().isOk() && !stream.callOptions.isWaitForReady()) {
stream.setStream(new FailingClientStream(pickResult.getStatus()));
toRemove.add(stream);
} // other cases: stay pending
}

synchronized (lock) {
if (pendingStreams == null || pendingStreams.isEmpty()) {
return;
}
boolean wasEmpty = pendingStreams.isEmpty();
pendingStreams.removeAll(toRemove);
if (pendingStreams.isEmpty()) {
pendingStreams = null;
if (!wasEmpty) {
// There may be a brief gap between delayed transport clearing in-use state, and first
// real transport starting streams and setting in-use state. During the gap the whole
// channel's in-use state may be false. However, it shouldn't cause spurious switching to
// idleness (which would shutdown the transports and LoadBalancer) because the gap should
// be shorter than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
listener.transportInUse(false);
}
}
}
}

// To be implemented by ManagedChannelImpl2 and unit test
@Nullable
ClientTransport obtainActiveTransport(Subchannel subchannel) {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public String getLogId() {
public final String getLogId() {
return GrpcUtil.getLogId(this);
}

@VisibleForTesting
@Nullable
Supplier<ClientTransport> getTransportSupplier() {
final Supplier<ClientTransport> getTransportSupplier() {
return transportSupplier;
}

Expand Down
Loading

0 comments on commit 652a0ef

Please sign in to comment.