@@ -201,6 +201,12 @@ class AwsS3ClientWrapper {
201
201
return outcome;
202
202
}
203
203
204
+ void GetCloudObjectAsync (
205
+ const Aws::S3Crt::Model::GetObjectRequest& request,
206
+ const Aws::S3Crt::GetObjectResponseReceivedHandler& handler) {
207
+ client_->GetObjectAsync (request, handler, nullptr );
208
+ }
209
+
204
210
template <class ... Args>
205
211
std::shared_ptr<Aws::Transfer::TransferHandle> DownloadFile (Args... args) {
206
212
CloudRequestCallbackGuard guard (cloud_request_callback_.get (),
@@ -434,6 +440,11 @@ class S3StorageProvider : public CloudStorageProviderImpl {
434
440
IODebugContext* dbg) override ;
435
441
Status PrepareOptions (const ConfigOptions& options) override ;
436
442
protected:
443
+ IOStatus DoGetCloudObjectAsync (
444
+ const std::string& bucket_name, const std::string& object_path,
445
+ const std::string& local_path,
446
+ std::shared_ptr<std::promise<bool >> prom_ptr) override ;
447
+
437
448
IOStatus DoGetCloudObject (const std::string& bucket_name,
438
449
const std::string& object_path,
439
450
const std::string& destination,
@@ -729,7 +740,7 @@ IOStatus S3StorageProvider::ExistsCloudObject(const std::string& bucket_name,
729
740
IOStatus S3StorageProvider::GetCloudObjectSize (const std::string& bucket_name,
730
741
const std::string& object_path,
731
742
uint64_t * filesize) {
732
- HeadObjectResult result;
743
+ HeadObjectResult result;
733
744
result.size = filesize;
734
745
return HeadObject (bucket_name, object_path, &result);
735
746
}
@@ -900,6 +911,15 @@ class WritableFileStreamBuf : public std::streambuf {
900
911
*fileCloseStatus_ = fileWriter_->Close ();
901
912
}
902
913
914
+ // Flushes any buffered data
915
+ // move sync() from protected region to public
916
+ // because Close() is called after async handler been executed
917
+ // sync() must be called in async handler, or the file is incomplete
918
+ int sync () override {
919
+ auto st = fileWriter_->Flush ();
920
+ return st.ok () ? 0 : -1 ;
921
+ }
922
+
903
923
protected:
904
924
// Appends a block of data to the stream. Must always write n if possible
905
925
std::streamsize xsputn (const char * s, std::streamsize n) override {
@@ -924,17 +944,26 @@ class WritableFileStreamBuf : public std::streambuf {
924
944
return ch;
925
945
}
926
946
927
- // Flushes any buffered data
928
- int sync () override {
929
- auto st = fileWriter_->Flush ();
930
- return st.ok () ? 0 : -1 ;
931
- }
932
-
933
947
private:
934
948
IOStatus *fileCloseStatus_;
935
949
std::unique_ptr<WritableFileWriter> fileWriter_;
936
950
};
937
951
952
+ // shared_ptr version of IOStreamWithOwnedBuf
953
+ // because Writablefile pointer must be reachable in async callback
954
+ // std::iostream takes a raw pointer to std::streambuf. This subclass
955
+ // takes a shared_ptr to the streambuf, tying the std::streambuf's
956
+ // lifetime to the iostream's.
957
+ template <class T >
958
+ class IOStreamWithOwnedBufSPtr : public std ::iostream {
959
+ public:
960
+ IOStreamWithOwnedBufSPtr (std::shared_ptr<T> s)
961
+ : std::iostream(s.get()), s_(s) {}
962
+
963
+ private:
964
+ std::shared_ptr<T> s_;
965
+ };
966
+
938
967
// std::iostream takes a raw pointer to std::streambuf. This subclass
939
968
// takes a unique_ptr to the streambuf, tying the std::streambuf's
940
969
// lifetime to the iostream's.
@@ -950,6 +979,78 @@ class IOStreamWithOwnedBuf : public std::iostream {
950
979
951
980
} // namespace
952
981
982
+ IOStatus S3StorageProvider::DoGetCloudObjectAsync (
983
+ const std::string& bucket_name, const std::string& object_path,
984
+ const std::string& local_path,
985
+ std::shared_ptr<std::promise<bool >> prom_ptr) {
986
+ std::string tmp_destination =
987
+ local_path + " .tmp-" + std::to_string (rng_.Next ());
988
+
989
+ std::shared_ptr<IOStatus> fileCloseStatus = std::make_shared<IOStatus>();
990
+
991
+ FileOptions foptions;
992
+ foptions.use_direct_writes =
993
+ cfs_->GetCloudFileSystemOptions ().use_direct_io_for_cloud_download ;
994
+ std::unique_ptr<FSWritableFile> file;
995
+ auto st = NewWritableFile (cfs_->GetBaseFileSystem ().get (), tmp_destination,
996
+ &file, foptions);
997
+ if (!st.ok ()) {
998
+ Log (InfoLogLevel::ERROR_LEVEL, cfs_->GetLogger (),
999
+ " create writeablefile for async download failed, msg: %s" ,
1000
+ st.ToString ().c_str ());
1001
+ prom_ptr->set_value (false );
1002
+ return st;
1003
+ }
1004
+ std::shared_ptr<WritableFileStreamBuf> file_stream_buf =
1005
+ std::make_shared<WritableFileStreamBuf>(
1006
+ fileCloseStatus.get (),
1007
+ std::unique_ptr<WritableFileWriter>(new WritableFileWriter (
1008
+ std::move (file), tmp_destination, foptions)));
1009
+
1010
+ auto ioStreamFactory = [file_stream_buf,
1011
+ fileCloseStatus]() -> Aws::IOStream* {
1012
+ return Aws::New<IOStreamWithOwnedBufSPtr<WritableFileStreamBuf>>(
1013
+ Aws::Utils::ARRAY_ALLOCATION_TAG, file_stream_buf);
1014
+ };
1015
+
1016
+ Aws::S3Crt::Model::GetObjectRequest request;
1017
+ request.SetBucket (ToAwsString (bucket_name));
1018
+ request.SetKey (ToAwsString (object_path));
1019
+ request.SetResponseStreamFactory (std::move (ioStreamFactory));
1020
+
1021
+ auto handler = Aws::S3Crt::GetObjectResponseReceivedHandler{
1022
+ [this , tmp_destination, local_path, prom_ptr, file_stream_buf](
1023
+ const Aws::S3Crt::S3CrtClient*,
1024
+ const Aws::S3Crt::Model::GetObjectRequest&,
1025
+ Aws::S3Crt::Model::GetObjectOutcome outcome,
1026
+ const std::shared_ptr<const Aws::Client::AsyncCallerContext>&) {
1027
+ file_stream_buf->sync ();
1028
+ const auto & local_fs = cfs_->GetBaseFileSystem ();
1029
+ const IOOptions io_opts;
1030
+ IODebugContext* dbg = nullptr ;
1031
+ auto remote_size = outcome.GetResult ().GetContentLength ();
1032
+ uint64_t local_size{0 };
1033
+ auto s =
1034
+ local_fs->GetFileSize (tmp_destination, io_opts, &local_size, dbg);
1035
+ if (!outcome.IsSuccess () || !s.ok () ||
1036
+ local_size != uint64_t (remote_size)) {
1037
+ local_fs->DeleteFile (tmp_destination, io_opts, dbg);
1038
+ Log (InfoLogLevel::ERROR_LEVEL, cfs_->GetLogger (),
1039
+ " async download error, outcome: %d, local_size: %lu, "
1040
+ " remote_size: %lu" ,
1041
+ outcome.IsSuccess (), local_size, uint64_t (remote_size));
1042
+ prom_ptr->set_value (false );
1043
+ return ;
1044
+ }
1045
+ local_fs->RenameFile (tmp_destination, local_path, io_opts, dbg);
1046
+ cfs_->FileCacheInsert (local_path, local_size);
1047
+ prom_ptr->set_value (true );
1048
+ }};
1049
+
1050
+ s3client_->GetCloudObjectAsync (request, handler);
1051
+ return IOStatus::OK ();
1052
+ }
1053
+
953
1054
IOStatus S3StorageProvider::DoGetCloudObject (const std::string& bucket_name,
954
1055
const std::string& object_path,
955
1056
const std::string& destination,
0 commit comments