Skip to content

Commit

Permalink
[chaotic-good] Revamp wire format (grpc#37765)
Browse files Browse the repository at this point in the history
Update the chaotic-good wire format with some learnings from the past year, and set up things for the next round of changes we'd like to make:

* Instead of a composite FRAGMENT frame, split out CLIENT_INITIAL_METADATA, CLIENT_END_OF_STREAM, MESSAGE, SERVER_INITIAL_METADATA, SERVER_TRAILING_METADATA as separate frame types - this eliminates a ton of complexity in the transport, and corresponds to how we used the wire format in practice anyway.
* Switch the frame payload for metadata, settings to be protobuf instead of HPACK - this eliminates the ordering requirements on interpreting these frames between streams, which I expect to open up some flexibility with head of line avoidance in the future. It's a heck of a lot easier to read and reason about the code. It's also easier to predict the size of the frame at encode time, which lets us treat metadata and payloads more uniformly in the protocol.
* Add a connection id field to our header, in preparation for allowing multiple data connections
* Allow payloads to be shipped on the control channel ('connection id 0') and use this for sending small messages

Closes grpc#37765

COPYBARA_INTEGRATE_REVIEW=grpc#37765 from ctiller:tiefling 7b57f72
PiperOrigin-RevId: 695766541
  • Loading branch information
ctiller authored and copybara-github committed Nov 12, 2024
1 parent 8342a10 commit 630d790
Show file tree
Hide file tree
Showing 44 changed files with 2,010 additions and 1,744 deletions.
552 changes: 461 additions & 91 deletions CMakeLists.txt

Large diffs are not rendered by default.

458 changes: 276 additions & 182 deletions build_autogenerated.yaml

Large diffs are not rendered by default.

66 changes: 33 additions & 33 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
load(
"//bazel:grpc_build_system.bzl",
"grpc_cc_library",
"grpc_cc_proto_library",
"grpc_generate_one_off_internal_targets",
"grpc_internal_proto_library",
"grpc_upb_proto_library",
"grpc_upb_proto_reflection_library",
)
Expand Down Expand Up @@ -696,6 +698,17 @@ grpc_cc_library(
],
)

grpc_cc_library(
name = "promise_variant",
external_deps = [
"absl/types:variant",
],
language = "c++",
public_hdrs = ["lib/promise/detail/promise_variant.h"],
deps = [
],
)

grpc_cc_library(
name = "match_promise",
external_deps = [
Expand All @@ -709,6 +722,7 @@ grpc_cc_library(
"poll",
"promise_factory",
"promise_like",
"promise_variant",
"//:gpr_platform",
],
)
Expand Down Expand Up @@ -830,6 +844,8 @@ grpc_cc_library(
deps = [
"if",
"promise_factory",
"promise_variant",
"//:gpr",
"//:gpr_platform",
],
)
Expand Down Expand Up @@ -7797,6 +7813,17 @@ grpc_cc_library(
],
)

grpc_internal_proto_library(
name = "chaotic_good_frame_proto",
srcs = ["ext/transport/chaotic_good/chaotic_good_frame.proto"],
has_services = False,
)

grpc_cc_proto_library(
name = "chaotic_good_frame_cc_proto",
deps = ["chaotic_good_frame_proto"],
)

grpc_cc_library(
name = "chaotic_good_frame",
srcs = [
Expand All @@ -7815,38 +7842,19 @@ grpc_cc_library(
deps = [
"arena",
"bitset",
"chaotic_good_frame_cc_proto",
"chaotic_good_frame_header",
"context",
"match",
"message",
"metadata",
"metadata_batch",
"no_destruct",
"slice",
"slice_buffer",
"status_helper",
"//:gpr",
"//:gpr_platform",
"//:grpc_base",
"//:hpack_encoder",
"//:hpack_parser",
],
)

grpc_cc_library(
name = "chaotic_good_settings_metadata",
srcs = [
"ext/transport/chaotic_good/settings_metadata.cc",
],
hdrs = [
"ext/transport/chaotic_good/settings_metadata.h",
],
external_deps = [
"absl/status",
"absl/types:optional",
],
deps = [
"arena",
"metadata_batch",
"//:gpr",
],
)

Expand Down Expand Up @@ -8087,6 +8095,7 @@ grpc_cc_library(
external_deps = [
"absl/log:log",
"absl/random",
"absl/strings",
],
language = "c++",
deps = [
Expand All @@ -8099,7 +8108,6 @@ grpc_cc_library(
"try_seq",
"//:gpr_platform",
"//:grpc_trace",
"//:hpack_encoder",
"//:promise",
],
)
Expand Down Expand Up @@ -8150,14 +8158,13 @@ grpc_cc_library(
"resource_quota",
"slice",
"slice_buffer",
"switch",
"try_join",
"try_seq",
"//:exec_ctx",
"//:gpr",
"//:gpr_platform",
"//:grpc_base",
"//:hpack_encoder",
"//:hpack_parser",
"//:ref_counted_ptr",
],
)
Expand Down Expand Up @@ -8217,8 +8224,6 @@ grpc_cc_library(
"//:gpr",
"//:gpr_platform",
"//:grpc_base",
"//:hpack_encoder",
"//:hpack_parser",
"//:ref_counted_ptr",
],
)
Expand Down Expand Up @@ -8714,7 +8719,6 @@ grpc_cc_library(
"chaotic_good_frame_header",
"chaotic_good_legacy_server",
"chaotic_good_server_transport",
"chaotic_good_settings_metadata",
"closure",
"context",
"error",
Expand Down Expand Up @@ -8747,8 +8751,6 @@ grpc_cc_library(
"//:gpr_platform",
"//:grpc_base",
"//:handshaker",
"//:hpack_encoder",
"//:hpack_parser",
"//:iomgr",
"//:orphanable",
"//:ref_counted_ptr",
Expand Down Expand Up @@ -8847,9 +8849,9 @@ grpc_cc_library(
"channel_args_endpoint_config",
"chaotic_good_client_transport",
"chaotic_good_frame",
"chaotic_good_frame_cc_proto",
"chaotic_good_frame_header",
"chaotic_good_legacy_connector",
"chaotic_good_settings_metadata",
"closure",
"context",
"error",
Expand Down Expand Up @@ -8884,8 +8886,6 @@ grpc_cc_library(
"//:grpc_base",
"//:grpc_client_channel",
"//:handshaker",
"//:hpack_encoder",
"//:hpack_parser",
"//:iomgr",
"//:ref_counted_ptr",
],
Expand Down
51 changes: 51 additions & 0 deletions src/core/ext/transport/chaotic_good/chaotic_good_frame.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2024 The gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

package chaotic_good_frame;

message Settings {
// Connection id
// - sent server->client on the control channel to specify the
// data channel connection id
// - sent client->server on the data channel to complete the
// connection
bytes connection_id = 1;
// Flag true if this is a data channel (and not a control channel)
bool data_channel = 2;
// Requested alignment for the data channel
// Client and server each send this with their preferences
uint32 alignment = 3;
}

message UnknownMetadata {
string key = 1;
bytes value = 2;
}

message ClientMetadata {
optional string path = 1;
optional string authority = 2;
optional uint64 timeout_ms = 3;

repeated UnknownMetadata unknown_metadata = 100;
}

message ServerMetadata {
optional uint32 status = 1;
optional bytes message = 2;

repeated UnknownMetadata unknown_metadata = 100;
}
109 changes: 57 additions & 52 deletions src/core/ext/transport/chaotic_good/chaotic_good_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@

#include "absl/log/log.h"
#include "absl/random/random.h"
#include "absl/strings/escaping.h"
#include "src/core/ext/transport/chaotic_good/frame.h"
#include "src/core/ext/transport/chaotic_good/frame_header.h"
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/promise/if.h"
Expand All @@ -38,31 +38,49 @@ namespace chaotic_good {

class ChaoticGoodTransport : public RefCounted<ChaoticGoodTransport> {
public:
struct Options {
uint32_t encode_alignment = 64;
uint32_t decode_alignment = 64;
uint32_t inlined_payload_size_threshold = 8 * 1024;
};

ChaoticGoodTransport(PromiseEndpoint control_endpoint,
PromiseEndpoint data_endpoint, HPackParser hpack_parser,
HPackCompressor hpack_encoder)
PromiseEndpoint data_endpoint, Options options)
: control_endpoint_(std::move(control_endpoint)),
data_endpoint_(std::move(data_endpoint)),
encoder_(std::move(hpack_encoder)),
parser_(std::move(hpack_parser)) {
options_(options) {
// Enable RxMemoryAlignment and RPC receive coalescing after the transport
// setup is complete. At this point all the settings frames should have
// been read.
data_endpoint_.EnforceRxMemoryAlignmentAndCoalescing();
}

auto WriteFrame(const FrameInterface& frame) {
bool saw_encoding_errors = false;
auto buffers = frame.Serialize(&encoder_, saw_encoding_errors);
SliceBuffer control;
SliceBuffer data;
FrameHeader header = frame.MakeHeader();
if (header.payload_length > options_.inlined_payload_size_threshold) {
header.payload_connection_id = 1;
header.Serialize(control.AddTiny(FrameHeader::kFrameHeaderSize));
frame.SerializePayload(data);
const size_t padding = header.Padding(options_.encode_alignment);
if (padding != 0) {
auto slice = MutableSlice::CreateUninitialized(padding);
memset(slice.data(), 0, padding);
data.AppendIndexed(Slice(std::move(slice)));
}
} else {
header.Serialize(control.AddTiny(FrameHeader::kFrameHeaderSize));
frame.SerializePayload(control);
}
// ignore encoding errors: they will be logged separately already
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: WriteFrame to:"
<< ResolvedAddressToString(control_endpoint_.GetPeerAddress())
.value_or("<<unknown peer address>>")
<< " " << frame.ToString();
return TryJoin<absl::StatusOr>(
control_endpoint_.Write(std::move(buffers.control)),
data_endpoint_.Write(std::move(buffers.data)));
return TryJoin<absl::StatusOr>(control_endpoint_.Write(std::move(control)),
data_endpoint_.Write(std::move(data)));
}

// Read frame header and payloads for control and data portions of one frame.
Expand All @@ -81,59 +99,46 @@ class ChaoticGoodTransport : public RefCounted<ChaoticGoodTransport> {
<< " "
<< (frame_header.ok() ? frame_header->ToString()
: frame_header.status().ToString());
// Read header and trailers from control endpoint.
// Read message padding and message from data endpoint.
return If(
frame_header.ok(),
[this, &frame_header] {
const uint32_t message_padding = frame_header->message_padding;
const uint32_t message_length = frame_header->message_length;
return Map(
TryJoin<absl::StatusOr>(
control_endpoint_.Read(frame_header->GetFrameLength()),
data_endpoint_.Read(message_length + message_padding)),
[frame_header = *frame_header, message_padding](
absl::StatusOr<std::tuple<SliceBuffer, SliceBuffer>>
buffers)
-> absl::StatusOr<std::tuple<FrameHeader, BufferPair>> {
if (!buffers.ok()) return buffers.status();
SliceBuffer data_read = std::move(std::get<1>(*buffers));
if (message_padding > 0) {
data_read.RemoveLastNBytesNoInline(message_padding);
}
return std::tuple<FrameHeader, BufferPair>(
frame_header,
BufferPair{std::move(std::get<0>(*buffers)),
std::move(data_read)});
});
},
[&frame_header]() {
return
[status = frame_header.status()]() mutable
-> absl::StatusOr<std::tuple<FrameHeader, BufferPair>> {
return std::move(status);
};
});
return frame_header;
},
[this](FrameHeader frame_header) {
current_frame_header_ = frame_header;
auto con = frame_header.payload_connection_id == 0
? &control_endpoint_
: &data_endpoint_;
return con->Read(frame_header.payload_length +
frame_header.Padding(options_.decode_alignment));
},
[this](SliceBuffer payload)
-> absl::StatusOr<std::tuple<FrameHeader, SliceBuffer>> {
payload.RemoveLastNBytesNoInline(
current_frame_header_.Padding(options_.decode_alignment));
return std::tuple<FrameHeader, SliceBuffer>(current_frame_header_,
std::move(payload));
});
}

absl::Status DeserializeFrame(FrameHeader header, BufferPair buffers,
Arena* arena, FrameInterface& frame,
FrameLimits limits) {
auto s = frame.Deserialize(&parser_, header, bitgen_, arena,
std::move(buffers), limits);
template <typename T>
absl::StatusOr<T> DeserializeFrame(const FrameHeader& header,
SliceBuffer payload) {
T frame;
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Deserialize " << header << " with payload "
<< absl::CEscape(payload.JoinIntoString());
CHECK_EQ(header.payload_length, payload.Length());
auto s = frame.Deserialize(header, std::move(payload));
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: DeserializeFrame "
<< (s.ok() ? frame.ToString() : s.ToString());
return s;
if (s.ok()) return std::move(frame);
return std::move(s);
}

private:
PromiseEndpoint control_endpoint_;
PromiseEndpoint data_endpoint_;
HPackCompressor encoder_;
HPackParser parser_;
absl::BitGen bitgen_;
FrameHeader current_frame_header_;
Options options_;
};

} // namespace chaotic_good
Expand Down
Loading

0 comments on commit 630d790

Please sign in to comment.