diff --git a/CMakeLists.txt b/CMakeLists.txt index 00c5f607733..b3bde509767 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -457,6 +457,7 @@ add_library (seastar STATIC include/seastar/core/units.hh include/seastar/core/vector-data-sink.hh include/seastar/core/weak_ptr.hh + include/seastar/fs/file.hh include/seastar/fs/block_device.hh include/seastar/fs/temporary_file.hh include/seastar/http/api_docs.hh @@ -564,6 +565,7 @@ add_library (seastar STATIC src/core/uname.cc src/core/vla.hh src/core/io_queue.cc + src/fs/file.cc src/http/api_docs.cc src/http/common.cc src/http/file_handler.cc diff --git a/include/seastar/fs/file.hh b/include/seastar/fs/file.hh new file mode 100644 index 00000000000..5c16ba4b66d --- /dev/null +++ b/include/seastar/fs/file.hh @@ -0,0 +1,59 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2019 ScyllaDB + */ + +#pragma once + +#include +#include +#include + +namespace seastar { + +namespace fs { + +class seastarfs_file_impl : public file_impl { + block_device _block_device; + open_flags _open_flags; +public: + seastarfs_file_impl(block_device dev, open_flags flags); + ~seastarfs_file_impl() override = default; + + future write_dma(uint64_t pos, const void* buffer, size_t len, const io_priority_class& pc) override; + future write_dma(uint64_t pos, std::vector iov, const io_priority_class& pc) override; + future read_dma(uint64_t pos, void* buffer, size_t len, const io_priority_class& pc) override; + future read_dma(uint64_t pos, std::vector iov, const io_priority_class& pc) override; + future<> flush() override; + future stat() override; + future<> truncate(uint64_t length) override; + future<> discard(uint64_t offset, uint64_t length) override; + future<> allocate(uint64_t position, uint64_t length) override; + future size() override; + future<> close() noexcept override; + std::unique_ptr dup() override; + subscription list_directory(std::function (directory_entry de)> next) override; + future> dma_read_bulk(uint64_t offset, size_t range_size, const io_priority_class& pc) override; +}; + +future open_file_dma(sstring name, open_flags flags); + +} + +} diff --git a/src/fs/file.cc b/src/fs/file.cc new file mode 100644 index 00000000000..5202b0d6b30 --- /dev/null +++ b/src/fs/file.cc @@ -0,0 +1,112 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2019 ScyllaDB + */ + +#include +#include +#include + +namespace seastar { + +namespace fs { + +seastarfs_file_impl::seastarfs_file_impl(block_device dev, open_flags flags) + : _block_device(std::move(dev)) + , _open_flags(flags) {} + +future +seastarfs_file_impl::write_dma(uint64_t pos, const void* buffer, size_t len, const io_priority_class& pc) { + return _block_device.write(pos, buffer, len, pc); +} + +future +seastarfs_file_impl::write_dma(uint64_t pos, std::vector iov, const io_priority_class& pc) { + throw std::bad_function_call(); +} + +future +seastarfs_file_impl::read_dma(uint64_t pos, void* buffer, size_t len, const io_priority_class& pc) { + return _block_device.read(pos, buffer, len, pc); +} + +future +seastarfs_file_impl::read_dma(uint64_t pos, std::vector iov, const io_priority_class& pc) { + throw std::bad_function_call(); +} + +future<> +seastarfs_file_impl::flush() { + return _block_device.flush(); +} + +future +seastarfs_file_impl::stat() { + throw std::bad_function_call(); +} + +future<> +seastarfs_file_impl::truncate(uint64_t) { + throw std::bad_function_call(); +} + +future<> +seastarfs_file_impl::discard(uint64_t offset, uint64_t length) { + throw std::bad_function_call(); +} + +future<> +seastarfs_file_impl::allocate(uint64_t position, uint64_t length) { + throw std::bad_function_call(); +} + +future +seastarfs_file_impl::size() { + throw std::bad_function_call(); +} + +future<> +seastarfs_file_impl::close() noexcept { + return _block_device.close(); +} + +std::unique_ptr +seastarfs_file_impl::dup() { + throw std::bad_function_call(); +} + +subscription +seastarfs_file_impl::list_directory(std::function (directory_entry de)> next) { + throw std::bad_function_call(); +} + +future> +seastarfs_file_impl::dma_read_bulk(uint64_t offset, size_t range_size, const io_priority_class& pc) { + throw std::bad_function_call(); +} + +future open_file_dma(sstring name, open_flags flags) { + return open_block_device(name).then([flags] (block_device bd) { + return file(make_shared(std::move(bd), flags)); + }); +} + +} + +} diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index aa15bb36583..3a9f2c29516 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -349,6 +349,9 @@ seastar_add_test (rpc loopback_socket.hh rpc_test.cc) +seastar_add_test (seastarfs + SOURCES seastarfs_test.cc) + seastar_add_test (semaphore SOURCES semaphore_test.cc) diff --git a/tests/unit/seastarfs_test.cc b/tests/unit/seastarfs_test.cc new file mode 100644 index 00000000000..ccd82ee49f8 --- /dev/null +++ b/tests/unit/seastarfs_test.cc @@ -0,0 +1,63 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2019 ScyllaDB + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace seastar; + +SEASTAR_TEST_CASE(parallel_read_write_test) { + constexpr auto size = 16 * MB; + constexpr auto path = "/tmp/seastarfs"; + + return async([] { + const auto tf = fs::temporary_file(path); + auto f = fs::open_file_dma(tf.path(), open_flags::rw).get0(); + static auto alignment = f.memory_dma_alignment(); + + parallel_for_each(boost::irange(0, size / alignment), [&f] (auto i) { + auto wbuf = allocate_aligned_buffer(alignment, alignment); + std::fill(wbuf.get(), wbuf.get() + alignment, i); + auto wb = wbuf.get(); + + return f.dma_write(i * alignment, wb, alignment).then( + [&f, i, wbuf = std::move(wbuf)] (auto ret) mutable { + BOOST_REQUIRE(ret == alignment); + auto rbuf = allocate_aligned_buffer(alignment, alignment); + auto rb = rbuf.get(); + return f.dma_read(i * alignment, rb, alignment).then( + [f, rbuf = std::move(rbuf), wbuf = std::move(wbuf)] (auto ret) { + BOOST_REQUIRE(ret == alignment); + BOOST_REQUIRE(std::equal(rbuf.get(), rbuf.get() + alignment, wbuf.get())); + }); + }); + }).wait(); + + f.flush().wait(); + f.close().wait(); + }); +}