From b2848a573a9ea0df71b1456bab9e6983c07bd833 Mon Sep 17 00:00:00 2001 From: jomayeri Date: Fri, 23 Aug 2024 20:26:29 +0000 Subject: [PATCH 01/17] file offset for gds --- csrc/aio/py_lib/deepspeed_aio_op_desc.cpp | 4 ++- csrc/aio/py_lib/deepspeed_aio_op_desc.h | 2 ++ csrc/aio/py_lib/deepspeed_cpu_op.cpp | 4 ++- csrc/aio/py_lib/deepspeed_cpu_op.h | 1 + csrc/aio/py_lib/deepspeed_py_io_handle.cpp | 26 ++++++++++--------- csrc/aio/py_lib/deepspeed_py_io_handle.h | 6 +++-- csrc/aio/py_lib/py_ds_aio.cpp | 7 +++-- csrc/gds/py_lib/deepspeed_gds_op.cpp | 14 +++++++--- csrc/gds/py_lib/deepspeed_gds_op.h | 1 + csrc/gds/py_lib/deepspeed_py_gds_handle.cpp | 5 ++-- csrc/gds/py_lib/deepspeed_py_gds_handle.h | 1 + csrc/gds/py_lib/py_ds_gds.cpp | 7 +++-- .../swap_tensor/partitioned_param_swapper.py | 1 + 13 files changed, 53 insertions(+), 26 deletions(-) diff --git a/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp b/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp index dc820be528d0..392ec61a7a62 100644 --- a/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp +++ b/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp @@ -12,6 +12,7 @@ io_op_desc_t::io_op_desc_t(const bool read_op, const int fd, const char* filename, const long long int file_num_bytes, + const long long int file_offset, const int num_threads, const bool validate) : _read_op(read_op), @@ -19,8 +20,9 @@ io_op_desc_t::io_op_desc_t(const bool read_op, _fd(fd), _filename(filename), _file_num_bytes(file_num_bytes), + _file_offset(file_offset), _num_threads(num_threads), - _num_bytes_per_thread(file_num_bytes / num_threads), + _num_bytes_per_thread( static_cast(buffer.nbytes())/ num_threads), _validate(validate) { } diff --git a/csrc/aio/py_lib/deepspeed_aio_op_desc.h b/csrc/aio/py_lib/deepspeed_aio_op_desc.h index 7305f6920c91..4ff06d27f5a4 100644 --- a/csrc/aio/py_lib/deepspeed_aio_op_desc.h +++ b/csrc/aio/py_lib/deepspeed_aio_op_desc.h @@ -15,6 +15,7 @@ struct io_op_desc_t { int _fd; const std::string _filename; const long long int _file_num_bytes; + const long long int _file_offset; const int _num_threads; const int _num_bytes_per_thread; torch::Tensor _contiguous_buffer; @@ -25,6 +26,7 @@ struct io_op_desc_t { const int fd, const char* filename, const long long int file_num_bytes, + const long long int file_offset, const int num_threads, const bool validate); diff --git a/csrc/aio/py_lib/deepspeed_cpu_op.cpp b/csrc/aio/py_lib/deepspeed_cpu_op.cpp index 41790b99bb88..583ad9d41eaf 100644 --- a/csrc/aio/py_lib/deepspeed_cpu_op.cpp +++ b/csrc/aio/py_lib/deepspeed_cpu_op.cpp @@ -12,9 +12,10 @@ cpu_op_desc_t::cpu_op_desc_t(const bool read_op, const int fd, const char* filename, const long long int file_num_bytes, + const long long int file_offset, const int num_threads, const bool validate) - : io_op_desc_t(read_op, buffer, fd, filename, file_num_bytes, num_threads, validate), + : io_op_desc_t(read_op, buffer, fd, filename, file_num_bytes, file_offset, num_threads, validate), _cpu_buffer(buffer) { // Need to use CPU bounce buffer if buffer is not a page-locked DRAM memory. @@ -58,6 +59,7 @@ void cpu_op_desc_t::run(const int tid, std::unique_ptr& aio_ctxt, deepspeed_aio_config_t* aio_config) { + // TODO: add file_offset assert(tid < _num_threads); const auto base_offset = _num_bytes_per_thread * tid; diff --git a/csrc/aio/py_lib/deepspeed_cpu_op.h b/csrc/aio/py_lib/deepspeed_cpu_op.h index da96dd2b1d50..b1650c4b4d75 100644 --- a/csrc/aio/py_lib/deepspeed_cpu_op.h +++ b/csrc/aio/py_lib/deepspeed_cpu_op.h @@ -16,6 +16,7 @@ struct cpu_op_desc_t : io_op_desc_t { const int fd, const char* filename, const long long int file_num_bytes, + const long long int file_offset, const int num_threads, const bool validate); diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp index bdf2a858d797..a1fd529d0822 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp @@ -207,15 +207,17 @@ std::shared_ptr deepspeed_io_handle_t::_create_io_op_desc( const int fd, const char* filename, const long long int file_num_bytes, + const long long int file_offset, const bool validate) { return std::make_shared( - read_op, buffer, fd, filename, file_num_bytes, _num_threads, validate); + read_op, buffer, fd, filename, file_num_bytes, file_offset, _num_threads, validate); } int deepspeed_io_handle_t::pread(const torch::Tensor& buffer, const char* filename, const bool validate, + const long long int file_offset, const bool async) { long long num_file_bytes; @@ -225,19 +227,19 @@ int deepspeed_io_handle_t::pread(const torch::Tensor& buffer, return -1; } const auto buffer_bytes = static_cast(buffer.nbytes()); - if (buffer_bytes != num_file_bytes) { - std::cout << filename << ": buffer nbytes != file bytes " << buffer_bytes - << " != " << num_file_bytes << std::endl; + if ((buffer_bytes+file_offset) > num_file_bytes) { + std::cout << filename << ": buffer + file offset > file bytes " << buffer_bytes + << "+ " << file_offset << " > " << num_file_bytes << std::endl; + assert(0); } - assert(static_cast(buffer.nbytes()) == num_file_bytes); - assert((num_file_bytes % _num_threads) == 0); + assert((buffer_bytes % _num_threads) == 0); if (!_is_valid_parallel_aio_op(true, num_file_bytes)) { return -1; } const auto fd = open_file(filename, true); if (fd == -1) { return -1; } - auto scheduled_op = _create_io_op_desc(true, buffer, fd, filename, num_file_bytes, validate); + auto scheduled_op = _create_io_op_desc(true, buffer, fd, filename, num_file_bytes, file_offset, validate); _schedule_aio_work(scheduled_op); @@ -259,7 +261,7 @@ int deepspeed_io_handle_t::pwrite(const torch::Tensor& buffer, const auto fd = open_file(filename, false); if (fd == -1) { return -1; } - auto scheduled_op = _create_io_op_desc(false, buffer, fd, filename, num_write_bytes, validate); + auto scheduled_op = _create_io_op_desc(false, buffer, fd, filename, num_write_bytes, 0, validate); _schedule_aio_work(scheduled_op); @@ -268,9 +270,9 @@ int deepspeed_io_handle_t::pwrite(const torch::Tensor& buffer, return wait(); } -int deepspeed_io_handle_t::sync_pread(torch::Tensor& buffer, const char* filename) +int deepspeed_io_handle_t::sync_pread(torch::Tensor& buffer, const char* filename, const long long int file_offset) { - return pread(buffer, filename, false, false); + return pread(buffer, filename, false, file_offset, false); } int deepspeed_io_handle_t::sync_pwrite(const torch::Tensor& buffer, const char* filename) @@ -278,9 +280,9 @@ int deepspeed_io_handle_t::sync_pwrite(const torch::Tensor& buffer, const char* return pwrite(buffer, filename, false, false); } -int deepspeed_io_handle_t::async_pread(torch::Tensor& buffer, const char* filename) +int deepspeed_io_handle_t::async_pread(torch::Tensor& buffer, const char* filename, const long long int file_offset) { - return pread(buffer, filename, false, true); + return pread(buffer, filename, false, file_offset, true); } int deepspeed_io_handle_t::async_pwrite(const torch::Tensor& buffer, const char* filename) diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.h b/csrc/aio/py_lib/deepspeed_py_io_handle.h index 2974ebe87bfc..7e684f96cccc 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.h +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.h @@ -45,6 +45,7 @@ struct deepspeed_io_handle_t { int pread(const torch::Tensor& buffer, const char* filename, const bool validate, + const long long int file_offset, const bool async); int pwrite(const torch::Tensor& buffer, @@ -52,11 +53,11 @@ struct deepspeed_io_handle_t { const bool validate, const bool async); - int sync_pread(torch::Tensor& buffer, const char* filename); + int sync_pread(torch::Tensor& buffer, const char* filename, const long long int file_offset); int sync_pwrite(const torch::Tensor& buffer, const char* filename); - int async_pread(torch::Tensor& buffer, const char* filename); + int async_pread(torch::Tensor& buffer, const char* filename, const long long int file_offset); int async_pwrite(const torch::Tensor& buffer, const char* filename); @@ -81,5 +82,6 @@ struct deepspeed_io_handle_t { const int fd, const char* filename, const long long int file_num_bytes, + const long long int file_offset, const bool validate); }; diff --git a/csrc/aio/py_lib/py_ds_aio.cpp b/csrc/aio/py_lib/py_ds_aio.cpp index 3171d0c6bf3c..720bbf1faa57 100644 --- a/csrc/aio/py_lib/py_ds_aio.cpp +++ b/csrc/aio/py_lib/py_ds_aio.cpp @@ -55,6 +55,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "buffer"_a, "filename"_a, "validate"_a, + "file_offset"_a, "async"_a) .def("pwrite", @@ -69,7 +70,8 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) &deepspeed_aio_handle_t::sync_pread, "Synchrononous parallel file read. Returns count of completed read ops", "buffer"_a, - "filename"_a) + "filename"_a, + "file_offset"_a) .def("sync_pwrite", &deepspeed_aio_handle_t::sync_pwrite, @@ -82,7 +84,8 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "Asynchronous parallel file read. Returns 0 on success. Returns 0 on success, and " "following wait() returns count of completed ops.", "buffer"_a, - "filename"_a) + "filename"_a, + "file_offset"_a) .def("async_pwrite", &deepspeed_aio_handle_t::async_pwrite, diff --git a/csrc/gds/py_lib/deepspeed_gds_op.cpp b/csrc/gds/py_lib/deepspeed_gds_op.cpp index c370a448e5a2..cdc9ad29abc7 100644 --- a/csrc/gds/py_lib/deepspeed_gds_op.cpp +++ b/csrc/gds/py_lib/deepspeed_gds_op.cpp @@ -95,9 +95,10 @@ gds_op_desc_t::gds_op_desc_t(const bool read_op, const int fd, const char* filename, const long long int file_num_bytes, + const long long int file_offset, const int num_threads, const bool validate) - : io_op_desc_t(read_op, buffer, fd, filename, file_num_bytes, num_threads, validate) + : io_op_desc_t(read_op, buffer, fd, filename, file_num_bytes, file_offset, num_threads, validate) { _contiguous_buffer = _buffer.contiguous(); const int64_t device = _buffer.get_device(); @@ -126,15 +127,20 @@ void gds_op_desc_t::run(const int tid, assert(tid < _num_threads); check_cudaruntimecall(cudaSetDevice(_buffer.get_device())); int64_t buf_offset = data_ptr() + (_num_bytes_per_thread * tid) - (char*)_base_ptr; - const auto file_offset = _num_bytes_per_thread * tid; + const auto tid_file_offset = _file_offset + (_num_bytes_per_thread * tid); + + // std::cout << "FILE OFFSET: " << _file_offset << std::endl; + // std::cout << "TID FILE OFFSET: " << tid_file_offset << std::endl; + // std::cout << "NUM FILE BYTES: " << _file_num_bytes << std::endl; + // std::cout << "TID BYTES: " << _num_bytes_per_thread << std::endl; if (_read_op) { auto ret = - cuFileRead(_cf_handle, _base_ptr, _num_bytes_per_thread, file_offset, buf_offset); + cuFileRead(_cf_handle, _base_ptr, _num_bytes_per_thread, tid_file_offset, buf_offset); if (ret < 0) { _report_error(ret, errno, buf_offset); } } else { auto ret = - cuFileWrite(_cf_handle, _base_ptr, _num_bytes_per_thread, file_offset, buf_offset); + cuFileWrite(_cf_handle, _base_ptr, _num_bytes_per_thread, tid_file_offset, buf_offset); if (ret < 0) { _report_error(ret, errno, buf_offset); } } } diff --git a/csrc/gds/py_lib/deepspeed_gds_op.h b/csrc/gds/py_lib/deepspeed_gds_op.h index b7fab64d4054..b71133d5723e 100644 --- a/csrc/gds/py_lib/deepspeed_gds_op.h +++ b/csrc/gds/py_lib/deepspeed_gds_op.h @@ -23,6 +23,7 @@ struct gds_op_desc_t : io_op_desc_t { const int fd, const char* filename, const long long int file_num_bytes, + const long long int file_offset, const int num_threads, const bool validate); diff --git a/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp b/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp index 3a35ad3145a0..4e353fe9e122 100644 --- a/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp +++ b/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp @@ -103,12 +103,13 @@ std::shared_ptr deepspeed_gds_handle_t::_create_io_op_desc( const int fd, const char* filename, const long long int file_num_bytes, + const long long int file_offset, const bool validate) { if (buffer.is_cuda()) { return std::make_shared( - read_op, buffer, fd, filename, file_num_bytes, _num_threads, validate); + read_op, buffer, fd, filename, file_num_bytes, file_offset, _num_threads, validate); } return deepspeed_io_handle_t::_create_io_op_desc( - read_op, buffer, fd, filename, file_num_bytes, validate); + read_op, buffer, fd, filename, file_num_bytes, file_offset, validate); } diff --git a/csrc/gds/py_lib/deepspeed_py_gds_handle.h b/csrc/gds/py_lib/deepspeed_py_gds_handle.h index f324e6b65e80..8e268a230259 100644 --- a/csrc/gds/py_lib/deepspeed_py_gds_handle.h +++ b/csrc/gds/py_lib/deepspeed_py_gds_handle.h @@ -38,6 +38,7 @@ struct deepspeed_gds_handle_t : deepspeed_io_handle_t { const int fd, const char* filename, const long long int file_num_bytes, + const long long int file_offset, const bool validate); static int s_cuFile_init; diff --git a/csrc/gds/py_lib/py_ds_gds.cpp b/csrc/gds/py_lib/py_ds_gds.cpp index 66eb34d4ea8c..60e03db7c1e5 100644 --- a/csrc/gds/py_lib/py_ds_gds.cpp +++ b/csrc/gds/py_lib/py_ds_gds.cpp @@ -48,6 +48,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "buffer"_a, "filename"_a, "validate"_a, + "file_offset"_a, "async"_a) .def("pwrite", @@ -62,7 +63,8 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) &deepspeed_gds_handle_t::sync_pread, "Synchrononous parallel file read. Returns count of completed read ops", "buffer"_a, - "filename"_a) + "filename"_a, + "file_offset"_a) .def("sync_pwrite", &deepspeed_gds_handle_t::sync_pwrite, @@ -75,7 +77,8 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "Asynchronous parallel file read. Returns 0 on success. Returns 0 on success, and " "following wait() returns count of completed ops.", "buffer"_a, - "filename"_a) + "filename"_a, + "file_offset"_a) .def("async_pwrite", &deepspeed_gds_handle_t::async_pwrite, diff --git a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py index 26fbf6164d54..af0519625d3b 100644 --- a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py +++ b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py @@ -121,6 +121,7 @@ def _configure_aio(self, ds_config): dtype=self.dtype, device=get_accelerator().device_name(), requires_grad=False) + import pdb; pdb.set_trace() self.aio_read_handle.pin_device_tensor(self.buffers) else: self.buffers = get_accelerator().pin_memory(torch.empty(int(self.aligned_elements_per_buffer * From 63a278f2f585df01c5630dd40eb312ac8c13d685 Mon Sep 17 00:00:00 2001 From: jomayeri Date: Fri, 23 Aug 2024 21:59:02 +0000 Subject: [PATCH 02/17] update thread cxt for gds handle --- csrc/aio/py_lib/deepspeed_cpu_op.cpp | 2 +- csrc/aio/py_lib/deepspeed_py_io_handle.cpp | 2 +- csrc/gds/py_lib/deepspeed_gds_op.cpp | 5 ----- csrc/gds/py_lib/deepspeed_py_gds_handle.cpp | 2 +- 4 files changed, 3 insertions(+), 8 deletions(-) diff --git a/csrc/aio/py_lib/deepspeed_cpu_op.cpp b/csrc/aio/py_lib/deepspeed_cpu_op.cpp index 583ad9d41eaf..cc349be83223 100644 --- a/csrc/aio/py_lib/deepspeed_cpu_op.cpp +++ b/csrc/aio/py_lib/deepspeed_cpu_op.cpp @@ -61,7 +61,7 @@ void cpu_op_desc_t::run(const int tid, { // TODO: add file_offset assert(tid < _num_threads); - const auto base_offset = _num_bytes_per_thread * tid; + const auto base_offset = _file_offset + (_num_bytes_per_thread * tid); std::unique_ptr xfer_ctxt( new io_xfer_ctxt(_fd, base_offset, _num_bytes_per_thread, data_ptr())); diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp index a1fd529d0822..11813d5b9ebc 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp @@ -234,7 +234,7 @@ int deepspeed_io_handle_t::pread(const torch::Tensor& buffer, } assert((buffer_bytes % _num_threads) == 0); - if (!_is_valid_parallel_aio_op(true, num_file_bytes)) { return -1; } + if (!_is_valid_parallel_aio_op(true, buffer_bytes)) { return -1; } const auto fd = open_file(filename, true); if (fd == -1) { return -1; } diff --git a/csrc/gds/py_lib/deepspeed_gds_op.cpp b/csrc/gds/py_lib/deepspeed_gds_op.cpp index cdc9ad29abc7..3a25e0fd56f1 100644 --- a/csrc/gds/py_lib/deepspeed_gds_op.cpp +++ b/csrc/gds/py_lib/deepspeed_gds_op.cpp @@ -129,11 +129,6 @@ void gds_op_desc_t::run(const int tid, int64_t buf_offset = data_ptr() + (_num_bytes_per_thread * tid) - (char*)_base_ptr; const auto tid_file_offset = _file_offset + (_num_bytes_per_thread * tid); - // std::cout << "FILE OFFSET: " << _file_offset << std::endl; - // std::cout << "TID FILE OFFSET: " << tid_file_offset << std::endl; - // std::cout << "NUM FILE BYTES: " << _file_num_bytes << std::endl; - // std::cout << "TID BYTES: " << _num_bytes_per_thread << std::endl; - if (_read_op) { auto ret = cuFileRead(_cf_handle, _base_ptr, _num_bytes_per_thread, tid_file_offset, buf_offset); diff --git a/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp b/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp index 4e353fe9e122..721737579151 100644 --- a/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp +++ b/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp @@ -20,7 +20,7 @@ deepspeed_gds_handle_t::deepspeed_gds_handle_t(const int block_size, const bool single_submit, const bool overlap_events, const int num_threads) - : deepspeed_io_handle_t(block_size, queue_depth, single_submit, overlap_events, num_threads) + : deepspeed_io_handle_t(block_size, queue_depth, single_submit, overlap_events, 1) { _init_cuFile(block_size, queue_depth, num_threads); } From 99896b639b7917a41da663a78ba64c817e98c8c9 Mon Sep 17 00:00:00 2001 From: jomayeri Date: Wed, 28 Aug 2024 02:21:28 +0000 Subject: [PATCH 03/17] toward cpu opp w/offset --- csrc/aio/common/deepspeed_aio_utils.cpp | 17 +++++++++++------ csrc/aio/common/deepspeed_aio_utils.h | 4 +++- csrc/aio/py_lib/deepspeed_cpu_op.cpp | 10 ++++++++-- csrc/aio/py_lib/deepspeed_py_aio.cpp | 4 ++-- csrc/aio/py_lib/deepspeed_py_io_handle.cpp | 4 ++-- 5 files changed, 26 insertions(+), 13 deletions(-) diff --git a/csrc/aio/common/deepspeed_aio_utils.cpp b/csrc/aio/common/deepspeed_aio_utils.cpp index 763b2c253a34..74a9eced17dc 100644 --- a/csrc/aio/common/deepspeed_aio_utils.cpp +++ b/csrc/aio/common/deepspeed_aio_utils.cpp @@ -19,9 +19,14 @@ const int c_io_queue_depth = 8; io_xfer_ctxt::io_xfer_ctxt(const int fd, const long long int file_offset, + const long long int buffer_offset, const long long int num_bytes, const void* buffer) - : _fd(fd), _base_offset(file_offset), _mem_buffer(buffer), _num_bytes(num_bytes) + : _fd(fd), + _file_base_offset(file_offset), + _buffer_base_offset(buffer_offset), + _mem_buffer(buffer), + _num_bytes(num_bytes) { } @@ -41,8 +46,8 @@ void io_prep_context::prep_iocbs(const int n_iocbs, assert(static_cast(n_iocbs) <= _iocbs->size()); for (auto i = 0; i < n_iocbs; ++i) { const auto shift = i * _block_size; - const auto xfer_buffer = (char*)start_buffer + _xfer_ctxt->_base_offset + shift; - const auto xfer_offset = _xfer_ctxt->_base_offset + start_offset + shift; + const auto xfer_buffer = (char*)start_buffer + _xfer_ctxt->_buffer_base_offset + shift; + const auto xfer_offset = _xfer_ctxt->_file_base_offset + start_offset + shift; auto byte_count = _block_size; if ((shift + _block_size) > num_bytes) { byte_count = num_bytes - shift; } @@ -79,10 +84,10 @@ int io_prep_generator::prep_iocbs(const int n_iocbs, std::vector* auto actual_n_iocbs = min(static_cast(n_iocbs), _remaining_io_blocks); for (auto i = 0; i < actual_n_iocbs; ++i, ++_next_iocb_index) { - const auto xfer_offset = _xfer_ctxt->_base_offset + (_next_iocb_index * _block_size); - const auto xfer_buffer = (char*)_xfer_ctxt->_mem_buffer + xfer_offset; - const auto num_bytes = min(static_cast(_block_size), _remaining_bytes); + const auto xfer_buffer = (char*)_xfer_ctxt->_mem_buffer + _xfer_ctxt->_buffer_base_offset + (_next_iocb_index * _block_size); + const auto xfer_offset = _xfer_ctxt->_file_base_offset + (_next_iocb_index * _block_size); + const auto num_bytes = min(static_cast(_block_size), _remaining_bytes); if (_read_op) { io_prep_pread(iocbs->at(i), _xfer_ctxt->_fd, xfer_buffer, num_bytes, xfer_offset); } else { diff --git a/csrc/aio/common/deepspeed_aio_utils.h b/csrc/aio/common/deepspeed_aio_utils.h index 9c58c2286610..b2f170c34fc5 100644 --- a/csrc/aio/common/deepspeed_aio_utils.h +++ b/csrc/aio/common/deepspeed_aio_utils.h @@ -30,12 +30,14 @@ Functionality for swapping optimizer tensors to/from (NVMe) storage devices. struct io_xfer_ctxt { const int _fd; - const long long int _base_offset; + const long long int _file_base_offset; + const long long int _buffer_base_offset; const void* _mem_buffer; const long long int _num_bytes; io_xfer_ctxt(const int fd, const long long int file_offset, + const long long int buffer_offset, const long long int num_bytes, const void* buffer); }; diff --git a/csrc/aio/py_lib/deepspeed_cpu_op.cpp b/csrc/aio/py_lib/deepspeed_cpu_op.cpp index cc349be83223..b8ccc236b683 100644 --- a/csrc/aio/py_lib/deepspeed_cpu_op.cpp +++ b/csrc/aio/py_lib/deepspeed_cpu_op.cpp @@ -61,10 +61,16 @@ void cpu_op_desc_t::run(const int tid, { // TODO: add file_offset assert(tid < _num_threads); - const auto base_offset = _file_offset + (_num_bytes_per_thread * tid); + const auto buffer_base_offset = _num_bytes_per_thread * tid; + const auto file_base_offset = _file_offset + (_num_bytes_per_thread * tid); + + std::cout << "FILE OFFSET: " << _file_offset << std::endl; + std::cout << "NUM BTYES : " << _num_bytes_per_thread << std::endl; + std::cout << "FIlE BASE OFFSET : " << file_base_offset << std::endl; + std::cout << "BUFFER BASE OFFSET : " << buffer_base_offset << std::endl; std::unique_ptr xfer_ctxt( - new io_xfer_ctxt(_fd, base_offset, _num_bytes_per_thread, data_ptr())); + new io_xfer_ctxt(_fd, file_base_offset, buffer_base_offset, _num_bytes_per_thread, data_ptr())); if (aio_config->_overlap_events) { do_aio_operation_overlap(_read_op, aio_ctxt, xfer_ctxt, aio_config, nullptr); diff --git a/csrc/aio/py_lib/deepspeed_py_aio.cpp b/csrc/aio/py_lib/deepspeed_py_aio.cpp index eac268d33433..125759216930 100644 --- a/csrc/aio/py_lib/deepspeed_py_aio.cpp +++ b/csrc/aio/py_lib/deepspeed_py_aio.cpp @@ -52,7 +52,7 @@ int deepspeed_py_aio_write(const torch::Tensor& buffer, auto write_buffer = (char*)buffer.data_ptr(); const auto num_write_bytes = static_cast(buffer.nbytes()); - std::unique_ptr xfer_ctxt(new io_xfer_ctxt(fd, 0, num_write_bytes, write_buffer)); + std::unique_ptr xfer_ctxt(new io_xfer_ctxt(fd, 0, 0, num_write_bytes, write_buffer)); std::unique_ptr aio_ctxt(new aio_context(config._block_size, config._queue_depth)); if (config._overlap_events) { @@ -97,7 +97,7 @@ int deepspeed_py_aio_read(torch::Tensor& buffer, auto read_buffer = (char*)buffer.data_ptr(); assert(static_cast(buffer.nbytes()) == num_file_bytes); - std::unique_ptr xfer_ctxt(new io_xfer_ctxt(fd, 0, num_file_bytes, read_buffer)); + std::unique_ptr xfer_ctxt(new io_xfer_ctxt(fd, 0, 0, num_file_bytes, read_buffer)); std::unique_ptr aio_ctxt(new aio_context(config._block_size, config._queue_depth)); if (config._overlap_events) { diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp index 11813d5b9ebc..cf18424ab842 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp @@ -76,7 +76,7 @@ int deepspeed_io_handle_t::read(torch::Tensor& buffer, const char* filename, con if (fd == -1) { return -1; } auto read_buffer = (char*)buffer.data_ptr(); - std::unique_ptr xfer_ctxt(new io_xfer_ctxt(fd, 0, num_file_bytes, read_buffer)); + std::unique_ptr xfer_ctxt(new io_xfer_ctxt(fd, 0, 0, num_file_bytes, read_buffer)); if (_aio_config._overlap_events) { do_aio_operation_overlap(true, _aio_ctxt, xfer_ctxt, &_aio_config, nullptr); @@ -109,7 +109,7 @@ int deepspeed_io_handle_t::write(const torch::Tensor& buffer, auto write_buffer = (char*)buffer.data_ptr(); const auto num_write_bytes = static_cast(buffer.nbytes()); - std::unique_ptr xfer_ctxt(new io_xfer_ctxt(fd, 0, num_write_bytes, write_buffer)); + std::unique_ptr xfer_ctxt(new io_xfer_ctxt(fd, 0, 0, num_write_bytes, write_buffer)); if (_aio_config._overlap_events) { do_aio_operation_overlap(false, _aio_ctxt, xfer_ctxt, &_aio_config, nullptr); From 7b9d26b1801bef5a9b9ba48c116cabb38b38b3b5 Mon Sep 17 00:00:00 2001 From: jomayeri Date: Thu, 12 Sep 2024 17:12:39 +0000 Subject: [PATCH 04/17] offset updates --- csrc/aio/common/deepspeed_aio_utils.cpp | 2 ++ csrc/aio/py_lib/deepspeed_aio_op_desc.h | 2 +- csrc/aio/py_lib/deepspeed_cpu_op.cpp | 6 ------ csrc/gds/py_lib/deepspeed_gds_op.cpp | 6 +++--- 4 files changed, 6 insertions(+), 10 deletions(-) diff --git a/csrc/aio/common/deepspeed_aio_utils.cpp b/csrc/aio/common/deepspeed_aio_utils.cpp index 74a9eced17dc..f35aabf052d2 100644 --- a/csrc/aio/common/deepspeed_aio_utils.cpp +++ b/csrc/aio/common/deepspeed_aio_utils.cpp @@ -45,10 +45,12 @@ void io_prep_context::prep_iocbs(const int n_iocbs, { assert(static_cast(n_iocbs) <= _iocbs->size()); for (auto i = 0; i < n_iocbs; ++i) { + const auto shift = i * _block_size; const auto xfer_buffer = (char*)start_buffer + _xfer_ctxt->_buffer_base_offset + shift; const auto xfer_offset = _xfer_ctxt->_file_base_offset + start_offset + shift; auto byte_count = _block_size; + if ((shift + _block_size) > num_bytes) { byte_count = num_bytes - shift; } if (_read_op) { diff --git a/csrc/aio/py_lib/deepspeed_aio_op_desc.h b/csrc/aio/py_lib/deepspeed_aio_op_desc.h index 4ff06d27f5a4..2adb8f1040a1 100644 --- a/csrc/aio/py_lib/deepspeed_aio_op_desc.h +++ b/csrc/aio/py_lib/deepspeed_aio_op_desc.h @@ -17,7 +17,7 @@ struct io_op_desc_t { const long long int _file_num_bytes; const long long int _file_offset; const int _num_threads; - const int _num_bytes_per_thread; + const long long int _num_bytes_per_thread; torch::Tensor _contiguous_buffer; const bool _validate; diff --git a/csrc/aio/py_lib/deepspeed_cpu_op.cpp b/csrc/aio/py_lib/deepspeed_cpu_op.cpp index b8ccc236b683..1d8d87db0545 100644 --- a/csrc/aio/py_lib/deepspeed_cpu_op.cpp +++ b/csrc/aio/py_lib/deepspeed_cpu_op.cpp @@ -59,16 +59,10 @@ void cpu_op_desc_t::run(const int tid, std::unique_ptr& aio_ctxt, deepspeed_aio_config_t* aio_config) { - // TODO: add file_offset assert(tid < _num_threads); const auto buffer_base_offset = _num_bytes_per_thread * tid; const auto file_base_offset = _file_offset + (_num_bytes_per_thread * tid); - std::cout << "FILE OFFSET: " << _file_offset << std::endl; - std::cout << "NUM BTYES : " << _num_bytes_per_thread << std::endl; - std::cout << "FIlE BASE OFFSET : " << file_base_offset << std::endl; - std::cout << "BUFFER BASE OFFSET : " << buffer_base_offset << std::endl; - std::unique_ptr xfer_ctxt( new io_xfer_ctxt(_fd, file_base_offset, buffer_base_offset, _num_bytes_per_thread, data_ptr())); diff --git a/csrc/gds/py_lib/deepspeed_gds_op.cpp b/csrc/gds/py_lib/deepspeed_gds_op.cpp index 3a25e0fd56f1..fe83086d0533 100644 --- a/csrc/gds/py_lib/deepspeed_gds_op.cpp +++ b/csrc/gds/py_lib/deepspeed_gds_op.cpp @@ -126,17 +126,17 @@ void gds_op_desc_t::run(const int tid, { assert(tid < _num_threads); check_cudaruntimecall(cudaSetDevice(_buffer.get_device())); - int64_t buf_offset = data_ptr() + (_num_bytes_per_thread * tid) - (char*)_base_ptr; + const auto buf_offset = data_ptr() + (_num_bytes_per_thread * tid) - (char*)_base_ptr; const auto tid_file_offset = _file_offset + (_num_bytes_per_thread * tid); if (_read_op) { auto ret = cuFileRead(_cf_handle, _base_ptr, _num_bytes_per_thread, tid_file_offset, buf_offset); - if (ret < 0) { _report_error(ret, errno, buf_offset); } + if (ret < 0) { _report_error(ret, errno, tid_file_offset); } } else { auto ret = cuFileWrite(_cf_handle, _base_ptr, _num_bytes_per_thread, tid_file_offset, buf_offset); - if (ret < 0) { _report_error(ret, errno, buf_offset); } + if (ret < 0) { _report_error(ret, errno, tid_file_offset); } } } From 33fe0ddb0dfff6716053fd29c062b4af7c5d6e9d Mon Sep 17 00:00:00 2001 From: jomayeri Date: Mon, 23 Sep 2024 19:14:07 +0000 Subject: [PATCH 05/17] update test api --- csrc/aio/py_lib/deepspeed_py_io_handle.cpp | 11 ++++++----- csrc/aio/py_test/ds_aio_handle.py | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp index cf18424ab842..ac396d17fbf0 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp @@ -227,11 +227,12 @@ int deepspeed_io_handle_t::pread(const torch::Tensor& buffer, return -1; } const auto buffer_bytes = static_cast(buffer.nbytes()); - if ((buffer_bytes+file_offset) > num_file_bytes) { - std::cout << filename << ": buffer + file offset > file bytes " << buffer_bytes - << "+ " << file_offset << " > " << num_file_bytes << std::endl; - assert(0); - } + // No longer check, b/c buffer can exceed file size to enable 4k alignment + //if ((buffer_bytes+file_offset) > num_file_bytes) { + // std::cout << filename << ": buffer + file offset > file bytes " << buffer_bytes + // << "+ " << file_offset << " > " << num_file_bytes << std::endl; + // //assert(0); + //} assert((buffer_bytes % _num_threads) == 0); if (!_is_valid_parallel_aio_op(true, buffer_bytes)) { return -1; } diff --git a/csrc/aio/py_test/ds_aio_handle.py b/csrc/aio/py_test/ds_aio_handle.py index f4a179deb9ec..6913e9090bf5 100755 --- a/csrc/aio/py_test/ds_aio_handle.py +++ b/csrc/aio/py_test/ds_aio_handle.py @@ -92,7 +92,7 @@ def main_parallel_read(pool_params): start_time = time.time() dest_buffer = BOUNCE_BUFFER if ctxt[BOUNCE_BUFFER] is not None else BUFFER - ret = handle.pread(ctxt[dest_buffer], ctxt['file'], args.validate, True) + ret = handle.pread(ctxt[dest_buffer], ctxt['file'], args.validate, 0, True) assert ret != -1 handle.wait() if dest_buffer == BOUNCE_BUFFER: From f872b539e4d785f45b13a393061801ef00a0fa41 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 10 Oct 2024 23:07:50 +0000 Subject: [PATCH 06/17] mid compile test --- csrc/aio/common/deepspeed_aio_utils.cpp | 8 ++++---- csrc/aio/common/deepspeed_aio_utils.h | 10 +++++----- csrc/aio/py_lib/deepspeed_aio_op_desc.cpp | 6 +++--- csrc/aio/py_lib/deepspeed_aio_op_desc.h | 10 +++++----- csrc/aio/py_lib/deepspeed_cpu_op.cpp | 4 ++-- csrc/aio/py_lib/deepspeed_cpu_op.h | 4 ++-- csrc/aio/py_lib/deepspeed_py_aio.cpp | 2 +- csrc/aio/py_lib/deepspeed_py_io_handle.cpp | 15 +++++++-------- csrc/aio/py_lib/deepspeed_py_io_handle.h | 10 +++++----- 9 files changed, 34 insertions(+), 35 deletions(-) diff --git a/csrc/aio/common/deepspeed_aio_utils.cpp b/csrc/aio/common/deepspeed_aio_utils.cpp index 04678b8557d3..b8f21fe84edc 100644 --- a/csrc/aio/common/deepspeed_aio_utils.cpp +++ b/csrc/aio/common/deepspeed_aio_utils.cpp @@ -18,9 +18,9 @@ const int c_block_size = 128 * 1024; const int c_io_queue_depth = 8; io_xfer_ctxt::io_xfer_ctxt(const int fd, - const long long int file_offset, - const long long int buffer_offset, - const long long int num_bytes, + const int64_t file_offset, + const int64_t buffer_offset, + const int64_t num_bytes, const void* buffer) : _fd(fd), _file_base_offset(file_offset), @@ -88,7 +88,7 @@ int io_prep_generator::prep_iocbs(const int n_iocbs, std::vector* for (auto i = 0; i < actual_n_iocbs; ++i, ++_next_iocb_index) { const auto xfer_buffer = (char*)_xfer_ctxt->_mem_buffer + _xfer_ctxt->_buffer_base_offset + (_next_iocb_index * _block_size); const auto xfer_offset = _xfer_ctxt->_file_base_offset + (_next_iocb_index * _block_size); - const auto num_bytes = min(static_cast(_block_size), _remaining_bytes); + const auto num_bytes = min(static_cast(_block_size), _remaining_bytes); if (_read_op) { io_prep_pread(iocbs->at(i), _xfer_ctxt->_fd, xfer_buffer, num_bytes, xfer_offset); } else { diff --git a/csrc/aio/common/deepspeed_aio_utils.h b/csrc/aio/common/deepspeed_aio_utils.h index 8a8fc92087ce..6b7599acecb4 100644 --- a/csrc/aio/common/deepspeed_aio_utils.h +++ b/csrc/aio/common/deepspeed_aio_utils.h @@ -30,15 +30,15 @@ Functionality for swapping optimizer tensors to/from (NVMe) storage devices. struct io_xfer_ctxt { const int _fd; - const long long int _file_base_offset; - const long long int _buffer_base_offset; + const int64_t _file_base_offset; + const int64_t _buffer_base_offset; const void* _mem_buffer; const int64_t _num_bytes; io_xfer_ctxt(const int fd, - const long long int file_offset, - const long long int buffer_offset, - const long long int num_bytes, + const int64_t file_offset, + const int64_t buffer_offset, + const int64_t num_bytes, const void* buffer); }; diff --git a/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp b/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp index 4cd881a8c9e8..ae650bf22cb9 100644 --- a/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp +++ b/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp @@ -11,8 +11,8 @@ io_op_desc_t::io_op_desc_t(const bool read_op, const torch::Tensor& buffer, const int fd, const char* filename, - const long long int file_num_bytes, - const long long int file_offset, + const int64_t file_num_bytes, + const int64_t file_offset, const int intra_op_parallelism, const bool validate) : _read_op(read_op), @@ -22,7 +22,7 @@ io_op_desc_t::io_op_desc_t(const bool read_op, _file_num_bytes(file_num_bytes), _file_offset(file_offset), _intra_op_parallelism(intra_op_parallelism), - _num_bytes_per_thread( static_cast(buffer.nbytes())/ intra_op_parallelism), + _num_bytes_per_thread( static_cast(buffer.nbytes())/ intra_op_parallelism), _validate(validate) { } diff --git a/csrc/aio/py_lib/deepspeed_aio_op_desc.h b/csrc/aio/py_lib/deepspeed_aio_op_desc.h index 4ba1d3847e12..086a562bc893 100644 --- a/csrc/aio/py_lib/deepspeed_aio_op_desc.h +++ b/csrc/aio/py_lib/deepspeed_aio_op_desc.h @@ -14,10 +14,10 @@ struct io_op_desc_t { torch::Tensor _buffer; int _fd; const std::string _filename; - const long long int _file_num_bytes; - const long long int _file_offset; + const int64_t _file_num_bytes; + const int64_t _file_offset; const int _intra_op_parallelism; - const long long int _num_bytes_per_thread; + const int64_t _num_bytes_per_thread; torch::Tensor _contiguous_buffer; const bool _validate; @@ -25,8 +25,8 @@ struct io_op_desc_t { const torch::Tensor& buffer, const int fd, const char* filename, - const long long int file_num_bytes, - const long long int file_offset, + const int64_t file_num_bytes, + const int64_t file_offset, const int intra_op_parallelism, const bool validate); diff --git a/csrc/aio/py_lib/deepspeed_cpu_op.cpp b/csrc/aio/py_lib/deepspeed_cpu_op.cpp index 6f5eae6f0c8c..072bb4204d0a 100644 --- a/csrc/aio/py_lib/deepspeed_cpu_op.cpp +++ b/csrc/aio/py_lib/deepspeed_cpu_op.cpp @@ -14,8 +14,8 @@ cpu_op_desc_t::cpu_op_desc_t( const std::unique_ptr& pinned_tensor_mgr, const int fd, const char* filename, - const long long int file_num_bytes, - const long long int file_offset, + const int64_t file_num_bytes, + const int64_t file_offset, const int intra_op_parallelism, const bool validate) : io_op_desc_t(read_op, buffer, fd, filename, file_num_bytes, file_offset, intra_op_parallelism, validate), diff --git a/csrc/aio/py_lib/deepspeed_cpu_op.h b/csrc/aio/py_lib/deepspeed_cpu_op.h index 44b5237261d0..6f39c492988f 100644 --- a/csrc/aio/py_lib/deepspeed_cpu_op.h +++ b/csrc/aio/py_lib/deepspeed_cpu_op.h @@ -18,8 +18,8 @@ struct cpu_op_desc_t : io_op_desc_t { const std::unique_ptr& pinned_tensor_mgr, const int fd, const char* filename, - const long long int file_num_bytes, - const long long int file_offset, + const int64_t file_num_bytes, + const int64_t file_offset, const int intra_op_parallelism, const bool validate); diff --git a/csrc/aio/py_lib/deepspeed_py_aio.cpp b/csrc/aio/py_lib/deepspeed_py_aio.cpp index a94935cb2faa..6ded0bd2e92d 100644 --- a/csrc/aio/py_lib/deepspeed_py_aio.cpp +++ b/csrc/aio/py_lib/deepspeed_py_aio.cpp @@ -51,7 +51,7 @@ int deepspeed_py_aio_write(const torch::Tensor& buffer, if (fd == -1) { return -1; } auto write_buffer = (char*)buffer.data_ptr(); - const auto num_write_bytes = static_cast(buffer.nbytes()); + const auto num_write_bytes = static_cast(buffer.nbytes()); std::unique_ptr xfer_ctxt(new io_xfer_ctxt(fd, 0, 0, num_write_bytes, write_buffer)); std::unique_ptr aio_ctxt(new aio_context(config._block_size, config._queue_depth)); diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp index cf839fa51800..f7d23384f3d2 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp @@ -108,7 +108,7 @@ int deepspeed_io_handle_t::write(const torch::Tensor& buffer, if (fd == -1) { return -1; } auto write_buffer = (char*)buffer.data_ptr(); - const auto num_write_bytes = static_cast(buffer.nbytes()); + const auto num_write_bytes = static_cast(buffer.nbytes()); std::unique_ptr xfer_ctxt(new io_xfer_ctxt(fd, 0, 0, num_write_bytes, write_buffer)); if (_aio_config._overlap_events) { @@ -205,9 +205,8 @@ std::shared_ptr deepspeed_io_handle_t::_create_io_op_desc( const torch::Tensor& buffer, const int fd, const char* filename, -<<<<<<< HEAD - const long long int file_num_bytes, - const long long int file_offset, + const int64_t file_num_bytes, + const int64_t file_offset, const bool validate) { return std::make_shared(read_op, @@ -224,7 +223,7 @@ std::shared_ptr deepspeed_io_handle_t::_create_io_op_desc( int deepspeed_io_handle_t::pread(const torch::Tensor& buffer, const char* filename, const bool validate, - const long long int file_offset, + const int64_t file_offset, const bool async) { int64_t num_file_bytes; @@ -233,7 +232,7 @@ int deepspeed_io_handle_t::pread(const torch::Tensor& buffer, report_file_error(filename, " fstat for read", error_code); return -1; } - const auto buffer_bytes = static_cast(buffer.nbytes()); + const auto buffer_bytes = static_cast(buffer.nbytes()); // No longer check, b/c buffer can exceed file size to enable 4k alignment //if ((buffer_bytes+file_offset) > num_file_bytes) { // std::cout << filename << ": buffer + file offset > file bytes " << buffer_bytes @@ -278,7 +277,7 @@ int deepspeed_io_handle_t::pwrite(const torch::Tensor& buffer, return wait(); } -int deepspeed_io_handle_t::sync_pread(torch::Tensor& buffer, const char* filename, const long long int file_offset) +int deepspeed_io_handle_t::sync_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset) { return pread(buffer, filename, false, file_offset, false); } @@ -288,7 +287,7 @@ int deepspeed_io_handle_t::sync_pwrite(const torch::Tensor& buffer, const char* return pwrite(buffer, filename, false, false); } -int deepspeed_io_handle_t::async_pread(torch::Tensor& buffer, const char* filename, const long long int file_offset) +int deepspeed_io_handle_t::async_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset) { return pread(buffer, filename, false, file_offset, true); } diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.h b/csrc/aio/py_lib/deepspeed_py_io_handle.h index f28a3f256323..b4fd3e485fb8 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.h +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.h @@ -45,7 +45,7 @@ struct deepspeed_io_handle_t { int pread(const torch::Tensor& buffer, const char* filename, const bool validate, - const long long int file_offset, + const int64_t file_offset, const bool async); int pwrite(const torch::Tensor& buffer, @@ -53,11 +53,11 @@ struct deepspeed_io_handle_t { const bool validate, const bool async); - int sync_pread(torch::Tensor& buffer, const char* filename, const long long int file_offset); + int sync_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset); int sync_pwrite(const torch::Tensor& buffer, const char* filename); - int async_pread(torch::Tensor& buffer, const char* filename, const long long int file_offset); + int async_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset); int async_pwrite(const torch::Tensor& buffer, const char* filename); @@ -82,7 +82,7 @@ struct deepspeed_io_handle_t { const torch::Tensor& buffer, const int fd, const char* filename, - const long long int file_num_bytes, - const long long int file_offset, + const int64_t file_num_bytes, + const int64_t file_offset, const bool validate); }; From 4820710d4cdb2346d13404183db69deaf843b64f Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 10 Oct 2024 23:20:25 +0000 Subject: [PATCH 07/17] fixing compil error --- csrc/aio/py_lib/deepspeed_cpu_op.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/csrc/aio/py_lib/deepspeed_cpu_op.cpp b/csrc/aio/py_lib/deepspeed_cpu_op.cpp index 072bb4204d0a..4c285fb19fb1 100644 --- a/csrc/aio/py_lib/deepspeed_cpu_op.cpp +++ b/csrc/aio/py_lib/deepspeed_cpu_op.cpp @@ -19,7 +19,7 @@ cpu_op_desc_t::cpu_op_desc_t( const int intra_op_parallelism, const bool validate) : io_op_desc_t(read_op, buffer, fd, filename, file_num_bytes, file_offset, intra_op_parallelism, validate), - _cpu_buffer(buffer) + _cpu_buffer(buffer), _pinned_tensor_mgr(pinned_tensor_mgr), _is_managed_bounce_buffer(false) { From 43bba31fe401a6a05da9079618874d55925a6bc0 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 11 Oct 2024 18:53:43 +0000 Subject: [PATCH 08/17] offset into swap_tensor --- deepspeed/runtime/swap_tensor/partitioned_param_swapper.py | 1 - deepspeed/runtime/swap_tensor/utils.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py index af0519625d3b..26fbf6164d54 100644 --- a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py +++ b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py @@ -121,7 +121,6 @@ def _configure_aio(self, ds_config): dtype=self.dtype, device=get_accelerator().device_name(), requires_grad=False) - import pdb; pdb.set_trace() self.aio_read_handle.pin_device_tensor(self.buffers) else: self.buffers = get_accelerator().pin_memory(torch.empty(int(self.aligned_elements_per_buffer * diff --git a/deepspeed/runtime/swap_tensor/utils.py b/deepspeed/runtime/swap_tensor/utils.py index 90b2d9b8bd31..3ed5a5710aa7 100644 --- a/deepspeed/runtime/swap_tensor/utils.py +++ b/deepspeed/runtime/swap_tensor/utils.py @@ -18,7 +18,7 @@ def swap_in_tensors(swap_handle, tensor_buffers, swap_paths): for buffer, path in zip(tensor_buffers, swap_paths): - assert (swap_handle.async_pread(buffer, path) == 0) + assert (swap_handle.async_pread(buffer, path, 0) == 0) def swap_out_tensors(swap_handle, tensor_buffers, swap_paths): From 9cd8bb034e681b82b37e43ffd22c0acdaeb04572 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 18 Oct 2024 19:45:29 +0000 Subject: [PATCH 09/17] adding all the offsets --- csrc/aio/common/deepspeed_aio_utils.cpp | 12 +++--- csrc/aio/py_lib/deepspeed_aio_op_desc.cpp | 2 +- csrc/aio/py_lib/deepspeed_cpu_op.cpp | 13 +++++-- csrc/aio/py_lib/deepspeed_py_aio.cpp | 6 ++- csrc/aio/py_lib/deepspeed_py_io_handle.cpp | 41 +++++++++++++++------ csrc/aio/py_lib/deepspeed_py_io_handle.h | 30 +++++++++------ csrc/gds/py_lib/deepspeed_gds_op.cpp | 9 ++++- csrc/gds/py_lib/deepspeed_py_gds_handle.cpp | 10 ++++- 8 files changed, 84 insertions(+), 39 deletions(-) diff --git a/csrc/aio/common/deepspeed_aio_utils.cpp b/csrc/aio/common/deepspeed_aio_utils.cpp index b8f21fe84edc..fb269b58315f 100644 --- a/csrc/aio/common/deepspeed_aio_utils.cpp +++ b/csrc/aio/common/deepspeed_aio_utils.cpp @@ -22,10 +22,10 @@ io_xfer_ctxt::io_xfer_ctxt(const int fd, const int64_t buffer_offset, const int64_t num_bytes, const void* buffer) - : _fd(fd), - _file_base_offset(file_offset), - _buffer_base_offset(buffer_offset), - _mem_buffer(buffer), + : _fd(fd), + _file_base_offset(file_offset), + _buffer_base_offset(buffer_offset), + _mem_buffer(buffer), _num_bytes(num_bytes) { } @@ -45,7 +45,6 @@ void io_prep_context::prep_iocbs(const int n_iocbs, { assert(static_cast(n_iocbs) <= _iocbs->size()); for (auto i = 0; i < n_iocbs; ++i) { - const auto shift = i * _block_size; const auto xfer_buffer = (char*)start_buffer + _xfer_ctxt->_buffer_base_offset + shift; const auto xfer_offset = _xfer_ctxt->_file_base_offset + start_offset + shift; @@ -86,7 +85,8 @@ int io_prep_generator::prep_iocbs(const int n_iocbs, std::vector* auto actual_n_iocbs = min(static_cast(n_iocbs), _remaining_io_blocks); for (auto i = 0; i < actual_n_iocbs; ++i, ++_next_iocb_index) { - const auto xfer_buffer = (char*)_xfer_ctxt->_mem_buffer + _xfer_ctxt->_buffer_base_offset + (_next_iocb_index * _block_size); + const auto xfer_buffer = (char*)_xfer_ctxt->_mem_buffer + _xfer_ctxt->_buffer_base_offset + + (_next_iocb_index * _block_size); const auto xfer_offset = _xfer_ctxt->_file_base_offset + (_next_iocb_index * _block_size); const auto num_bytes = min(static_cast(_block_size), _remaining_bytes); if (_read_op) { diff --git a/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp b/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp index ae650bf22cb9..8594eb7a2e9d 100644 --- a/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp +++ b/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp @@ -22,7 +22,7 @@ io_op_desc_t::io_op_desc_t(const bool read_op, _file_num_bytes(file_num_bytes), _file_offset(file_offset), _intra_op_parallelism(intra_op_parallelism), - _num_bytes_per_thread( static_cast(buffer.nbytes())/ intra_op_parallelism), + _num_bytes_per_thread(static_cast(buffer.nbytes()) / intra_op_parallelism), _validate(validate) { } diff --git a/csrc/aio/py_lib/deepspeed_cpu_op.cpp b/csrc/aio/py_lib/deepspeed_cpu_op.cpp index 28c06f9ec47c..e48c2762b010 100644 --- a/csrc/aio/py_lib/deepspeed_cpu_op.cpp +++ b/csrc/aio/py_lib/deepspeed_cpu_op.cpp @@ -18,7 +18,14 @@ cpu_op_desc_t::cpu_op_desc_t( const int64_t file_offset, const int intra_op_parallelism, const bool validate) - : io_op_desc_t(read_op, buffer, fd, filename, file_num_bytes, file_offset, intra_op_parallelism, validate), + : io_op_desc_t(read_op, + buffer, + fd, + filename, + file_num_bytes, + file_offset, + intra_op_parallelism, + validate), _cpu_buffer(buffer), _pinned_tensor_mgr(pinned_tensor_mgr), _is_managed_bounce_buffer(false) @@ -70,8 +77,8 @@ void cpu_op_desc_t::run(const int tid, const auto buffer_base_offset = _num_bytes_per_thread * tid; const auto file_base_offset = _file_offset + (_num_bytes_per_thread * tid); - std::unique_ptr xfer_ctxt( - new io_xfer_ctxt(_fd, file_base_offset, buffer_base_offset, _num_bytes_per_thread, data_ptr())); + std::unique_ptr xfer_ctxt(new io_xfer_ctxt( + _fd, file_base_offset, buffer_base_offset, _num_bytes_per_thread, data_ptr())); if (aio_config->_overlap_events) { do_aio_operation_overlap(_read_op, aio_ctxt, xfer_ctxt, aio_config, nullptr); diff --git a/csrc/aio/py_lib/deepspeed_py_aio.cpp b/csrc/aio/py_lib/deepspeed_py_aio.cpp index 6ded0bd2e92d..1ff0397043fa 100644 --- a/csrc/aio/py_lib/deepspeed_py_aio.cpp +++ b/csrc/aio/py_lib/deepspeed_py_aio.cpp @@ -53,7 +53,8 @@ int deepspeed_py_aio_write(const torch::Tensor& buffer, auto write_buffer = (char*)buffer.data_ptr(); const auto num_write_bytes = static_cast(buffer.nbytes()); - std::unique_ptr xfer_ctxt(new io_xfer_ctxt(fd, 0, 0, num_write_bytes, write_buffer)); + std::unique_ptr xfer_ctxt( + new io_xfer_ctxt(fd, 0, 0, num_write_bytes, write_buffer)); std::unique_ptr aio_ctxt(new aio_context(config._block_size, config._queue_depth)); if (config._overlap_events) { @@ -98,7 +99,8 @@ int deepspeed_py_aio_read(torch::Tensor& buffer, auto read_buffer = (char*)buffer.data_ptr(); assert(static_cast(buffer.nbytes()) == num_file_bytes); - std::unique_ptr xfer_ctxt(new io_xfer_ctxt(fd, 0, 0, num_file_bytes, read_buffer)); + std::unique_ptr xfer_ctxt( + new io_xfer_ctxt(fd, 0, 0, num_file_bytes, read_buffer)); std::unique_ptr aio_ctxt(new aio_context(config._block_size, config._queue_depth)); if (config._overlap_events) { diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp index f7d23384f3d2..036e84fa1591 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp @@ -58,7 +58,10 @@ const bool deepspeed_io_handle_t::get_overlap_events() const { return _overlap_e const int deepspeed_io_handle_t::get_intra_op_parallelism() const { return _intra_op_parallelism; } -int deepspeed_io_handle_t::read(torch::Tensor& buffer, const char* filename, const bool validate) +int deepspeed_io_handle_t::read(torch::Tensor& buffer, + const char* filename, + const int64_t file_offset, + const bool validate) { const auto start_time = std::chrono::high_resolution_clock::now(); @@ -76,7 +79,8 @@ int deepspeed_io_handle_t::read(torch::Tensor& buffer, const char* filename, con if (fd == -1) { return -1; } auto read_buffer = (char*)buffer.data_ptr(); - std::unique_ptr xfer_ctxt(new io_xfer_ctxt(fd, 0, 0, num_file_bytes, read_buffer)); + std::unique_ptr xfer_ctxt( + new io_xfer_ctxt(fd, file_offset, 0, num_file_bytes, read_buffer)); if (_aio_config._overlap_events) { do_aio_operation_overlap(true, _aio_ctxt, xfer_ctxt, &_aio_config, nullptr); @@ -98,6 +102,7 @@ int deepspeed_io_handle_t::read(torch::Tensor& buffer, const char* filename, con int deepspeed_io_handle_t::write(const torch::Tensor& buffer, const char* filename, + const int64_t file_offset, const bool validate) { assert(_aio_ctxt); @@ -109,7 +114,8 @@ int deepspeed_io_handle_t::write(const torch::Tensor& buffer, auto write_buffer = (char*)buffer.data_ptr(); const auto num_write_bytes = static_cast(buffer.nbytes()); - std::unique_ptr xfer_ctxt(new io_xfer_ctxt(fd, 0, 0, num_write_bytes, write_buffer)); + std::unique_ptr xfer_ctxt( + new io_xfer_ctxt(fd, file_offset, 0, num_write_bytes, write_buffer)); if (_aio_config._overlap_events) { do_aio_operation_overlap(false, _aio_ctxt, xfer_ctxt, &_aio_config, nullptr); @@ -234,7 +240,7 @@ int deepspeed_io_handle_t::pread(const torch::Tensor& buffer, } const auto buffer_bytes = static_cast(buffer.nbytes()); // No longer check, b/c buffer can exceed file size to enable 4k alignment - //if ((buffer_bytes+file_offset) > num_file_bytes) { + // if ((buffer_bytes+file_offset) > num_file_bytes) { // std::cout << filename << ": buffer + file offset > file bytes " << buffer_bytes // << "+ " << file_offset << " > " << num_file_bytes << std::endl; // //assert(0); @@ -246,7 +252,8 @@ int deepspeed_io_handle_t::pread(const torch::Tensor& buffer, const auto fd = open_file(filename, true); if (fd == -1) { return -1; } - auto scheduled_op = _create_io_op_desc(true, buffer, fd, filename, num_file_bytes, file_offset, validate); + auto scheduled_op = + _create_io_op_desc(true, buffer, fd, filename, num_file_bytes, file_offset, validate); _schedule_aio_work(scheduled_op); @@ -258,6 +265,7 @@ int deepspeed_io_handle_t::pread(const torch::Tensor& buffer, int deepspeed_io_handle_t::pwrite(const torch::Tensor& buffer, const char* filename, const bool validate, + const int64_t file_offset, const bool async) { const auto num_write_bytes = static_cast(buffer.nbytes()); @@ -268,7 +276,8 @@ int deepspeed_io_handle_t::pwrite(const torch::Tensor& buffer, const auto fd = open_file(filename, false); if (fd == -1) { return -1; } - auto scheduled_op = _create_io_op_desc(false, buffer, fd, filename, num_write_bytes, 0, validate); + auto scheduled_op = + _create_io_op_desc(false, buffer, fd, filename, num_write_bytes, file_offset, validate); _schedule_aio_work(scheduled_op); @@ -277,24 +286,32 @@ int deepspeed_io_handle_t::pwrite(const torch::Tensor& buffer, return wait(); } -int deepspeed_io_handle_t::sync_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset) +int deepspeed_io_handle_t::sync_pread(torch::Tensor& buffer, + const char* filename, + const int64_t file_offset) { return pread(buffer, filename, false, file_offset, false); } -int deepspeed_io_handle_t::sync_pwrite(const torch::Tensor& buffer, const char* filename) +int deepspeed_io_handle_t::sync_pwrite(const torch::Tensor& buffer, + const char* filename, + const int64_t file_offset) { - return pwrite(buffer, filename, false, false); + return pwrite(buffer, filename, false, file_offset, false); } -int deepspeed_io_handle_t::async_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset) +int deepspeed_io_handle_t::async_pread(torch::Tensor& buffer, + const char* filename, + const int64_t file_offset) { return pread(buffer, filename, false, file_offset, true); } -int deepspeed_io_handle_t::async_pwrite(const torch::Tensor& buffer, const char* filename) +int deepspeed_io_handle_t::async_pwrite(const torch::Tensor& buffer, + const char* filename, + const int64_t file_offset) { - return pwrite(buffer, filename, false, true); + return pwrite(buffer, filename, false, file_offset, true); } at::Tensor deepspeed_io_handle_t::new_cpu_locked_tensor(const int64_t num_elem, diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.h b/csrc/aio/py_lib/deepspeed_py_io_handle.h index b4fd3e485fb8..16671b7e7c05 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.h +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.h @@ -38,9 +38,15 @@ struct deepspeed_io_handle_t { const bool get_overlap_events() const; const int get_intra_op_parallelism() const; - int read(torch::Tensor& buffer, const char* filename, const bool validate); + int read(torch::Tensor& buffer, + const char* filename, + const int64_t file_offset, + const bool validate); - int write(const torch::Tensor& buffer, const char* filename, const bool validate); + int write(const torch::Tensor& buffer, + const char* filename, + const int64_t file_offset, + const bool validate); int pread(const torch::Tensor& buffer, const char* filename, @@ -51,15 +57,16 @@ struct deepspeed_io_handle_t { int pwrite(const torch::Tensor& buffer, const char* filename, const bool validate, + const int64_t file_offset, const bool async); int sync_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset); - int sync_pwrite(const torch::Tensor& buffer, const char* filename); + int sync_pwrite(const torch::Tensor& buffer, const char* filename, const int64_t file_offset); int async_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset); - int async_pwrite(const torch::Tensor& buffer, const char* filename); + int async_pwrite(const torch::Tensor& buffer, const char* filename, const int64_t file_offset); // TODO: Make API's args to be shape and dtype. torch::Tensor new_cpu_locked_tensor(const int64_t num_elem, @@ -77,12 +84,11 @@ struct deepspeed_io_handle_t { bool _is_valid_parallel_aio_op(const bool read_op, const int64_t num_bytes); - virtual std::shared_ptr _create_io_op_desc( - const bool read_op, - const torch::Tensor& buffer, - const int fd, - const char* filename, - const int64_t file_num_bytes, - const int64_t file_offset, - const bool validate); + virtual std::shared_ptr _create_io_op_desc(const bool read_op, + const torch::Tensor& buffer, + const int fd, + const char* filename, + const int64_t file_num_bytes, + const int64_t file_offset, + const bool validate); }; diff --git a/csrc/gds/py_lib/deepspeed_gds_op.cpp b/csrc/gds/py_lib/deepspeed_gds_op.cpp index b55302ee89e6..9bb1888dbded 100644 --- a/csrc/gds/py_lib/deepspeed_gds_op.cpp +++ b/csrc/gds/py_lib/deepspeed_gds_op.cpp @@ -97,7 +97,14 @@ gds_op_desc_t::gds_op_desc_t(const bool read_op, const int64_t file_offset, const int intra_op_parallelism, const bool validate) - : io_op_desc_t(read_op, buffer, fd, filename, file_num_bytes, file_offset, intra_op_parallelism, validate) + : io_op_desc_t(read_op, + buffer, + fd, + filename, + file_num_bytes, + file_offset, + intra_op_parallelism, + validate) { _contiguous_buffer = _buffer.contiguous(); const int64_t device = _buffer.get_device(); diff --git a/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp b/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp index 19ae1f8bb1bd..7da4afb85239 100644 --- a/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp +++ b/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp @@ -111,8 +111,14 @@ std::shared_ptr deepspeed_gds_handle_t::_create_io_op_desc( const bool validate) { if (buffer.is_cuda()) { - return std::make_shared( - read_op, buffer, fd, filename, file_num_bytes, file_offset, _intra_op_parallelism, validate); + return std::make_shared(read_op, + buffer, + fd, + filename, + file_num_bytes, + file_offset, + _intra_op_parallelism, + validate); } return deepspeed_io_handle_t::_create_io_op_desc( read_op, buffer, fd, filename, file_num_bytes, file_offset, validate); From 3d89bf38a9156550814404405ef7cddaaad71c2c Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 18 Oct 2024 20:07:01 +0000 Subject: [PATCH 10/17] fixing compiles --- csrc/aio/py_lib/py_ds_aio.cpp | 9 +++++++-- csrc/gds/py_lib/py_ds_gds.cpp | 9 +++++++-- deepspeed/runtime/swap_tensor/utils.py | 2 +- tests/unit/ops/aio/test_aio.py | 12 ++++++------ tests/unit/ops/aio/test_gds.py | 12 ++++++------ 5 files changed, 27 insertions(+), 17 deletions(-) diff --git a/csrc/aio/py_lib/py_ds_aio.cpp b/csrc/aio/py_lib/py_ds_aio.cpp index 96f3064656a0..ff29480bd6f3 100644 --- a/csrc/aio/py_lib/py_ds_aio.cpp +++ b/csrc/aio/py_lib/py_ds_aio.cpp @@ -40,6 +40,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "Synchronous and non-parallel file read. Returns count of completed read ops", "buffer"_a, "filename"_a, + "file_offset"_a, "validate"_a) .def("write", @@ -47,6 +48,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "Synchronous and non-parallel file write. Returns count of completed write ops", "buffer"_a, "filename"_a, + "file_offset"_a, "validate"_a) .def("pread", @@ -64,6 +66,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "buffer"_a, "filename"_a, "validate"_a, + "file_offset"_a, "async"_a) .def("sync_pread", @@ -77,7 +80,8 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) &deepspeed_aio_handle_t::sync_pwrite, "Synchronous parallel file write. Returns count of completed write ops", "buffer"_a, - "filename"_a) + "filename"_a, + "file_offset"_a) .def("async_pread", &deepspeed_aio_handle_t::async_pread, @@ -92,7 +96,8 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "Asynchronous parallel file write. Returns 0 on success, and following wait() returns " "count of completed ops.", "buffer"_a, - "filename"_a) + "filename"_a, + "file_offset"_a) .def("new_cpu_locked_tensor", &deepspeed_aio_handle_t::new_cpu_locked_tensor, diff --git a/csrc/gds/py_lib/py_ds_gds.cpp b/csrc/gds/py_lib/py_ds_gds.cpp index 4b451034de1e..ecdc526624b3 100644 --- a/csrc/gds/py_lib/py_ds_gds.cpp +++ b/csrc/gds/py_lib/py_ds_gds.cpp @@ -33,6 +33,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "Synchronous and non-parallel file read. Returns count of completed read ops", "buffer"_a, "filename"_a, + "file_offset"_a, "validate"_a) .def("write", @@ -40,6 +41,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "Synchronous and non-parallel file write. Returns count of completed write ops", "buffer"_a, "filename"_a, + "file_offset"_a, "validate"_a) .def("pread", @@ -57,6 +59,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "buffer"_a, "filename"_a, "validate"_a, + "file_offset"_a, "async"_a) .def("sync_pread", @@ -70,7 +73,8 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) &deepspeed_gds_handle_t::sync_pwrite, "Synchronous parallel file write. Returns count of completed write ops", "buffer"_a, - "filename"_a) + "filename"_a, + "file_offset"_a) .def("async_pread", &deepspeed_gds_handle_t::async_pread, @@ -85,7 +89,8 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "Asynchronous parallel file write. Returns 0 on success, and following wait() returns " "count of completed ops.", "buffer"_a, - "filename"_a) + "filename"_a, + "file_offset"_a) .def("new_cpu_locked_tensor", &deepspeed_gds_handle_t::new_cpu_locked_tensor, diff --git a/deepspeed/runtime/swap_tensor/utils.py b/deepspeed/runtime/swap_tensor/utils.py index 3ed5a5710aa7..1f9825c34638 100644 --- a/deepspeed/runtime/swap_tensor/utils.py +++ b/deepspeed/runtime/swap_tensor/utils.py @@ -23,7 +23,7 @@ def swap_in_tensors(swap_handle, tensor_buffers, swap_paths): def swap_out_tensors(swap_handle, tensor_buffers, swap_paths): for buffer, path in zip(tensor_buffers, swap_paths): - assert (swap_handle.async_pwrite(buffer, path) == 0) + assert (swap_handle.async_pwrite(buffer, path, 0) == 0) def print_object(obj, name, exclude_list=[]): diff --git a/tests/unit/ops/aio/test_aio.py b/tests/unit/ops/aio/test_aio.py index a074cfca317f..542570e79954 100644 --- a/tests/unit/ops/aio/test_aio.py +++ b/tests/unit/ops/aio/test_aio.py @@ -103,7 +103,7 @@ def test_parallel_read(self, tmpdir, use_cuda_pinned_tensor, single_submit, over _validate_handle_state(h, single_submit, overlap_events) ref_file, _ = _do_ref_write(tmpdir) - read_status = h.sync_pread(aio_buffer, ref_file) + read_status = h.sync_pread(aio_buffer, ref_file, 0) assert read_status == 1 with open(ref_file, 'rb') as f: @@ -131,7 +131,7 @@ def test_async_read(self, tmpdir, use_cuda_pinned_tensor, single_submit, overlap _validate_handle_state(h, single_submit, overlap_events) ref_file, _ = _do_ref_write(tmpdir) - read_status = h.async_pread(aio_buffer, ref_file) + read_status = h.async_pread(aio_buffer, ref_file, 0) assert read_status == 0 wait_status = h.wait() @@ -172,7 +172,7 @@ def test_parallel_write(self, tmpdir, use_cuda_pinned_tensor, single_submit, ove _validate_handle_state(h, single_submit, overlap_events) - write_status = h.sync_pwrite(aio_buffer, aio_file) + write_status = h.sync_pwrite(aio_buffer, aio_file, 0) assert write_status == 1 if not use_cuda_pinned_tensor: @@ -201,7 +201,7 @@ def test_async_write(self, tmpdir, use_cuda_pinned_tensor, single_submit, overla _validate_handle_state(h, single_submit, overlap_events) - write_status = h.async_pwrite(aio_buffer, aio_file) + write_status = h.async_pwrite(aio_buffer, aio_file, 0) assert write_status == 0 wait_status = h.wait() @@ -258,7 +258,7 @@ def test_read(self, tmpdir, async_queue, use_cuda_pinned_tensor, use_unpinned_te _validate_handle_state(h, single_submit, overlap_events) for i in range(async_queue): - read_status = h.async_pread(aio_buffers[i], ref_files[i]) + read_status = h.async_pread(aio_buffers[i], ref_files[i], 0) assert read_status == 0 wait_status = h.wait() @@ -305,7 +305,7 @@ def test_write(self, tmpdir, use_cuda_pinned_tensor, async_queue, use_unpinned_t _validate_handle_state(h, single_submit, overlap_events) for i in range(async_queue): - read_status = h.async_pwrite(aio_buffers[i], aio_files[i]) + read_status = h.async_pwrite(aio_buffers[i], aio_files[i], 0) assert read_status == 0 wait_status = h.wait() diff --git a/tests/unit/ops/aio/test_gds.py b/tests/unit/ops/aio/test_gds.py index e94d42cd22af..0ec55e724ca5 100644 --- a/tests/unit/ops/aio/test_gds.py +++ b/tests/unit/ops/aio/test_gds.py @@ -78,7 +78,7 @@ def test_parallel_read(self, tmpdir, single_submit, overlap_events): _validate_handle_state(h, single_submit, overlap_events) ref_file, _ = _do_ref_write(tmpdir) - read_status = h.sync_pread(gds_buffer, ref_file) + read_status = h.sync_pread(gds_buffer, ref_file, 0) assert read_status == 1 with open(ref_file, 'rb') as f: @@ -97,7 +97,7 @@ def test_async_read(self, tmpdir, single_submit, overlap_events): _validate_handle_state(h, single_submit, overlap_events) ref_file, _ = _do_ref_write(tmpdir) - read_status = h.async_pread(gds_buffer, ref_file) + read_status = h.async_pread(gds_buffer, ref_file, 0) assert read_status == 0 wait_status = h.wait() @@ -128,7 +128,7 @@ def test_parallel_write(self, tmpdir, single_submit, overlap_events): _validate_handle_state(h, single_submit, overlap_events) - write_status = h.sync_pwrite(gds_buffer, gds_file) + write_status = h.sync_pwrite(gds_buffer, gds_file, 0) assert write_status == 1 h.unpin_device_tensor(gds_buffer) @@ -146,7 +146,7 @@ def test_async_write(self, tmpdir, single_submit, overlap_events): _validate_handle_state(h, single_submit, overlap_events) - write_status = h.async_pwrite(gds_buffer, gds_file) + write_status = h.async_pwrite(gds_buffer, gds_file, 0) assert write_status == 0 wait_status = h.wait() @@ -188,7 +188,7 @@ def test_read(self, tmpdir, async_queue): _validate_handle_state(h, single_submit, overlap_events) for i in range(async_queue): - read_status = h.async_pread(gds_buffers[i], ref_files[i]) + read_status = h.async_pread(gds_buffers[i], ref_files[i], 0) assert read_status == 0 wait_status = h.wait() @@ -225,7 +225,7 @@ def test_write(self, tmpdir, async_queue): _validate_handle_state(h, single_submit, overlap_events) for i in range(async_queue): - read_status = h.async_pwrite(gds_buffers[i], gds_files[i]) + read_status = h.async_pwrite(gds_buffers[i], gds_files[i], 0) assert read_status == 0 wait_status = h.wait() From 1400c3f1d165350541e2de8505eea8b2d6c087aa Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sat, 19 Oct 2024 00:33:14 +0000 Subject: [PATCH 11/17] pytest additions --- csrc/aio/py_lib/py_ds_aio.cpp | 16 +++--- csrc/gds/py_lib/py_ds_gds.cpp | 16 +++--- tests/unit/ops/aio/test_aio.py | 102 ++++++++++++++++++++++++++++++++- tests/unit/ops/aio/test_gds.py | 85 ++++++++++++++++++++++++++- 4 files changed, 199 insertions(+), 20 deletions(-) diff --git a/csrc/aio/py_lib/py_ds_aio.cpp b/csrc/aio/py_lib/py_ds_aio.cpp index ff29480bd6f3..2511a4d29b21 100644 --- a/csrc/aio/py_lib/py_ds_aio.cpp +++ b/csrc/aio/py_lib/py_ds_aio.cpp @@ -40,7 +40,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "Synchronous and non-parallel file read. Returns count of completed read ops", "buffer"_a, "filename"_a, - "file_offset"_a, + "file_offset"_a = 0, "validate"_a) .def("write", @@ -48,7 +48,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "Synchronous and non-parallel file write. Returns count of completed write ops", "buffer"_a, "filename"_a, - "file_offset"_a, + "file_offset"_a = 0, "validate"_a) .def("pread", @@ -57,7 +57,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "buffer"_a, "filename"_a, "validate"_a, - "file_offset"_a, + "file_offset"_a = 0, "async"_a) .def("pwrite", @@ -66,7 +66,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "buffer"_a, "filename"_a, "validate"_a, - "file_offset"_a, + "file_offset"_a = 0, "async"_a) .def("sync_pread", @@ -74,14 +74,14 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "Synchrononous parallel file read. Returns count of completed read ops", "buffer"_a, "filename"_a, - "file_offset"_a) + "file_offset"_a = 0) .def("sync_pwrite", &deepspeed_aio_handle_t::sync_pwrite, "Synchronous parallel file write. Returns count of completed write ops", "buffer"_a, "filename"_a, - "file_offset"_a) + "file_offset"_a = 0) .def("async_pread", &deepspeed_aio_handle_t::async_pread, @@ -89,7 +89,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "following wait() returns count of completed ops.", "buffer"_a, "filename"_a, - "file_offset"_a) + "file_offset"_a = 0) .def("async_pwrite", &deepspeed_aio_handle_t::async_pwrite, @@ -97,7 +97,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "count of completed ops.", "buffer"_a, "filename"_a, - "file_offset"_a) + "file_offset"_a = 0) .def("new_cpu_locked_tensor", &deepspeed_aio_handle_t::new_cpu_locked_tensor, diff --git a/csrc/gds/py_lib/py_ds_gds.cpp b/csrc/gds/py_lib/py_ds_gds.cpp index ecdc526624b3..c235c2cc9eb6 100644 --- a/csrc/gds/py_lib/py_ds_gds.cpp +++ b/csrc/gds/py_lib/py_ds_gds.cpp @@ -33,7 +33,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "Synchronous and non-parallel file read. Returns count of completed read ops", "buffer"_a, "filename"_a, - "file_offset"_a, + "file_offset"_a = 0, "validate"_a) .def("write", @@ -41,7 +41,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "Synchronous and non-parallel file write. Returns count of completed write ops", "buffer"_a, "filename"_a, - "file_offset"_a, + "file_offset"_a = 0, "validate"_a) .def("pread", @@ -50,7 +50,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "buffer"_a, "filename"_a, "validate"_a, - "file_offset"_a, + "file_offset"_a = 0, "async"_a) .def("pwrite", @@ -59,7 +59,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "buffer"_a, "filename"_a, "validate"_a, - "file_offset"_a, + "file_offset"_a = 0, "async"_a) .def("sync_pread", @@ -67,14 +67,14 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "Synchrononous parallel file read. Returns count of completed read ops", "buffer"_a, "filename"_a, - "file_offset"_a) + "file_offset"_a = 0) .def("sync_pwrite", &deepspeed_gds_handle_t::sync_pwrite, "Synchronous parallel file write. Returns count of completed write ops", "buffer"_a, "filename"_a, - "file_offset"_a) + "file_offset"_a = 0) .def("async_pread", &deepspeed_gds_handle_t::async_pread, @@ -82,7 +82,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "following wait() returns count of completed ops.", "buffer"_a, "filename"_a, - "file_offset"_a) + "file_offset"_a = 0) .def("async_pwrite", &deepspeed_gds_handle_t::async_pwrite, @@ -90,7 +90,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "count of completed ops.", "buffer"_a, "filename"_a, - "file_offset"_a) + "file_offset"_a = 0) .def("new_cpu_locked_tensor", &deepspeed_gds_handle_t::new_cpu_locked_tensor, diff --git a/tests/unit/ops/aio/test_aio.py b/tests/unit/ops/aio/test_aio.py index 542570e79954..3afbc01d54ab 100644 --- a/tests/unit/ops/aio/test_aio.py +++ b/tests/unit/ops/aio/test_aio.py @@ -35,16 +35,21 @@ def _get_local_rank(): return 0 -def _do_ref_write(tmpdir, index=0): +def _do_ref_write(tmpdir, index=0, file_size = IO_SIZE): file_suffix = f'{_get_local_rank()}_{index}' ref_file = os.path.join(tmpdir, f'_py_random_{file_suffix}.pt') - ref_buffer = os.urandom(IO_SIZE) + ref_buffer = os.urandom(file_size) with open(ref_file, 'wb') as f: f.write(ref_buffer) return ref_file, ref_buffer +def _get_file_path(tmpdir, file_prefix, index=0): + file_suffix = f'{_get_local_rank()}_{index}' + return os.path.join(tmpdir, f'{file_prefix}_{file_suffix}.pt') + + def _get_test_write_file(tmpdir, index): file_suffix = f'{_get_local_rank()}_{index}' return os.path.join(tmpdir, f'_aio_write_random_{file_suffix}.pt') @@ -320,3 +325,96 @@ def test_write(self, tmpdir, use_cuda_pinned_tensor, async_queue, use_unpinned_t filecmp.clear_cache() assert filecmp.cmp(ref_files[i], aio_files[i], shallow=False) + +@pytest.mark.parametrize("use_cuda_pinned_tensor", [True, False]) +@pytest.mark.parametrize('file_partitions', [[1, 1, 1], [1, 1, 2], [1, 2, 1], [2, 1, 1]]) +class TestAsyncFileOffset(DistributedTest): + world_size = 1 + + def test_offset_write(self, tmpdir, file_partitions, use_cuda_pinned_tensor): + + + ref_file = _get_file_path(tmpdir, '_py_random') + aio_file = _get_file_path(tmpdir, '_aio_random') + partition_unit_size = BLOCK_SIZE + file_size = sum(file_partitions) * partition_unit_size + + h = AsyncIOBuilder().load().aio_handle(BLOCK_SIZE, + QUEUE_DEPTH, + True, + True, + IO_PARALLEL) + + if use_cuda_pinned_tensor: + data_buffer = torch.ByteTensor(list(os.urandom(file_size))).pin_memory() + else: + data_buffer = h.new_cpu_locked_tensor(file_size, + torch.empty(0, + dtype=torch.uint8)) + + file_offsets = [] + next_offset = 0 + for i in range(len(file_partitions)): + file_offsets.append(next_offset) + next_offset += file_partitions[i] * partition_unit_size + + ref_fd = open(ref_file, 'wb') + for i in range(len(file_partitions)): + src_buffer = torch.narrow(data_buffer, + 0, + file_offsets[i], + file_partitions[i] * partition_unit_size) + + ref_fd.write(src_buffer.numpy().tobytes()) + ref_fd.flush() + + assert 1 == h.sync_pwrite(buffer=src_buffer, + filename=aio_file, + file_offset=file_offsets[i]) + + filecmp.clear_cache() + assert filecmp.cmp(ref_file, aio_file, shallow=False) + + ref_fd.close() + + if not use_cuda_pinned_tensor: + h.free_cpu_locked_tensor(data_buffer) + + def test_offset_read(self, tmpdir, file_partitions, use_cuda_pinned_tensor): + + partition_unit_size = BLOCK_SIZE + file_size = sum(file_partitions) * partition_unit_size + ref_file, _ = _do_ref_write(tmpdir, 0, file_size) + h = AsyncIOBuilder().load().aio_handle(BLOCK_SIZE, + QUEUE_DEPTH, + True, + True, + IO_PARALLEL) + + if use_cuda_pinned_tensor: + data_buffer = torch.zeros(file_size, + dtype=torch.uint8, + device='cpu').pin_memory() + else: + data_buffer = h.new_cpu_locked_tensor(file_size, + torch.empty(0, + dtype=torch.uint8)) + + file_offsets = [] + next_offset = 0 + for i in range(len(file_partitions)): + file_offsets.append(next_offset) + next_offset += file_partitions[i] * partition_unit_size + + with open(ref_file, 'rb') as ref_fd: + for i in range(len(file_partitions)): + ref_fd.seek(file_offsets[i]) + bytes_to_read = file_partitions[i] * partition_unit_size + ref_buf = list(ref_fd.read(bytes_to_read)) + + dst_tensor = torch.narrow(data_buffer, 0, 0, bytes_to_read) + assert 1 == h.sync_pread(dst_tensor, ref_file, file_offsets[i]) + assert dst_tensor.tolist() == ref_buf + + if not use_cuda_pinned_tensor: + h.free_cpu_locked_tensor(data_buffer) diff --git a/tests/unit/ops/aio/test_gds.py b/tests/unit/ops/aio/test_gds.py index 0ec55e724ca5..6880c3d26292 100644 --- a/tests/unit/ops/aio/test_gds.py +++ b/tests/unit/ops/aio/test_gds.py @@ -29,15 +29,19 @@ def _get_local_rank(): return 0 -def _do_ref_write(tmpdir, index=0): +def _do_ref_write(tmpdir, index=0, file_size = IO_SIZE): file_suffix = f'{_get_local_rank()}_{index}' ref_file = os.path.join(tmpdir, f'_py_random_{file_suffix}.pt') - ref_buffer = os.urandom(IO_SIZE) + ref_buffer = os.urandom(file_size) with open(ref_file, 'wb') as f: f.write(ref_buffer) return ref_file, ref_buffer +def _get_file_path(tmpdir, file_prefix, index=0): + file_suffix = f'{_get_local_rank()}_{index}' + return os.path.join(tmpdir, f'{file_prefix}_{file_suffix}.pt') + def _get_test_write_file(tmpdir, index): file_suffix = f'{_get_local_rank()}_{index}' @@ -268,3 +272,80 @@ def test_pin_device_tensor(self, use_new_api): h.free_pinned_device_tensor(pinned_buffer) else: h.unpin_device_tensor(pinned_buffer) + +@pytest.mark.parametrize('file_partitions', [[1, 1, 1], [1, 1, 2], [1, 2, 1], [2, 1, 1]]) +class TestAsyncFileOffset(DistributedTest): + world_size = 1 + + def test_offset_write(self, tmpdir, file_partitions): + ref_file = _get_file_path(tmpdir, '_py_random') + aio_file = _get_file_path(tmpdir, '_aio_random') + partition_unit_size = IO_SIZE + file_size = sum(file_partitions) * partition_unit_size + + h = GDSBuilder().load().gds_handle(BLOCK_SIZE, + QUEUE_DEPTH, + True, + True, + IO_PARALLEL) + + gds_buffer = torch.empty(file_size, dtype=torch.uint8, device=get_accelerator().device_name()) + h.pin_device_tensor(gds_buffer) + + file_offsets = [] + next_offset = 0 + for i in range(len(file_partitions)): + file_offsets.append(next_offset) + next_offset += file_partitions[i] * partition_unit_size + + ref_fd = open(ref_file, 'wb') + for i in range(len(file_partitions)): + src_buffer = torch.narrow(gds_buffer, + 0, + file_offsets[i], + file_partitions[i] * partition_unit_size).to(device='cpu') + + ref_fd.write(src_buffer.numpy().tobytes()) + ref_fd.flush() + + assert 1 == h.sync_pwrite(buffer=src_buffer, + filename=aio_file, + file_offset=file_offsets[i]) + + filecmp.clear_cache() + assert filecmp.cmp(ref_file, aio_file, shallow=False) + + ref_fd.close() + + h.unpin_device_tensor(gds_buffer) + + def test_offset_read(self, tmpdir, file_partitions): + partition_unit_size = BLOCK_SIZE + file_size = sum(file_partitions) * partition_unit_size + ref_file, _ = _do_ref_write(tmpdir, 0, file_size) + h = GDSBuilder().load().gds_handle(BLOCK_SIZE, + QUEUE_DEPTH, + True, + True, + IO_PARALLEL) + + gds_buffer = torch.empty(file_size, dtype=torch.uint8, device=get_accelerator().device_name()) + h.pin_device_tensor(gds_buffer) + + file_offsets = [] + next_offset = 0 + for i in range(len(file_partitions)): + file_offsets.append(next_offset) + next_offset += file_partitions[i] * partition_unit_size + + with open(ref_file, 'rb') as ref_fd: + for i in range(len(file_partitions)): + ref_fd.seek(file_offsets[i]) + bytes_to_read = file_partitions[i] * partition_unit_size + ref_buf = list(ref_fd.read(bytes_to_read)) + + dst_tensor = torch.narrow(gds_buffer, 0, 0, bytes_to_read) + assert 1 == h.sync_pread(dst_tensor, ref_file, file_offsets[i]) + assert dst_tensor.tolist() == ref_buf + + h.unpin_device_tensor(gds_buffer) From b8da1cc90fd426e9a65b35fa6d19c4473b85bb90 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sat, 19 Oct 2024 00:33:49 +0000 Subject: [PATCH 12/17] fix formatting --- csrc/aio/py_lib/py_ds_aio.cpp | 2 +- tests/unit/ops/aio/test_aio.py | 37 +++++++++------------------------- tests/unit/ops/aio/test_gds.py | 24 +++++++--------------- 3 files changed, 17 insertions(+), 46 deletions(-) diff --git a/csrc/aio/py_lib/py_ds_aio.cpp b/csrc/aio/py_lib/py_ds_aio.cpp index 2511a4d29b21..f324e6e533be 100644 --- a/csrc/aio/py_lib/py_ds_aio.cpp +++ b/csrc/aio/py_lib/py_ds_aio.cpp @@ -80,7 +80,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) &deepspeed_aio_handle_t::sync_pwrite, "Synchronous parallel file write. Returns count of completed write ops", "buffer"_a, - "filename"_a, + "filename"_a, "file_offset"_a = 0) .def("async_pread", diff --git a/tests/unit/ops/aio/test_aio.py b/tests/unit/ops/aio/test_aio.py index 3afbc01d54ab..82d41d18b66b 100644 --- a/tests/unit/ops/aio/test_aio.py +++ b/tests/unit/ops/aio/test_aio.py @@ -35,7 +35,7 @@ def _get_local_rank(): return 0 -def _do_ref_write(tmpdir, index=0, file_size = IO_SIZE): +def _do_ref_write(tmpdir, index=0, file_size=IO_SIZE): file_suffix = f'{_get_local_rank()}_{index}' ref_file = os.path.join(tmpdir, f'_py_random_{file_suffix}.pt') ref_buffer = os.urandom(file_size) @@ -326,6 +326,7 @@ def test_write(self, tmpdir, use_cuda_pinned_tensor, async_queue, use_unpinned_t filecmp.clear_cache() assert filecmp.cmp(ref_files[i], aio_files[i], shallow=False) + @pytest.mark.parametrize("use_cuda_pinned_tensor", [True, False]) @pytest.mark.parametrize('file_partitions', [[1, 1, 1], [1, 1, 2], [1, 2, 1], [2, 1, 1]]) class TestAsyncFileOffset(DistributedTest): @@ -333,24 +334,17 @@ class TestAsyncFileOffset(DistributedTest): def test_offset_write(self, tmpdir, file_partitions, use_cuda_pinned_tensor): - ref_file = _get_file_path(tmpdir, '_py_random') aio_file = _get_file_path(tmpdir, '_aio_random') partition_unit_size = BLOCK_SIZE file_size = sum(file_partitions) * partition_unit_size - h = AsyncIOBuilder().load().aio_handle(BLOCK_SIZE, - QUEUE_DEPTH, - True, - True, - IO_PARALLEL) + h = AsyncIOBuilder().load().aio_handle(BLOCK_SIZE, QUEUE_DEPTH, True, True, IO_PARALLEL) if use_cuda_pinned_tensor: data_buffer = torch.ByteTensor(list(os.urandom(file_size))).pin_memory() else: - data_buffer = h.new_cpu_locked_tensor(file_size, - torch.empty(0, - dtype=torch.uint8)) + data_buffer = h.new_cpu_locked_tensor(file_size, torch.empty(0, dtype=torch.uint8)) file_offsets = [] next_offset = 0 @@ -360,17 +354,12 @@ def test_offset_write(self, tmpdir, file_partitions, use_cuda_pinned_tensor): ref_fd = open(ref_file, 'wb') for i in range(len(file_partitions)): - src_buffer = torch.narrow(data_buffer, - 0, - file_offsets[i], - file_partitions[i] * partition_unit_size) + src_buffer = torch.narrow(data_buffer, 0, file_offsets[i], file_partitions[i] * partition_unit_size) ref_fd.write(src_buffer.numpy().tobytes()) ref_fd.flush() - assert 1 == h.sync_pwrite(buffer=src_buffer, - filename=aio_file, - file_offset=file_offsets[i]) + assert 1 == h.sync_pwrite(buffer=src_buffer, filename=aio_file, file_offset=file_offsets[i]) filecmp.clear_cache() assert filecmp.cmp(ref_file, aio_file, shallow=False) @@ -385,20 +374,12 @@ def test_offset_read(self, tmpdir, file_partitions, use_cuda_pinned_tensor): partition_unit_size = BLOCK_SIZE file_size = sum(file_partitions) * partition_unit_size ref_file, _ = _do_ref_write(tmpdir, 0, file_size) - h = AsyncIOBuilder().load().aio_handle(BLOCK_SIZE, - QUEUE_DEPTH, - True, - True, - IO_PARALLEL) + h = AsyncIOBuilder().load().aio_handle(BLOCK_SIZE, QUEUE_DEPTH, True, True, IO_PARALLEL) if use_cuda_pinned_tensor: - data_buffer = torch.zeros(file_size, - dtype=torch.uint8, - device='cpu').pin_memory() + data_buffer = torch.zeros(file_size, dtype=torch.uint8, device='cpu').pin_memory() else: - data_buffer = h.new_cpu_locked_tensor(file_size, - torch.empty(0, - dtype=torch.uint8)) + data_buffer = h.new_cpu_locked_tensor(file_size, torch.empty(0, dtype=torch.uint8)) file_offsets = [] next_offset = 0 diff --git a/tests/unit/ops/aio/test_gds.py b/tests/unit/ops/aio/test_gds.py index 6880c3d26292..d97eff452eb5 100644 --- a/tests/unit/ops/aio/test_gds.py +++ b/tests/unit/ops/aio/test_gds.py @@ -29,7 +29,7 @@ def _get_local_rank(): return 0 -def _do_ref_write(tmpdir, index=0, file_size = IO_SIZE): +def _do_ref_write(tmpdir, index=0, file_size=IO_SIZE): file_suffix = f'{_get_local_rank()}_{index}' ref_file = os.path.join(tmpdir, f'_py_random_{file_suffix}.pt') ref_buffer = os.urandom(file_size) @@ -38,6 +38,7 @@ def _do_ref_write(tmpdir, index=0, file_size = IO_SIZE): return ref_file, ref_buffer + def _get_file_path(tmpdir, file_prefix, index=0): file_suffix = f'{_get_local_rank()}_{index}' return os.path.join(tmpdir, f'{file_prefix}_{file_suffix}.pt') @@ -273,6 +274,7 @@ def test_pin_device_tensor(self, use_new_api): else: h.unpin_device_tensor(pinned_buffer) + @pytest.mark.parametrize('file_partitions', [[1, 1, 1], [1, 1, 2], [1, 2, 1], [2, 1, 1]]) class TestAsyncFileOffset(DistributedTest): world_size = 1 @@ -283,11 +285,7 @@ def test_offset_write(self, tmpdir, file_partitions): partition_unit_size = IO_SIZE file_size = sum(file_partitions) * partition_unit_size - h = GDSBuilder().load().gds_handle(BLOCK_SIZE, - QUEUE_DEPTH, - True, - True, - IO_PARALLEL) + h = GDSBuilder().load().gds_handle(BLOCK_SIZE, QUEUE_DEPTH, True, True, IO_PARALLEL) gds_buffer = torch.empty(file_size, dtype=torch.uint8, device=get_accelerator().device_name()) h.pin_device_tensor(gds_buffer) @@ -300,17 +298,13 @@ def test_offset_write(self, tmpdir, file_partitions): ref_fd = open(ref_file, 'wb') for i in range(len(file_partitions)): - src_buffer = torch.narrow(gds_buffer, - 0, - file_offsets[i], + src_buffer = torch.narrow(gds_buffer, 0, file_offsets[i], file_partitions[i] * partition_unit_size).to(device='cpu') ref_fd.write(src_buffer.numpy().tobytes()) ref_fd.flush() - assert 1 == h.sync_pwrite(buffer=src_buffer, - filename=aio_file, - file_offset=file_offsets[i]) + assert 1 == h.sync_pwrite(buffer=src_buffer, filename=aio_file, file_offset=file_offsets[i]) filecmp.clear_cache() assert filecmp.cmp(ref_file, aio_file, shallow=False) @@ -323,11 +317,7 @@ def test_offset_read(self, tmpdir, file_partitions): partition_unit_size = BLOCK_SIZE file_size = sum(file_partitions) * partition_unit_size ref_file, _ = _do_ref_write(tmpdir, 0, file_size) - h = GDSBuilder().load().gds_handle(BLOCK_SIZE, - QUEUE_DEPTH, - True, - True, - IO_PARALLEL) + h = GDSBuilder().load().gds_handle(BLOCK_SIZE, QUEUE_DEPTH, True, True, IO_PARALLEL) gds_buffer = torch.empty(file_size, dtype=torch.uint8, device=get_accelerator().device_name()) h.pin_device_tensor(gds_buffer) From 441c7e3494247669536b1bfd65254e4964767fe7 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 28 Oct 2024 20:29:41 +0000 Subject: [PATCH 13/17] removing commented code --- csrc/aio/py_lib/deepspeed_py_io_handle.cpp | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp index 036e84fa1591..003bf5bd46c1 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp @@ -238,13 +238,9 @@ int deepspeed_io_handle_t::pread(const torch::Tensor& buffer, report_file_error(filename, " fstat for read", error_code); return -1; } + + // buffer can exceed file size to enable 4k alignment const auto buffer_bytes = static_cast(buffer.nbytes()); - // No longer check, b/c buffer can exceed file size to enable 4k alignment - // if ((buffer_bytes+file_offset) > num_file_bytes) { - // std::cout << filename << ": buffer + file offset > file bytes " << buffer_bytes - // << "+ " << file_offset << " > " << num_file_bytes << std::endl; - // //assert(0); - //} assert((num_file_bytes % _intra_op_parallelism) == 0); if (!_is_valid_parallel_aio_op(true, buffer_bytes)) { return -1; } From 988be5a9bb9a315b4d53ddbd52392d77804aa6ad Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 30 Oct 2024 21:22:56 +0000 Subject: [PATCH 14/17] moving offset to last argument --- csrc/aio/py_lib/py_ds_aio.cpp | 16 ++++++++-------- csrc/gds/py_lib/py_ds_gds.cpp | 16 ++++++++-------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/csrc/aio/py_lib/py_ds_aio.cpp b/csrc/aio/py_lib/py_ds_aio.cpp index f324e6e533be..bf298b691b81 100644 --- a/csrc/aio/py_lib/py_ds_aio.cpp +++ b/csrc/aio/py_lib/py_ds_aio.cpp @@ -40,16 +40,16 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "Synchronous and non-parallel file read. Returns count of completed read ops", "buffer"_a, "filename"_a, - "file_offset"_a = 0, - "validate"_a) + "validate"_a, + "file_offset"_a = 0) .def("write", &deepspeed_aio_handle_t::write, "Synchronous and non-parallel file write. Returns count of completed write ops", "buffer"_a, "filename"_a, - "file_offset"_a = 0, - "validate"_a) + "validate"_a, + "file_offset"_a = 0) .def("pread", &deepspeed_aio_handle_t::pread, @@ -57,8 +57,8 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "buffer"_a, "filename"_a, "validate"_a, - "file_offset"_a = 0, - "async"_a) + "async"_a, + "file_offset"_a = 0) .def("pwrite", &deepspeed_aio_handle_t::pwrite, @@ -66,8 +66,8 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "buffer"_a, "filename"_a, "validate"_a, - "file_offset"_a = 0, - "async"_a) + "async"_a, + "file_offset"_a = 0) .def("sync_pread", &deepspeed_aio_handle_t::sync_pread, diff --git a/csrc/gds/py_lib/py_ds_gds.cpp b/csrc/gds/py_lib/py_ds_gds.cpp index c235c2cc9eb6..2f165ee2c32a 100644 --- a/csrc/gds/py_lib/py_ds_gds.cpp +++ b/csrc/gds/py_lib/py_ds_gds.cpp @@ -33,16 +33,16 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "Synchronous and non-parallel file read. Returns count of completed read ops", "buffer"_a, "filename"_a, - "file_offset"_a = 0, - "validate"_a) + "validate"_a, + "file_offset"_a = 0) .def("write", &deepspeed_gds_handle_t::write, "Synchronous and non-parallel file write. Returns count of completed write ops", "buffer"_a, "filename"_a, - "file_offset"_a = 0, - "validate"_a) + "validate"_a, + "file_offset"_a = 0) .def("pread", &deepspeed_gds_handle_t::pread, @@ -50,8 +50,8 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "buffer"_a, "filename"_a, "validate"_a, - "file_offset"_a = 0, - "async"_a) + "async"_a, + "file_offset"_a = 0) .def("pwrite", &deepspeed_gds_handle_t::pwrite, @@ -59,8 +59,8 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "buffer"_a, "filename"_a, "validate"_a, - "file_offset"_a = 0, - "async"_a) + "async"_a, + "file_offset"_a = 0) .def("sync_pread", &deepspeed_gds_handle_t::sync_pread, From 7454159cf9d7abaaabb54c8b72aab78e7c19b0e4 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 30 Oct 2024 23:05:01 +0000 Subject: [PATCH 15/17] moving offset argumetn --- csrc/aio/py_lib/deepspeed_aio_op_desc.cpp | 4 +-- csrc/aio/py_lib/deepspeed_aio_op_desc.h | 6 ++-- csrc/aio/py_lib/deepspeed_cpu_op.cpp | 8 ++--- csrc/aio/py_lib/deepspeed_cpu_op.h | 4 +-- csrc/aio/py_lib/deepspeed_py_io_handle.cpp | 36 ++++++++++----------- csrc/aio/py_lib/deepspeed_py_io_handle.h | 20 ++++++------ csrc/gds/py_lib/deepspeed_gds_op.cpp | 8 ++--- csrc/gds/py_lib/deepspeed_gds_op.h | 4 +-- csrc/gds/py_lib/deepspeed_py_gds_handle.cpp | 10 +++--- csrc/gds/py_lib/deepspeed_py_gds_handle.h | 4 +-- 10 files changed, 52 insertions(+), 52 deletions(-) diff --git a/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp b/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp index 8594eb7a2e9d..945251397225 100644 --- a/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp +++ b/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp @@ -12,9 +12,9 @@ io_op_desc_t::io_op_desc_t(const bool read_op, const int fd, const char* filename, const int64_t file_num_bytes, - const int64_t file_offset, const int intra_op_parallelism, - const bool validate) + const bool validate, + const int64_t file_offset) : _read_op(read_op), _buffer(buffer), _fd(fd), diff --git a/csrc/aio/py_lib/deepspeed_aio_op_desc.h b/csrc/aio/py_lib/deepspeed_aio_op_desc.h index 086a562bc893..ac1cdf90f78b 100644 --- a/csrc/aio/py_lib/deepspeed_aio_op_desc.h +++ b/csrc/aio/py_lib/deepspeed_aio_op_desc.h @@ -15,20 +15,20 @@ struct io_op_desc_t { int _fd; const std::string _filename; const int64_t _file_num_bytes; - const int64_t _file_offset; const int _intra_op_parallelism; const int64_t _num_bytes_per_thread; torch::Tensor _contiguous_buffer; const bool _validate; + const int64_t _file_offset; io_op_desc_t(const bool read_op, const torch::Tensor& buffer, const int fd, const char* filename, const int64_t file_num_bytes, - const int64_t file_offset, const int intra_op_parallelism, - const bool validate); + const bool validate, + const int64_t file_offset); virtual void run(const int tid, std::unique_ptr& aio_ctxt, diff --git a/csrc/aio/py_lib/deepspeed_cpu_op.cpp b/csrc/aio/py_lib/deepspeed_cpu_op.cpp index e48c2762b010..56fb33fb1886 100644 --- a/csrc/aio/py_lib/deepspeed_cpu_op.cpp +++ b/csrc/aio/py_lib/deepspeed_cpu_op.cpp @@ -15,17 +15,17 @@ cpu_op_desc_t::cpu_op_desc_t( const int fd, const char* filename, const int64_t file_num_bytes, - const int64_t file_offset, const int intra_op_parallelism, - const bool validate) + const bool validate, + const int64_t file_offset) : io_op_desc_t(read_op, buffer, fd, filename, file_num_bytes, - file_offset, intra_op_parallelism, - validate), + validate, + file_offset), _cpu_buffer(buffer), _pinned_tensor_mgr(pinned_tensor_mgr), _is_managed_bounce_buffer(false) diff --git a/csrc/aio/py_lib/deepspeed_cpu_op.h b/csrc/aio/py_lib/deepspeed_cpu_op.h index 6f39c492988f..debaf4a90731 100644 --- a/csrc/aio/py_lib/deepspeed_cpu_op.h +++ b/csrc/aio/py_lib/deepspeed_cpu_op.h @@ -19,9 +19,9 @@ struct cpu_op_desc_t : io_op_desc_t { const int fd, const char* filename, const int64_t file_num_bytes, - const int64_t file_offset, const int intra_op_parallelism, - const bool validate); + const bool validate, + const int64_t file_offset); void run(const int tid, std::unique_ptr& aio_ctxt, diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp index 003bf5bd46c1..64d7c2e0541e 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp @@ -60,8 +60,8 @@ const int deepspeed_io_handle_t::get_intra_op_parallelism() const { return _intr int deepspeed_io_handle_t::read(torch::Tensor& buffer, const char* filename, - const int64_t file_offset, - const bool validate) + const bool validate, + const int64_t file_offset) { const auto start_time = std::chrono::high_resolution_clock::now(); @@ -102,8 +102,8 @@ int deepspeed_io_handle_t::read(torch::Tensor& buffer, int deepspeed_io_handle_t::write(const torch::Tensor& buffer, const char* filename, - const int64_t file_offset, - const bool validate) + const bool validate, + const int64_t file_offset) { assert(_aio_ctxt); @@ -212,8 +212,8 @@ std::shared_ptr deepspeed_io_handle_t::_create_io_op_desc( const int fd, const char* filename, const int64_t file_num_bytes, - const int64_t file_offset, - const bool validate) + const bool validate, + const int64_t file_offset) { return std::make_shared(read_op, buffer, @@ -221,16 +221,16 @@ std::shared_ptr deepspeed_io_handle_t::_create_io_op_desc( fd, filename, file_num_bytes, - file_offset, _intra_op_parallelism, - validate); + validate, + file_offset); } int deepspeed_io_handle_t::pread(const torch::Tensor& buffer, const char* filename, const bool validate, - const int64_t file_offset, - const bool async) + const bool async, + const int64_t file_offset) { int64_t num_file_bytes; if (-1 == get_file_size(filename, num_file_bytes)) { @@ -249,7 +249,7 @@ int deepspeed_io_handle_t::pread(const torch::Tensor& buffer, if (fd == -1) { return -1; } auto scheduled_op = - _create_io_op_desc(true, buffer, fd, filename, num_file_bytes, file_offset, validate); + _create_io_op_desc(true, buffer, fd, filename, num_file_bytes, validate, file_offset); _schedule_aio_work(scheduled_op); @@ -261,8 +261,8 @@ int deepspeed_io_handle_t::pread(const torch::Tensor& buffer, int deepspeed_io_handle_t::pwrite(const torch::Tensor& buffer, const char* filename, const bool validate, - const int64_t file_offset, - const bool async) + const bool async, + const int64_t file_offset) { const auto num_write_bytes = static_cast(buffer.nbytes()); assert((num_write_bytes % _intra_op_parallelism) == 0); @@ -273,7 +273,7 @@ int deepspeed_io_handle_t::pwrite(const torch::Tensor& buffer, if (fd == -1) { return -1; } auto scheduled_op = - _create_io_op_desc(false, buffer, fd, filename, num_write_bytes, file_offset, validate); + _create_io_op_desc(false, buffer, fd, filename, num_write_bytes, validate, file_offset); _schedule_aio_work(scheduled_op); @@ -286,28 +286,28 @@ int deepspeed_io_handle_t::sync_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset) { - return pread(buffer, filename, false, file_offset, false); + return pread(buffer, filename, false, false, file_offset); } int deepspeed_io_handle_t::sync_pwrite(const torch::Tensor& buffer, const char* filename, const int64_t file_offset) { - return pwrite(buffer, filename, false, file_offset, false); + return pwrite(buffer, filename, false, false, file_offset); } int deepspeed_io_handle_t::async_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset) { - return pread(buffer, filename, false, file_offset, true); + return pread(buffer, filename, false, true, file_offset); } int deepspeed_io_handle_t::async_pwrite(const torch::Tensor& buffer, const char* filename, const int64_t file_offset) { - return pwrite(buffer, filename, false, file_offset, true); + return pwrite(buffer, filename, false, true, file_offset); } at::Tensor deepspeed_io_handle_t::new_cpu_locked_tensor(const int64_t num_elem, diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.h b/csrc/aio/py_lib/deepspeed_py_io_handle.h index 16671b7e7c05..dfcb4125ab9a 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.h +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.h @@ -40,25 +40,25 @@ struct deepspeed_io_handle_t { int read(torch::Tensor& buffer, const char* filename, - const int64_t file_offset, - const bool validate); + const bool validate, + const int64_t file_offset); int write(const torch::Tensor& buffer, const char* filename, - const int64_t file_offset, - const bool validate); + const bool validate, + const int64_t file_offset); int pread(const torch::Tensor& buffer, const char* filename, const bool validate, - const int64_t file_offset, - const bool async); + const bool async, + const int64_t file_offset); int pwrite(const torch::Tensor& buffer, const char* filename, const bool validate, - const int64_t file_offset, - const bool async); + const bool async, + const int64_t file_offset); int sync_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset); @@ -89,6 +89,6 @@ struct deepspeed_io_handle_t { const int fd, const char* filename, const int64_t file_num_bytes, - const int64_t file_offset, - const bool validate); + const bool validate, + const int64_t file_offset); }; diff --git a/csrc/gds/py_lib/deepspeed_gds_op.cpp b/csrc/gds/py_lib/deepspeed_gds_op.cpp index 9bb1888dbded..b7055c8cc72b 100644 --- a/csrc/gds/py_lib/deepspeed_gds_op.cpp +++ b/csrc/gds/py_lib/deepspeed_gds_op.cpp @@ -94,17 +94,17 @@ gds_op_desc_t::gds_op_desc_t(const bool read_op, const int fd, const char* filename, const int64_t file_num_bytes, - const int64_t file_offset, const int intra_op_parallelism, - const bool validate) + const bool validate, + const int64_t file_offset) : io_op_desc_t(read_op, buffer, fd, filename, file_num_bytes, - file_offset, intra_op_parallelism, - validate) + validate, + file_offset) { _contiguous_buffer = _buffer.contiguous(); const int64_t device = _buffer.get_device(); diff --git a/csrc/gds/py_lib/deepspeed_gds_op.h b/csrc/gds/py_lib/deepspeed_gds_op.h index f94b98920de7..d955527b1ba3 100644 --- a/csrc/gds/py_lib/deepspeed_gds_op.h +++ b/csrc/gds/py_lib/deepspeed_gds_op.h @@ -23,9 +23,9 @@ struct gds_op_desc_t : io_op_desc_t { const int fd, const char* filename, const int64_t file_num_bytes, - const int64_t file_offset, const int intra_op_parallelism, - const bool validate); + const bool validate, + const int64_t file_offset); void run(const int tid, std::unique_ptr& aio_ctxt, diff --git a/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp b/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp index 7da4afb85239..f11245c75a5e 100644 --- a/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp +++ b/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp @@ -107,8 +107,8 @@ std::shared_ptr deepspeed_gds_handle_t::_create_io_op_desc( const int fd, const char* filename, const int64_t file_num_bytes, - const int64_t file_offset, - const bool validate) + const bool validate, + const int64_t file_offset) { if (buffer.is_cuda()) { return std::make_shared(read_op, @@ -116,10 +116,10 @@ std::shared_ptr deepspeed_gds_handle_t::_create_io_op_desc( fd, filename, file_num_bytes, - file_offset, _intra_op_parallelism, - validate); + validate, + file_offset); } return deepspeed_io_handle_t::_create_io_op_desc( - read_op, buffer, fd, filename, file_num_bytes, file_offset, validate); + read_op, buffer, fd, filename, file_num_bytes, validate, file_offset); } diff --git a/csrc/gds/py_lib/deepspeed_py_gds_handle.h b/csrc/gds/py_lib/deepspeed_py_gds_handle.h index adf9d0b1c4b6..25f68e177b2c 100644 --- a/csrc/gds/py_lib/deepspeed_py_gds_handle.h +++ b/csrc/gds/py_lib/deepspeed_py_gds_handle.h @@ -42,8 +42,8 @@ struct deepspeed_gds_handle_t : deepspeed_io_handle_t { const int fd, const char* filename, const int64_t file_num_bytes, - const int64_t file_offset, - const bool validate); + const bool validate, + const int64_t file_offset); static int s_cuFile_init; }; From 6d0a89e947291dffc18ae554d32aa2e1c02aef4f Mon Sep 17 00:00:00 2001 From: Joe Mayer Date: Mon, 4 Nov 2024 12:21:29 -0800 Subject: [PATCH 16/17] skipping pin mem when not cuda --- accelerator/cpu_accelerator.py | 2 ++ deepspeed/utils/numa.py | 5 ++++- tests/unit/ops/aio/test_aio.py | 2 ++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/accelerator/cpu_accelerator.py b/accelerator/cpu_accelerator.py index 1e4335b19292..b611dc56eacd 100644 --- a/accelerator/cpu_accelerator.py +++ b/accelerator/cpu_accelerator.py @@ -71,6 +71,8 @@ def device_count(self): # In flat mode, HBM is in separate NUMA node with no cores on this node. # Ignore these NUMA nodes with no cores. numa_core_lists = get_numa_cores() + if not numa_core_lists: + return 1 numa_count = 0 prev_core_list = [] for core_list in numa_core_lists: diff --git a/deepspeed/utils/numa.py b/deepspeed/utils/numa.py index 4fe7cbba90ae..aba3b5179d41 100644 --- a/deepspeed/utils/numa.py +++ b/deepspeed/utils/numa.py @@ -23,7 +23,10 @@ # ] def get_numa_cores(): ret = [] - output = subprocess.check_output(['numactl', '--hardware']).decode("utf-8") + try: + output = subprocess.check_output(['numactl', '--hardware']).decode("utf-8") + except: + return [] lines = output.split('\n') for line in lines: if line.startswith('available:'): diff --git a/tests/unit/ops/aio/test_aio.py b/tests/unit/ops/aio/test_aio.py index 82d41d18b66b..1aa5f647a8aa 100644 --- a/tests/unit/ops/aio/test_aio.py +++ b/tests/unit/ops/aio/test_aio.py @@ -334,6 +334,7 @@ class TestAsyncFileOffset(DistributedTest): def test_offset_write(self, tmpdir, file_partitions, use_cuda_pinned_tensor): + _skip_for_invalid_environment(use_cuda_pinned_tensor=use_cuda_pinned_tensor) ref_file = _get_file_path(tmpdir, '_py_random') aio_file = _get_file_path(tmpdir, '_aio_random') partition_unit_size = BLOCK_SIZE @@ -371,6 +372,7 @@ def test_offset_write(self, tmpdir, file_partitions, use_cuda_pinned_tensor): def test_offset_read(self, tmpdir, file_partitions, use_cuda_pinned_tensor): + _skip_for_invalid_environment(use_cuda_pinned_tensor=use_cuda_pinned_tensor) partition_unit_size = BLOCK_SIZE file_size = sum(file_partitions) * partition_unit_size ref_file, _ = _do_ref_write(tmpdir, 0, file_size) From 25f94d71728ecab980a0457c4e9625216542a370 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 4 Nov 2024 20:41:01 +0000 Subject: [PATCH 17/17] formatting --- accelerator/cpu_accelerator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/accelerator/cpu_accelerator.py b/accelerator/cpu_accelerator.py index b611dc56eacd..0e49bd9f6458 100644 --- a/accelerator/cpu_accelerator.py +++ b/accelerator/cpu_accelerator.py @@ -72,7 +72,7 @@ def device_count(self): # Ignore these NUMA nodes with no cores. numa_core_lists = get_numa_cores() if not numa_core_lists: - return 1 + return 1 numa_count = 0 prev_core_list = [] for core_list in numa_core_lists: