From 2ffbc2c0713ca7cf73d78ddb318ceb000ea5b400 Mon Sep 17 00:00:00 2001 From: Oleg Shaldybin Date: Sat, 6 May 2023 03:20:57 +0000 Subject: [PATCH] pw_rpc_transport: Add support for non-HDLC framing Adds simple framing for the case when HDLC is not needed e.g. for inter-core communication over mailboxes or shared memory where HDLC only adds overhead (due to escaping etc). This 'simple' framing is basically a length-prefix encoding with a rudimentary support for recovery on lost frames. Change-Id: Id209453b547aeb3a7565d0c5215637cddf415c7d Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/141273 Reviewed-by: Carlos Chinchilla Pigweed-Auto-Submit: Carlos Chinchilla Commit-Queue: Auto-Submit --- pw_rpc_transport/BUILD.bazel | 26 ++ pw_rpc_transport/BUILD.gn | 24 ++ .../public/pw_rpc_transport/simple_framing.h | 252 ++++++++++++ pw_rpc_transport/simple_framing.cc | 37 ++ pw_rpc_transport/simple_framing_test.cc | 381 ++++++++++++++++++ 5 files changed, 720 insertions(+) create mode 100644 pw_rpc_transport/public/pw_rpc_transport/simple_framing.h create mode 100644 pw_rpc_transport/simple_framing.cc create mode 100644 pw_rpc_transport/simple_framing_test.cc diff --git a/pw_rpc_transport/BUILD.bazel b/pw_rpc_transport/BUILD.bazel index 3beb24b9e5..b0176c9ed7 100644 --- a/pw_rpc_transport/BUILD.bazel +++ b/pw_rpc_transport/BUILD.bazel @@ -147,6 +147,32 @@ pw_cc_test( ], ) +pw_cc_library( + name = "simple_framing", + srcs = [ + "simple_framing.cc", + ], + hdrs = ["public/pw_rpc_transport/simple_framing.h"], + deps = [ + ":rpc_transport", + "//pw_assert", + "//pw_bytes", + "//pw_log", + "//pw_status", + ], +) + +pw_cc_test( + name = "simple_framing_test", + srcs = ["simple_framing_test.cc"], + deps = [ + ":simple_framing", + "//pw_bytes", + "//pw_log", + "//pw_status", + ], +) + pw_proto_filegroup( name = "test_protos_and_options", srcs = ["internal/test.proto"], diff --git a/pw_rpc_transport/BUILD.gn b/pw_rpc_transport/BUILD.gn index 1f5326a340..2e2bc48cdd 100644 --- a/pw_rpc_transport/BUILD.gn +++ b/pw_rpc_transport/BUILD.gn @@ -34,6 +34,7 @@ pw_test_group("tests") { tests = [ ":hdlc_framing_test", ":packet_buffer_queue_test", + ":simple_framing_test", ] if (pw_thread_THREAD_BACKEND != "") { @@ -149,6 +150,29 @@ pw_test("hdlc_framing_test") { ] } +pw_source_set("simple_framing") { + public = [ "public/pw_rpc_transport/simple_framing.h" ] + public_configs = [ ":public_include_path" ] + sources = [ "simple_framing.cc" ] + public_deps = [ + ":rpc_transport", + "$dir_pw_assert", + "$dir_pw_bytes", + "$dir_pw_status", + ] + deps = [ "$dir_pw_log" ] +} + +pw_test("simple_framing_test") { + sources = [ "simple_framing_test.cc" ] + deps = [ + ":simple_framing", + "$dir_pw_bytes", + "$dir_pw_log", + "$dir_pw_status", + ] +} + pw_proto_library("test_protos") { sources = [ "internal/test.proto" ] inputs = [ "internal/test.options" ] diff --git a/pw_rpc_transport/public/pw_rpc_transport/simple_framing.h b/pw_rpc_transport/public/pw_rpc_transport/simple_framing.h new file mode 100644 index 0000000000..71c4665502 --- /dev/null +++ b/pw_rpc_transport/public/pw_rpc_transport/simple_framing.h @@ -0,0 +1,252 @@ +// Copyright 2023 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. +#pragma once + +#include "pw_assert/assert.h" +#include "pw_bytes/span.h" +#include "pw_rpc_transport/rpc_transport.h" +#include "pw_status/status.h" +#include "pw_status/try.h" +#include "rpc_transport.h" + +namespace pw::rpc { + +// The following encoder and decoder implement a very simple RPC framing +// protocol where the first frame contains the total packet size in the header +// and up to max frame size bytes in the payload. The subsequent frames of the +// same packet have an empty header and the rest of the packet in their payload. +// +// First frame header also contains a special marker as an attempt to +// resynchronize the receiver if some frames were not sent (although we expect +// all transports using this framing type to be reliable, it's still possible +// that some random transport write timeout result in only the first few frames +// being sent and others dropped; in that case we attempt best effort recovery +// by effectively skipping the input until we see something that resembles a +// valid header). +// +// Both encoder and decoder are not thread-safe. The caller must ensure their +// correct use in a multi-threaded environment. + +namespace internal { + +void LogReceivedRpcPacketTooLarge(size_t packet_size, size_t max_packet_size); +void LogMalformedRpcFrameHeader(); + +} // namespace internal + +template +class SimpleRpcPacketEncoder + : public RpcPacketEncoder> { + static_assert(kMaxPacketSize <= 1 << 16); + + public: + static constexpr size_t kHeaderSize = 4; + static constexpr uint16_t kFrameMarker = 0x27f1; + + // Encodes `packet` with a simple framing protocol and split the resulting + // frame into chunks of `RpcFrame`s where every `RpcFrame` is no longer than + // `max_frame_size`. Calls `callback` for for each of the resulting + // `RpcFrame`s. + Status Encode(ConstByteSpan rpc_packet, + size_t max_frame_size, + OnRpcFrameEncodedCallback&& callback) { + if (rpc_packet.size() > kMaxPacketSize) { + return Status::FailedPrecondition(); + } + if (max_frame_size <= kHeaderSize) { + return Status::FailedPrecondition(); + } + + // First frame. This is the only frame that contains a header. + const auto first_frame_size = + std::min(max_frame_size - kHeaderSize, rpc_packet.size()); + + std::array header{ + std::byte{kFrameMarker & 0xff}, + std::byte{(kFrameMarker >> 8) & 0xff}, + static_cast(rpc_packet.size() & 0xff), + static_cast((rpc_packet.size() >> 8) & 0xff), + }; + + RpcFrame frame{.header = span(header), + .payload = rpc_packet.first(first_frame_size)}; + PW_TRY(callback(frame)); + auto remaining = rpc_packet.subspan(first_frame_size); + + // Second and subsequent frames (if any). + while (!remaining.empty()) { + auto fragment_size = std::min(max_frame_size, remaining.size()); + RpcFrame next_frame{.header = {}, + .payload = remaining.first(fragment_size)}; + PW_TRY(callback(next_frame)); + remaining = remaining.subspan(fragment_size); + } + + return OkStatus(); + } +}; + +template +class SimpleRpcPacketDecoder + : public RpcPacketDecoder> { + using Encoder = SimpleRpcPacketEncoder; + + public: + SimpleRpcPacketDecoder() { ExpectHeader(); } + + // Find and decodes `RpcFrame`s in `buffer`. `buffer` may contain zero or + // more frames for zero or more packets. Calls `callback` for each + // well-formed packet. Malformed packets are ignored and dropped. + Status Decode(ConstByteSpan buffer, OnRpcPacketDecodedCallback&& callback) { + while (!buffer.empty()) { + switch (state_) { + case State::kReadingHeader: { + buffer = buffer.subspan(ReadHeader(buffer)); + break; + } + case State::kReadingPayload: { + // Payload can only follow a valid header, reset the flag here so + // that next invalid header logs again. + already_logged_invalid_header_ = false; + buffer = buffer.subspan(ReadPayload(buffer, callback)); + break; + } + } + } + return OkStatus(); + } + + private: + enum class State { + kReadingHeader, + kReadingPayload, + }; + + size_t ReadHeader(ConstByteSpan buffer); + + size_t ReadPayload(ConstByteSpan buffer, + const OnRpcPacketDecodedCallback& callback); + + void ExpectHeader() { + state_ = State::kReadingHeader; + bytes_read_ = 0; + bytes_remaining_ = Encoder::kHeaderSize; + } + + void ExpectPayload(size_t size) { + state_ = State::kReadingPayload; + bytes_read_ = 0; + bytes_remaining_ = size; + } + + std::array packet_{}; + std::array header_{}; + + // Current decoder state. + State state_; + // How many bytes were read in the current state. + size_t bytes_read_ = 0; + // How many bytes remain to read in the current state. + size_t bytes_remaining_ = 0; + // When true, discard the received payload instead of buffering it (because + // it's too big to buffer). + bool discard_payload_ = false; + // When true, skip logging on invalid header if we already logged. This is + // to prevent logging on every payload byte of a malformed frame. + bool already_logged_invalid_header_ = false; +}; + +template +size_t SimpleRpcPacketDecoder::ReadHeader( + ConstByteSpan buffer) { + const auto read_size = std::min(buffer.size(), bytes_remaining_); + bool header_available = false; + PW_DASSERT(read_size <= Encoder::kHeaderSize); + + std::memcpy(header_.data() + bytes_read_, buffer.data(), read_size); + bytes_read_ += read_size; + bytes_remaining_ -= read_size; + header_available = bytes_remaining_ == 0; + + if (header_available) { + uint16_t marker = (static_cast(header_[1]) << 8) | + static_cast(header_[0]); + uint16_t packet_size = (static_cast(header_[3]) << 8) | + static_cast(header_[2]); + + if (marker != Encoder::kFrameMarker) { + // We expected a header but received some data that is definitely not + // a header. Skip it and keep waiting for the next header. This could + // also be a false positive, e.g. in the worst case we treat some + // random data as a header: even then we should eventually be able to + // stumble upon a real header and start processing packets again. + ExpectHeader(); + // Consume only a single byte since we're looking for a header in a + // broken stream and it could start at the next byte. + if (!already_logged_invalid_header_) { + internal::LogMalformedRpcFrameHeader(); + already_logged_invalid_header_ = true; + } + return 1; + } + if (packet_size > kMaxPacketSize) { + // Consume both header and packet without saving it, as it's too big + // for the buffer. This is likely due to max packet size mismatch + // between the encoder and the decoder. + internal::LogReceivedRpcPacketTooLarge(packet_size, kMaxPacketSize); + discard_payload_ = true; + } + ExpectPayload(packet_size); + } + + return read_size; +} + +template +size_t SimpleRpcPacketDecoder::ReadPayload( + ConstByteSpan buffer, const OnRpcPacketDecodedCallback& callback) { + if (buffer.size() >= bytes_remaining_ && bytes_read_ == 0) { + const auto read_size = bytes_remaining_; + // We have the whole packet available in the buffer, no need to copy + // it into an internal buffer. + callback(buffer.first(read_size)); + ExpectHeader(); + return read_size; + } + // Frame has been split between multiple inputs: assembling it in + // an internal buffer. + const auto read_size = std::min(buffer.size(), bytes_remaining_); + + // We could be discarding the payload if it was too big to fit into our + // packet buffer. + if (!discard_payload_) { + PW_DASSERT(bytes_read_ + read_size <= packet_.size()); + std::memcpy(packet_.data() + bytes_read_, buffer.data(), read_size); + } + + bytes_read_ += read_size; + bytes_remaining_ -= read_size; + if (bytes_remaining_ == 0) { + if (discard_payload_) { + discard_payload_ = false; + } else { + auto packet_span = span(packet_); + callback(packet_span.first(bytes_read_)); + } + ExpectHeader(); + } + return read_size; +} + +} // namespace pw::rpc diff --git a/pw_rpc_transport/simple_framing.cc b/pw_rpc_transport/simple_framing.cc new file mode 100644 index 0000000000..8a3655684a --- /dev/null +++ b/pw_rpc_transport/simple_framing.cc @@ -0,0 +1,37 @@ +// Copyright 2023 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#define PW_LOG_MODULE_NAME "PW_RPC" + +#include "pw_rpc_transport/simple_framing.h" + +#include + +#include "pw_log/log.h" + +namespace pw::rpc::internal { + +void LogReceivedRpcPacketTooLarge(size_t packet_size, size_t max_packet_size) { + PW_LOG_WARN( + "Received RPC packet (%d) bytes) is larger than max packet size (%d " + "bytes)", + static_cast(packet_size), + static_cast(max_packet_size)); +} + +void LogMalformedRpcFrameHeader() { + PW_LOG_WARN("Skipping malformed RPC frame header"); +} + +} // namespace pw::rpc::internal diff --git a/pw_rpc_transport/simple_framing_test.cc b/pw_rpc_transport/simple_framing_test.cc new file mode 100644 index 0000000000..2ac75a5b4e --- /dev/null +++ b/pw_rpc_transport/simple_framing_test.cc @@ -0,0 +1,381 @@ +// Copyright 2023 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#include "pw_rpc_transport/simple_framing.h" + +#include +#include +#include + +#include "gtest/gtest.h" +#include "pw_bytes/span.h" +#include "pw_log/log.h" +#include "pw_status/status.h" + +namespace pw::rpc { +namespace { + +constexpr size_t kMaxPacketSize = 256; + +struct TestParams { + size_t packet_size = 0; + size_t max_frame_size = 0; +}; + +constexpr std::array kTestCases = { + // Packet fits in one frame. + TestParams{.packet_size = 5, .max_frame_size = 100}, + // Typical parameters for RPC packet and mailbox frame size. + TestParams{.packet_size = 100, .max_frame_size = 128}, + // Smallest packet. + TestParams{.packet_size = 1, .max_frame_size = 16}, + // Small packet, small frame. + TestParams{.packet_size = 16, .max_frame_size = 5}, + // Odd-sized packet, small frame. + TestParams{.packet_size = 77, .max_frame_size = 16}, + // Frame size and packet size off by one. + TestParams{.packet_size = 11, .max_frame_size = 10}, + // Almost at the limit. + TestParams{.packet_size = kMaxPacketSize - 1, + .max_frame_size = kMaxPacketSize - 2}, + // At the limit. + TestParams{.packet_size = kMaxPacketSize, + .max_frame_size = kMaxPacketSize}}; + +void MakePacket(ByteSpan dst_buffer) { + static uint32_t rg_seed = 0x123; + unsigned char c = 0; + for (auto& i : dst_buffer) { + i = std::byte{c++}; + } + std::mt19937 rg(rg_seed++); + std::shuffle(dst_buffer.begin(), dst_buffer.end(), rg); +} + +void CopyFrame(RpcFrame frame, std::vector& dst) { + std::copy(frame.header.begin(), frame.header.end(), std::back_inserter(dst)); + std::copy( + frame.payload.begin(), frame.payload.end(), std::back_inserter(dst)); +} + +TEST(SimpleRpcFrameEncodeDecodeTest, EncodeThenDecode) { + for (auto test_case : kTestCases) { + const size_t packet_size = test_case.packet_size; + const size_t max_frame_size = test_case.max_frame_size; + PW_LOG_INFO("EncodeThenDecode: packet_size = %d, max_frame_size = %d", + static_cast(packet_size), + static_cast(max_frame_size)); + + std::vector src(packet_size); + MakePacket(src); + + std::vector encoded; + std::vector decoded; + + SimpleRpcPacketEncoder encoder; + + ASSERT_EQ(encoder.Encode(src, + max_frame_size, + [&encoded](RpcFrame& frame) { + CopyFrame(frame, encoded); + return OkStatus(); + }), + OkStatus()); + + SimpleRpcPacketDecoder decoder; + + ASSERT_EQ(decoder.Decode(encoded, + [&decoded](ConstByteSpan packet) { + std::copy(packet.begin(), + packet.end(), + std::back_inserter(decoded)); + }), + OkStatus()); + + EXPECT_TRUE(std::equal(src.begin(), src.end(), decoded.begin())); + } +} + +TEST(SimpleRpcFrameEncodeDecodeTest, OneByteAtTimeDecoding) { + for (auto test_case : kTestCases) { + const size_t packet_size = test_case.packet_size; + const size_t max_frame_size = test_case.max_frame_size; + PW_LOG_INFO("EncodeThenDecode: packet_size = %d, max_frame_size = %d", + static_cast(packet_size), + static_cast(max_frame_size)); + + std::vector src(packet_size); + MakePacket(src); + + std::vector encoded; + std::vector decoded; + + SimpleRpcPacketEncoder encoder; + + ASSERT_EQ(encoder.Encode(src, + max_frame_size, + [&encoded](RpcFrame& frame) { + CopyFrame(frame, encoded); + return OkStatus(); + }), + OkStatus()); + + SimpleRpcPacketDecoder decoder; + + for (std::byte b : encoded) { + auto buffer_span = span(&b, 1); + ASSERT_EQ(decoder.Decode(buffer_span, + [&decoded](ConstByteSpan packet) { + std::copy(packet.begin(), + packet.end(), + std::back_inserter(decoded)); + }), + OkStatus()); + } + + EXPECT_TRUE(std::equal(src.begin(), src.end(), decoded.begin())); + } +} + +TEST(SimpleRpcFrameTest, MissingFirstFrame) { + // Sends two packets, the first packet is missing its first frame. The decoder + // ignores the remaining frames of the first packet but still picks up the + // second packet. + constexpr size_t kPacketSize = 77; + constexpr size_t kMaxFrameSize = 16; + + std::vector src1(kPacketSize); + MakePacket(src1); + + std::vector src2(kPacketSize); + MakePacket(src2); + + std::vector decoded; + + SimpleRpcPacketEncoder encoder; + struct EncodeState { + size_t frame_counter = 0; + std::vector encoded; + } state; + + ASSERT_EQ(encoder.Encode(src1, + kMaxFrameSize, + [&state](RpcFrame& frame) { + state.frame_counter++; + if (state.frame_counter > 1) { + // Skip the first frame. + CopyFrame(frame, state.encoded); + } + return OkStatus(); + }), + OkStatus()); + + ASSERT_EQ(encoder.Encode(src2, + kMaxFrameSize, + [&state](RpcFrame& frame) { + CopyFrame(frame, state.encoded); + return OkStatus(); + }), + OkStatus()); + + SimpleRpcPacketDecoder decoder; + + ASSERT_EQ(decoder.Decode(state.encoded, + [&decoded](ConstByteSpan packet) { + std::copy(packet.begin(), + packet.end(), + std::back_inserter(decoded)); + }), + OkStatus()); + + EXPECT_TRUE(std::equal(src2.begin(), src2.end(), decoded.begin())); +} + +TEST(SimpleRpcFrameTest, MissingInternalFrame) { + // Sends two packets, the first packet is missing its second frame. The + // decoder ignores the remaining frames of the first packet and the second + // packet as well but eventually stumbles upon the frame header in the third + // packet and processes that packet. + constexpr size_t kPacketSize = 77; + constexpr size_t kMaxFrameSize = 16; + + std::vector src1(kPacketSize); + MakePacket(src1); + + std::vector src2(kPacketSize); + MakePacket(src2); + + std::vector src3(kPacketSize); + MakePacket(src3); + + std::vector decoded; + + SimpleRpcPacketEncoder encoder; + struct EncodeState { + size_t frame_counter = 0; + std::vector encoded; + } encode_state; + + ASSERT_EQ(encoder.Encode(src1, + kMaxFrameSize, + [&encode_state](RpcFrame& frame) { + encode_state.frame_counter++; + if (encode_state.frame_counter != 2) { + // Skip the second frame. + CopyFrame(frame, encode_state.encoded); + } + return OkStatus(); + }), + OkStatus()); + + ASSERT_EQ(encoder.Encode(src2, + kMaxFrameSize, + [&encode_state](RpcFrame& frame) { + CopyFrame(frame, encode_state.encoded); + return OkStatus(); + }), + OkStatus()); + + ASSERT_EQ(encoder.Encode(src3, + kMaxFrameSize, + [&encode_state](RpcFrame& frame) { + CopyFrame(frame, encode_state.encoded); + return OkStatus(); + }), + OkStatus()); + + SimpleRpcPacketDecoder decoder; + + // First packet is decoded but it doesn't have correct bytes, as one of its + // frames has never been received. Second packet is not received because its + // header has been consumed by the first packet. By that point the decoder + // knows that something is wrong and tries to recover as soon as it receives + // bytes that look as the valid header. So we eventually receive the third + // packet and it is correct. + struct DecodeState { + std::vector decoded1; + std::vector decoded2; + size_t packet_counter = 0; + } decode_state; + + ASSERT_EQ( + decoder.Decode(encode_state.encoded, + [&decode_state](ConstByteSpan packet) { + decode_state.packet_counter++; + if (decode_state.packet_counter == 1) { + std::copy(packet.begin(), + packet.end(), + std::back_inserter(decode_state.decoded1)); + } + if (decode_state.packet_counter == 2) { + std::copy(packet.begin(), + packet.end(), + std::back_inserter(decode_state.decoded2)); + } + }), + OkStatus()); + + EXPECT_EQ(decode_state.packet_counter, 2ul); + + EXPECT_EQ(decode_state.decoded1.size(), src1.size()); + EXPECT_FALSE( + std::equal(src1.begin(), src1.end(), decode_state.decoded1.begin())); + + EXPECT_TRUE( + std::equal(src3.begin(), src3.end(), decode_state.decoded2.begin())); +} + +TEST(SimpleRpcPacketEncoder, PacketTooBig) { + SimpleRpcPacketEncoder encoder; + constexpr size_t kMaxFrameSize = 100; + std::array src{}; + + EXPECT_EQ( + encoder.Encode(src, kMaxFrameSize, [](RpcFrame&) { return OkStatus(); }), + Status::FailedPrecondition()); +} + +TEST(SimpleRpcPacketEncoder, MaxFrameSizeTooSmall) { + SimpleRpcPacketEncoder encoder; + std::array src{}; + + EXPECT_EQ(encoder.Encode( + src, encoder.kHeaderSize, [](RpcFrame&) { return OkStatus(); }), + Status::FailedPrecondition()); + + EXPECT_EQ( + encoder.Encode( + src, encoder.kHeaderSize + 1, [](RpcFrame&) { return OkStatus(); }), + OkStatus()); +} + +TEST(SimpleRpcFrameTest, EncoderBufferLargerThanDecoderBuffer) { + constexpr size_t kLargePacketSize = 150; + constexpr size_t kSmallPacketSize = 120; + constexpr size_t kMaxFrameSize = 16; + + // Decoder isn't able to receive the whole packet because it needs to be + // buffered but the internal buffer is too small; the packet is thus + // discarded. The second packet is received without issues as it's small + // enough to fit in the decoder buffer. + constexpr size_t kEncoderMaxPacketSize = 256; + constexpr size_t kDecoderMaxPacketSize = 128; + + std::vector src1(kLargePacketSize); + MakePacket(src1); + + std::vector src2(kSmallPacketSize); + MakePacket(src1); + + std::vector encoded; + std::vector decoded; + + SimpleRpcPacketEncoder encoder; + + ASSERT_EQ(encoder.Encode(src1, + kMaxFrameSize, + [&encoded](RpcFrame& frame) { + CopyFrame(frame, encoded); + return OkStatus(); + }), + OkStatus()); + + ASSERT_EQ(encoder.Encode(src2, + kMaxFrameSize, + [&encoded](RpcFrame& frame) { + CopyFrame(frame, encoded); + return OkStatus(); + }), + OkStatus()); + + SimpleRpcPacketDecoder decoder; + + // We have to decode piecemeal here because otherwise the decoder can just + // pluck the packet from `encoded` without internally buffering it. + for (std::byte b : encoded) { + auto buffer_span = span(&b, 1); + ASSERT_EQ(decoder.Decode(buffer_span, + [&decoded](ConstByteSpan packet) { + std::copy(packet.begin(), + packet.end(), + std::back_inserter(decoded)); + }), + OkStatus()); + } + + EXPECT_TRUE(std::equal(src2.begin(), src2.end(), decoded.begin())); +} + +} // namespace +} // namespace pw::rpc