Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libstore S3: fix progress bar and make file transfers interruptible #12538

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions src/libstore/filetransfer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ struct curlFileTransfer : public FileTransfer
: fileTransfer(fileTransfer)
, request(request)
, act(*logger, lvlTalkative, actFileTransfer,
request.post ? "" : fmt(request.data ? "uploading '%s'" : "downloading '%s'", request.uri),
fmt("%sing '%s'", request.verb(), request.uri),
{request.uri}, request.parentAct)
, callback(std::move(callback))
, finalSink([this](std::string_view data) {
Expand Down Expand Up @@ -278,7 +278,9 @@ struct curlFileTransfer : public FileTransfer

static int progressCallbackWrapper(void * userp, double dltotal, double dlnow, double ultotal, double ulnow)
{
return ((TransferItem *) userp)->progressCallback(dltotal, dlnow);
auto & item = *static_cast<TransferItem *>(userp);
bool isUpload = item.request.data.has_value();
return item.progressCallback(isUpload ? ultotal : dltotal, isUpload ? ulnow : dlnow);
}

static int silentProgressCallbackWrapper(void * userp, double dltotal, double dlnow, double ultotal, double ulnow)
Expand Down Expand Up @@ -788,10 +790,6 @@ struct curlFileTransfer : public FileTransfer

S3Helper s3Helper(profile, region, scheme, endpoint);

Activity act(*logger, lvlTalkative, actFileTransfer,
fmt("downloading '%s'", request.uri),
{request.uri}, request.parentAct);

// FIXME: implement ETag
auto s3Res = s3Helper.getObject(bucketName, key);
FileTransferResult res;
Expand Down
2 changes: 1 addition & 1 deletion src/libstore/filetransfer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ struct FileTransferRequest
FileTransferRequest(std::string_view uri)
: uri(uri), parentAct(getCurActivity()) { }

std::string verb()
std::string verb() const
{
return data ? "upload" : "download";
}
Expand Down
120 changes: 105 additions & 15 deletions src/libstore/s3-binary-cache-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,11 @@ ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(
S3Helper::FileTransferResult S3Helper::getObject(
const std::string & bucketName, const std::string & key)
{
debug("fetching 's3://%s/%s'...", bucketName, key);
std::string uri = "s3://" + bucketName + "/" + key;
Activity act(*logger, lvlTalkative, actFileTransfer,
fmt("downloading '%s'", uri),
Logger::Fields{uri}, getCurActivity());
debug("fetching '%s'...", uri);

auto request =
Aws::S3::Model::GetObjectRequest()
Expand All @@ -170,6 +174,27 @@ S3Helper::FileTransferResult S3Helper::getObject(
return Aws::New<std::stringstream>("STRINGSTREAM");
});

size_t bytesDone = 0;
size_t bytesExpected = 0;
request.SetDataReceivedEventHandler([&](const Aws::Http::HttpRequest * req, Aws::Http::HttpResponse * resp, long long l) {
if (!bytesExpected && resp->HasHeader("Content-Length")) {
if (auto length = string2Int<size_t>(resp->GetHeader("Content-Length"))) {
bytesExpected = *length;
}
}
bytesDone += l;
act.progress(bytesDone, bytesExpected);
});

request.SetContinueRequestHandler([](const Aws::Http::HttpRequest*) {
bool cont = false;
try {
checkInterrupt();
cont = true;
} catch(...) {}
return cont;
});

FileTransferResult res;

auto now1 = std::chrono::steady_clock::now();
Expand All @@ -179,6 +204,8 @@ S3Helper::FileTransferResult S3Helper::getObject(
auto result = checkAws(fmt("AWS error fetching '%s'", key),
client->GetObject(request));

act.progress(result.GetContentLength(), result.GetContentLength());

res.data = decompress(result.GetContentEncoding(),
dynamic_cast<std::stringstream &>(result.GetBody()).str());

Expand Down Expand Up @@ -306,11 +333,35 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
std::shared_ptr<TransferManager> transferManager;
std::once_flag transferManagerCreated;

struct AsyncContext : public Aws::Client::AsyncCallerContext
{
mutable std::mutex mutex;
mutable std::condition_variable cv;
const Activity & act;

void notify() const
{
cv.notify_one();
}

void wait() const
{
std::unique_lock<std::mutex> lk(mutex);
cv.wait(lk);
}

AsyncContext(const Activity & act) : act(act) {}
};

void uploadFile(const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType,
const std::string & contentEncoding)
{
std::string uri = "s3://" + bucketName + "/" + path;
Activity act(*logger, lvlTalkative, actFileTransfer,
fmt("uploading '%s'", uri),
Logger::Fields{uri}, getCurActivity());
istream->seekg(0, istream->end);
auto size = istream->tellg();
istream->seekg(0, istream->beg);
Expand All @@ -329,16 +380,25 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
transferConfig.bufferSize = bufferSize;

transferConfig.uploadProgressCallback =
[](const TransferManager *transferManager,
const std::shared_ptr<const TransferHandle>
&transferHandle)
[](const TransferManager * transferManager,
const std::shared_ptr<const TransferHandle> & transferHandle)
{
//FIXME: find a way to properly abort the multipart upload.
//checkInterrupt();
debug("upload progress ('%s'): '%d' of '%d' bytes",
transferHandle->GetKey(),
transferHandle->GetBytesTransferred(),
transferHandle->GetBytesTotalSize());
auto context = std::dynamic_pointer_cast<const AsyncContext>(transferHandle->GetContext());
size_t bytesDone = transferHandle->GetBytesTransferred();
size_t bytesTotal = transferHandle->GetBytesTotalSize();
try {
checkInterrupt();
context->act.progress(bytesDone, bytesTotal);
} catch (...) {
context->notify();
}
};
transferConfig.transferStatusUpdatedCallback =
[](const TransferManager * transferManager,
const std::shared_ptr<const TransferHandle> & transferHandle)
{
auto context = std::dynamic_pointer_cast<const AsyncContext>(transferHandle->GetContext());
context->notify();
};

transferManager = TransferManager::Create(transferConfig);
Expand All @@ -352,29 +412,57 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
if (contentEncoding != "")
throw Error("setting a content encoding is not supported with S3 multi-part uploads");

auto context = std::make_shared<AsyncContext>(act);
std::shared_ptr<TransferHandle> transferHandle =
transferManager->UploadFile(
istream, bucketName, path, mimeType,
Aws::Map<Aws::String, Aws::String>(),
nullptr /*, contentEncoding */);

transferHandle->WaitUntilFinished();
context /*, contentEncoding */);

TransferStatus status = transferHandle->GetStatus();
while (status == TransferStatus::IN_PROGRESS || status == TransferStatus::NOT_STARTED) {
try {
checkInterrupt();
context->wait();
} catch (...) {
transferHandle->Cancel();
transferHandle->WaitUntilFinished();
}
status = transferHandle->GetStatus();
}
act.progress(transferHandle->GetBytesTransferred(), transferHandle->GetBytesTotalSize());

if (transferHandle->GetStatus() == TransferStatus::FAILED)
if (status == TransferStatus::FAILED)
throw Error("AWS error: failed to upload 's3://%s/%s': %s",
bucketName, path, transferHandle->GetLastError().GetMessage());

if (transferHandle->GetStatus() != TransferStatus::COMPLETED)
if (status != TransferStatus::COMPLETED)
throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state",
bucketName, path);

} else {
act.progress(0, size);

auto request =
Aws::S3::Model::PutObjectRequest()
.WithBucket(bucketName)
.WithKey(path);

size_t bytesSent = 0;
request.SetDataSentEventHandler([&](const Aws::Http::HttpRequest * req, long long l) {
bytesSent += l;
act.progress(bytesSent, size);
});

request.SetContinueRequestHandler([](const Aws::Http::HttpRequest*) {
bool cont = false;
try {
checkInterrupt();
cont = true;
} catch(...) {}
return cont;
});

request.SetContentType(mimeType);

if (contentEncoding != "")
Expand All @@ -384,6 +472,8 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual

auto result = checkAws(fmt("AWS error uploading '%s'", path),
s3Helper.client->PutObject(request));

act.progress(size, size);
}

auto now2 = std::chrono::steady_clock::now();
Expand Down
Loading