Skip to content

Commit

Permalink
Added initial seastarfs file implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
rokinsky committed Nov 17, 2019
1 parent 809b3ef commit b0d1b5a
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 0 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ add_library (seastar STATIC
include/seastar/core/units.hh
include/seastar/core/vector-data-sink.hh
include/seastar/core/weak_ptr.hh
include/seastar/fs/file.hh
include/seastar/http/api_docs.hh
include/seastar/http/common.hh
include/seastar/http/exception.hh
Expand Down Expand Up @@ -515,6 +516,7 @@ add_library (seastar STATIC
include/seastar/util/conversions.hh
include/seastar/util/defer.hh
include/seastar/util/eclipse.hh
include/seastar/util/file_utils.hh
include/seastar/util/function_input_iterator.hh
include/seastar/util/gcc6-concepts.hh
include/seastar/util/indirect.hh
Expand Down Expand Up @@ -562,6 +564,7 @@ add_library (seastar STATIC
src/core/uname.cc
src/core/vla.hh
src/core/io_queue.cc
src/fs/file.cc
src/http/api_docs.cc
src/http/common.cc
src/http/file_handler.cc
Expand Down
59 changes: 59 additions & 0 deletions include/seastar/fs/file.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright (C) 2019 ScyllaDB
*/

#pragma once

#include <seastar/core/file.hh>
#include <seastar/core/future.hh>
#include <seastar/fs/block_device.hh>

namespace seastar {

namespace fs {

class seastarfs_file_impl : public file_impl {
block_device _block_device;
open_flags _open_flags;
public:
seastarfs_file_impl(block_device dev, open_flags flags);
~seastarfs_file_impl() override = default;

future<size_t> write_dma(uint64_t pos, const void* buffer, size_t len, const io_priority_class& pc) override;
future<size_t> write_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& pc) override;
future<size_t> read_dma(uint64_t pos, void* buffer, size_t len, const io_priority_class& pc) override;
future<size_t> read_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& pc) override;
future<> flush() override;
future<struct stat> stat() override;
future<> truncate(uint64_t length) override;
future<> discard(uint64_t offset, uint64_t length) override;
future<> allocate(uint64_t position, uint64_t length) override;
future<uint64_t> size() override;
future<> close() noexcept override;
std::unique_ptr<file_handle_impl> dup() override;
subscription<directory_entry> list_directory(std::function<future<> (directory_entry de)> next) override;
future<temporary_buffer<uint8_t>> dma_read_bulk(uint64_t offset, size_t range_size, const io_priority_class& pc) override;
};

future<file> open_file_dma(sstring name, open_flags flags);

}

}
55 changes: 55 additions & 0 deletions include/seastar/util/file_utils.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright (C) 2019 ScyllaDB
*/

#pragma once

#include <seastar/core/sstring.hh>

namespace seastar {

namespace util {

class temporary_file {
sstring _path;
public:
explicit temporary_file(sstring path) : _path(std::move(path) + ".XXXXXX") {
int fd = mkstemp(_path.data());
throw_system_error_on(fd == -1);
close(fd);
}

~temporary_file() {
unlink(_path.data());
}

temporary_file(const temporary_file &) = delete;
temporary_file &operator=(const temporary_file &) = delete;
temporary_file(temporary_file &&) noexcept = delete;
temporary_file &operator=(temporary_file &&) noexcept = delete;

const sstring& path() const {
return _path;
}
};

}

}
112 changes: 112 additions & 0 deletions src/fs/file.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright (C) 2019 ScyllaDB
*/

#include <seastar/core/future.hh>
#include <seastar/fs/block_device.hh>
#include <seastar/fs/file.hh>

namespace seastar {

namespace fs {

seastarfs_file_impl::seastarfs_file_impl(block_device dev, open_flags flags)
: _block_device(std::move(dev))
, _open_flags(flags) {}

future<size_t>
seastarfs_file_impl::write_dma(uint64_t pos, const void *buffer, size_t len, const io_priority_class &pc) {
return _block_device.write(pos, buffer, len);
}

future<size_t>
seastarfs_file_impl::write_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class &pc) {
throw std::bad_function_call();
}

future<size_t>
seastarfs_file_impl::read_dma(uint64_t pos, void *buffer, size_t len, const io_priority_class &pc) {
return _block_device.read(pos, buffer, len);
}

future<size_t>
seastarfs_file_impl::read_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class &pc) {
throw std::bad_function_call();
}

future<>
seastarfs_file_impl::flush() {
return _block_device.flush();
}

future<struct stat>
seastarfs_file_impl::stat() {
throw std::bad_function_call();
}

future<>
seastarfs_file_impl::truncate(uint64_t) {
throw std::bad_function_call();
}

future<>
seastarfs_file_impl::discard(uint64_t offset, uint64_t length) {
throw std::bad_function_call();
}

future<>
seastarfs_file_impl::allocate(uint64_t position, uint64_t length) {
throw std::bad_function_call();
}

future<uint64_t>
seastarfs_file_impl::size() {
throw std::bad_function_call();
}

future<>
seastarfs_file_impl::close() noexcept {
return _block_device.close();
}

std::unique_ptr<file_handle_impl>
seastarfs_file_impl::dup() {
throw std::bad_function_call();
}

subscription<directory_entry>
seastarfs_file_impl::list_directory(std::function<future<>(directory_entry de)> next) {
throw std::bad_function_call();
}

future<temporary_buffer<uint8_t>>
seastarfs_file_impl::dma_read_bulk(uint64_t offset, size_t range_size, const io_priority_class &pc) {
throw std::bad_function_call();
}

future<file> open_file_dma(sstring name, open_flags flags) {
return open_block_device(name).then([f = std::move(flags)] (block_device bd) {
return file(make_shared<seastarfs_file_impl>(std::move(bd), f));
});
}

}

}
3 changes: 3 additions & 0 deletions tests/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ seastar_add_test (rpc
loopback_socket.hh
rpc_test.cc)

seastar_add_test (seastarfs
SOURCES seastarfs_test.cc)

seastar_add_test (semaphore
SOURCES semaphore_test.cc)

Expand Down
64 changes: 64 additions & 0 deletions tests/unit/seastarfs_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright (C) 2019 ScyllaDB
*/

#include <seastar/core/aligned_buffer.hh>
#include <seastar/core/file-types.hh>
#include <seastar/core/file.hh>
#include <seastar/core/thread.hh>
#include <seastar/core/units.hh>
#include <seastar/fs/file.hh>
#include <seastar/testing/test_case.hh>
#include <seastar/util/file_utils.hh>

using namespace seastar;

SEASTAR_TEST_CASE(parallel_read_write_test) {
constexpr auto max = 16 * MB;
constexpr auto path = "/tmp/seastarfs";

return async([] {
const auto tf = util::temporary_file(path);
auto ft = fs::open_file_dma(tf.path(), open_flags::rw).get0();
static auto alignment = ft.memory_dma_alignment();

parallel_for_each(boost::irange<off_t>(0, max / alignment), [&ft] (auto i) {
auto wbuf = allocate_aligned_buffer<unsigned char>(alignment, alignment);
std::fill(wbuf.get(), wbuf.get() + alignment, i);
auto wb = wbuf.get();

return ft.dma_write(i * alignment, wb, alignment).then(
[&ft, i, wbuf = std::move(wbuf)](size_t ret) mutable {
BOOST_REQUIRE(ret == alignment);
auto rbuf = allocate_aligned_buffer<unsigned char>(alignment, alignment);
auto rb = rbuf.get();
return ft.dma_read(i * alignment, rb, alignment).then(
[ft, rbuf = std::move(rbuf), wbuf = std::move(wbuf)] (auto ret) {
BOOST_REQUIRE(ret == alignment);
BOOST_REQUIRE(std::equal(rbuf.get(), rbuf.get() + alignment, wbuf.get()));
});
});
}).wait();

ft.flush().wait();
ft.close().wait();
seastar_logger.info("done");
});
}

0 comments on commit b0d1b5a

Please sign in to comment.