-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
Copy pathdeepspeed_py_aio.cpp
123 lines (98 loc) · 4.13 KB
/
deepspeed_py_aio.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Copyright (c) Microsoft Corporation.
// SPDX-License-Identifier: Apache-2.0
// DeepSpeed Team
/*
Functionality for swapping optimizer tensors to/from (NVMe) storage devices.
*/
#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <cassert>
#include <chrono>
#include <cstring>
#include <fstream>
#include <iostream>
#include <memory>
#include <string>
#include <vector>
#include "deepspeed_py_aio.h"
using namespace std;
using namespace std::chrono;
#define DEBUG_DS_AIO_READ 0
#define DEBUG_DS_AIO_WRITE 0
static const std::string c_library_name = "deepspeed_aio";
int deepspeed_py_aio_write(const torch::Tensor& buffer,
const char* filename,
const int block_size,
const int queue_depth,
const bool single_submit,
const bool overlap_events,
const bool validate)
{
const auto start_time = std::chrono::high_resolution_clock::now();
deepspeed_aio_config_t config(block_size, queue_depth, single_submit, overlap_events, false);
const auto fd = open_file(filename, false);
if (fd == -1) { return -1; }
auto write_buffer = (char*)buffer.data_ptr();
const auto num_write_bytes = static_cast<int64_t>(buffer.nbytes());
std::unique_ptr<io_xfer_ctxt> xfer_ctxt(
new io_xfer_ctxt(fd, 0, 0, num_write_bytes, write_buffer));
std::unique_ptr<aio_context> aio_ctxt(new aio_context(config._block_size, config._queue_depth));
if (config._overlap_events) {
do_aio_operation_overlap(false, aio_ctxt, xfer_ctxt, &config, nullptr);
} else {
do_aio_operation_sequential(false, aio_ctxt, xfer_ctxt, &config, nullptr);
}
const std::chrono::duration<double> aio_time =
std::chrono::high_resolution_clock::now() - start_time;
close(fd);
if (validate) { validate_aio_operation(false, filename, write_buffer, num_write_bytes); }
const std::chrono::duration<double> fn_time =
std::chrono::high_resolution_clock::now() - start_time;
std::cout << "Elapsed time(usec): " << "aio = " << aio_time.count() * 1e6
<< " call = " << fn_time.count() * 1e6 << std::endl;
return 0;
}
int deepspeed_py_aio_read(torch::Tensor& buffer,
const char* filename,
const int block_size,
const int queue_depth,
const bool single_submit,
const bool overlap_events,
const bool validate)
{
const auto start_time = std::chrono::high_resolution_clock::now();
int64_t num_file_bytes;
if (-1 == get_file_size(filename, num_file_bytes)) {
const auto error_code = errno;
report_file_error(filename, " fstat for read", error_code);
return -1;
}
deepspeed_aio_config_t config(block_size, queue_depth, single_submit, overlap_events, false);
const auto fd = open_file(filename, true);
if (fd == -1) { return -1; }
auto read_buffer = (char*)buffer.data_ptr();
assert(static_cast<int64_t>(buffer.nbytes()) == num_file_bytes);
std::unique_ptr<io_xfer_ctxt> xfer_ctxt(
new io_xfer_ctxt(fd, 0, 0, num_file_bytes, read_buffer));
std::unique_ptr<aio_context> aio_ctxt(new aio_context(config._block_size, config._queue_depth));
if (config._overlap_events) {
do_aio_operation_overlap(true, aio_ctxt, xfer_ctxt, &config, nullptr);
} else {
do_aio_operation_sequential(true, aio_ctxt, xfer_ctxt, &config, nullptr);
}
const std::chrono::duration<double> aio_time =
std::chrono::high_resolution_clock::now() - start_time;
close(fd);
if (validate) { validate_aio_operation(true, filename, read_buffer, num_file_bytes); }
const std::chrono::duration<double> fn_time =
std::chrono::high_resolution_clock::now() - start_time;
std::cout << "Elapsed time(usec): " << "aio = " << aio_time.count() * 1e6
<< " call = " << fn_time.count() * 1e6 << std::endl;
return 0;
}