From 9d2a960b3fc6aa6ac40348fc8682fff03a57b0f6 Mon Sep 17 00:00:00 2001 From: Feng Li Date: Wed, 1 Mar 2017 10:39:37 -0800 Subject: [PATCH] Implements the new GRPC-Web spec. --- net/grpc/gateway/backend/grpc_backend.cc | 68 +++++++---- net/grpc/gateway/backend/grpc_backend.h | 3 +- net/grpc/gateway/codec/base64.cc | 2 +- net/grpc/gateway/codec/grpc_decoder.cc | 11 +- net/grpc/gateway/codec/grpc_web_decoder.cc | 85 ++++++++++++++ net/grpc/gateway/codec/grpc_web_decoder.h | 51 +++++++++ net/grpc/gateway/codec/grpc_web_encoder.cc | 107 ++++++++++++++++++ net/grpc/gateway/codec/grpc_web_encoder.h | 26 +++++ net/grpc/gateway/codec/json_decoder.cc | 2 +- net/grpc/gateway/codec/proto_decoder.cc | 2 +- net/grpc/gateway/codec/stream_body_decoder.cc | 2 +- net/grpc/gateway/frontend/nginx_bridge.h | 2 +- .../gateway/frontend/nginx_http_frontend.cc | 60 ++-------- net/grpc/gateway/nginx_includes.h | 16 +++ net/grpc/gateway/nginx_utils.cc | 43 +++++++ net/grpc/gateway/nginx_utils.h | 19 ++++ net/grpc/gateway/runtime/constants.h | 21 ++++ .../gateway/runtime/nginx_notify_queue.cc | 3 - net/grpc/gateway/runtime/nginx_notify_queue.h | 4 +- net/grpc/gateway/runtime/runtime.cc | 17 +++ net/grpc/gateway/runtime/runtime.h | 5 +- 21 files changed, 452 insertions(+), 97 deletions(-) create mode 100644 net/grpc/gateway/codec/grpc_web_decoder.cc create mode 100644 net/grpc/gateway/codec/grpc_web_decoder.h create mode 100644 net/grpc/gateway/codec/grpc_web_encoder.cc create mode 100644 net/grpc/gateway/codec/grpc_web_encoder.h create mode 100644 net/grpc/gateway/nginx_includes.h create mode 100644 net/grpc/gateway/nginx_utils.cc create mode 100644 net/grpc/gateway/nginx_utils.h diff --git a/net/grpc/gateway/backend/grpc_backend.cc b/net/grpc/gateway/backend/grpc_backend.cc index baa5ad0..1b54020 100644 --- a/net/grpc/gateway/backend/grpc_backend.cc +++ b/net/grpc/gateway/backend/grpc_backend.cc @@ -10,7 +10,6 @@ #include "net/grpc/gateway/log.h" #include "net/grpc/gateway/runtime/runtime.h" #include "net/grpc/gateway/runtime/types.h" -#include "third_party/grpc/include/grpc++/support/string_ref.h" #include "third_party/grpc/include/grpc/byte_buffer.h" #include "third_party/grpc/include/grpc/byte_buffer_reader.h" #include "third_party/grpc/include/grpc/grpc.h" @@ -40,8 +39,7 @@ GrpcBackend::GrpcBackend() request_buffer_(nullptr), response_buffer_(nullptr), status_code_(grpc_status_code::GRPC_STATUS_OK), - status_details_(nullptr), - status_details_capacity_(0), + status_details_(grpc_empty_slice()), is_cancelled_(false) { BACKEND_DEBUG("Creating GRPC backend proxy."); grpc_metadata_array_init(&response_initial_metadata_); @@ -58,9 +56,7 @@ GrpcBackend::~GrpcBackend() { if (response_buffer_ != nullptr) { grpc_byte_buffer_destroy(response_buffer_); } - if (status_details_ != nullptr) { - gpr_free(status_details_); - } + grpc_slice_unref(status_details_); if (call_ != nullptr) { BACKEND_DEBUG("Destroying GRPC call."); grpc_call_destroy(call_); @@ -77,9 +73,14 @@ grpc_channel* GrpcBackend::CreateChannel() { grpc_call* GrpcBackend::CreateCall() { BACKEND_DEBUG("Creating GRPC call."); - return grpc_channel_create_call( - channel_, nullptr, 0, Runtime::Get().grpc_event_queue(), method_.c_str(), - host_.c_str(), gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); + grpc_slice method_slice = grpc_slice_from_copied_string(method_.c_str()); + grpc_slice host_slice = grpc_slice_from_static_string(host_.c_str()); + grpc_call* call = grpc_channel_create_call( + channel_, nullptr, 0, Runtime::Get().grpc_event_queue(), method_slice, + host_.empty() ? nullptr : &host_slice, gpr_inf_future(GPR_CLOCK_REALTIME), + nullptr); + grpc_slice_unref(method_slice); + return call; } void GrpcBackend::Start() { @@ -88,7 +89,8 @@ void GrpcBackend::Start() { // Receives GRPC response initial metadata. grpc_op ops[1]; ops[0].op = GRPC_OP_RECV_INITIAL_METADATA; - ops[0].data.recv_initial_metadata = &response_initial_metadata_; + ops[0].data.recv_initial_metadata.recv_initial_metadata = + &response_initial_metadata_; ops[0].flags = 0; ops[0].reserved = nullptr; grpc_call_error error = grpc_call_start_batch( @@ -111,9 +113,13 @@ void GrpcBackend::OnResponseInitialMetadata(bool result) { std::unique_ptr response_headers(new Headers()); for (size_t i = 0; i < response_initial_metadata_.count; i++) { grpc_metadata* metadata = response_initial_metadata_.metadata + i; - response_headers->push_back( - Header(std::string(metadata->key), - string_ref(metadata->value, metadata->value_length))); + response_headers->push_back(Header( + std::string( + reinterpret_cast(GRPC_SLICE_START_PTR(metadata->key)), + GRPC_SLICE_LENGTH(metadata->key)), + string_ref( + reinterpret_cast(GRPC_SLICE_START_PTR(metadata->value)), + GRPC_SLICE_LENGTH(metadata->value)))); } response->set_headers(std::move(response_headers)); frontend()->Send(std::move(response)); @@ -121,7 +127,7 @@ void GrpcBackend::OnResponseInitialMetadata(bool result) { // Receives next GRPC response message. grpc_op ops[1]; ops[0].op = GRPC_OP_RECV_MESSAGE; - ops[0].data.recv_message = &response_buffer_; + ops[0].data.recv_message.recv_message = &response_buffer_; ops[0].flags = 0; ops[0].reserved = nullptr; grpc_call_error error = grpc_call_start_batch( @@ -144,8 +150,6 @@ void GrpcBackend::OnResponseMessage(bool result) { ops[0].op = GRPC_OP_RECV_STATUS_ON_CLIENT; ops[0].data.recv_status_on_client.status = &status_code_; ops[0].data.recv_status_on_client.status_details = &status_details_; - ops[0].data.recv_status_on_client.status_details_capacity = - &status_details_capacity_; ops[0].data.recv_status_on_client.trailing_metadata = &response_trailing_metadata_; ops[0].flags = 0; @@ -176,7 +180,7 @@ void GrpcBackend::OnResponseMessage(bool result) { // Receives next GRPC response message. grpc_op ops[1]; ops[0].op = GRPC_OP_RECV_MESSAGE; - ops[0].data.recv_message = &response_buffer_; + ops[0].data.recv_message.recv_message = &response_buffer_; ops[0].flags = 0; ops[0].reserved = nullptr; grpc_call_error error = grpc_call_start_batch( @@ -194,15 +198,25 @@ void GrpcBackend::OnResponseStatus(bool result) { } std::unique_ptr response(new Response()); + grpc::string status_details; + if (!GRPC_SLICE_IS_EMPTY(status_details_)) { + status_details = grpc::string( + reinterpret_cast(GRPC_SLICE_START_PTR(status_details_)), + GRPC_SLICE_LENGTH(status_details_)); + } response->set_status(std::unique_ptr(new grpc::Status( - static_cast(status_code_), status_details_))); + static_cast(status_code_), status_details))); std::unique_ptr response_trailers(new Trailers()); for (size_t i = 0; i < response_trailing_metadata_.count; i++) { grpc_metadata* metadata = response_trailing_metadata_.metadata + i; - response_trailers->push_back( - Trailer(std::string(metadata->key), - string_ref(metadata->value, metadata->value_length))); + response_trailers->push_back(Trailer( + std::string( + reinterpret_cast(GRPC_SLICE_START_PTR(metadata->key)), + GRPC_SLICE_LENGTH(metadata->key)), + string_ref( + reinterpret_cast(GRPC_SLICE_START_PTR(metadata->value)), + GRPC_SLICE_LENGTH(metadata->value)))); } response->set_trailers(std::move(response_trailers)); frontend()->Send(std::move(response)); @@ -216,10 +230,14 @@ void GrpcBackend::Send(std::unique_ptr request, Tag* on_done) { for (Header& header : *request->headers()) { std::transform(header.first.begin(), header.first.end(), header.first.begin(), ::tolower); + if (header.first == kGrpcAcceptEncoding) { + continue; + } grpc_metadata initial_metadata; - initial_metadata.key = header.first.c_str(); - initial_metadata.value = header.second.data(); - initial_metadata.value_length = header.second.size(); + initial_metadata.key = grpc_slice_intern( + grpc_slice_from_copied_string(header.first.c_str())); + initial_metadata.value = grpc_slice_from_copied_buffer( + header.second.data(), header.second.size()); initial_metadata.flags = 0; request_initial_metadata_.push_back(initial_metadata); } @@ -248,7 +266,7 @@ void GrpcBackend::Send(std::unique_ptr request, Tag* on_done) { for (auto& slice : slices) { grpc_slice_unref(slice); } - op->data.send_message = request_buffer_; + op->data.send_message.send_message = request_buffer_; op->flags = 0; op->reserved = nullptr; op++; diff --git a/net/grpc/gateway/backend/grpc_backend.h b/net/grpc/gateway/backend/grpc_backend.h index 00b0818..e00e05d 100644 --- a/net/grpc/gateway/backend/grpc_backend.h +++ b/net/grpc/gateway/backend/grpc_backend.h @@ -68,8 +68,7 @@ class GrpcBackend : public Backend { // The GRPC response buffer. grpc_byte_buffer* response_buffer_; grpc_status_code status_code_; - char* status_details_; - size_t status_details_capacity_; + grpc_slice status_details_; grpc_metadata_array response_trailing_metadata_; // True if the GRPC call has been cancelled by client. bool is_cancelled_; diff --git a/net/grpc/gateway/codec/base64.cc b/net/grpc/gateway/codec/base64.cc index de12d7b..ee38f52 100644 --- a/net/grpc/gateway/codec/base64.cc +++ b/net/grpc/gateway/codec/base64.cc @@ -216,7 +216,7 @@ bool Base64::Decode(const std::vector& input, } if (output->empty()) { - output->push_back(Slice(gpr_empty_slice(), Slice::STEAL_REF)); + output->push_back(Slice(grpc_empty_slice(), Slice::STEAL_REF)); } return true; } diff --git a/net/grpc/gateway/codec/grpc_decoder.cc b/net/grpc/gateway/codec/grpc_decoder.cc index 6e649ae..4ad75db 100644 --- a/net/grpc/gateway/codec/grpc_decoder.cc +++ b/net/grpc/gateway/codec/grpc_decoder.cc @@ -4,6 +4,7 @@ #include "net/grpc/gateway/utils.h" extern "C" { #include "third_party/grpc/src/core/lib/compression/message_compress.h" +#include "third_party/grpc/src/core/lib/iomgr/exec_ctx.h" } namespace grpc { @@ -16,6 +17,7 @@ GrpcDecoder::GrpcDecoder() GrpcDecoder::~GrpcDecoder() {} Status GrpcDecoder::Decode() { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; for (const Slice& slice : *inputs()) { if (slice.size() == 0) { continue; @@ -32,6 +34,7 @@ Status GrpcDecoder::Decode() { Status status(StatusCode::INVALID_ARGUMENT, Format("Receives invalid compressed flag: %c.", c)); DEBUG("%s", status.error_message().c_str()); + grpc_exec_ctx_finish(&exec_ctx); return status; } compressed_flag_ = c; @@ -56,7 +59,7 @@ Status GrpcDecoder::Decode() { case kExpectingMessageLengthByte3: { message_length_ += c; if (message_length_ == 0) { - buffer_.reset(new Slice(gpr_empty_slice(), Slice::STEAL_REF)); + buffer_.reset(new Slice(grpc_empty_slice(), Slice::STEAL_REF)); results()->push_back( std::unique_ptr(new ByteBuffer(buffer_.get(), 1))); state_ = kExpectingCompressedFlag; @@ -84,10 +87,11 @@ Status GrpcDecoder::Decode() { grpc_slice_buffer output; grpc_slice_buffer_init(&output); if (grpc_msg_decompress( - grpc_compression_algorithm::GRPC_COMPRESS_GZIP, &input, - &output) != 1) { + &exec_ctx, grpc_compression_algorithm::GRPC_COMPRESS_GZIP, + &input, &output) != 1) { grpc_slice_buffer_destroy(&input); grpc_slice_buffer_destroy(&output); + grpc_exec_ctx_finish(&exec_ctx); return Status(StatusCode::INTERNAL, "Failed to uncompress the GRPC data frame."); } @@ -113,6 +117,7 @@ Status GrpcDecoder::Decode() { } } inputs()->clear(); + grpc_exec_ctx_finish(&exec_ctx); return Status::OK; } } // namespace gateway diff --git a/net/grpc/gateway/codec/grpc_web_decoder.cc b/net/grpc/gateway/codec/grpc_web_decoder.cc new file mode 100644 index 0000000..271056d --- /dev/null +++ b/net/grpc/gateway/codec/grpc_web_decoder.cc @@ -0,0 +1,85 @@ +#include "net/grpc/gateway/codec/grpc_web_decoder.h" + +#include "net/grpc/gateway/log.h" +#include "net/grpc/gateway/utils.h" + +namespace grpc { +namespace gateway { + +const uint8_t GrpcWebDecoder::kGrpcWebMessage = 0; + +GrpcWebDecoder::GrpcWebDecoder() + : state_(kExpectingFlags), message_length_(0) {} +GrpcWebDecoder::~GrpcWebDecoder() {} + +Status GrpcWebDecoder::Decode() { + for (const Slice& slice : *inputs()) { + if (slice.size() == 0) { + continue; + } + + for (size_t i = 0; i < slice.size(); i++) { + uint8_t c = *(slice.begin() + i); + switch (state_) { + case kExpectingFlags: { + if (c != kGrpcWebMessage) { + // TODO(fengli): The following code is repeated 12 times. Extract it + // into a function or a macro. + Status status(StatusCode::INVALID_ARGUMENT, + Format("Receives invalid compressed flag: %X.", c)); + DEBUG("%s", status.error_message().c_str()); + return status; + } + state_ = kExpectingMessageLengthByte0; + continue; + } + case kExpectingMessageLengthByte0: { + message_length_ = c << 24; + state_ = kExpectingMessageLengthByte1; + continue; + } + case kExpectingMessageLengthByte1: { + message_length_ += c << 16; + state_ = kExpectingMessageLengthByte2; + continue; + } + case kExpectingMessageLengthByte2: { + message_length_ += c << 8; + state_ = kExpectingMessageLengthByte3; + continue; + } + case kExpectingMessageLengthByte3: { + message_length_ += c; + if (message_length_ == 0) { + buffer_.reset(new Slice(grpc_empty_slice(), Slice::STEAL_REF)); + results()->push_back( + std::unique_ptr(new ByteBuffer(buffer_.get(), 1))); + state_ = kExpectingFlags; + } else { + buffer_.reset(new Slice(grpc_slice_malloc(message_length_), + Slice::STEAL_REF)); + state_ = kExpectingMessageData; + } + continue; + } + case kExpectingMessageData: { + uint8_t* end = const_cast(buffer_->end()); + *(end - message_length_) = c; + message_length_--; + if (message_length_ == 0) { + results()->push_back( + std::unique_ptr(new ByteBuffer(buffer_.get(), 1))); + buffer_.reset(); + state_ = kExpectingFlags; + } + continue; + } + } + } + } + inputs()->clear(); + return Status::OK; +} + +} // namespace gateway +} // namespace grpc diff --git a/net/grpc/gateway/codec/grpc_web_decoder.h b/net/grpc/gateway/codec/grpc_web_decoder.h new file mode 100644 index 0000000..58d3a2e --- /dev/null +++ b/net/grpc/gateway/codec/grpc_web_decoder.h @@ -0,0 +1,51 @@ +#ifndef NET_GRPC_GATEWAY_CODEC_GRPC_WEB_DECODER_H_ +#define NET_GRPC_GATEWAY_CODEC_GRPC_WEB_DECODER_H_ + +#include +#include + +#include "net/grpc/gateway/codec/decoder.h" +#include "third_party/grpc/include/grpc++/support/slice.h" + +namespace grpc { +namespace gateway { + +class GrpcWebDecoder : public Decoder { + public: + static const uint8_t kGrpcWebMessage; + + enum State : uint8_t { + // The initial decode state, expecting the flags (1 byte). + kExpectingFlags, + // Expecting the 1st byte of message length (4 bytes in total). + kExpectingMessageLengthByte0, + // Expecting the 2nd byte of message length (4 bytes in total). + kExpectingMessageLengthByte1, + // Expecting the 3rd byte of message length (4 bytes in total). + kExpectingMessageLengthByte2, + // Expecting the 4th byte of message length (4 bytes in total). + kExpectingMessageLengthByte3, + // Expecting the message data. + kExpectingMessageData + }; + + GrpcWebDecoder(); + virtual ~GrpcWebDecoder(); + + // GrpcWebDecoder is neither copyable nor movable. + GrpcWebDecoder(const GrpcWebDecoder&) = delete; + GrpcWebDecoder& operator=(const GrpcWebDecoder&) = delete; + + Status Decode() override; + + private: + State state_; + // The message length of the current decoding GRPC-Web frame. + uint32_t message_length_; + // The data buffered for the current decoding GRPC-Web frame. + std::unique_ptr buffer_; +}; + +} // namespace gateway +} // namespace grpc +#endif // NET_GRPC_GATEWAY_CODEC_GRPC_WEB_DECODER_H_ diff --git a/net/grpc/gateway/codec/grpc_web_encoder.cc b/net/grpc/gateway/codec/grpc_web_encoder.cc new file mode 100644 index 0000000..446e03b --- /dev/null +++ b/net/grpc/gateway/codec/grpc_web_encoder.cc @@ -0,0 +1,107 @@ +#include "net/grpc/gateway/codec/grpc_web_encoder.h" + +#include +#include +#include + +#include "net/grpc/gateway/runtime/types.h" +#include "third_party/grpc/include/grpc++/support/byte_buffer.h" +#include "third_party/grpc/include/grpc++/support/slice.h" + +namespace grpc { +namespace gateway { +namespace { + +const char kGrpcStatus[] = "grpc-status: %i\r\n"; +const char kGrpcMessage[] = "grpc-message: %s\r\n"; +const char kGrpcTrailer[] = "%s: %s\r\n"; + +// GRPC Web message frame. +const uint8_t GRPC_WEB_FH_DATA = 0b0u; +// GRPC Web trailer frame. +const uint8_t GRPC_WEB_FH_TRAILER = 0b10000000u; + +// Creates a new GRPC data frame with the given flags and length. +// @param flags supplies the GRPC data frame flags. +// @param length supplies the GRPC data frame length. +// @param output the buffer to store the encoded data, it's size must be 5. +void NewFrame(uint8_t flags, uint64_t length, uint8_t* output) { + output[0] = flags; + output[1] = static_cast(length >> 24); + output[2] = static_cast(length >> 16); + output[3] = static_cast(length >> 8); + output[4] = static_cast(length); +} +} // namespace + +GrpcWebEncoder::GrpcWebEncoder() {} + +GrpcWebEncoder::~GrpcWebEncoder() {} + +void GrpcWebEncoder::Encode(grpc::ByteBuffer* input, + std::vector* result) { + uint8_t header[5]; + NewFrame(GRPC_WEB_FH_DATA, input->Length(), header); + result->push_back( + Slice(gpr_slice_from_copied_buffer(reinterpret_cast(header), 5), + Slice::STEAL_REF)); + std::vector buffer; + // TODO(fengli): Optimize if needed. Today we cannot dump data to the result + // directly since it will clear the target. + input->Dump(&buffer); + for (Slice& s : buffer) { + result->push_back(s); + } +} + +void GrpcWebEncoder::EncodeStatus(const grpc::Status& status, + const Trailers* trailers, + std::vector* result) { + std::vector buffer; + uint64_t length = 0; + + // Encodes GRPC status. + size_t grpc_status_size = + snprintf(nullptr, 0, kGrpcStatus, status.error_code()); + grpc_slice grpc_status = grpc_slice_malloc(grpc_status_size + 1); + snprintf(reinterpret_cast(GPR_SLICE_START_PTR(grpc_status)), + grpc_status_size + 1, kGrpcStatus, status.error_code()); + GPR_SLICE_SET_LENGTH(grpc_status, grpc_status_size); + buffer.push_back(Slice(grpc_status, Slice::STEAL_REF)); + length += grpc_status_size; + + // Encodes GRPC message. + if (!status.error_message().empty()) { + size_t grpc_message_size = + snprintf(nullptr, 0, kGrpcMessage, status.error_message().c_str()); + grpc_slice grpc_message = grpc_slice_malloc(grpc_message_size + 1); + snprintf(reinterpret_cast(GPR_SLICE_START_PTR(grpc_message)), + grpc_message_size + 1, kGrpcMessage, + status.error_message().c_str()); + GPR_SLICE_SET_LENGTH(grpc_message, grpc_message_size); + buffer.push_back(Slice(grpc_message, Slice::STEAL_REF)); + length += grpc_message_size; + } + + // Encodes GRPC trailers. + for (auto& trailer : *trailers) { + size_t grpc_trailer_size = snprintf( + nullptr, 0, kGrpcTrailer, trailer.first.c_str(), trailer.second.data()); + grpc_slice grpc_trailer = grpc_slice_malloc(grpc_trailer_size + 1); + snprintf(reinterpret_cast(GPR_SLICE_START_PTR(grpc_trailer)), + grpc_trailer_size + 1, kGrpcTrailer, trailer.first.c_str(), + trailer.second.data()); + GPR_SLICE_SET_LENGTH(grpc_trailer, grpc_trailer_size); + buffer.push_back(Slice(grpc_trailer, Slice::STEAL_REF)); + length += grpc_trailer_size; + } + + // Encodes GRPC trailer frame. + grpc_slice header = grpc_slice_malloc(5); + NewFrame(GRPC_WEB_FH_TRAILER, length, GPR_SLICE_START_PTR(header)); + result->push_back(Slice(header, Slice::STEAL_REF)); + result->insert(result->end(), buffer.begin(), buffer.end()); +} + +} // namespace gateway +} // namespace grpc diff --git a/net/grpc/gateway/codec/grpc_web_encoder.h b/net/grpc/gateway/codec/grpc_web_encoder.h new file mode 100644 index 0000000..d2ed86b --- /dev/null +++ b/net/grpc/gateway/codec/grpc_web_encoder.h @@ -0,0 +1,26 @@ +#ifndef NET_GRPC_GATEWAY_CODEC_GRPC_WEB_ENCODER_H_ +#define NET_GRPC_GATEWAY_CODEC_GRPC_WEB_ENCODER_H_ + +#include "net/grpc/gateway/codec/encoder.h" + +namespace grpc { +namespace gateway { + +class GrpcWebEncoder : public Encoder { + public: + GrpcWebEncoder(); + virtual ~GrpcWebEncoder(); + + // GrpcWebEncoder is neither copyable nor movable. + GrpcWebEncoder(const GrpcWebEncoder&) = delete; + GrpcWebEncoder& operator=(const GrpcWebEncoder&) = delete; + + void Encode(grpc::ByteBuffer* input, std::vector* result) override; + + void EncodeStatus(const grpc::Status& status, const Trailers* trailers, + std::vector* result) override; +}; + +} // namespace gateway +} // namespace grpc +#endif // NET_GRPC_GATEWAY_CODEC_GRPC_WEB_ENCODER_H_ diff --git a/net/grpc/gateway/codec/json_decoder.cc b/net/grpc/gateway/codec/json_decoder.cc index 219a700..77c7d03 100644 --- a/net/grpc/gateway/codec/json_decoder.cc +++ b/net/grpc/gateway/codec/json_decoder.cc @@ -146,7 +146,7 @@ Status JsonDecoder::Decode() { } if (static_cast(start) == i) { base64_buffer_.push_back( - Slice(gpr_empty_slice(), Slice::STEAL_REF)); + Slice(grpc_empty_slice(), Slice::STEAL_REF)); } else { base64_buffer_.push_back(Slice( grpc_slice_from_copied_buffer( diff --git a/net/grpc/gateway/codec/proto_decoder.cc b/net/grpc/gateway/codec/proto_decoder.cc index 0a3e96e..5ff7107 100644 --- a/net/grpc/gateway/codec/proto_decoder.cc +++ b/net/grpc/gateway/codec/proto_decoder.cc @@ -12,7 +12,7 @@ ProtoDecoder::~ProtoDecoder() {} Status ProtoDecoder::Decode() { if (inputs()->empty()) { - Slice* slice = new Slice(gpr_empty_slice(), Slice::STEAL_REF); + Slice* slice = new Slice(grpc_empty_slice(), Slice::STEAL_REF); ByteBuffer* buffer = new ByteBuffer(slice, 1); results()->push_back(std::unique_ptr(buffer)); } else { diff --git a/net/grpc/gateway/codec/stream_body_decoder.cc b/net/grpc/gateway/codec/stream_body_decoder.cc index 3e0a1ce..4759d25 100644 --- a/net/grpc/gateway/codec/stream_body_decoder.cc +++ b/net/grpc/gateway/codec/stream_body_decoder.cc @@ -47,7 +47,7 @@ Status StreamBodyDecoder::Decode() { if ((c & 0x80) == 0) { varint_bytes_ = 0; if (varint_value_ == 0) { - buffer_.reset(new Slice(gpr_empty_slice(), Slice::STEAL_REF)); + buffer_.reset(new Slice(grpc_empty_slice(), Slice::STEAL_REF)); results()->push_back(std::unique_ptr( new ByteBuffer(buffer_.release(), 1))); state_ = EXPECTING_MESSAGE_KEY_TYPE; diff --git a/net/grpc/gateway/frontend/nginx_bridge.h b/net/grpc/gateway/frontend/nginx_bridge.h index dba2032..7c5d180 100644 --- a/net/grpc/gateway/frontend/nginx_bridge.h +++ b/net/grpc/gateway/frontend/nginx_bridge.h @@ -1,7 +1,7 @@ #ifndef NET_GRPC_GATEWAY_FRONTEND_NGINX_BRIDGE_H_ #define NET_GRPC_GATEWAY_FRONTEND_NGINX_BRIDGE_H_ -#include +#include "net/grpc/gateway/nginx_includes.h" typedef struct ngx_grpc_gateway_loc_conf_s ngx_grpc_gateway_loc_conf_t; struct ngx_grpc_gateway_loc_conf_s { diff --git a/net/grpc/gateway/frontend/nginx_http_frontend.cc b/net/grpc/gateway/frontend/nginx_http_frontend.cc index 6604d0f..462ec3b 100644 --- a/net/grpc/gateway/frontend/nginx_http_frontend.cc +++ b/net/grpc/gateway/frontend/nginx_http_frontend.cc @@ -1,11 +1,10 @@ #include "net/grpc/gateway/frontend/nginx_http_frontend.h" -#include - #include #include "net/grpc/gateway/frontend/nginx_bridge.h" #include "net/grpc/gateway/log.h" +#include "net/grpc/gateway/nginx_utils.h" #include "net/grpc/gateway/runtime/constants.h" #include "net/grpc/gateway/runtime/request.h" #include "net/grpc/gateway/runtime/runtime.h" @@ -110,16 +109,6 @@ void continue_write_response(ngx_http_request_t *r) { namespace grpc { namespace gateway { -namespace { -void AddElementToNginxElementTable(ngx_pool_t *pool, ngx_list_t *table, - const string &name, const string_ref &value); - -void AddHTTPHeader(ngx_http_request_t *http_request, const string &name, - const string_ref &value); - -void AddHTTPTrailer(ngx_http_request_t *http_request, const string &name, - const string_ref &value); -} // namespace NginxHttpFrontend::NginxHttpFrontend(std::unique_ptr backend) : Frontend(std::move(backend)), @@ -274,10 +263,11 @@ void NginxHttpFrontend::SendResponseStatusToClient(Response *response) { is_response_status_sent_ = true; std::vector trancoded_status; - encoder_->EncodeStatus(*response->status(), response->trailers(), - &trancoded_status); - if (trancoded_status.empty()) { + if (protocol_ == Protocol::GRPC) { SendResponseTrailersToClient(response); + } else { + encoder_->EncodeStatus(*response->status(), response->trailers(), + &trancoded_status); } ngx_chain_t *output = ngx_alloc_chain_link(http_request_->pool); if (output == nullptr) { @@ -481,6 +471,9 @@ void NginxHttpFrontend::SendResponseHeadersToClient(Response *response) { AddHTTPHeader(http_request_, kGrpcAcceptEncoding, kGrpcAcceptEncoding_AcceptAll); break; + case GRPC_WEB: + AddHTTPHeader(http_request_, kContentType, kContentTypeGrpcWeb); + break; case JSON_STREAM_BODY: AddHTTPHeader(http_request_, kContentType, kContentTypeJson); break; @@ -526,42 +519,5 @@ void NginxHttpFrontend::SendErrorToClient(const grpc::Status &status) { backend()->Cancel(status); } -namespace { -void AddElementToNginxElementTable(ngx_pool_t *pool, ngx_list_t *table, - const string &name, - const string_ref &value) { - ngx_table_elt_t *ngx_key_value = - reinterpret_cast(ngx_list_push(table)); - if (ngx_key_value == nullptr) { - ERROR("Failed to allocate response initial metadata for nginx."); - } - ngx_key_value->key.len = name.size(); - ngx_key_value->key.data = - reinterpret_cast(ngx_palloc(pool, name.size())); - ngx_copy(ngx_key_value->key.data, name.c_str(), name.size()); - ngx_key_value->value.len = value.size(); - ngx_key_value->value.data = - reinterpret_cast(ngx_palloc(pool, value.size())); - ngx_copy(ngx_key_value->value.data, value.data(), value.size()); - ngx_key_value->lowcase_key = - reinterpret_cast(ngx_pnalloc(pool, ngx_key_value->key.len)); - ngx_strlow(ngx_key_value->lowcase_key, ngx_key_value->key.data, - ngx_key_value->key.len); - ngx_key_value->hash = - ngx_hash_key_lc(ngx_key_value->key.data, ngx_key_value->key.len); -} - -void AddHTTPHeader(ngx_http_request_t *http_request, const string &name, - const string_ref &value) { - AddElementToNginxElementTable( - http_request->pool, &http_request->headers_out.headers, name, value); -} - -void AddHTTPTrailer(ngx_http_request_t *http_request, const string &name, - const string_ref &value) { - AddElementToNginxElementTable( - http_request->pool, &http_request->headers_out.trailers, name, value); -} -} // namespace } // namespace gateway } // namespace grpc diff --git a/net/grpc/gateway/nginx_includes.h b/net/grpc/gateway/nginx_includes.h new file mode 100644 index 0000000..22b2356 --- /dev/null +++ b/net/grpc/gateway/nginx_includes.h @@ -0,0 +1,16 @@ +#ifndef NET_GRPC_GATEWAY_NGINX_INCLUDES_H_ +#define NET_GRPC_GATEWAY_NGINX_INCLUDES_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include + +#ifdef __cplusplus +} +#endif + +#endif // NET_GRPC_GATEWAY_NGINX_INCLUDES_H_ diff --git a/net/grpc/gateway/nginx_utils.cc b/net/grpc/gateway/nginx_utils.cc new file mode 100644 index 0000000..b1fa5a1 --- /dev/null +++ b/net/grpc/gateway/nginx_utils.cc @@ -0,0 +1,43 @@ +#include "net/grpc/gateway/nginx_utils.h" + +namespace grpc { +namespace gateway { +namespace { +void AddElementToNginxElementTable(ngx_pool_t *pool, ngx_list_t *table, + const string &name, + const string_ref &value) { + ngx_table_elt_t *ngx_key_value = + reinterpret_cast(ngx_list_push(table)); + if (ngx_key_value == nullptr) { + ERROR("Failed to allocate response initial metadata for nginx."); + } + ngx_key_value->key.len = name.size(); + ngx_key_value->key.data = + reinterpret_cast(ngx_palloc(pool, name.size())); + ngx_copy(ngx_key_value->key.data, name.c_str(), name.size()); + ngx_key_value->value.len = value.size(); + ngx_key_value->value.data = + reinterpret_cast(ngx_palloc(pool, value.size())); + ngx_copy(ngx_key_value->value.data, value.data(), value.size()); + ngx_key_value->lowcase_key = + reinterpret_cast(ngx_pnalloc(pool, ngx_key_value->key.len)); + ngx_strlow(ngx_key_value->lowcase_key, ngx_key_value->key.data, + ngx_key_value->key.len); + ngx_key_value->hash = + ngx_hash_key_lc(ngx_key_value->key.data, ngx_key_value->key.len); +} +} // namespace + +void AddHTTPHeader(ngx_http_request_t *http_request, const string &name, + const string_ref &value) { + AddElementToNginxElementTable( + http_request->pool, &http_request->headers_out.headers, name, value); +} + +void AddHTTPTrailer(ngx_http_request_t *http_request, const string &name, + const string_ref &value) { + AddElementToNginxElementTable( + http_request->pool, &http_request->headers_out.trailers, name, value); +} +} // namespace gateway +} // namespace grpc diff --git a/net/grpc/gateway/nginx_utils.h b/net/grpc/gateway/nginx_utils.h new file mode 100644 index 0000000..b676248 --- /dev/null +++ b/net/grpc/gateway/nginx_utils.h @@ -0,0 +1,19 @@ +#ifndef NET_GRPC_GATEWAY_NGINX_UTILS_H_ +#define NET_GRPC_GATEWAY_NGINX_UTILS_H_ + +#include "net/grpc/gateway/log.h" +#include "net/grpc/gateway/nginx_includes.h" +#include "third_party/grpc/include/grpc++/support/string_ref.h" + +namespace grpc { +namespace gateway { + +void AddHTTPHeader(ngx_http_request_t *http_request, const string &name, + const string_ref &value); + +void AddHTTPTrailer(ngx_http_request_t *http_request, const string &name, + const string_ref &value); +} // namespace gateway +} // namespace grpc + +#endif // NET_GRPC_GATEWAY_NGINX_UTILS_H_ diff --git a/net/grpc/gateway/runtime/constants.h b/net/grpc/gateway/runtime/constants.h index dc8cb90..4418416 100644 --- a/net/grpc/gateway/runtime/constants.h +++ b/net/grpc/gateway/runtime/constants.h @@ -24,6 +24,15 @@ const size_t kContentTypeJsonLength = sizeof(kContentTypeJson) - 1; const char kContentTypeGrpc[] = "application/grpc"; const size_t kContentTypeGrpcLength = sizeof(kContentTypeGrpc) - 1; +// The content type of GRPC Web. +const char kContentTypeGrpcWeb[] = "application/grpc-web"; +const size_t kContentTypeGrpcWebLength = sizeof(kContentTypeGrpcWeb) - 1; + +// The content type of GRPC Web Text. +const char kContentTypeGrpcWebText[] = "application/grpc-web-text"; +const size_t kContentTypeGrpcWebTextLength = + sizeof(kContentTypeGrpcWebText) - 1; + // The type url of google.rpc.Pair. const char kTypeUrlPair[] = "type.googleapis.com/google.rpc.Pair"; @@ -64,10 +73,22 @@ const char kContentTransferEncoding_Base64[] = "base64"; const size_t kContentTransferEncoding_Base64_Length = sizeof(kContentTransferEncoding_Base64) - 1; +const char kXAcceptContentTransferEncoding[] = + "x-accept-content-transfer-encoding"; +const size_t kXAcceptContentTransferEncodingLength = + sizeof(kXAcceptContentTransferEncoding) - 1; +const char kXAcceptContentTransferEncoding_Base64[] = "base64"; + +const char kXAcceptResponseStreaming[] = "x-accept-response-streaming"; +const size_t kXAcceptResponseStreamingLength = + sizeof(kXAcceptResponseStreaming) - 1; +const char kXAcceptResponseStreaming_True[] = "true"; + // The frontend protocols supported by GRPC-Web gateway. enum Protocol { UNKNOWN = 0, GRPC, + GRPC_WEB, JSON_STREAM_BODY, PROTO_STREAM_BODY, B64_STREAM_BODY, diff --git a/net/grpc/gateway/runtime/nginx_notify_queue.cc b/net/grpc/gateway/runtime/nginx_notify_queue.cc index 9958596..9685947 100644 --- a/net/grpc/gateway/runtime/nginx_notify_queue.cc +++ b/net/grpc/gateway/runtime/nginx_notify_queue.cc @@ -1,8 +1,5 @@ #include "net/grpc/gateway/runtime/nginx_notify_queue.h" -#include -#include - #include #include "net/grpc/gateway/log.h" diff --git a/net/grpc/gateway/runtime/nginx_notify_queue.h b/net/grpc/gateway/runtime/nginx_notify_queue.h index 7c46d6b..c320fb2 100644 --- a/net/grpc/gateway/runtime/nginx_notify_queue.h +++ b/net/grpc/gateway/runtime/nginx_notify_queue.h @@ -1,12 +1,10 @@ #ifndef NET_GRPC_GATEWAY_RUNTIME_NGINX_NOTIFY_QUEUE_H_ #define NET_GRPC_GATEWAY_RUNTIME_NGINX_NOTIFY_QUEUE_H_ -#include -#include - #include #include +#include "net/grpc/gateway/nginx_includes.h" #include "net/grpc/gateway/runtime/tag.h" #include "third_party/grpc/include/grpc/support/sync.h" diff --git a/net/grpc/gateway/runtime/runtime.cc b/net/grpc/gateway/runtime/runtime.cc index d63470f..4cde901 100644 --- a/net/grpc/gateway/runtime/runtime.cc +++ b/net/grpc/gateway/runtime/runtime.cc @@ -10,6 +10,8 @@ #include "net/grpc/gateway/codec/b64_stream_body_encoder.h" #include "net/grpc/gateway/codec/grpc_decoder.h" #include "net/grpc/gateway/codec/grpc_encoder.h" +#include "net/grpc/gateway/codec/grpc_web_decoder.h" +#include "net/grpc/gateway/codec/grpc_web_encoder.h" #include "net/grpc/gateway/codec/json_decoder.h" #include "net/grpc/gateway/codec/json_encoder.h" #include "net/grpc/gateway/codec/proto_decoder.h" @@ -82,6 +84,11 @@ std::unique_ptr Runtime::CreateEncoder( const char* content_type = reinterpret_cast( http_request->headers_in.content_type->value.data); size_t content_type_length = http_request->headers_in.content_type->value.len; + if (content_type_length == kContentTypeGrpcWebLength && + strncasecmp(kContentTypeGrpcWeb, content_type, kContentTypeGrpcLength) == + 0) { + return std::unique_ptr(new GrpcWebEncoder()); + } if (content_type_length == kContentTypeStreamBodyLength && strncasecmp(kContentTypeStreamBody, content_type, kContentTypeStreamBodyLength) == 0) { @@ -128,6 +135,11 @@ std::unique_ptr Runtime::CreateDecoder( const char* content_type = reinterpret_cast( http_request->headers_in.content_type->value.data); size_t content_type_length = http_request->headers_in.content_type->value.len; + if (content_type_length == kContentTypeGrpcWebLength && + strncasecmp(kContentTypeGrpcWeb, content_type, kContentTypeGrpcLength) == + 0) { + return std::unique_ptr(new GrpcWebDecoder()); + } if (content_type_length == kContentTypeProtoLength && strncasecmp(kContentTypeProto, content_type, kContentTypeProtoLength) == 0) { @@ -218,6 +230,11 @@ Protocol Runtime::DetectFrontendProtocol(ngx_http_request_t* http_request) { 0) { return GRPC; } + if (content_type_length == kContentTypeGrpcWebLength && + strncasecmp(kContentTypeGrpcWeb, content_type, + kContentTypeGrpcWebLength) == 0) { + return GRPC_WEB; + } return UNKNOWN; } diff --git a/net/grpc/gateway/runtime/runtime.h b/net/grpc/gateway/runtime/runtime.h index 9fb5b20..4bdf631 100644 --- a/net/grpc/gateway/runtime/runtime.h +++ b/net/grpc/gateway/runtime/runtime.h @@ -1,16 +1,13 @@ #ifndef NET_GRPC_GATEWAY_RUNTIME_RUNTIME_H_ #define NET_GRPC_GATEWAY_RUNTIME_RUNTIME_H_ -#include -#include -#include - #include #include #include "net/grpc/gateway/codec/decoder.h" #include "net/grpc/gateway/codec/encoder.h" #include "net/grpc/gateway/frontend/frontend.h" +#include "net/grpc/gateway/nginx_includes.h" #include "net/grpc/gateway/runtime/constants.h" #include "net/grpc/gateway/runtime/grpc_event_queue.h"