Implements the new GRPC-Web spec.

This commit is contained in:
Feng Li 2017-03-01 10:39:37 -08:00
parent be2ad621c8
commit 9d2a960b3f
21 changed files with 452 additions and 97 deletions

View File

@ -10,7 +10,6 @@
#include "net/grpc/gateway/log.h"
#include "net/grpc/gateway/runtime/runtime.h"
#include "net/grpc/gateway/runtime/types.h"
#include "third_party/grpc/include/grpc++/support/string_ref.h"
#include "third_party/grpc/include/grpc/byte_buffer.h"
#include "third_party/grpc/include/grpc/byte_buffer_reader.h"
#include "third_party/grpc/include/grpc/grpc.h"
@ -40,8 +39,7 @@ GrpcBackend::GrpcBackend()
request_buffer_(nullptr),
response_buffer_(nullptr),
status_code_(grpc_status_code::GRPC_STATUS_OK),
status_details_(nullptr),
status_details_capacity_(0),
status_details_(grpc_empty_slice()),
is_cancelled_(false) {
BACKEND_DEBUG("Creating GRPC backend proxy.");
grpc_metadata_array_init(&response_initial_metadata_);
@ -58,9 +56,7 @@ GrpcBackend::~GrpcBackend() {
if (response_buffer_ != nullptr) {
grpc_byte_buffer_destroy(response_buffer_);
}
if (status_details_ != nullptr) {
gpr_free(status_details_);
}
grpc_slice_unref(status_details_);
if (call_ != nullptr) {
BACKEND_DEBUG("Destroying GRPC call.");
grpc_call_destroy(call_);
@ -77,9 +73,14 @@ grpc_channel* GrpcBackend::CreateChannel() {
grpc_call* GrpcBackend::CreateCall() {
BACKEND_DEBUG("Creating GRPC call.");
return grpc_channel_create_call(
channel_, nullptr, 0, Runtime::Get().grpc_event_queue(), method_.c_str(),
host_.c_str(), gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
grpc_slice method_slice = grpc_slice_from_copied_string(method_.c_str());
grpc_slice host_slice = grpc_slice_from_static_string(host_.c_str());
grpc_call* call = grpc_channel_create_call(
channel_, nullptr, 0, Runtime::Get().grpc_event_queue(), method_slice,
host_.empty() ? nullptr : &host_slice, gpr_inf_future(GPR_CLOCK_REALTIME),
nullptr);
grpc_slice_unref(method_slice);
return call;
}
void GrpcBackend::Start() {
@ -88,7 +89,8 @@ void GrpcBackend::Start() {
// Receives GRPC response initial metadata.
grpc_op ops[1];
ops[0].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[0].data.recv_initial_metadata = &response_initial_metadata_;
ops[0].data.recv_initial_metadata.recv_initial_metadata =
&response_initial_metadata_;
ops[0].flags = 0;
ops[0].reserved = nullptr;
grpc_call_error error = grpc_call_start_batch(
@ -111,9 +113,13 @@ void GrpcBackend::OnResponseInitialMetadata(bool result) {
std::unique_ptr<Headers> response_headers(new Headers());
for (size_t i = 0; i < response_initial_metadata_.count; i++) {
grpc_metadata* metadata = response_initial_metadata_.metadata + i;
response_headers->push_back(
Header(std::string(metadata->key),
string_ref(metadata->value, metadata->value_length)));
response_headers->push_back(Header(
std::string(
reinterpret_cast<char*>(GRPC_SLICE_START_PTR(metadata->key)),
GRPC_SLICE_LENGTH(metadata->key)),
string_ref(
reinterpret_cast<char*>(GRPC_SLICE_START_PTR(metadata->value)),
GRPC_SLICE_LENGTH(metadata->value))));
}
response->set_headers(std::move(response_headers));
frontend()->Send(std::move(response));
@ -121,7 +127,7 @@ void GrpcBackend::OnResponseInitialMetadata(bool result) {
// Receives next GRPC response message.
grpc_op ops[1];
ops[0].op = GRPC_OP_RECV_MESSAGE;
ops[0].data.recv_message = &response_buffer_;
ops[0].data.recv_message.recv_message = &response_buffer_;
ops[0].flags = 0;
ops[0].reserved = nullptr;
grpc_call_error error = grpc_call_start_batch(
@ -144,8 +150,6 @@ void GrpcBackend::OnResponseMessage(bool result) {
ops[0].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[0].data.recv_status_on_client.status = &status_code_;
ops[0].data.recv_status_on_client.status_details = &status_details_;
ops[0].data.recv_status_on_client.status_details_capacity =
&status_details_capacity_;
ops[0].data.recv_status_on_client.trailing_metadata =
&response_trailing_metadata_;
ops[0].flags = 0;
@ -176,7 +180,7 @@ void GrpcBackend::OnResponseMessage(bool result) {
// Receives next GRPC response message.
grpc_op ops[1];
ops[0].op = GRPC_OP_RECV_MESSAGE;
ops[0].data.recv_message = &response_buffer_;
ops[0].data.recv_message.recv_message = &response_buffer_;
ops[0].flags = 0;
ops[0].reserved = nullptr;
grpc_call_error error = grpc_call_start_batch(
@ -194,15 +198,25 @@ void GrpcBackend::OnResponseStatus(bool result) {
}
std::unique_ptr<Response> response(new Response());
grpc::string status_details;
if (!GRPC_SLICE_IS_EMPTY(status_details_)) {
status_details = grpc::string(
reinterpret_cast<char*>(GRPC_SLICE_START_PTR(status_details_)),
GRPC_SLICE_LENGTH(status_details_));
}
response->set_status(std::unique_ptr<grpc::Status>(new grpc::Status(
static_cast<grpc::StatusCode>(status_code_), status_details_)));
static_cast<grpc::StatusCode>(status_code_), status_details)));
std::unique_ptr<Trailers> response_trailers(new Trailers());
for (size_t i = 0; i < response_trailing_metadata_.count; i++) {
grpc_metadata* metadata = response_trailing_metadata_.metadata + i;
response_trailers->push_back(
Trailer(std::string(metadata->key),
string_ref(metadata->value, metadata->value_length)));
response_trailers->push_back(Trailer(
std::string(
reinterpret_cast<char*>(GRPC_SLICE_START_PTR(metadata->key)),
GRPC_SLICE_LENGTH(metadata->key)),
string_ref(
reinterpret_cast<char*>(GRPC_SLICE_START_PTR(metadata->value)),
GRPC_SLICE_LENGTH(metadata->value))));
}
response->set_trailers(std::move(response_trailers));
frontend()->Send(std::move(response));
@ -216,10 +230,14 @@ void GrpcBackend::Send(std::unique_ptr<Request> request, Tag* on_done) {
for (Header& header : *request->headers()) {
std::transform(header.first.begin(), header.first.end(),
header.first.begin(), ::tolower);
if (header.first == kGrpcAcceptEncoding) {
continue;
}
grpc_metadata initial_metadata;
initial_metadata.key = header.first.c_str();
initial_metadata.value = header.second.data();
initial_metadata.value_length = header.second.size();
initial_metadata.key = grpc_slice_intern(
grpc_slice_from_copied_string(header.first.c_str()));
initial_metadata.value = grpc_slice_from_copied_buffer(
header.second.data(), header.second.size());
initial_metadata.flags = 0;
request_initial_metadata_.push_back(initial_metadata);
}
@ -248,7 +266,7 @@ void GrpcBackend::Send(std::unique_ptr<Request> request, Tag* on_done) {
for (auto& slice : slices) {
grpc_slice_unref(slice);
}
op->data.send_message = request_buffer_;
op->data.send_message.send_message = request_buffer_;
op->flags = 0;
op->reserved = nullptr;
op++;

View File

@ -68,8 +68,7 @@ class GrpcBackend : public Backend {
// The GRPC response buffer.
grpc_byte_buffer* response_buffer_;
grpc_status_code status_code_;
char* status_details_;
size_t status_details_capacity_;
grpc_slice status_details_;
grpc_metadata_array response_trailing_metadata_;
// True if the GRPC call has been cancelled by client.
bool is_cancelled_;

View File

@ -216,7 +216,7 @@ bool Base64::Decode(const std::vector<Slice>& input,
}
if (output->empty()) {
output->push_back(Slice(gpr_empty_slice(), Slice::STEAL_REF));
output->push_back(Slice(grpc_empty_slice(), Slice::STEAL_REF));
}
return true;
}

View File

@ -4,6 +4,7 @@
#include "net/grpc/gateway/utils.h"
extern "C" {
#include "third_party/grpc/src/core/lib/compression/message_compress.h"
#include "third_party/grpc/src/core/lib/iomgr/exec_ctx.h"
}
namespace grpc {
@ -16,6 +17,7 @@ GrpcDecoder::GrpcDecoder()
GrpcDecoder::~GrpcDecoder() {}
Status GrpcDecoder::Decode() {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
for (const Slice& slice : *inputs()) {
if (slice.size() == 0) {
continue;
@ -32,6 +34,7 @@ Status GrpcDecoder::Decode() {
Status status(StatusCode::INVALID_ARGUMENT,
Format("Receives invalid compressed flag: %c.", c));
DEBUG("%s", status.error_message().c_str());
grpc_exec_ctx_finish(&exec_ctx);
return status;
}
compressed_flag_ = c;
@ -56,7 +59,7 @@ Status GrpcDecoder::Decode() {
case kExpectingMessageLengthByte3: {
message_length_ += c;
if (message_length_ == 0) {
buffer_.reset(new Slice(gpr_empty_slice(), Slice::STEAL_REF));
buffer_.reset(new Slice(grpc_empty_slice(), Slice::STEAL_REF));
results()->push_back(
std::unique_ptr<ByteBuffer>(new ByteBuffer(buffer_.get(), 1)));
state_ = kExpectingCompressedFlag;
@ -84,10 +87,11 @@ Status GrpcDecoder::Decode() {
grpc_slice_buffer output;
grpc_slice_buffer_init(&output);
if (grpc_msg_decompress(
grpc_compression_algorithm::GRPC_COMPRESS_GZIP, &input,
&output) != 1) {
&exec_ctx, grpc_compression_algorithm::GRPC_COMPRESS_GZIP,
&input, &output) != 1) {
grpc_slice_buffer_destroy(&input);
grpc_slice_buffer_destroy(&output);
grpc_exec_ctx_finish(&exec_ctx);
return Status(StatusCode::INTERNAL,
"Failed to uncompress the GRPC data frame.");
}
@ -113,6 +117,7 @@ Status GrpcDecoder::Decode() {
}
}
inputs()->clear();
grpc_exec_ctx_finish(&exec_ctx);
return Status::OK;
}
} // namespace gateway

View File

@ -0,0 +1,85 @@
#include "net/grpc/gateway/codec/grpc_web_decoder.h"
#include "net/grpc/gateway/log.h"
#include "net/grpc/gateway/utils.h"
namespace grpc {
namespace gateway {
const uint8_t GrpcWebDecoder::kGrpcWebMessage = 0;
GrpcWebDecoder::GrpcWebDecoder()
: state_(kExpectingFlags), message_length_(0) {}
GrpcWebDecoder::~GrpcWebDecoder() {}
Status GrpcWebDecoder::Decode() {
for (const Slice& slice : *inputs()) {
if (slice.size() == 0) {
continue;
}
for (size_t i = 0; i < slice.size(); i++) {
uint8_t c = *(slice.begin() + i);
switch (state_) {
case kExpectingFlags: {
if (c != kGrpcWebMessage) {
// TODO(fengli): The following code is repeated 12 times. Extract it
// into a function or a macro.
Status status(StatusCode::INVALID_ARGUMENT,
Format("Receives invalid compressed flag: %X.", c));
DEBUG("%s", status.error_message().c_str());
return status;
}
state_ = kExpectingMessageLengthByte0;
continue;
}
case kExpectingMessageLengthByte0: {
message_length_ = c << 24;
state_ = kExpectingMessageLengthByte1;
continue;
}
case kExpectingMessageLengthByte1: {
message_length_ += c << 16;
state_ = kExpectingMessageLengthByte2;
continue;
}
case kExpectingMessageLengthByte2: {
message_length_ += c << 8;
state_ = kExpectingMessageLengthByte3;
continue;
}
case kExpectingMessageLengthByte3: {
message_length_ += c;
if (message_length_ == 0) {
buffer_.reset(new Slice(grpc_empty_slice(), Slice::STEAL_REF));
results()->push_back(
std::unique_ptr<ByteBuffer>(new ByteBuffer(buffer_.get(), 1)));
state_ = kExpectingFlags;
} else {
buffer_.reset(new Slice(grpc_slice_malloc(message_length_),
Slice::STEAL_REF));
state_ = kExpectingMessageData;
}
continue;
}
case kExpectingMessageData: {
uint8_t* end = const_cast<uint8_t*>(buffer_->end());
*(end - message_length_) = c;
message_length_--;
if (message_length_ == 0) {
results()->push_back(
std::unique_ptr<ByteBuffer>(new ByteBuffer(buffer_.get(), 1)));
buffer_.reset();
state_ = kExpectingFlags;
}
continue;
}
}
}
}
inputs()->clear();
return Status::OK;
}
} // namespace gateway
} // namespace grpc

View File

@ -0,0 +1,51 @@
#ifndef NET_GRPC_GATEWAY_CODEC_GRPC_WEB_DECODER_H_
#define NET_GRPC_GATEWAY_CODEC_GRPC_WEB_DECODER_H_
#include <cstdint>
#include <memory>
#include "net/grpc/gateway/codec/decoder.h"
#include "third_party/grpc/include/grpc++/support/slice.h"
namespace grpc {
namespace gateway {
class GrpcWebDecoder : public Decoder {
public:
static const uint8_t kGrpcWebMessage;
enum State : uint8_t {
// The initial decode state, expecting the flags (1 byte).
kExpectingFlags,
// Expecting the 1st byte of message length (4 bytes in total).
kExpectingMessageLengthByte0,
// Expecting the 2nd byte of message length (4 bytes in total).
kExpectingMessageLengthByte1,
// Expecting the 3rd byte of message length (4 bytes in total).
kExpectingMessageLengthByte2,
// Expecting the 4th byte of message length (4 bytes in total).
kExpectingMessageLengthByte3,
// Expecting the message data.
kExpectingMessageData
};
GrpcWebDecoder();
virtual ~GrpcWebDecoder();
// GrpcWebDecoder is neither copyable nor movable.
GrpcWebDecoder(const GrpcWebDecoder&) = delete;
GrpcWebDecoder& operator=(const GrpcWebDecoder&) = delete;
Status Decode() override;
private:
State state_;
// The message length of the current decoding GRPC-Web frame.
uint32_t message_length_;
// The data buffered for the current decoding GRPC-Web frame.
std::unique_ptr<Slice> buffer_;
};
} // namespace gateway
} // namespace grpc
#endif // NET_GRPC_GATEWAY_CODEC_GRPC_WEB_DECODER_H_

View File

@ -0,0 +1,107 @@
#include "net/grpc/gateway/codec/grpc_web_encoder.h"
#include <cstdint>
#include <cstring>
#include <vector>
#include "net/grpc/gateway/runtime/types.h"
#include "third_party/grpc/include/grpc++/support/byte_buffer.h"
#include "third_party/grpc/include/grpc++/support/slice.h"
namespace grpc {
namespace gateway {
namespace {
const char kGrpcStatus[] = "grpc-status: %i\r\n";
const char kGrpcMessage[] = "grpc-message: %s\r\n";
const char kGrpcTrailer[] = "%s: %s\r\n";
// GRPC Web message frame.
const uint8_t GRPC_WEB_FH_DATA = 0b0u;
// GRPC Web trailer frame.
const uint8_t GRPC_WEB_FH_TRAILER = 0b10000000u;
// Creates a new GRPC data frame with the given flags and length.
// @param flags supplies the GRPC data frame flags.
// @param length supplies the GRPC data frame length.
// @param output the buffer to store the encoded data, it's size must be 5.
void NewFrame(uint8_t flags, uint64_t length, uint8_t* output) {
output[0] = flags;
output[1] = static_cast<uint8_t>(length >> 24);
output[2] = static_cast<uint8_t>(length >> 16);
output[3] = static_cast<uint8_t>(length >> 8);
output[4] = static_cast<uint8_t>(length);
}
} // namespace
GrpcWebEncoder::GrpcWebEncoder() {}
GrpcWebEncoder::~GrpcWebEncoder() {}
void GrpcWebEncoder::Encode(grpc::ByteBuffer* input,
std::vector<Slice>* result) {
uint8_t header[5];
NewFrame(GRPC_WEB_FH_DATA, input->Length(), header);
result->push_back(
Slice(gpr_slice_from_copied_buffer(reinterpret_cast<char*>(header), 5),
Slice::STEAL_REF));
std::vector<Slice> buffer;
// TODO(fengli): Optimize if needed. Today we cannot dump data to the result
// directly since it will clear the target.
input->Dump(&buffer);
for (Slice& s : buffer) {
result->push_back(s);
}
}
void GrpcWebEncoder::EncodeStatus(const grpc::Status& status,
const Trailers* trailers,
std::vector<Slice>* result) {
std::vector<Slice> buffer;
uint64_t length = 0;
// Encodes GRPC status.
size_t grpc_status_size =
snprintf(nullptr, 0, kGrpcStatus, status.error_code());
grpc_slice grpc_status = grpc_slice_malloc(grpc_status_size + 1);
snprintf(reinterpret_cast<char*>(GPR_SLICE_START_PTR(grpc_status)),
grpc_status_size + 1, kGrpcStatus, status.error_code());
GPR_SLICE_SET_LENGTH(grpc_status, grpc_status_size);
buffer.push_back(Slice(grpc_status, Slice::STEAL_REF));
length += grpc_status_size;
// Encodes GRPC message.
if (!status.error_message().empty()) {
size_t grpc_message_size =
snprintf(nullptr, 0, kGrpcMessage, status.error_message().c_str());
grpc_slice grpc_message = grpc_slice_malloc(grpc_message_size + 1);
snprintf(reinterpret_cast<char*>(GPR_SLICE_START_PTR(grpc_message)),
grpc_message_size + 1, kGrpcMessage,
status.error_message().c_str());
GPR_SLICE_SET_LENGTH(grpc_message, grpc_message_size);
buffer.push_back(Slice(grpc_message, Slice::STEAL_REF));
length += grpc_message_size;
}
// Encodes GRPC trailers.
for (auto& trailer : *trailers) {
size_t grpc_trailer_size = snprintf(
nullptr, 0, kGrpcTrailer, trailer.first.c_str(), trailer.second.data());
grpc_slice grpc_trailer = grpc_slice_malloc(grpc_trailer_size + 1);
snprintf(reinterpret_cast<char*>(GPR_SLICE_START_PTR(grpc_trailer)),
grpc_trailer_size + 1, kGrpcTrailer, trailer.first.c_str(),
trailer.second.data());
GPR_SLICE_SET_LENGTH(grpc_trailer, grpc_trailer_size);
buffer.push_back(Slice(grpc_trailer, Slice::STEAL_REF));
length += grpc_trailer_size;
}
// Encodes GRPC trailer frame.
grpc_slice header = grpc_slice_malloc(5);
NewFrame(GRPC_WEB_FH_TRAILER, length, GPR_SLICE_START_PTR(header));
result->push_back(Slice(header, Slice::STEAL_REF));
result->insert(result->end(), buffer.begin(), buffer.end());
}
} // namespace gateway
} // namespace grpc

View File

@ -0,0 +1,26 @@
#ifndef NET_GRPC_GATEWAY_CODEC_GRPC_WEB_ENCODER_H_
#define NET_GRPC_GATEWAY_CODEC_GRPC_WEB_ENCODER_H_
#include "net/grpc/gateway/codec/encoder.h"
namespace grpc {
namespace gateway {
class GrpcWebEncoder : public Encoder {
public:
GrpcWebEncoder();
virtual ~GrpcWebEncoder();
// GrpcWebEncoder is neither copyable nor movable.
GrpcWebEncoder(const GrpcWebEncoder&) = delete;
GrpcWebEncoder& operator=(const GrpcWebEncoder&) = delete;
void Encode(grpc::ByteBuffer* input, std::vector<Slice>* result) override;
void EncodeStatus(const grpc::Status& status, const Trailers* trailers,
std::vector<Slice>* result) override;
};
} // namespace gateway
} // namespace grpc
#endif // NET_GRPC_GATEWAY_CODEC_GRPC_WEB_ENCODER_H_

View File

@ -146,7 +146,7 @@ Status JsonDecoder::Decode() {
}
if (static_cast<size_t>(start) == i) {
base64_buffer_.push_back(
Slice(gpr_empty_slice(), Slice::STEAL_REF));
Slice(grpc_empty_slice(), Slice::STEAL_REF));
} else {
base64_buffer_.push_back(Slice(
grpc_slice_from_copied_buffer(

View File

@ -12,7 +12,7 @@ ProtoDecoder::~ProtoDecoder() {}
Status ProtoDecoder::Decode() {
if (inputs()->empty()) {
Slice* slice = new Slice(gpr_empty_slice(), Slice::STEAL_REF);
Slice* slice = new Slice(grpc_empty_slice(), Slice::STEAL_REF);
ByteBuffer* buffer = new ByteBuffer(slice, 1);
results()->push_back(std::unique_ptr<ByteBuffer>(buffer));
} else {

View File

@ -47,7 +47,7 @@ Status StreamBodyDecoder::Decode() {
if ((c & 0x80) == 0) {
varint_bytes_ = 0;
if (varint_value_ == 0) {
buffer_.reset(new Slice(gpr_empty_slice(), Slice::STEAL_REF));
buffer_.reset(new Slice(grpc_empty_slice(), Slice::STEAL_REF));
results()->push_back(std::unique_ptr<ByteBuffer>(
new ByteBuffer(buffer_.release(), 1)));
state_ = EXPECTING_MESSAGE_KEY_TYPE;

View File

@ -1,7 +1,7 @@
#ifndef NET_GRPC_GATEWAY_FRONTEND_NGINX_BRIDGE_H_
#define NET_GRPC_GATEWAY_FRONTEND_NGINX_BRIDGE_H_
#include <ngx_core.h>
#include "net/grpc/gateway/nginx_includes.h"
typedef struct ngx_grpc_gateway_loc_conf_s ngx_grpc_gateway_loc_conf_t;
struct ngx_grpc_gateway_loc_conf_s {

View File

@ -1,11 +1,10 @@
#include "net/grpc/gateway/frontend/nginx_http_frontend.h"
#include <ngx_http.h>
#include <algorithm>
#include "net/grpc/gateway/frontend/nginx_bridge.h"
#include "net/grpc/gateway/log.h"
#include "net/grpc/gateway/nginx_utils.h"
#include "net/grpc/gateway/runtime/constants.h"
#include "net/grpc/gateway/runtime/request.h"
#include "net/grpc/gateway/runtime/runtime.h"
@ -110,16 +109,6 @@ void continue_write_response(ngx_http_request_t *r) {
namespace grpc {
namespace gateway {
namespace {
void AddElementToNginxElementTable(ngx_pool_t *pool, ngx_list_t *table,
const string &name, const string_ref &value);
void AddHTTPHeader(ngx_http_request_t *http_request, const string &name,
const string_ref &value);
void AddHTTPTrailer(ngx_http_request_t *http_request, const string &name,
const string_ref &value);
} // namespace
NginxHttpFrontend::NginxHttpFrontend(std::unique_ptr<Backend> backend)
: Frontend(std::move(backend)),
@ -274,10 +263,11 @@ void NginxHttpFrontend::SendResponseStatusToClient(Response *response) {
is_response_status_sent_ = true;
std::vector<Slice> trancoded_status;
if (protocol_ == Protocol::GRPC) {
SendResponseTrailersToClient(response);
} else {
encoder_->EncodeStatus(*response->status(), response->trailers(),
&trancoded_status);
if (trancoded_status.empty()) {
SendResponseTrailersToClient(response);
}
ngx_chain_t *output = ngx_alloc_chain_link(http_request_->pool);
if (output == nullptr) {
@ -481,6 +471,9 @@ void NginxHttpFrontend::SendResponseHeadersToClient(Response *response) {
AddHTTPHeader(http_request_, kGrpcAcceptEncoding,
kGrpcAcceptEncoding_AcceptAll);
break;
case GRPC_WEB:
AddHTTPHeader(http_request_, kContentType, kContentTypeGrpcWeb);
break;
case JSON_STREAM_BODY:
AddHTTPHeader(http_request_, kContentType, kContentTypeJson);
break;
@ -526,42 +519,5 @@ void NginxHttpFrontend::SendErrorToClient(const grpc::Status &status) {
backend()->Cancel(status);
}
namespace {
void AddElementToNginxElementTable(ngx_pool_t *pool, ngx_list_t *table,
const string &name,
const string_ref &value) {
ngx_table_elt_t *ngx_key_value =
reinterpret_cast<ngx_table_elt_t *>(ngx_list_push(table));
if (ngx_key_value == nullptr) {
ERROR("Failed to allocate response initial metadata for nginx.");
}
ngx_key_value->key.len = name.size();
ngx_key_value->key.data =
reinterpret_cast<u_char *>(ngx_palloc(pool, name.size()));
ngx_copy(ngx_key_value->key.data, name.c_str(), name.size());
ngx_key_value->value.len = value.size();
ngx_key_value->value.data =
reinterpret_cast<u_char *>(ngx_palloc(pool, value.size()));
ngx_copy(ngx_key_value->value.data, value.data(), value.size());
ngx_key_value->lowcase_key =
reinterpret_cast<u_char *>(ngx_pnalloc(pool, ngx_key_value->key.len));
ngx_strlow(ngx_key_value->lowcase_key, ngx_key_value->key.data,
ngx_key_value->key.len);
ngx_key_value->hash =
ngx_hash_key_lc(ngx_key_value->key.data, ngx_key_value->key.len);
}
void AddHTTPHeader(ngx_http_request_t *http_request, const string &name,
const string_ref &value) {
AddElementToNginxElementTable(
http_request->pool, &http_request->headers_out.headers, name, value);
}
void AddHTTPTrailer(ngx_http_request_t *http_request, const string &name,
const string_ref &value) {
AddElementToNginxElementTable(
http_request->pool, &http_request->headers_out.trailers, name, value);
}
} // namespace
} // namespace gateway
} // namespace grpc

View File

@ -0,0 +1,16 @@
#ifndef NET_GRPC_GATEWAY_NGINX_INCLUDES_H_
#define NET_GRPC_GATEWAY_NGINX_INCLUDES_H_
#ifdef __cplusplus
extern "C" {
#endif
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#ifdef __cplusplus
}
#endif
#endif // NET_GRPC_GATEWAY_NGINX_INCLUDES_H_

View File

@ -0,0 +1,43 @@
#include "net/grpc/gateway/nginx_utils.h"
namespace grpc {
namespace gateway {
namespace {
void AddElementToNginxElementTable(ngx_pool_t *pool, ngx_list_t *table,
const string &name,
const string_ref &value) {
ngx_table_elt_t *ngx_key_value =
reinterpret_cast<ngx_table_elt_t *>(ngx_list_push(table));
if (ngx_key_value == nullptr) {
ERROR("Failed to allocate response initial metadata for nginx.");
}
ngx_key_value->key.len = name.size();
ngx_key_value->key.data =
reinterpret_cast<u_char *>(ngx_palloc(pool, name.size()));
ngx_copy(ngx_key_value->key.data, name.c_str(), name.size());
ngx_key_value->value.len = value.size();
ngx_key_value->value.data =
reinterpret_cast<u_char *>(ngx_palloc(pool, value.size()));
ngx_copy(ngx_key_value->value.data, value.data(), value.size());
ngx_key_value->lowcase_key =
reinterpret_cast<u_char *>(ngx_pnalloc(pool, ngx_key_value->key.len));
ngx_strlow(ngx_key_value->lowcase_key, ngx_key_value->key.data,
ngx_key_value->key.len);
ngx_key_value->hash =
ngx_hash_key_lc(ngx_key_value->key.data, ngx_key_value->key.len);
}
} // namespace
void AddHTTPHeader(ngx_http_request_t *http_request, const string &name,
const string_ref &value) {
AddElementToNginxElementTable(
http_request->pool, &http_request->headers_out.headers, name, value);
}
void AddHTTPTrailer(ngx_http_request_t *http_request, const string &name,
const string_ref &value) {
AddElementToNginxElementTable(
http_request->pool, &http_request->headers_out.trailers, name, value);
}
} // namespace gateway
} // namespace grpc

View File

@ -0,0 +1,19 @@
#ifndef NET_GRPC_GATEWAY_NGINX_UTILS_H_
#define NET_GRPC_GATEWAY_NGINX_UTILS_H_
#include "net/grpc/gateway/log.h"
#include "net/grpc/gateway/nginx_includes.h"
#include "third_party/grpc/include/grpc++/support/string_ref.h"
namespace grpc {
namespace gateway {
void AddHTTPHeader(ngx_http_request_t *http_request, const string &name,
const string_ref &value);
void AddHTTPTrailer(ngx_http_request_t *http_request, const string &name,
const string_ref &value);
} // namespace gateway
} // namespace grpc
#endif // NET_GRPC_GATEWAY_NGINX_UTILS_H_

View File

@ -24,6 +24,15 @@ const size_t kContentTypeJsonLength = sizeof(kContentTypeJson) - 1;
const char kContentTypeGrpc[] = "application/grpc";
const size_t kContentTypeGrpcLength = sizeof(kContentTypeGrpc) - 1;
// The content type of GRPC Web.
const char kContentTypeGrpcWeb[] = "application/grpc-web";
const size_t kContentTypeGrpcWebLength = sizeof(kContentTypeGrpcWeb) - 1;
// The content type of GRPC Web Text.
const char kContentTypeGrpcWebText[] = "application/grpc-web-text";
const size_t kContentTypeGrpcWebTextLength =
sizeof(kContentTypeGrpcWebText) - 1;
// The type url of google.rpc.Pair.
const char kTypeUrlPair[] = "type.googleapis.com/google.rpc.Pair";
@ -64,10 +73,22 @@ const char kContentTransferEncoding_Base64[] = "base64";
const size_t kContentTransferEncoding_Base64_Length =
sizeof(kContentTransferEncoding_Base64) - 1;
const char kXAcceptContentTransferEncoding[] =
"x-accept-content-transfer-encoding";
const size_t kXAcceptContentTransferEncodingLength =
sizeof(kXAcceptContentTransferEncoding) - 1;
const char kXAcceptContentTransferEncoding_Base64[] = "base64";
const char kXAcceptResponseStreaming[] = "x-accept-response-streaming";
const size_t kXAcceptResponseStreamingLength =
sizeof(kXAcceptResponseStreaming) - 1;
const char kXAcceptResponseStreaming_True[] = "true";
// The frontend protocols supported by GRPC-Web gateway.
enum Protocol {
UNKNOWN = 0,
GRPC,
GRPC_WEB,
JSON_STREAM_BODY,
PROTO_STREAM_BODY,
B64_STREAM_BODY,

View File

@ -1,8 +1,5 @@
#include "net/grpc/gateway/runtime/nginx_notify_queue.h"
#include <ngx_config.h>
#include <ngx_event.h>
#include <algorithm>
#include "net/grpc/gateway/log.h"

View File

@ -1,12 +1,10 @@
#ifndef NET_GRPC_GATEWAY_RUNTIME_NGINX_NOTIFY_QUEUE_H_
#define NET_GRPC_GATEWAY_RUNTIME_NGINX_NOTIFY_QUEUE_H_
#include <ngx_core.h>
#include <ngx_event.h>
#include <deque>
#include <memory>
#include "net/grpc/gateway/nginx_includes.h"
#include "net/grpc/gateway/runtime/tag.h"
#include "third_party/grpc/include/grpc/support/sync.h"

View File

@ -10,6 +10,8 @@
#include "net/grpc/gateway/codec/b64_stream_body_encoder.h"
#include "net/grpc/gateway/codec/grpc_decoder.h"
#include "net/grpc/gateway/codec/grpc_encoder.h"
#include "net/grpc/gateway/codec/grpc_web_decoder.h"
#include "net/grpc/gateway/codec/grpc_web_encoder.h"
#include "net/grpc/gateway/codec/json_decoder.h"
#include "net/grpc/gateway/codec/json_encoder.h"
#include "net/grpc/gateway/codec/proto_decoder.h"
@ -82,6 +84,11 @@ std::unique_ptr<Encoder> Runtime::CreateEncoder(
const char* content_type = reinterpret_cast<const char*>(
http_request->headers_in.content_type->value.data);
size_t content_type_length = http_request->headers_in.content_type->value.len;
if (content_type_length == kContentTypeGrpcWebLength &&
strncasecmp(kContentTypeGrpcWeb, content_type, kContentTypeGrpcLength) ==
0) {
return std::unique_ptr<Encoder>(new GrpcWebEncoder());
}
if (content_type_length == kContentTypeStreamBodyLength &&
strncasecmp(kContentTypeStreamBody, content_type,
kContentTypeStreamBodyLength) == 0) {
@ -128,6 +135,11 @@ std::unique_ptr<Decoder> Runtime::CreateDecoder(
const char* content_type = reinterpret_cast<const char*>(
http_request->headers_in.content_type->value.data);
size_t content_type_length = http_request->headers_in.content_type->value.len;
if (content_type_length == kContentTypeGrpcWebLength &&
strncasecmp(kContentTypeGrpcWeb, content_type, kContentTypeGrpcLength) ==
0) {
return std::unique_ptr<Decoder>(new GrpcWebDecoder());
}
if (content_type_length == kContentTypeProtoLength &&
strncasecmp(kContentTypeProto, content_type, kContentTypeProtoLength) ==
0) {
@ -218,6 +230,11 @@ Protocol Runtime::DetectFrontendProtocol(ngx_http_request_t* http_request) {
0) {
return GRPC;
}
if (content_type_length == kContentTypeGrpcWebLength &&
strncasecmp(kContentTypeGrpcWeb, content_type,
kContentTypeGrpcWebLength) == 0) {
return GRPC_WEB;
}
return UNKNOWN;
}

View File

@ -1,16 +1,13 @@
#ifndef NET_GRPC_GATEWAY_RUNTIME_RUNTIME_H_
#define NET_GRPC_GATEWAY_RUNTIME_RUNTIME_H_
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include <map>
#include <memory>
#include "net/grpc/gateway/codec/decoder.h"
#include "net/grpc/gateway/codec/encoder.h"
#include "net/grpc/gateway/frontend/frontend.h"
#include "net/grpc/gateway/nginx_includes.h"
#include "net/grpc/gateway/runtime/constants.h"
#include "net/grpc/gateway/runtime/grpc_event_queue.h"