Compare commits

...

1 Commits

Author SHA1 Message Date
zhwaaaaaa 378861e9c9 fix big message cause cpu 100% bug. #18 2025-04-25 20:34:27 +08:00
6 changed files with 194 additions and 121 deletions

View File

@ -1,8 +1,9 @@
---
SortIncludes: Never
Language: Cpp
BasedOnStyle: Google
ConstructorInitializerIndentWidth: 4
ContinuationIndentWidth: 4
IndentWidth: 4
SpaceAfterCStyleCast: true
SpaceAfterCStyleCast: true

36
.vscode/launch.json vendored
View File

@ -1,21 +1,17 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "(lldb) Launch",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/cmake-build-debug/nginx_nacos",
"args": [],
"stopAtEntry": false,
"cwd": "${workspaceFolder}/cmake-build-debug",
"environment": [],
"externalConsole": false,
"MIMode": "lldb",
"preLaunchTask": "cmake build"
}
]
}
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "lldb",
"request": "launch",
"name": "Launch",
"program": "${workspaceRoot}/cmake-build-debug/nginx_nacos",
"args": [],
"cwd": "${workspaceRoot}",
"preLaunchTask": "cmake build"
}
]
}

78
.vscode/tasks.json vendored
View File

@ -1,33 +1,49 @@
{
"version": "2.0.0",
"options": {
"cwd": "${workspaceFolder}/cmake-build-debug"
},
"tasks": [
{
"type": "shell",
"label": "cmake",
"command": "cmake",
"args": [
".."
]
},
{
"label": "make",
"group": {
"kind": "build",
"isDefault": true
},
"command": "make",
"args": ["nginx_nacos"]
},
{
"label": "cmake build",
"dependsOn": [
"mkdir",
"cmake",
"make"
]
}
]
"version": "2.0.0",
"options": {
"cwd": "${workspaceFolder}/cmake-build-debug"
},
"tasks": [
{
"label": "cmake",
"command": "cmake",
"args": [
"-DCMAKE_BUILD_TYPE=Debug",
".."
]
},
{
"label": "make",
"command": "make",
"args": [
"-j",
"4"
]
},
{
"label": "cmake build",
"dependsOn": [
"cmake",
"make"
]
},
{
"type": "process",
"command": "/usr/bin/env",
"args": [
"CC=clang",
"CFLAGS=-g -Wall",
"make",
"${fileBasenameNoExtension}"
],
"options": {
"cwd": "${fileDirname}"
},
"group": {
"kind": "build",
"isDefault": true
},
"label": "makelldb: selected file"
}
]
}

View File

@ -18,7 +18,7 @@ patch -p1 < ../patch/nginx.patch
--with-http_ssl_module \
--add-module=../modules/auxiliary \
--add-module=../modules/nacos \
--prefix=.. \
--prefix= \
--conf-path=conf/my.conf \
--error-log-path=objs/logs/error.log \
--pid-path=objs/logs/nginx.pid \

View File

@ -14,13 +14,13 @@ events {
}
nacos {
server_list localhost:8848; # nacos 服务器列表,空格隔开
grpc_server_list localhost:9848; # nacos grpc服务器列表空格隔开
server_list 127.0.0.1:8848; # nacos 服务器列表,空格隔开
grpc_server_list 127.0.0.1:9848; # nacos grpc服务器列表空格隔开
#udp_port 19999; #udp 端口号
#udp_ip 127.0.0.1; #udp ip 地址。
#udp_bind 0.0.0.0:19999; # 绑定udp 地址
# username "nacos";
# password "nacos";
username "nacos";
password "nacos";
error_log objs/logs/nacos.log info;
default_group DEFAULT_GROUP; # 默认的nacos group name
cache_dir objs/nacos/;
@ -43,23 +43,17 @@ http {
nacos_use_cluster DEFAULT;
keepalive 300;
}
nacos_config_var $n_var data_id=ccccdddddd group=DEFAULT_GROUP md5_var=$dd default=123456;
nacos_config_var $n_var data_id=tt.server.route.json group=stg1 md5_var=$dd default=123456;
server {
listen 9999 default_server;
proxy_set_header Connection "";
proxy_http_version 1.1;
location ^~ / {
add_header X-Var-Nacos "$n_var" always;
proxy_pass http://s;
}
nacos_config_var $n_bb data_id=aaabbbbccc;
location ^~ /echo {
nacos_config_var $n_var data_id=ccdd;
add_header X-Var-Nacos "$n_var";
return 200 "hear ....n_var: $n_var ... n_bb: $n_bb ....dd: $dd";
return 200 "hear ....n_var: $n_var ... ....dd: $dd";
}
}
}

View File

@ -182,6 +182,7 @@ struct ngx_nacos_grpc_stream_s {
unsigned long_live : 1;
unsigned send_buf_block : 1;
unsigned send_buf_block_conn : 1;
unsigned store_proto_size_buf : 1;
};
#define NGX_NACOS_GRPC_DEFAULT_GRPC_STATUS 10000
@ -216,16 +217,18 @@ static ngx_int_t ngx_nacos_grpc_send_buf(ngx_nacos_grpc_buf_t *buf,
static ngx_int_t ngx_nacos_grpc_do_send(ngx_nacos_grpc_stream_t *st);
#define READ_BUF_CAP 65536
// read buf. 128K. max frame
#define READ_BUF_CAP (1 << 17)
#define MAX_FRAME_SIZE (READ_BUF_CAP - 9)
static u_char ngx_nacos_grpc_connection_start[] =
"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" /* connection preface */
"\x00\x00\x12\x04\x00\x00\x00\x00\x00" /* settings frame */
"\x00\x00\x18\x04\x00\x00\x00\x00\x00" /* settings frame */
"\x00\x01\x00\x00\x00\x00" /* header table size */
"\x00\x02\x00\x00\x00\x00" /* disable push */
"\x00\x04\x7f\xff\xff\xff" /* initial window */
"\x00\x05\x00\x01\xff\xf7" /* max frame size 128K - 9 */
"\x00\x00\x04\x08\x00\x00\x00\x00\x00" /* window update frame */
"\x7f\xff\x00\x00";
@ -544,8 +547,14 @@ static void ngx_nacos_grpc_event_handler(ngx_event_t *ev) {
gc = c->data;
if (ev == c->read) {
rc = ngx_nacos_grpc_read_handler(gc, ev);
if (rc == NGX_AGAIN && ngx_handle_read_event(ev, 0) != NGX_OK) {
rc = NGX_ERROR;
}
} else {
rc = ngx_nacos_grpc_write_handler(gc, ev);
if (rc == NGX_AGAIN && ngx_handle_write_event(ev, 0) != NGX_OK) {
rc = NGX_ERROR;
}
}
if (rc == NGX_ERROR || rc == NGX_DONE) {
@ -654,7 +663,7 @@ static ngx_int_t ngx_nacos_grpc_read_handler(ngx_nacos_grpc_conn_t *gc,
} else if (rc == 0) {
return NGX_DONE;
} else if (rc == NGX_AGAIN) {
return NGX_OK;
return NGX_AGAIN;
}
}
}
@ -672,7 +681,7 @@ static ngx_int_t ngx_nacos_grpc_parse_frame(ngx_nacos_grpc_conn_t *gc) {
if (gc->parse_stat == parse_frame_header) {
if (len < 9) {
return NGX_AGAIN;
break;
}
gc->frame_size =
(((size_t) b->pos[0]) << 16) | (b->pos[1] << 8) | (b->pos[2]);
@ -684,43 +693,55 @@ static ngx_int_t ngx_nacos_grpc_parse_frame(ngx_nacos_grpc_conn_t *gc) {
gc->parse_stat = parse_frame_payload;
b->pos += 9;
gc->frame_start = 1;
gc->frame_end = len - 9 >= gc->frame_size ? 1 : 0;
len -= 9;
if (gc->frame_type >
sizeof(frame_handlers) / sizeof(ngx_nacos_grpc_frame_handler)) {
ngx_log_error(NGX_LOG_ERR, gc->conn->log, 0,
"nacos http2 protocol error. error frame type");
return NGX_ERROR;
}
if (gc->frame_size > MAX_FRAME_SIZE) {
ngx_log_error(
NGX_LOG_ERR, gc->conn->log, 0,
"nacos http2 protocol error. exceed max frame size");
return NGX_ERROR;
}
}
if (gc->parse_stat == parse_frame_payload) {
if (gc->frame_type >
sizeof(frame_handlers) / sizeof(ngx_nacos_grpc_frame_handler)) {
return NGX_ERROR;
gc->frame_end = len >= gc->frame_size ? 1 : 0;
if (!gc->frame_end) {
break;
}
pp = b->pos;
lp = b->last;
if ((size_t) (lp - pp) > gc->frame_size) {
b->last = pp + gc->frame_size;
if ((size_t) (lp - b->pos) > gc->frame_size) {
b->last = b->pos + gc->frame_size;
}
pp = b->last;
rc = frame_handlers[gc->frame_type](gc);
if (b->pos == b->last) {
gc->parse_stat = parse_frame_header;
} else if (b->pos != pp) {
gc->frame_start = 0;
}
b->pos = pp;
b->last = lp;
if (rc == NGX_ERROR || rc == NGX_DONE) {
return rc;
} else if (rc == NGX_AGAIN) {
len = b->last - b->pos;
if (len > 0 && len * 4 < (size_t) (b->end - b->start) &&
(size_t) (b->end - b->pos) * 2 <
(size_t) (b->end - b->start)) {
ngx_memcpy(b->start, b->pos, len);
b->pos = b->start;
b->last = b->pos + len;
} else if (len == 0) {
b->pos = b->last = b->start;
}
if (rc != NGX_OK) {
return rc;
}
gc->parse_stat = parse_frame_header;
}
}
len = b->last - b->pos;
if (len == 0) {
b->pos = b->last = b->start;
} else if (len > 0 && len * 4 < (size_t) (b->end - b->start) &&
(size_t) (b->end - b->pos) * 2 < (size_t) (b->end - b->start)) {
ngx_memcpy(b->start, b->pos, len);
b->pos = b->start;
b->last = b->pos + len;
}
return NGX_AGAIN;
}
static ngx_nacos_grpc_stream_t *ngx_nacos_grpc_create_stream(
@ -977,7 +998,7 @@ static ngx_int_t ngx_nacos_grpc_parse_unknown_frame(ngx_nacos_grpc_conn_t *gc) {
static ngx_int_t ngx_nacos_grpc_parse_data_frame(ngx_nacos_grpc_conn_t *gc) {
ngx_nacos_grpc_stream_t *st;
ngx_buf_t *buf, *tb;
u_char *p;
u_char *p, *t;
ngx_int_t rc;
size_t len, msg_size;
ngx_str_t proto_msg;
@ -1008,6 +1029,11 @@ static ngx_int_t ngx_nacos_grpc_parse_data_frame(ngx_nacos_grpc_conn_t *gc) {
p = buf->pos;
len = buf->last - p;
if (len == 0) {
rc = NGX_OK;
break;
}
if (gc->frame_start) {
if (gc->frame_flags & HTTP_V2_PADDED_FLAG) {
if (len < 1) {
@ -1023,11 +1049,48 @@ static ngx_int_t ngx_nacos_grpc_parse_data_frame(ngx_nacos_grpc_conn_t *gc) {
}
if (st->parsing_state == parsing_prefix) {
if (len < 5) {
if (st->store_proto_size_buf) {
tb = st->tmp_buf;
if (len > 5 - (size_t) (tb->last - tb->pos)) {
msg_size = 5 - (tb->last - tb->pos);
} else {
msg_size = len;
}
memcpy(tb->last, p, msg_size);
tb->last += msg_size;
p += msg_size;
buf->pos = p;
if (tb->last - tb->pos < 5) {
rc = NGX_OK;
break;
}
} else if (len < 5) {
if (ngx_nacos_grpc_realloc_tmp_buf(st, 256) != NGX_OK) {
rc = NGX_ERROR;
break;
}
tb = st->tmp_buf;
memcpy(tb->last, p, len);
tb->last += len;
p += len;
buf->pos = p;
rc = NGX_OK;
st->store_proto_size_buf = 1;
break;
}
if (p[0] != 0) {
if (st->store_proto_size_buf) {
t = tb->pos;
tb->pos = tb->last = tb->start;
} else {
t = p;
p += 5;
len -= 5;
buf->pos = p;
}
if (t[0] != 0) {
ngx_log_error(NGX_LOG_ERR, gc->conn->log, 0,
"nacos server sent data frame "
"send compressed msg: %uz",
@ -1035,15 +1098,13 @@ static ngx_int_t ngx_nacos_grpc_parse_data_frame(ngx_nacos_grpc_conn_t *gc) {
rc = NGX_ERROR;
break;
}
msg_size = (p[1] << 24) | (p[2] << 16) | (p[3] << 8) | p[4];
msg_size = (t[1] << 24) | (t[2] << 16) | (t[3] << 8) | t[4];
if (ngx_nacos_grpc_realloc_tmp_buf(st, msg_size) != NGX_OK) {
rc = NGX_ERROR;
break;
}
tb = st->tmp_buf;
p += 5;
len -= 5;
buf->pos = p;
st->parsing_state = parsing_msg;
st->proto_len = msg_size;
st->recv_win -= 5;
@ -1063,6 +1124,9 @@ static ngx_int_t ngx_nacos_grpc_parse_data_frame(ngx_nacos_grpc_conn_t *gc) {
st->padding = 0;
}
len = tb->last - tb->pos;
if (st->proto_len > 300000) {
st->padding = 0;
}
if (len == st->proto_len) {
proto_msg.len = len;
proto_msg.data = tb->pos;
@ -1223,8 +1287,7 @@ static ngx_int_t ngx_nacos_grpc_parse_header_frame(ngx_nacos_grpc_conn_t *gc) {
b->pos = p;
len = b->last - p;
gc->frame_size -= min;
gc->frame_flags &=
~(HTTP_V2_PADDED_FLAG | HTTP_V2_PRIORITY_FLAG);
gc->frame_flags &= ~(HTTP_V2_PADDED_FLAG | HTTP_V2_PRIORITY_FLAG);
st->parsing_state = p_receiving;
}
}
@ -1372,7 +1435,8 @@ parse_header:
ch = 0;
tp = tmp;
if (ngx_nacos_http_v2_huff_decode(&ch, p, field_len, &tp, 1,
gc->conn->log) != NGX_OK) {
gc->conn->log) !=
NGX_OK) {
ngx_log_error(
NGX_LOG_ERR, gc->conn->log, 0,
"nacos server sent invalid encoded header");
@ -1421,7 +1485,8 @@ parse_header:
ch = 0;
tp = tmp;
if (ngx_nacos_http_v2_huff_decode(&ch, p, field_len, &tp, 1,
gc->conn->log) != NGX_OK) {
gc->conn->log) !=
NGX_OK) {
ngx_log_error(
NGX_LOG_ERR, gc->conn->log, 0,
"nacos server sent invalid encoded header");
@ -1634,8 +1699,8 @@ static ngx_int_t ngx_nacos_grpc_parse_ping_frame(ngx_nacos_grpc_conn_t *gc) {
if (buf == NULL) {
return NGX_ERROR;
}
ngx_nacos_grpc_encode_frame_header(
gc->m_stream, buf->b, HTTP_V2_PING_FRAME, HTTP_V2_ACK_FLAG, 8);
ngx_nacos_grpc_encode_frame_header(gc->m_stream, buf->b, HTTP_V2_PING_FRAME,
HTTP_V2_ACK_FLAG, 8);
buf->len = 9 + 8;
p = buf->b + 9;
p[0] = (data >> 56) & 0xFF;
@ -1845,26 +1910,27 @@ static ngx_nacos_grpc_buf_t *ngx_nacos_grpc_encode_request(
// AUTHORITY
*b++ = ngx_nacos_http_v2_inc_indexed(HTTP_V2_AUTHORITY_INDEX);
b = ngx_nacos_http_v2_write_value(b, (u_char *) "nacos-server",
sizeof("nacos-server") - 1, tmp);
sizeof("nacos-server") - 1, tmp);
// user-agent
*b++ = ngx_nacos_http_v2_inc_indexed(HTTP_V2_USER_AGENT_INDEX);
b = ngx_nacos_http_v2_write_value(b, (u_char *) "nginx-nacos-grpc-client",
sizeof("nginx-nacos-grpc-client") - 1, tmp);
sizeof("nginx-nacos-grpc-client") - 1,
tmp);
// content-type
*b++ = ngx_nacos_http_v2_inc_indexed(HTTP_V2_CONTENT_TYPE_INDEX);
b = ngx_nacos_http_v2_write_value(b, (u_char *) "application/grpc",
sizeof("application/grpc") - 1, tmp);
sizeof("application/grpc") - 1, tmp);
// te: trailers
*b++ = 0;
b = ngx_nacos_http_v2_write_name(b, (u_char *) "te", sizeof("te") - 1, tmp);
b = ngx_nacos_http_v2_write_value(b, (u_char *) "trailers",
sizeof("trailers") - 1, tmp);
sizeof("trailers") - 1, tmp);
// grpc-accept-encoding: identity
*b++ = 0;
b = ngx_nacos_http_v2_write_name(b, (u_char *) "grpc-accept-encoding",
sizeof("grpc-accept-encoding") - 1, tmp);
sizeof("grpc-accept-encoding") - 1, tmp);
b = ngx_nacos_http_v2_write_value(b, (u_char *) "identity",
sizeof("identity") - 1, tmp);
sizeof("identity") - 1, tmp);
buf->len = len = b - buf->b;
ngx_nacos_grpc_encode_frame_header(st, buf->b, HTTP_V2_HEADERS_FRAME,
HTTP_V2_END_HEADERS_FLAG, len - 9);
@ -2294,9 +2360,9 @@ static ngx_nacos_grpc_buf_t *ngx_nacos_grpc_encode_data_msg(
goto err;
}
b = buf->b;
ngx_nacos_grpc_encode_frame_header(
st, b, HTTP_V2_DATA_FRAME,
end_stream ? HTTP_V2_END_STREAM_FLAG : 0, 5 + b_len);
ngx_nacos_grpc_encode_frame_header(st, b, HTTP_V2_DATA_FRAME,
end_stream ? HTTP_V2_END_STREAM_FLAG : 0,
5 + b_len);
b[9] = 0;
b[10] = (b_len >> 24) & 0xFF;
b[11] = (b_len >> 16) & 0xFF;
@ -2470,8 +2536,8 @@ static ngx_int_t ngx_nacos_grpc_send_win_update_frame(
if (buf == NULL) {
return NGX_ERROR;
}
ngx_nacos_grpc_encode_frame_header(st, buf->b,
HTTP_V2_WINDOW_UPDATE_FRAME, 0, 4);
ngx_nacos_grpc_encode_frame_header(st, buf->b, HTTP_V2_WINDOW_UPDATE_FRAME,
0, 4);
buf->len = 9 + 4;
p = buf->b + 9;
p[0] = (win_update >> 24) & 0x7F;
@ -2494,8 +2560,8 @@ static ngx_int_t ngx_nacos_send_ping_frame(ngx_nacos_grpc_conn_t *gc) {
if (buf == NULL) {
return NGX_ERROR;
}
ngx_nacos_grpc_encode_frame_header(gc->m_stream, buf->b,
HTTP_V2_PING_FRAME, 0, 8);
ngx_nacos_grpc_encode_frame_header(gc->m_stream, buf->b, HTTP_V2_PING_FRAME,
0, 8);
p = buf->b + 9;
data = ++gc->heartbeat;
p[0] = (data >> 56) & 0xFF;