Skip to content

Commit

Permalink
Rewind on --remote_download_minimal upload FileNotFounds
Browse files Browse the repository at this point in the history
Currently, when building with `--remote_download_minimal`, if the remote
storage has evicted an output of a previous action, Bazel will throw a
FileNotFoundException for it.

This change detects that scenario, and re-throws it as a
LostInputsExecException, tracking enough metadata on the way to do so.

If rewinding is enabled for a build, this will cause rewinding to
automatically kick in.
  • Loading branch information
illicitonion committed Apr 26, 2022
1 parent 4e6153e commit 0670bc4
Show file tree
Hide file tree
Showing 27 changed files with 327 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,26 @@ protected FailureDetail getFailureDetail(String message) {
.build();
}

public void combineAndThrow(LostInputsExecException other) throws LostInputsExecException {
public static LostInputsExecException combine(LostInputsExecException... es) {
// This uses a HashMap when merging the two lostInputs maps because key collisions are expected.
// In contrast, ImmutableMap.Builder treats collisions as errors. Collisions will happen when
// the two sources of the original exceptions shared knowledge of what was lost. For example,
// a SpawnRunner may discover a lost input and look it up in an action filesystem in which it's
// also lost. The SpawnRunner and the filesystem may then each throw a LostInputsExecException
// with the same information.
Map<String, ActionInput> map = new HashMap<>();
map.putAll(lostInputs);
map.putAll(other.lostInputs);
LostInputsExecException combined =
new LostInputsExecException(
ImmutableMap.copyOf(map), new MergedActionInputDepOwners(owners, other.owners), this);
ActionInputDepOwners mergedOwners = new ActionInputDepOwnerMap(ImmutableSet.of());

for (LostInputsExecException other : es) {
map.putAll(other.lostInputs);
mergedOwners = new MergedActionInputDepOwners(mergedOwners, other.owners);
}
return new LostInputsExecException(ImmutableMap.copyOf(map), mergedOwners);
}

public void combineAndThrow(LostInputsExecException other) throws LostInputsExecException {
LostInputsExecException combined = combine(this, other);
combined.addSuppressed(this);
combined.addSuppressed(other);
throw combined;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ java_library(
deps = [
"//src/main/java/com/google/devtools/build/docgen/annot",
"//src/main/java/com/google/devtools/build/lib:runtime",
"//src/main/java/com/google/devtools/build/lib/actions",
"//src/main/java/com/google/devtools/build/lib/actions:file_metadata",
"//src/main/java/com/google/devtools/build/lib/analysis:analysis_cluster",
"//src/main/java/com/google/devtools/build/lib/analysis:blaze_directories",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Maps;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.FileValue;
import com.google.devtools.build.lib.bazel.debug.WorkspaceRuleEvent;
import com.google.devtools.build.lib.bazel.repository.downloader.DownloadManager;
Expand Down Expand Up @@ -284,7 +285,7 @@ private StarlarkExecutionResult executeRemote(
}

return new StarlarkExecutionResult(result.exitCode(), stdout, stderr);
} catch (IOException e) {
} catch (ExecException | IOException e) {
throw Starlark.errorf("remote_execute failed: %s", e.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ public ImmutableList<SpawnResult> exec(
}
} catch (InterruptedIOException e) {
throw new InterruptedException(e.getMessage());
} catch (LostInputsExecException e) {
throw e;
} catch (IOException e) {
throw new EnvironmentalExecException(
e,
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/google/devtools/build/lib/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ java_library(
],
deps = [
":ExecutionStatusException",
"//src/main/java/com/google/devtools/build/lib/actions",
"//src/main/java/com/google/devtools/build/lib/remote/options",
"//third_party:guava",
"//third_party:jsr305",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import build.bazel.remote.execution.v2.RequestMetadata;
import build.bazel.remote.execution.v2.WaitExecutionRequest;
import com.google.common.base.Preconditions;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.remote.RemoteRetrier.ProgressiveBackoff;
Expand Down Expand Up @@ -114,7 +115,7 @@ private static class Execution {
this.waitExecutionFunction = waitExecutionFunction;
}

ExecuteResponse start() throws IOException, InterruptedException {
ExecuteResponse start() throws ExecException, IOException, InterruptedException {
// Execute has two components: the Execute call and (optionally) the WaitExecution call.
// This is the simple flow without any errors:
//
Expand Down Expand Up @@ -321,7 +322,7 @@ static ExecuteResponse extractResponseOrThrowIfError(Operation operation) throws
@Override
public ExecuteResponse executeRemotely(
RemoteActionExecutionContext context, ExecuteRequest request, OperationObserver observer)
throws IOException, InterruptedException {
throws ExecException, IOException, InterruptedException {
Execution execution =
new Execution(
request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,19 @@
import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.Directory;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputDepOwnerMap;
import com.google.devtools.build.lib.actions.Artifact;
import com.google.devtools.build.lib.actions.Artifact.DerivedArtifact;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.FileArtifactValue.RemoteFileArtifactValue;
import com.google.devtools.build.lib.actions.LostInputsExecException;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
Expand All @@ -40,6 +50,7 @@
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.subjects.AsyncSubject;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -73,8 +84,9 @@ public void ensureInputsPresent(
RemoteActionExecutionContext context,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
String actionId,
boolean force)
throws IOException, InterruptedException {
throws ExecException, IOException, InterruptedException {
ImmutableSet<Digest> allDigests =
ImmutableSet.<Digest>builder()
.addAll(merkleTree.getAllDigests())
Expand All @@ -91,14 +103,34 @@ public void ensureInputsPresent(
.flatMapSingle(
digest ->
uploadBlobIfMissing(
context, merkleTree, additionalInputs, force, missingDigestFinder, digest));
context, merkleTree, additionalInputs, force, missingDigestFinder, digest, actionId)
.onErrorResumeNext(e -> {
if (e instanceof FileNotFoundException) {
PathOrBytes file = merkleTree.getFileByDigest(digest);
if (file.getArtifact() != null) {
ActionInputDepOwnerMap owners = new ActionInputDepOwnerMap(
ImmutableList.of(file.getArtifact()));
RemoteFileArtifactValue artifactValue = new RemoteFileArtifactValue(
DigestUtil.toBinaryDigest(digest),
digest.getSizeBytes(),
/*locationIndex=*/ 1,
actionId);
owners.put(file.getArtifact(), artifactValue, file.getArtifact());
return Single.error(new LostInputsExecException(
ImmutableMap.of(digest.getHash(), file.getArtifact()),
owners));
}
}
return Single.error(e);
}));

try {
mergeBulkTransfer(uploads).blockingAwait();
} catch (RuntimeException e) {
Throwable cause = e.getCause();
if (cause != null) {
Throwables.throwIfInstanceOf(cause, InterruptedException.class);
Throwables.throwIfInstanceOf(cause, LostInputsExecException.class);
Throwables.throwIfInstanceOf(cause, IOException.class);
}
throw e;
Expand All @@ -111,7 +143,8 @@ private Single<TransferResult> uploadBlobIfMissing(
Map<Digest, Message> additionalInputs,
boolean force,
MissingDigestFinder missingDigestFinder,
Digest digest) {
Digest digest,
String actionId) {
Completable upload =
casUploadCache.execute(
digest,
Expand All @@ -124,7 +157,7 @@ private Single<TransferResult> uploadBlobIfMissing(
missingDigests -> {
if (missingDigests.contains(digest)) {
return toCompletable(
() -> uploadBlob(context, digest, merkleTree, additionalInputs),
() -> uploadBlob(context, digest, merkleTree, additionalInputs, actionId),
directExecutor());
} else {
return Completable.complete();
Expand All @@ -139,7 +172,8 @@ private ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context,
Digest digest,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs) {
Map<Digest, Message> additionalInputs,
String actionId) {
Directory node = merkleTree.getDirectoryByDigest(digest);
if (node != null) {
return cacheProtocol.uploadBlob(context, digest, node.toByteString());
Expand All @@ -150,7 +184,28 @@ private ListenableFuture<Void> uploadBlob(
if (file.getBytes() != null) {
return cacheProtocol.uploadBlob(context, digest, file.getBytes());
}
return cacheProtocol.uploadFile(context, digest, file.getPath());
return Futures.catchingAsync(cacheProtocol.uploadFile(context, digest, file.getPath()),
// When we avoid downloads (e.g. with --remote_download_minimal), we end up not populating
// paths which may be read from when doing uploads.
// If this happens, the remote is missing a file we expect it to have, and we can identify
// which action produced the file, report that we lost inputs but that rewinding may be
// able to regenerate them.
FileNotFoundException.class, e -> {
if (file.getArtifact() != null) {
ActionInputDepOwnerMap owners = new ActionInputDepOwnerMap(
ImmutableList.of(file.getArtifact()));
RemoteFileArtifactValue artifactValue = new RemoteFileArtifactValue(
DigestUtil.toBinaryDigest(digest),
digest.getSizeBytes(),
/*locationIndex=*/ 1,
actionId);
owners.put(file.getArtifact(), artifactValue, file.getArtifact());
return Futures.immediateFailedFuture(new LostInputsExecException(
ImmutableMap.of(digest.getHash(), file.getArtifact()),
owners));
}
return Futures.immediateFailedFuture(e);
}, MoreExecutors.directExecutor());
}

Message message = additionalInputs.get(digest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1207,7 +1207,7 @@ private void reportUploadError(Throwable error) {
* <p>Must be called before calling {@link #executeRemotely}.
*/
public void uploadInputsIfNotPresent(RemoteAction action, boolean force)
throws IOException, InterruptedException {
throws ExecException, IOException, InterruptedException {
checkState(!shutdown.get(), "shutdown");
checkState(mayBeExecutedRemotely(action.getSpawn()), "spawn can't be executed remotely");

Expand All @@ -1219,7 +1219,7 @@ public void uploadInputsIfNotPresent(RemoteAction action, boolean force)
additionalInputs.put(action.getActionKey().getDigest(), action.getAction());
additionalInputs.put(action.getCommandHash(), action.getCommand());
remoteExecutionCache.ensureInputsPresent(
action.getRemoteActionExecutionContext(), action.getMerkleTree(), additionalInputs, force);
action.getRemoteActionExecutionContext(), action.getMerkleTree(), additionalInputs, action.getActionKey().getDigest().toString(), force);
}

/**
Expand All @@ -1230,7 +1230,7 @@ public void uploadInputsIfNotPresent(RemoteAction action, boolean force)
*/
public RemoteActionResult executeRemotely(
RemoteAction action, boolean acceptCachedResult, OperationObserver observer)
throws IOException, InterruptedException {
throws ExecException, IOException, InterruptedException {
checkState(!shutdown.get(), "shutdown");
checkState(mayBeExecutedRemotely(action.getSpawn()), "spawn can't be executed remotely");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Maps;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.analysis.platform.PlatformUtils;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.ProfilerTask;
Expand Down Expand Up @@ -108,7 +109,7 @@ public ExecutionResult execute(
ImmutableMap<String, String> environment,
String workingDirectory,
Duration timeout)
throws IOException, InterruptedException {
throws ExecException, IOException, InterruptedException {
RequestMetadata metadata =
TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "repository_rule", null);
RemoteActionExecutionContext context = RemoteActionExecutionContext.create(metadata);
Expand Down Expand Up @@ -158,7 +159,7 @@ public ExecutionResult execute(
additionalInputs.put(actionDigest, action);
additionalInputs.put(commandHash, command);

remoteCache.ensureInputsPresent(context, merkleTree, additionalInputs, /*force=*/ true);
remoteCache.ensureInputsPresent(context, merkleTree, additionalInputs, "", /*force=*/ true);
}

try (SilentCloseable c =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
Expand Down Expand Up @@ -131,6 +132,28 @@ public <T> T execute(Callable<T> call, Backoff backoff) throws IOException, Inte
}
}

/**
* As {@link #execute(Callable)} but will also propagate {@link ExecException}s.
*/
public <T> T executeWithExecException(Callable<T> call) throws ExecException, IOException, InterruptedException {
return executeWithExecException(call, newBackoff());
}

/**
* As {@link #execute(Callable, Backoff)} but will also propagate {@link ExecException}s.
*/
public <T> T executeWithExecException(Callable<T> call, Backoff backoff) throws ExecException, IOException, InterruptedException {
try {
return super.execute(call, backoff);
} catch (Exception e) {
Throwables.throwIfInstanceOf(e, IOException.class);
Throwables.throwIfInstanceOf(e, InterruptedException.class);
Throwables.throwIfInstanceOf(e, ExecException.class);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}

/** Backoff strategy that backs off exponentially. */
public static class ExponentialBackoff implements Backoff {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.devtools.build.lib.actions.CommandLines.ParamFileActionInput;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.ForbiddenActionInputException;
import com.google.devtools.build.lib.actions.LostInputsExecException;
import com.google.devtools.build.lib.actions.Spawn;
import com.google.devtools.build.lib.actions.SpawnMetrics;
import com.google.devtools.build.lib.actions.SpawnResult;
Expand Down Expand Up @@ -235,15 +236,25 @@ public SpawnResult exec(Spawn spawn, SpawnExecutionContext context)
AtomicBoolean useCachedResult = new AtomicBoolean(acceptCachedResult);
AtomicBoolean forceUploadInput = new AtomicBoolean(false);
try {
return retrier.execute(
return retrier.executeWithExecException(
() -> {
// Upload the command and all the inputs into the remote cache.
try (SilentCloseable c = prof.profile(UPLOAD_TIME, "upload missing inputs")) {
Duration networkTimeStart = action.getNetworkTime().getDuration();
Stopwatch uploadTime = Stopwatch.createStarted();
// Upon retry, we force upload inputs
remoteExecutionService.uploadInputsIfNotPresent(
action, forceUploadInput.getAndSet(true));
try {
remoteExecutionService.uploadInputsIfNotPresent(
action, forceUploadInput.getAndSet(true));
} catch (BulkTransferException e) {
// If all of the issues were lost actions, propagate that exception so we can maybe rewind.
LostInputsExecException lostInputsExecException = e.asLostInputsExecException();
if (lostInputsExecException != null) {
throw lostInputsExecException;
} else {
throw e;
}
}

// subtract network time consumed here to ensure wall clock during upload is not
// double
Expand Down
Loading

0 comments on commit 0670bc4

Please sign in to comment.