Compare commits

...

7 Commits

Author SHA1 Message Date
zhwaaaaaa 378861e9c9 fix big message cause cpu 100% bug. #18 2025-04-25 20:34:27 +08:00
zhwaaaaaa 4542817ed2 nanopb upgrade 0.4.9. cluster script specified. weight. max_fails fails_timeout 2024-12-14 19:53:06 +08:00
zhwaaaaaa db782d97eb fix openresty patch dir. update readme. 2024-12-10 22:54:16 +08:00
zhwaaaaaa a08a727ccc memzero server 0 2024-12-09 22:59:00 +08:00
zhwaaaaaa 6e3eb9aece openresty readme 2024-12-07 15:36:07 +08:00
zhwaaaaaa 894574b562 openresty compile. #7 compile with openresty 2024-12-07 15:31:37 +08:00
zhwaaaaaa 794bc0fc87 update config module directory 2024-12-07 13:40:42 +08:00
29 changed files with 3128 additions and 1730 deletions

View File

@ -1,8 +1,9 @@
---
SortIncludes: Never
Language: Cpp
BasedOnStyle: Google
ConstructorInitializerIndentWidth: 4
ContinuationIndentWidth: 4
IndentWidth: 4
SpaceAfterCStyleCast: true
SpaceAfterCStyleCast: true

36
.vscode/launch.json vendored
View File

@ -1,21 +1,17 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "(lldb) Launch",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/cmake-build-debug/nginx_nacos",
"args": [],
"stopAtEntry": false,
"cwd": "${workspaceFolder}/cmake-build-debug",
"environment": [],
"externalConsole": false,
"MIMode": "lldb",
"preLaunchTask": "cmake build"
}
]
}
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "lldb",
"request": "launch",
"name": "Launch",
"program": "${workspaceRoot}/cmake-build-debug/nginx_nacos",
"args": [],
"cwd": "${workspaceRoot}",
"preLaunchTask": "cmake build"
}
]
}

78
.vscode/tasks.json vendored
View File

@ -1,33 +1,49 @@
{
"version": "2.0.0",
"options": {
"cwd": "${workspaceFolder}/cmake-build-debug"
},
"tasks": [
{
"type": "shell",
"label": "cmake",
"command": "cmake",
"args": [
".."
]
},
{
"label": "make",
"group": {
"kind": "build",
"isDefault": true
},
"command": "make",
"args": ["nginx_nacos"]
},
{
"label": "cmake build",
"dependsOn": [
"mkdir",
"cmake",
"make"
]
}
]
"version": "2.0.0",
"options": {
"cwd": "${workspaceFolder}/cmake-build-debug"
},
"tasks": [
{
"label": "cmake",
"command": "cmake",
"args": [
"-DCMAKE_BUILD_TYPE=Debug",
".."
]
},
{
"label": "make",
"command": "make",
"args": [
"-j",
"4"
]
},
{
"label": "cmake build",
"dependsOn": [
"cmake",
"make"
]
},
{
"type": "process",
"command": "/usr/bin/env",
"args": [
"CC=clang",
"CFLAGS=-g -Wall",
"make",
"${fileBasenameNoExtension}"
],
"options": {
"cwd": "${fileDirname}"
},
"group": {
"kind": "build",
"isDefault": true
},
"label": "makelldb: selected file"
}
]
}

View File

@ -70,6 +70,10 @@ set(NGX_C_FILES nginx/src/core/nginx.c
nginx/src/os/unix/ngx_user.c
nginx/src/os/unix/ngx_dlopen.c
nginx/src/os/unix/ngx_process_cycle.c
nginx/src/os/unix/ngx_linux_init.c
nginx/src/event/modules/ngx_epoll_module.c
nginx/src/os/unix/ngx_linux_sendfile_chain.c
nginx/src/core/ngx_bpf.c
nginx/src/event/ngx_event_openssl.c
nginx/src/event/ngx_event_openssl_stapling.c
nginx/src/core/ngx_regex.c
@ -85,6 +89,8 @@ set(NGX_C_FILES nginx/src/core/nginx.c
nginx/src/http/ngx_http_upstream.c
nginx/src/http/ngx_http_upstream_round_robin.c
nginx/src/http/ngx_http_file_cache.c
nginx/src/http/ngx_http_huff_decode.c
nginx/src/http/ngx_http_huff_encode.c
nginx/src/http/ngx_http_write_filter_module.c
nginx/src/http/ngx_http_header_filter_module.c
nginx/src/http/modules/ngx_http_chunked_filter_module.c
@ -101,8 +107,6 @@ set(NGX_C_FILES nginx/src/core/nginx.c
nginx/src/http/v2/ngx_http_v2.c
nginx/src/http/v2/ngx_http_v2_table.c
nginx/src/http/v2/ngx_http_v2_encode.c
nginx/src/http/v2/ngx_http_v2_huff_decode.c
nginx/src/http/v2/ngx_http_v2_huff_encode.c
nginx/src/http/v2/ngx_http_v2_module.c
nginx/src/http/modules/ngx_http_static_module.c
nginx/src/http/modules/ngx_http_autoindex_module.c
@ -133,7 +137,7 @@ set(NGX_C_FILES nginx/src/core/nginx.c
nginx/src/http/modules/ngx_http_upstream_random_module.c
nginx/src/http/modules/ngx_http_upstream_keepalive_module.c
nginx/src/http/modules/ngx_http_upstream_zone_module.c
)
)
set(NGX_INC_DIR nginx/src/core
nginx/src/event

View File

@ -6,7 +6,7 @@ nginx 订阅 nacos实现 服务发现 和 配置 动态更新。 nacos 1.x
- nginx 订阅 nacos 获取配置,写入 nginx 标准变量。(配置功能,只支持 grpc 协议)。
### 配置示例
基于 NGINX 1.20.2 和 NACOS 2.x 版本开发。
基于 NACOS 2.x 版本开发。openresty is required if using lua.
```
nacos {
@ -50,15 +50,29 @@ http {
add_header X-Var-Nacos "$n_var";
return 200 "hear .... $n_var .... $dd";
}
# ===== openresty access nacos config data ===
location ^~ /echo-by-lua {
content_by_lua_block {
local md5 = ngx.var.md5_var;
local content = ngx.var.n_var;
if not md5 then
md5 = "not found md5";
end
if not content then
content = "not found content";
end
ngx.say("md5 = "..md5.." content = "..content);
}
}
}
}
```
### 编译
### nginx 编译
- 本项目支持 nginx 1.10 及以上版本,所以需要提前下载相应版本的 [nginx](https://nginx.org/download)。例如1.15.2
```bash
wget https://nginx.org/download/nginx-1.15.2.tar.gz
tar zxvf tar zxvf nginx-1.15.2.tar.gz
tar zxvf nginx-1.15.2.tar.gz
```
- 下载本项目源代码 nginx-nacos-upstream .本项目对 nginx 源代码有少量修改,所以需要 打上 patch
```bash
@ -68,13 +82,27 @@ cd nginx-1.15.2 && patch -p1 < ../nginx-nacos-upstream/patch/nginx.patch
```bash
sudo apt install build-essential libpcre3 libpcre3-dev zlib1g zlib1g-dev libssl-dev
```
./configure --add-module=../nginx-nacos-upstream/modules/auxiliary --add-module=../nginx-nacos-upstream/modules/nacos --with-http_ssl_module --with-http_v2_module && make
```
grpc 使用的是 http2 传输,所以 nacos 模块需要和 http2 模块一起安装:
### openresty 编译
- 本项目支持 openresty 1.10 及以上版本,所以需要提前下载相应版本的 [openresty](https://openresty.org/cn/download.html)。例如1.25.3.2
```bash
wget https://openresty.org/download/openresty-1.25.3.2.tar.gz
tar zxvf openresty-1.25.3.2.tar.gz
```
- 下载本项目源代码 nginx-nacos-upstream .本项目对 nginx 源代码有少量修改,所以需要 打上 patch. 注意这里使用的是openresty.patch 并且进入 bundle 下的 nginx 目录执行
```bash
cd openresty-1.25.3.2/bundle/nginx-1.25.3 && patch -p1 < ../../../nginx-nacos-upstream/patch/openresty.patch
```
- build nginx. ubuntu 下安装方式为
```bash
cp -r ../nginx-nacos-upstream/modules modules
./configure --add-module=modules/auxiliary --add-module=modules/nacos --with-http_ssl_module --with-http_v2_module && make
cd ../..
sudo apt install build-essential libpcre3 libpcre3-dev zlib1g zlib1g-dev libssl-dev
./configure --add-module=../nginx-nacos-upstream/modules/auxiliary --add-module=../nginx-nacos-upstream/modules/nacos
make
```
### 原理
@ -146,9 +174,9 @@ service_namespace "public";
```
#### cache_dir
nacos 的文件 缓存目录,下次启动 会优先从 这个目录读取数据,加快启动时间,否则从 http 地址拉取。"/"结尾
nacos 的文件 缓存目录,下次启动 会优先从 这个目录读取数据,加快启动时间,否则从 http 地址拉取。保证该目录 nobody 用户可读写
```
cache_dir nacos_cache/;
cache_dir nacos_cache;
```
@ -198,10 +226,23 @@ upstream backend {
# server 127.0.0.1:8080;
# 如果provider使用的springservice_name 要和 spring.application.name一致
# 不知道 provider 端怎么写请参考 https://github.com/zhwaaaaaa/springmvc-nacos-registry
nacos_subscribe_service service_name=springmvc-nacos-demo group=DEFAULT_GROUP;
# weight * nacos_weight 是 nginx 的权重默认1 max_fails=1 fail_timeout 对应 nginx 的server 配置
nacos_subscribe_service service_name=springmvc-nacos-demo group=DEFAULT_GROUP weight=1 max_fails=1 fail_timeout=10s;
}
```
### nacos_use_cluster
如果指定,则对应使用 cluster ip. 支持变量和字面量组合. 不指定,则使用所有集群 ip
```
set $cluster "DEFAULT";
upstream backend {
nacos_subscribe_service service_name=springmvc-nacos-demo group=DEFAULT_GROUP weight=1 max_fails=1 fail_timeout=10s;
nacos_use_cluster $cluster;
}
```
### nacos_config_var
订阅 nacos 的配置nginx把它写到 http 变量中。这个配置项可以出现在 http server location if {} 块中。
```
@ -217,7 +258,7 @@ nacos 变量功能让 nginx 的灵活性大大增强了。
* nginx 通过 GRPC 协议订阅 nacos 配置。(✅)
* 发布 1.0 版本,可以基本使用。(✅)
* 删除 nginx 原有代码,对 nginx 原有代码的修改通过 patch 支持各个 nginx 版本。(✅)
* 支持集成 openresty
* 支持集成 openresty。(✅)
# License
- The project is licensed under the Apache License Version 2.0 except for yaij and pb, Copyright (c) 2022-2024, Zhwaaaaaa

View File

@ -13,12 +13,12 @@ cd nginx
patch -p1 < ../patch/nginx.patch
cp -r ../modules modules && ./configure \
./configure \
--with-http_v2_module \
--with-http_ssl_module \
--add-module=modules/auxiliary \
--add-module=modules/nacos \
--prefix=.. \
--add-module=../modules/auxiliary \
--add-module=../modules/nacos \
--prefix= \
--conf-path=conf/my.conf \
--error-log-path=objs/logs/error.log \
--pid-path=objs/logs/nginx.pid \

View File

@ -14,16 +14,16 @@ events {
}
nacos {
server_list localhost:8848; # nacos 服务器列表,空格隔开
grpc_server_list localhost:9848; # nacos grpc服务器列表空格隔开
server_list 127.0.0.1:8848; # nacos 服务器列表,空格隔开
grpc_server_list 127.0.0.1:9848; # nacos grpc服务器列表空格隔开
#udp_port 19999; #udp 端口号
#udp_ip 127.0.0.1; #udp ip 地址。
#udp_bind 0.0.0.0:19999; # 绑定udp 地址
# username "nacos";
# password "nacos";
error_log cmake-build-debug/logs/nacos.log info;
username "nacos";
password "nacos";
error_log objs/logs/nacos.log info;
default_group DEFAULT_GROUP; # 默认的nacos group name
cache_dir cmake-build-debug/nacos/;
cache_dir objs/nacos/;
}
http {
include mime.types;
@ -39,26 +39,21 @@ http {
#gzip on;
upstream s {
nacos_subscribe_service service_name=springmvc-nacos-demo; # data_id 要和 spring.application.name一致
nacos_subscribe_service service_name=springmvc-nacos-demo weight=100 max_fails=1 fail_timeout=10s; # service_name 要和 spring.application.name一致
nacos_use_cluster DEFAULT;
keepalive 300;
}
nacos_config_var $n_var data_id=ccccdddddd group=DEFAULT_GROUP md5_var=$dd default=123456;
nacos_config_var $n_var data_id=tt.server.route.json group=stg1 md5_var=$dd default=123456;
server {
listen 9999 default_server;
proxy_set_header Connection "";
proxy_http_version 1.1;
location ^~ / {
add_header X-Var-Nacos "$n_var" always;
proxy_pass http://s;
}
nacos_config_var $n_bb data_id=aaabbbbccc;
location ^~ /echo {
nacos_config_var $n_var data_id=ccdd;
add_header X-Var-Nacos "$n_var";
return 200 "hear ....n_var: $n_var ... n_bb: $n_bb ....dd: $dd";
return 200 "hear ....n_var: $n_var ... ....dd: $dd";
}
}
}

10
generate_proto.sh Normal file
View File

@ -0,0 +1,10 @@
#!/bin/bash
set -e
mkdir -p objs
cd objs
curl -sSL https://jpa.kapsi.fi/nanopb/download/nanopb-0.4.9-linux-x86.tar.gz -o nanopb.tar.gz
tar zxvf nanopb.tar.gz
mv nanopb-0.4.9-linux-x86 nanopb
python3 nanopb/generator/nanopb_generator.py backup.proto nacos_grpc_service.proto -I ../modules/nacos

View File

@ -1,8 +1,8 @@
ngx_module_type=AUXILIARY
ngx_module_name=ngx_auxiliary_module
ngx_module_incs=modules/auxiliary
ngx_module_deps=modules/auxiliary/ngx_auxiliary_module.h
ngx_module_srcs=modules/auxiliary/ngx_auxiliary_module.c
ngx_module_incs=$ngx_addon_dir
ngx_module_deps=$ngx_addon_dir/ngx_auxiliary_module.h
ngx_module_srcs=$ngx_addon_dir/ngx_auxiliary_module.c
ngx_module_libs=
ngx_module_order=
ngx_module_link=ADDON

View File

@ -1,34 +1,18 @@
/* Automatically generated nanopb constant definitions */
/* Generated by nanopb-0.3.9.8 at Fri Aug 9 21:47:15 2024. */
/* Generated by nanopb-0.4.9 */
#include "backup.pb.h"
/* @@protoc_insertion_point(includes) */
#if PB_PROTO_HEADER_VERSION != 30
#if PB_PROTO_HEADER_VERSION != 40
#error Regenerate this file with the current version of nanopb generator.
#endif
PB_BIND(Config, Config, AUTO)
const pb_field_t Config_fields[4] = {
PB_FIELD( 1, UINT64 , SINGULAR, STATIC , FIRST, Config, version, version, 0),
PB_FIELD( 2, STRING , SINGULAR, CALLBACK, OTHER, Config, content, version, 0),
PB_FIELD( 3, STRING , SINGULAR, CALLBACK, OTHER, Config, md5, content, 0),
PB_LAST_FIELD
};
const pb_field_t Instance_fields[4] = {
PB_FIELD( 1, STRING , SINGULAR, CALLBACK, FIRST, Instance, host, host, 0),
PB_FIELD( 2, INT32 , SINGULAR, STATIC , OTHER, Instance, port, host, 0),
PB_FIELD( 3, INT32 , SINGULAR, STATIC , OTHER, Instance, weight, port, 0),
PB_LAST_FIELD
};
const pb_field_t Service_fields[3] = {
PB_FIELD( 1, UINT64 , SINGULAR, STATIC , FIRST, Service, version, version, 0),
PB_FIELD( 2, MESSAGE , REPEATED, CALLBACK, OTHER, Service, instances, version, &Instance_fields),
PB_LAST_FIELD
};
PB_BIND(Instance, Instance, AUTO)
PB_BIND(Service, Service, AUTO)
/* @@protoc_insertion_point(eof) */

View File

@ -1,48 +1,44 @@
/* Automatically generated nanopb header */
/* Generated by nanopb-0.3.9.8 at Fri Aug 9 21:47:15 2024. */
/* Generated by nanopb-0.4.9 */
#ifndef PB_BACKUP_PB_H_INCLUDED
#define PB_BACKUP_PB_H_INCLUDED
#include <pb/pb.h>
/* @@protoc_insertion_point(includes) */
#if PB_PROTO_HEADER_VERSION != 30
#if PB_PROTO_HEADER_VERSION != 40
#error Regenerate this file with the current version of nanopb generator.
#endif
#ifdef __cplusplus
extern "C" {
#endif
/* Struct definitions */
typedef struct _Config {
uint64_t version;
pb_callback_t content;
pb_callback_t md5;
/* @@protoc_insertion_point(struct:Config) */
} Config;
typedef struct _Instance {
pb_callback_t host;
int32_t port;
int32_t weight;
/* @@protoc_insertion_point(struct:Instance) */
pb_callback_t cluster;
} Instance;
typedef struct _Service {
uint64_t version;
pb_callback_t instances;
/* @@protoc_insertion_point(struct:Service) */
} Service;
/* Default values for struct fields */
#ifdef __cplusplus
extern "C" {
#endif
/* Initializer values for message structs */
#define Config_init_default {0, {{NULL}, NULL}, {{NULL}, NULL}}
#define Instance_init_default {{{NULL}, NULL}, 0, 0}
#define Instance_init_default {{{NULL}, NULL}, 0, 0, {{NULL}, NULL}}
#define Service_init_default {0, {{NULL}, NULL}}
#define Config_init_zero {0, {{NULL}, NULL}, {{NULL}, NULL}}
#define Instance_init_zero {{{NULL}, NULL}, 0, 0}
#define Instance_init_zero {{{NULL}, NULL}, 0, 0, {{NULL}, NULL}}
#define Service_init_zero {0, {{NULL}, NULL}}
/* Field tags (for use in manual encoding/decoding) */
@ -52,30 +48,49 @@ typedef struct _Service {
#define Instance_host_tag 1
#define Instance_port_tag 2
#define Instance_weight_tag 3
#define Instance_cluster_tag 4
#define Service_version_tag 1
#define Service_instances_tag 2
/* Struct field encoding specification for nanopb */
extern const pb_field_t Config_fields[4];
extern const pb_field_t Instance_fields[4];
extern const pb_field_t Service_fields[3];
#define Config_FIELDLIST(X, a) \
X(a, STATIC, SINGULAR, UINT64, version, 1) \
X(a, CALLBACK, SINGULAR, STRING, content, 2) \
X(a, CALLBACK, SINGULAR, STRING, md5, 3)
#define Config_CALLBACK pb_default_field_callback
#define Config_DEFAULT NULL
#define Instance_FIELDLIST(X, a) \
X(a, CALLBACK, SINGULAR, STRING, host, 1) \
X(a, STATIC, SINGULAR, INT32, port, 2) \
X(a, STATIC, SINGULAR, INT32, weight, 3) \
X(a, CALLBACK, SINGULAR, STRING, cluster, 4)
#define Instance_CALLBACK pb_default_field_callback
#define Instance_DEFAULT NULL
#define Service_FIELDLIST(X, a) \
X(a, STATIC, SINGULAR, UINT64, version, 1) \
X(a, CALLBACK, REPEATED, MESSAGE, instances, 2)
#define Service_CALLBACK pb_default_field_callback
#define Service_DEFAULT NULL
#define Service_instances_MSGTYPE Instance
extern const pb_msgdesc_t Config_msg;
extern const pb_msgdesc_t Instance_msg;
extern const pb_msgdesc_t Service_msg;
/* Defines for backwards compatibility with code written before nanopb-0.4.0 */
#define Config_fields &Config_msg
#define Instance_fields &Instance_msg
#define Service_fields &Service_msg
/* Maximum encoded size of messages (where known) */
/* Config_size depends on runtime parameters */
/* Instance_size depends on runtime parameters */
/* Service_size depends on runtime parameters */
/* Message IDs (where set with "msgid" option) */
#ifdef PB_MSGID
#define BACKUP_MESSAGES \
#endif
#ifdef __cplusplus
} /* extern "C" */
#endif
/* @@protoc_insertion_point(eof) */
#endif

View File

@ -10,6 +10,7 @@ message Instance {
string host = 1;
int32 port = 2;
int32 weight = 3;
string cluster = 4;
}
message Service {

View File

@ -1,30 +1,30 @@
ngx_module_type=AUXILIARY
ngx_module_name=ngx_nacos_module
ngx_module_incs=modules/nacos
ngx_module_deps=modules/nacos/ngx_nacos_aux.h
ngx_module_srcs="modules/nacos/yaij/yajl.c \
modules/nacos/yaij/yajl_alloc.c \
modules/nacos/yaij/yajl_buf.c \
modules/nacos/yaij/yajl_encode.c \
modules/nacos/yaij/yajl_gen.c \
modules/nacos/yaij/yajl_lex.c \
modules/nacos/yaij/yajl_parser.c \
modules/nacos/yaij/yajl_tree.c \
modules/nacos/yaij/yajl_version.c \
modules/nacos/pb/pb_common.c \
modules/nacos/pb/pb_decode.c \
modules/nacos/pb/pb_encode.c \
modules/nacos/ngx_nacos_module.c \
modules/nacos/ngx_nacos_aux.c \
modules/nacos/ngx_nacos_http_parse.c \
modules/nacos/ngx_nacos_data.c \
modules/nacos/ngx_nacos_udp.c \
modules/nacos/ngx_nacos_grpc.c \
modules/nacos/backup.pb.c \
modules/nacos/ngx_nacos_http_v2.c \
modules/nacos/ngx_nacos_http_v2_huff_decode.c \
modules/nacos/ngx_nacos_http_v2_huff_encode.c \
modules/nacos/nacos_grpc_service.pb.c"
ngx_module_incs=$ngx_addon_dir
ngx_module_deps=$ngx_addon_dir/ngx_nacos_aux.h
ngx_module_srcs="$ngx_addon_dir/yaij/yajl.c \
$ngx_addon_dir/yaij/yajl_alloc.c \
$ngx_addon_dir/yaij/yajl_buf.c \
$ngx_addon_dir/yaij/yajl_encode.c \
$ngx_addon_dir/yaij/yajl_gen.c \
$ngx_addon_dir/yaij/yajl_lex.c \
$ngx_addon_dir/yaij/yajl_parser.c \
$ngx_addon_dir/yaij/yajl_tree.c \
$ngx_addon_dir/yaij/yajl_version.c \
$ngx_addon_dir/pb/pb_common.c \
$ngx_addon_dir/pb/pb_decode.c \
$ngx_addon_dir/pb/pb_encode.c \
$ngx_addon_dir/ngx_nacos_module.c \
$ngx_addon_dir/ngx_nacos_aux.c \
$ngx_addon_dir/ngx_nacos_http_parse.c \
$ngx_addon_dir/ngx_nacos_data.c \
$ngx_addon_dir/ngx_nacos_udp.c \
$ngx_addon_dir/ngx_nacos_grpc.c \
$ngx_addon_dir/backup.pb.c \
$ngx_addon_dir/ngx_nacos_http_v2.c \
$ngx_addon_dir/ngx_nacos_http_v2_huff_decode.c \
$ngx_addon_dir/ngx_nacos_http_v2_huff_encode.c \
$ngx_addon_dir/nacos_grpc_service.pb.c"
ngx_module_libs=
ngx_module_order=
@ -38,7 +38,7 @@ ngx_module_type=HTTP
ngx_module_name=ngx_http_nacos_upstream_module
ngx_module_incs=
ngx_module_deps=
ngx_module_srcs=modules/nacos/ngx_http_nacos_upstream_module.c
ngx_module_srcs=$ngx_addon_dir/ngx_http_nacos_upstream_module.c
ngx_module_libs=
ngx_module_order=
ngx_module_link=ADDON
@ -51,7 +51,7 @@ ngx_module_type=HTTP
ngx_module_name=ngx_http_nacos_config_module
ngx_module_incs=
ngx_module_deps=
ngx_module_srcs=modules/nacos/ngx_http_nacos_config_module.c
ngx_module_srcs=$ngx_addon_dir/ngx_http_nacos_config_module.c
ngx_module_libs=
ngx_module_order=
ngx_module_link=ADDON

View File

@ -1,73 +1,21 @@
/* Automatically generated nanopb constant definitions */
/* Generated by nanopb-0.3.9.10 at Mon Jun 3 14:49:02 2024. */
/* Generated by nanopb-0.4.9 */
#include "nacos_grpc_service.pb.h"
/* @@protoc_insertion_point(includes) */
#if PB_PROTO_HEADER_VERSION != 30
#if PB_PROTO_HEADER_VERSION != 40
#error Regenerate this file with the current version of nanopb generator.
#endif
/**
* #define PB_FIELD(tag, type, rules, allocation, placement, message, field, prevfield, ptr) \
PB_ ## rules ## _ ## allocation(tag, message, field, \
PB_DATAOFFSET_ ## placement(message, field, prevfield), \
PB_LTYPE_MAP_ ## type, ptr)
*/
PB_BIND(Any, Any, AUTO)
const pb_field_t google_protobuf_Any_fields[3] = {
PB_FIELD( 1, STRING , SINGULAR, CALLBACK, FIRST, google_protobuf_Any, type_url, type_url, 0),
PB_FIELD( 2, BYTES , SINGULAR, CALLBACK, OTHER, google_protobuf_Any, value, type_url, 0),
PB_LAST_FIELD
};
PB_BIND(Metadata, Metadata, AUTO)
const pb_field_t Metadata_HeadersEntry_fields[3] = {
PB_FIELD( 1, STRING , SINGULAR, CALLBACK, FIRST, Metadata_HeadersEntry, key, key, 0),
PB_FIELD( 2, STRING , SINGULAR, CALLBACK, OTHER, Metadata_HeadersEntry, value, key, 0),
PB_LAST_FIELD
};
const pb_field_t Metadata_fields[4] = {
PB_FIELD( 3, STRING , SINGULAR, CALLBACK, FIRST, Metadata, type, type, 0),
// PB_SINGULAR_CALLBACK(3, Metadata, type, PB_DATAOFFSET_FIRST(Metadata, type, type), PB_LTYPE_MAP_STRING, 0),
PB_FIELD( 7, MESSAGE , REPEATED, CALLBACK, OTHER, Metadata, headers, type, &Metadata_HeadersEntry_fields),
PB_FIELD( 8, STRING , SINGULAR, CALLBACK, OTHER, Metadata, clientIp, headers, 0),
PB_LAST_FIELD
};
PB_BIND(Metadata_HeadersEntry, Metadata_HeadersEntry, AUTO)
const pb_field_t Payload_fields[3] = {
PB_FIELD( 2, MESSAGE , SINGULAR, STATIC , FIRST, Payload, metadata, metadata, &Metadata_fields),
// PB_SINGULAR_STATIC(2, Payload,metadata, PB_DATAOFFSET_FIRST(Payload, metadata, metadata), PB_LTYPE_MAP_MESSAGE, &Metadata_fields),
PB_FIELD( 3, MESSAGE , SINGULAR, STATIC , OTHER, Payload, body, metadata, &google_protobuf_Any_fields),
PB_LAST_FIELD
};
PB_BIND(Payload, Payload, AUTO)
/* Check that field information fits in pb_field_t */
#if !defined(PB_FIELD_32BIT)
/* If you get an error here, it means that you need to define PB_FIELD_32BIT
* compile-time option. You can do that in pb.h or on compiler command line.
*
* The reason you need to do this is that some of your messages contain tag
* numbers or field sizes that are larger than what can fit in 8 or 16 bit
* field descriptors.
*/
PB_STATIC_ASSERT((pb_membersize(Payload, metadata) < 65536 && pb_membersize(Payload, body) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_Metadata_Metadata_HeadersEntry_Payload)
#endif
#if !defined(PB_FIELD_16BIT) && !defined(PB_FIELD_32BIT)
/* If you get an error here, it means that you need to define PB_FIELD_16BIT
* compile-time option. You can do that in pb.h or on compiler command line.
*
* The reason you need to do this is that some of your messages contain tag
* numbers or field sizes that are larger than what can fit in the default
* 8 bit descriptors.
*/
PB_STATIC_ASSERT((pb_membersize(Payload, metadata) < 256 && pb_membersize(Payload, body) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_Metadata_Metadata_HeadersEntry_Payload)
#endif
/* @@protoc_insertion_point(eof) */

View File

@ -1,106 +1,112 @@
/* Automatically generated nanopb header */
/* Generated by nanopb-0.3.9.10 at Mon Jun 3 14:49:02 2024. */
/* Generated by nanopb-0.4.9 */
#ifndef PB_NACOS_GRPC_SERVICE_PB_H_INCLUDED
#define PB_NACOS_GRPC_SERVICE_PB_H_INCLUDED
#include <pb/pb.h>
/* @@protoc_insertion_point(includes) */
#if PB_PROTO_HEADER_VERSION != 30
#if PB_PROTO_HEADER_VERSION != 40
#error Regenerate this file with the current version of nanopb generator.
#endif
/* Struct definitions */
typedef struct _Any {
pb_callback_t type_url;
pb_callback_t value;
} Any;
typedef struct _Metadata {
pb_callback_t type;
pb_callback_t headers;
pb_callback_t clientIp;
} Metadata;
typedef struct _Metadata_HeadersEntry {
pb_callback_t key;
pb_callback_t value;
} Metadata_HeadersEntry;
typedef struct _Payload {
bool has_metadata;
Metadata metadata;
bool has_body;
Any body;
} Payload;
#ifdef __cplusplus
extern "C" {
#endif
/* Struct definitions */
typedef struct google_protobuf_Any {
pb_callback_t type_url;
pb_callback_t value;
/* @@protoc_insertion_point(struct:google_protobuf_Any) */
} google_protobuf_Any;
/* Default values for struct fields */
/* Initializer values for message structs */
#define google_protobuf_Any_init_default \
{ \
{{NULL}, NULL}, { {NULL}, NULL } \
}
#define google_protobuf_Any_init_zero \
{ \
{{NULL}, NULL}, { {NULL}, NULL } \
}
/* Field tags (for use in manual encoding/decoding) */
#define google_protobuf_Any_type_url_tag 1
#define google_protobuf_Any_value_tag 2
/* Struct field encoding specification for nanopb */
extern const pb_field_t google_protobuf_Any_fields[3];
/* Struct definitions */
typedef struct Metadata {
pb_callback_t type;
pb_callback_t headers;
pb_callback_t clientIp;
/* @@protoc_insertion_point(struct:Metadata) */
} Metadata;
typedef struct Metadata_HeadersEntry {
pb_callback_t key;
pb_callback_t value;
/* @@protoc_insertion_point(struct:Metadata_HeadersEntry) */
} Metadata_HeadersEntry;
typedef struct Payload {
Metadata metadata;
google_protobuf_Any body;
/* @@protoc_insertion_point(struct:Payload) */
} Payload;
/* Default values for struct fields */
/* Initializer values for message structs */
#define Any_init_default {{{NULL}, NULL}, {{NULL}, NULL}}
#define Metadata_init_default {{{NULL}, NULL}, {{NULL}, NULL}, {{NULL}, NULL}}
#define Metadata_HeadersEntry_init_default {{{NULL}, NULL}, {{NULL}, NULL}}
#define Payload_init_default {Metadata_init_default, google_protobuf_Any_init_default}
#define Payload_init_default {false, Metadata_init_default, false, Any_init_default}
#define Any_init_zero {{{NULL}, NULL}, {{NULL}, NULL}}
#define Metadata_init_zero {{{NULL}, NULL}, {{NULL}, NULL}, {{NULL}, NULL}}
#define Metadata_HeadersEntry_init_zero {{{NULL}, NULL}, {{NULL}, NULL}}
#define Payload_init_zero {Metadata_init_zero, google_protobuf_Any_init_zero}
#define Payload_init_zero {false, Metadata_init_zero, false, Any_init_zero}
/* Field tags (for use in manual encoding/decoding) */
#define Any_type_url_tag 1
#define Any_value_tag 2
#define Metadata_type_tag 3
#define Metadata_clientIp_tag 8
#define Metadata_headers_tag 7
#define Metadata_clientIp_tag 8
#define Metadata_HeadersEntry_key_tag 1
#define Metadata_HeadersEntry_value_tag 2
#define Payload_metadata_tag 2
#define Payload_body_tag 3
/* Struct field encoding specification for nanopb */
extern const pb_field_t Metadata_fields[4];
extern const pb_field_t Metadata_HeadersEntry_fields[3];
extern const pb_field_t Payload_fields[3];
#define Any_FIELDLIST(X, a) \
X(a, CALLBACK, SINGULAR, STRING, type_url, 1) \
X(a, CALLBACK, SINGULAR, BYTES, value, 2)
#define Any_CALLBACK pb_default_field_callback
#define Any_DEFAULT NULL
#define Metadata_FIELDLIST(X, a) \
X(a, CALLBACK, SINGULAR, STRING, type, 3) \
X(a, CALLBACK, REPEATED, MESSAGE, headers, 7) \
X(a, CALLBACK, SINGULAR, STRING, clientIp, 8)
#define Metadata_CALLBACK pb_default_field_callback
#define Metadata_DEFAULT NULL
#define Metadata_headers_MSGTYPE Metadata_HeadersEntry
#define Metadata_HeadersEntry_FIELDLIST(X, a) \
X(a, CALLBACK, SINGULAR, STRING, key, 1) \
X(a, CALLBACK, SINGULAR, STRING, value, 2)
#define Metadata_HeadersEntry_CALLBACK pb_default_field_callback
#define Metadata_HeadersEntry_DEFAULT NULL
#define Payload_FIELDLIST(X, a) \
X(a, STATIC, OPTIONAL, MESSAGE, metadata, 2) \
X(a, STATIC, OPTIONAL, MESSAGE, body, 3)
#define Payload_CALLBACK NULL
#define Payload_DEFAULT NULL
#define Payload_metadata_MSGTYPE Metadata
#define Payload_body_MSGTYPE Any
extern const pb_msgdesc_t Any_msg;
extern const pb_msgdesc_t Metadata_msg;
extern const pb_msgdesc_t Metadata_HeadersEntry_msg;
extern const pb_msgdesc_t Payload_msg;
/* Defines for backwards compatibility with code written before nanopb-0.4.0 */
#define Any_fields &Any_msg
#define Metadata_fields &Metadata_msg
#define Metadata_HeadersEntry_fields &Metadata_HeadersEntry_msg
#define Payload_fields &Payload_msg
/* Maximum encoded size of messages (where known) */
/* Any_size depends on runtime parameters */
/* Metadata_size depends on runtime parameters */
/* Metadata_HeadersEntry_size depends on runtime parameters */
/* Payload_size depends on runtime parameters */
/* Message IDs (where set with "msgid" option) */
#ifdef PB_MSGID
#define NACOS_GRPC_SERVICE_MESSAGES \
#endif
#ifdef __cplusplus
} /* extern "C" */
#endif
/* @@protoc_insertion_point(eof) */
#endif

View File

@ -0,0 +1,37 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.alibaba.nacos.api.grpc.auto";
message Any {
string type_url = 1;
bytes value = 2;
}
message Metadata {
string type = 3;
string clientIp = 8;
map<string, string> headers = 7;
}
message Payload {
Metadata metadata = 2;
Any body = 3;
}

View File

@ -13,6 +13,12 @@ typedef struct {
ngx_http_upstream_init_pt original_init_upstream;
ngx_str_t data_id;
ngx_str_t group;
ngx_uint_t weight;
ngx_uint_t max_fails;
time_t fail_timeout;
ngx_str_t cluster;
ngx_array_t *cluster_lengths;
ngx_array_t *cluster_values;
} ngx_http_nacos_srv_conf_t;
typedef struct {
@ -21,7 +27,10 @@ typedef struct {
ngx_nacos_key_t *key;
ngx_uint_t version;
ngx_nacos_service_addrs_t addrs;
ngx_http_upstream_srv_conf_t *us;
ngx_flag_t use_cluster;
ngx_http_upstream_srv_conf_t *origin;
ngx_http_upstream_srv_conf_t *us; // no cluster
ngx_array_t *clustered_us; // ngx_http_upstream_srv_conf_t
} ngx_http_nacos_peers_t;
typedef struct {
@ -41,14 +50,17 @@ static void *ngx_http_nacos_create_srv_conf(ngx_conf_t *cf);
static char *ngx_http_conf_use_nacos_address(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static char *ngx_http_conf_nacos_use_cluster(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static ngx_int_t ngx_http_nacos_init_upstream(ngx_conf_t *cf,
ngx_http_upstream_srv_conf_t *us);
static ngx_int_t ngx_http_nacos_init_peers(ngx_http_request_t *r,
ngx_http_upstream_srv_conf_t *us);
static ngx_int_t ngx_http_nacos_create_new_us(ngx_http_nacos_peers_t *new_peers,
ngx_http_upstream_srv_conf_t *us);
static ngx_http_upstream_srv_conf_t *ngx_http_nacos_select_upstream(
ngx_http_nacos_peers_t *peers, ngx_str_t *cluster);
static ngx_http_nacos_peers_t *ngx_http_get_nacos_peers(
ngx_http_request_t *r, ngx_http_upstream_srv_conf_t *us);
@ -83,8 +95,11 @@ static ngx_http_module_t ngx_http_nacos_module_ctx = {
};
static ngx_command_t cmds[] = {
{ngx_string("nacos_subscribe_service"), NGX_HTTP_UPS_CONF | NGX_CONF_TAKE12,
ngx_http_conf_use_nacos_address, NGX_HTTP_SRV_CONF_OFFSET, 0, NULL},
{ngx_string("nacos_subscribe_service"),
NGX_HTTP_UPS_CONF | NGX_CONF_1MORE, ngx_http_conf_use_nacos_address,
NGX_HTTP_SRV_CONF_OFFSET, 0, NULL},
{ngx_string("nacos_use_cluster"), NGX_HTTP_UPS_CONF | NGX_CONF_TAKE1,
ngx_http_conf_nacos_use_cluster, NGX_HTTP_SRV_CONF_OFFSET, 0, NULL},
ngx_null_command};
ngx_module_t ngx_http_nacos_upstream_module = {NGX_MODULE_V1,
@ -103,20 +118,29 @@ ngx_module_t ngx_http_nacos_upstream_module = {NGX_MODULE_V1,
static void *ngx_http_nacos_create_srv_conf(ngx_conf_t *cf) {
return ngx_pcalloc(cf->pool, sizeof(ngx_http_nacos_srv_conf_t));
}
static ngx_int_t ngx_http_nacos_add_server(ngx_http_nacos_peers_t *peers,
ngx_http_nacos_srv_conf_t *nscf,
ngx_pool_t *temp_pool);
static char *ngx_http_conf_use_nacos_address(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf) {
ngx_http_nacos_srv_conf_t *nlcf = conf;
ngx_uint_t i;
ngx_uint_t n = cf->args->nelts;
ngx_str_t *value = cf->args->elts;
ngx_str_t *value = cf->args->elts, s;
ngx_nacos_sub_t tmp;
ngx_nacos_main_conf_t *mf;
ngx_int_t weight, max_fails;
time_t fail_timeout;
if (nlcf->uscf) {
return "is duplicate";
}
weight = 1;
max_fails = 1;
fail_timeout = 10;
ngx_memzero(&tmp, sizeof(tmp));
for (i = 1; i < n; ++i) {
@ -131,6 +155,62 @@ static char *ngx_http_conf_use_nacos_address(ngx_conf_t *cf, ngx_command_t *cmd,
tmp.group.len = value[i].len - 6;
continue;
}
if (value[i].len > 7 && ngx_strncmp(value[i].data, "weight=", 7) == 0) {
weight = ngx_atoi(value[i].data + 7, value[i].len - 7);
if (weight <= 0) {
ngx_conf_log_error(
NGX_LOG_EMERG, cf, 0,
"weight= must be number: invalid parameter \"%V\"",
&value[i]);
return NGX_CONF_ERROR;
}
continue;
}
if (value[i].len > 10 &&
ngx_strncmp(value[i].data, "max_fails=", 10) == 0) {
max_fails = ngx_atoi(value[i].data + 10, value[i].len - 10);
if (max_fails < 0) {
ngx_conf_log_error(
NGX_LOG_EMERG, cf, 0,
"max_fails= must be number: invalid parameter \"%V\"",
&value[i]);
return NGX_CONF_ERROR;
}
continue;
}
if (value[i].len > 10 &&
ngx_strncmp(value[i].data, "max_fails=", 10) == 0) {
max_fails = ngx_atoi(value[i].data + 10, value[i].len - 10);
if (max_fails < 0) {
ngx_conf_log_error(
NGX_LOG_EMERG, cf, 0,
"max_fails= must be number: invalid parameter \"%V\"",
&value[i]);
return NGX_CONF_ERROR;
}
continue;
}
if (value[i].len > 13 &&
ngx_strncmp(value[i].data, "fail_timeout=", 13) == 0) {
s.len = value[i].len - 13;
s.data = &value[i].data[13];
fail_timeout = ngx_parse_time(&s, 1);
if (fail_timeout == (time_t) NGX_ERROR) {
ngx_conf_log_error(
NGX_LOG_EMERG, cf, 0,
"fail_timeout= must be time: invalid parameter \"%V\"",
&value[i]);
return NGX_CONF_ERROR;
}
continue;
}
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "invalid parameter \"%V\"",
&value[i]);
return NGX_CONF_ERROR;
@ -143,6 +223,10 @@ static char *ngx_http_conf_use_nacos_address(ngx_conf_t *cf, ngx_command_t *cmd,
nlcf->data_id = tmp.data_id;
nlcf->group = tmp.group;
nlcf->weight = weight;
nlcf->max_fails = max_fails;
nlcf->fail_timeout = fail_timeout;
nlcf->uscf =
ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module);
nlcf->original_init_upstream = nlcf->uscf->peer.init_upstream;
@ -161,6 +245,40 @@ static char *ngx_http_conf_use_nacos_address(ngx_conf_t *cf, ngx_command_t *cmd,
return NGX_CONF_OK;
}
static char *ngx_http_conf_nacos_use_cluster(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf) {
ngx_uint_t n;
ngx_http_script_compile_t sc;
ngx_str_t *value;
ngx_http_nacos_srv_conf_t *nscf = conf;
if (nscf->cluster_lengths != NULL || nscf->cluster.len > 0) {
return "is duplicate";
}
value = cf->args->elts;
nscf->cluster = value[1];
n = ngx_http_script_variables_count(&nscf->cluster);
ngx_memzero(&sc, sizeof(ngx_http_script_compile_t));
sc.variables = n;
if (n) {
sc.cf = cf;
sc.source = &nscf->cluster;
sc.lengths = &nscf->cluster_lengths;
sc.values = &nscf->cluster_values;
sc.complete_lengths = 1;
sc.complete_values = 1;
if (ngx_http_script_compile(&sc) != NGX_OK) {
return NGX_CONF_ERROR;
}
}
return NGX_CONF_OK;
}
u_char *ngx_http_nacos_log_handler(ngx_log_t *log, u_char *buf, size_t len) {
ngx_http_nacos_peers_t *peers;
u_char *p = buf;
@ -175,12 +293,14 @@ u_char *ngx_http_nacos_log_handler(ngx_log_t *log, u_char *buf, size_t len) {
return p;
}
static ngx_http_nacos_peers_t *ngx_http_nacos_create_peers(ngx_log_t *log) {
static ngx_http_nacos_peers_t *ngx_http_nacos_create_peers(
ngx_log_t *log, ngx_flag_t clustered,
ngx_http_upstream_srv_conf_t *origin) {
ngx_pool_t *pool;
ngx_http_nacos_peers_t *peers;
ngx_log_t *new_log;
pool = ngx_create_pool(1024, log);
pool = ngx_create_pool(2048, log);
if (pool == NULL) {
return NULL;
}
@ -203,6 +323,8 @@ static ngx_http_nacos_peers_t *ngx_http_nacos_create_peers(ngx_log_t *log) {
new_log->handler = ngx_http_nacos_log_handler;
new_log->action = "nacos update addrs";
peers->origin = origin;
peers->use_cluster = clustered;
peers->pool = pool;
peers->ref = 1;
if (ngx_array_init(&peers->addrs.addrs, peers->pool, 16,
@ -213,13 +335,15 @@ static ngx_http_nacos_peers_t *ngx_http_nacos_create_peers(ngx_log_t *log) {
return peers;
}
static ngx_int_t ngx_http_nacos_add_server(ngx_http_nacos_peers_t *peers) {
static ngx_int_t ngx_http_nacos_add_server(ngx_http_nacos_peers_t *peers,
ngx_http_nacos_srv_conf_t *nscf,
ngx_pool_t *temp_pool) {
ngx_http_upstream_server_t *server;
ngx_http_upstream_srv_conf_t *us;
ngx_nacos_service_addr_t *adr;
ngx_uint_t i, n;
ngx_url_t u;
us = peers->us;
ngx_conf_t cf;
n = peers->addrs.addrs.nelts;
adr = peers->addrs.addrs.elts;
@ -230,17 +354,48 @@ static ngx_int_t ngx_http_nacos_add_server(ngx_http_nacos_peers_t *peers) {
if (ngx_parse_url(peers->pool, &u) != NGX_OK) {
continue;
}
us = ngx_http_nacos_select_upstream(peers, &adr->cluster);
if (us == NULL) {
return NGX_ERROR;
}
server = ngx_array_push(us->servers);
if (server == NULL) {
return NGX_ERROR;
}
ngx_memzero(server, sizeof(*server));
server->addrs = u.addrs;
server->naddrs = u.naddrs;
server->name = u.url;
server->weight = adr[i].weight;
server->down = 0;
server->backup = 0;
server->fail_timeout = 15000;
server->weight =
(ngx_uint_t) (adr[i].weight / 0.01 * (double) nscf->weight);
server->max_fails = nscf->max_fails;
server->fail_timeout = nscf->fail_timeout;
}
if (peers->addrs.addrs.nelts > 0) {
if (peers->us) {
memset(&cf, 0, sizeof(cf));
cf.pool = peers->pool;
cf.temp_pool = temp_pool;
cf.log = temp_pool->log;
if (nscf->original_init_upstream(&cf, peers->us) != NGX_OK) {
return NGX_ERROR;
}
} else {
us = peers->clustered_us->elts;
n = peers->clustered_us->nelts;
for (i = 0; i < n; ++i) {
memset(&cf, 0, sizeof(cf));
cf.pool = peers->pool;
cf.temp_pool = temp_pool;
cf.log = temp_pool->log;
if (nscf->original_init_upstream(&cf, us + i) != NGX_OK) {
return NGX_ERROR;
}
}
}
}
return NGX_OK;
}
@ -251,19 +406,15 @@ static ngx_int_t ngx_http_nacos_init_upstream(
ngx_pool_t *pool;
ngx_http_nacos_peers_t *peers;
ngx_http_nacos_srv_conf_t *ncf;
ngx_conf_t new_cf;
ncf = ngx_http_conf_upstream_srv_conf(us, ngx_http_nacos_upstream_module);
peers = ngx_http_nacos_create_peers(cf->log);
peers = ngx_http_nacos_create_peers(cf->log, ncf->cluster.len > 0, us);
if (peers == NULL) {
return NGX_ERROR;
}
pool = peers->pool;
if (ngx_http_nacos_create_new_us(peers, us) != NGX_OK) {
ngx_destroy_pool(pool);
return NGX_ERROR;
}
sub.key_ptr = &peers->key;
sub.data_id = ncf->data_id;
@ -285,49 +436,89 @@ static ngx_int_t ngx_http_nacos_init_upstream(
return NGX_ERROR;
}
if (ngx_http_nacos_add_server(peers) != NGX_OK) {
if (ngx_http_nacos_add_server(peers, ncf, cf->temp_pool) != NGX_OK) {
ngx_destroy_pool(pool);
return NGX_ERROR;
}
if (peers->addrs.addrs.nelts > 0) {
new_cf = *cf;
new_cf.pool = pool;
new_cf.log = pool->log;
if (ncf->original_init_upstream(&new_cf, peers->us) != NGX_OK) {
ngx_destroy_pool(pool);
return NGX_ERROR;
}
}
us->peer.init = ngx_http_nacos_init_peers;
us->peer.data = peers;
return NGX_OK;
}
static ngx_int_t ngx_http_nacos_create_new_us(
ngx_http_nacos_peers_t *new_peers, ngx_http_upstream_srv_conf_t *us) {
ngx_http_upstream_srv_conf_t *new_us;
static ngx_http_upstream_srv_conf_t *ngx_http_nacos_select_upstream(
ngx_http_nacos_peers_t *peers, ngx_str_t *cluster) {
ngx_http_upstream_srv_conf_t *us;
ngx_uint_t i, n;
new_us = ngx_palloc(new_peers->pool, sizeof(*new_us));
if (new_us == NULL) {
return NGX_ERROR;
}
*new_us = *us;
new_us->servers = ngx_array_create(new_peers->pool, 16,
if (!peers->use_cluster) {
us = peers->us;
if (us == NULL) {
us = ngx_palloc(peers->pool, sizeof(*us));
if (us == NULL) {
return NULL;
}
*us = *peers->origin;
us->servers = ngx_array_create(peers->pool, 16,
sizeof(ngx_http_upstream_server_t));
if (us->servers == NULL) {
return NULL;
}
peers->us = us;
ngx_str_set(&us->host, "nacos-no-cluster");
}
} else {
if (peers->clustered_us == NULL) {
peers->clustered_us = ngx_array_create(peers->pool, 4, sizeof(*us));
if (peers->clustered_us == NULL) {
return NULL;
}
}
us = peers->clustered_us->elts;
n = peers->clustered_us->nelts;
if (n > 0) {
for (i = 0; i < n; ++i) {
if (us[i].host.len == cluster->len &&
ngx_strncmp(us[i].host.data, cluster->data, cluster->len) ==
0) {
return us + i;
}
}
}
us = ngx_array_push(peers->clustered_us);
if (us == NULL) {
return NULL;
}
*us = *peers->origin;
us->servers = ngx_array_create(peers->pool, 16,
sizeof(ngx_http_upstream_server_t));
if (new_us->servers == NULL) {
return NGX_ERROR;
if (us->servers == NULL) {
return NULL;
}
us->host.len = cluster->len;
us->host.data = ngx_palloc(peers->pool, cluster->len + 1);
if (us->host.data == NULL) {
return NULL;
}
ngx_memcpy(us->host.data, cluster->data, cluster->len);
us->host.data[cluster->len] = 0;
}
new_peers->us = new_us;
return NGX_OK;
return us;
}
static ngx_int_t ngx_http_nacos_init_peers(ngx_http_request_t *r,
ngx_http_upstream_srv_conf_t *us) {
ngx_http_nacos_peers_t *peers;
ngx_http_nacos_rrp_t *rrp;
ngx_http_upstream_srv_conf_t *selected_us, *uc;
ngx_str_t cluster;
ngx_http_nacos_srv_conf_t *nusf;
ngx_int_t rc;
ngx_uint_t i, n;
rrp = r->upstream->peer.data;
if (rrp == NULL) {
@ -343,7 +534,42 @@ static ngx_int_t ngx_http_nacos_init_peers(ngx_http_request_t *r,
if (peers == NULL || peers->addrs.addrs.nelts == 0) {
return NGX_ERROR;
}
rc = peers->us->peer.init(r, peers->us);
nusf = ngx_http_conf_upstream_srv_conf(us, ngx_http_nacos_upstream_module);
if (!peers->use_cluster) {
selected_us = peers->us;
} else {
selected_us = NULL;
ngx_memzero(&cluster, sizeof(cluster));
if (nusf->cluster_lengths == NULL) {
cluster = nusf->cluster;
} else {
if (ngx_http_script_run(r, &cluster, nusf->cluster_lengths->elts, 0,
nusf->cluster_values->elts) == NULL) {
return NGX_ERROR;
}
}
if (peers->clustered_us == NULL || peers->clustered_us->nelts == 0) {
return NGX_ERROR;
}
n = peers->clustered_us->nelts;
uc = peers->clustered_us->elts;
for (i = 0; i < n; ++i) {
if (cluster.len == uc[i].host.len &&
ngx_strncmp(cluster.data, uc[i].host.data, cluster.len) == 0) {
selected_us = uc + i;
break;
}
}
}
if (selected_us == NULL) {
return NGX_ERROR;
}
rc = selected_us->peer.init(r, selected_us);
if (rc != NGX_OK) {
return rc;
}
@ -371,7 +597,6 @@ static ngx_http_nacos_peers_t *ngx_http_get_nacos_peers(
ngx_http_nacos_peers_t *peers, *new_peers;
ngx_http_nacos_srv_conf_t *nusf;
ngx_int_t rc;
ngx_conf_t cf;
peers = us->peer.data;
@ -379,7 +604,8 @@ static ngx_http_nacos_peers_t *ngx_http_get_nacos_peers(
return peers;
}
new_peers = ngx_http_nacos_create_peers(r->pool->log);
new_peers =
ngx_http_nacos_create_peers(r->pool->log, peers->use_cluster, us);
if (new_peers == NULL) {
return NULL;
}
@ -392,29 +618,12 @@ static ngx_http_nacos_peers_t *ngx_http_get_nacos_peers(
return NULL;
}
rc = ngx_http_nacos_create_new_us(new_peers, peers->us);
if (rc != NGX_OK) {
nusf = ngx_http_conf_upstream_srv_conf(us, ngx_http_nacos_upstream_module);
if (ngx_http_nacos_add_server(new_peers, nusf, r->pool) != NGX_OK) {
ngx_destroy_pool(new_peers->pool);
return NULL;
}
if (ngx_http_nacos_add_server(new_peers) != NGX_OK) {
ngx_destroy_pool(new_peers->pool);
return NULL;
}
if (new_peers->addrs.addrs.nelts > 0) {
memset(&cf, 0, sizeof(cf));
cf.pool = new_peers->pool;
cf.temp_pool = r->pool;
cf.log = r->connection->log;
nusf =
ngx_http_conf_upstream_srv_conf(us, ngx_http_nacos_upstream_module);
if (nusf->original_init_upstream(&cf, new_peers->us) != NGX_OK) {
ngx_destroy_pool(new_peers->pool);
return NULL;
}
}
us->peer.data = new_peers;
if (--peers->ref == 0) {
ngx_destroy_pool(peers->pool);

View File

@ -68,6 +68,7 @@ typedef struct {
ngx_str_t host;
int32_t port;
int32_t weight;
ngx_str_t cluster;
} ngx_nacos_service_addr_t;
typedef struct {
uint64_t version;

View File

@ -77,6 +77,7 @@ ngx_int_t ngx_nacos_write_disk_data(ngx_nacos_main_conf_t *mcf,
ssize_t rd;
ngx_fd_t fd;
ngx_err_t err;
ngx_flag_t dir_end;
char *c;
if (cache->adr == NULL) {
@ -86,13 +87,22 @@ ngx_int_t ngx_nacos_write_disk_data(ngx_nacos_main_conf_t *mcf,
pool = cache->pool;
filename.data = tmp;
filename.len = ngx_snprintf(tmp, sizeof(tmp) - 1, "%V@@%V", &cache->group,
filename.len = ngx_snprintf(tmp, sizeof(tmp) - 1, "-%V@@%V", &cache->group,
&cache->data_id) -
tmp;
dir_end = 0;
if (mcf->cache_dir.data[mcf->cache_dir.len - 1] == '/') {
--filename.len;
++filename.data;
dir_end = 1;
}
if (ngx_get_full_name(pool, &mcf->cache_dir, &filename) != NGX_OK) {
return NGX_ERROR;
}
if (!dir_end) {
filename.data[mcf->cache_dir.len] = '/';
}
fd = ngx_open_file(filename.data, NGX_FILE_WRONLY, NGX_FILE_TRUNCATE,
NGX_FILE_DEFAULT_ACCESS);
if (fd == NGX_INVALID_FILE) {
@ -420,6 +430,8 @@ static bool ngx_nacos_encode_hosts(pb_ostream_t *stream,
instance.host.funcs.encode = ngx_nacos_data_pb_encode_str;
instance.port = addr->port;
instance.weight = addr->weight;
instance.cluster.arg = &addr[i].cluster;
instance.cluster.funcs.encode = ngx_nacos_data_pb_encode_str;
if (!pb_encode_submessage(stream, Instance_fields, &instance)) {
return false;
}
@ -428,11 +440,11 @@ static bool ngx_nacos_encode_hosts(pb_ostream_t *stream,
}
char *ngx_nacos_parse_addrs_from_json(ngx_nacos_resp_json_parser_t *parser) {
yajl_val json, arr, item, ip, port, ref, weight, enable, healthy;
yajl_val json, arr, item, ip, port, ref, weight, enable, healthy, cluster;
size_t i, n;
int is;
ngx_log_t *log;
char *ts, *c;
char *ts, *c, *cs;
Service service;
ngx_nacos_service_addrs_t addrs;
ngx_nacos_service_addr_t *addr;
@ -481,9 +493,11 @@ char *ngx_nacos_parse_addrs_from_json(ngx_nacos_resp_json_parser_t *parser) {
}
ip = yajl_tree_get_field(item, "ip", yajl_t_string);
if (!ip) {
ngx_log_error(NGX_LOG_WARN, log, 0,
"nacos response json hosts ip is not string");
cluster = yajl_tree_get_field(item, "clusterName", yajl_t_string);
if (!ip || !cluster) {
ngx_log_error(
NGX_LOG_WARN, log, 0,
"nacos response json hosts ip or cluster is not string");
return NULL;
}
port = yajl_tree_get_field(item, "port", yajl_t_number);
@ -493,6 +507,7 @@ char *ngx_nacos_parse_addrs_from_json(ngx_nacos_resp_json_parser_t *parser) {
return NULL;
}
ts = YAJL_GET_STRING(ip);
cs = YAJL_GET_STRING(cluster);
is = (int) YAJL_GET_INTEGER(port);
weight = yajl_tree_get_field(item, "weight", yajl_t_number);
@ -514,6 +529,8 @@ char *ngx_nacos_parse_addrs_from_json(ngx_nacos_resp_json_parser_t *parser) {
}
addr->host.data = (u_char *) ts;
addr->host.len = strlen(ts);
addr->cluster.data = (u_char *) cs;
addr->cluster.len = strlen(cs);
addr->port = is;
addr->weight = (int32_t) w * 100;
}
@ -567,15 +584,20 @@ static bool ngx_nacos_pb_decode_str(pb_istream_t *stream,
static bool ngx_nacos_decode_instances(pb_istream_t *stream,
const pb_field_t *field, void **arg) {
struct ngx_pb_decode_ctx_t *ctx;
struct ngx_pb_decode_ctx_t *parent;
ngx_array_t *addrs;
ngx_nacos_service_addr_t *addr;
Instance instance;
struct ngx_pb_decode_ctx_t host, cluster;
ctx = *arg;
addrs = ctx->arg;
instance.host.arg = ctx;
parent = *arg;
addrs = parent->arg;
host.pool = parent->pool;
cluster.pool = parent->pool;
instance.host.arg = &host;
instance.host.funcs.decode = ngx_nacos_pb_decode_str;
instance.cluster.arg = &cluster;
instance.cluster.funcs.decode = ngx_nacos_pb_decode_str;
addr = ngx_array_push(addrs);
if (addr == NULL) {
@ -583,13 +605,13 @@ static bool ngx_nacos_decode_instances(pb_istream_t *stream,
}
instance.port = 0;
instance.weight = 0;
ctx->arg = &addr->host;
host.arg = &addr->host;
cluster.arg = &addr->cluster;
if (!pb_decode(stream, Instance_fields, &instance)) {
return false;
}
addr->weight = instance.weight;
addr->port = instance.port;
ctx->arg = addrs;
return true;
}

View File

@ -182,6 +182,7 @@ struct ngx_nacos_grpc_stream_s {
unsigned long_live : 1;
unsigned send_buf_block : 1;
unsigned send_buf_block_conn : 1;
unsigned store_proto_size_buf : 1;
};
#define NGX_NACOS_GRPC_DEFAULT_GRPC_STATUS 10000
@ -216,16 +217,18 @@ static ngx_int_t ngx_nacos_grpc_send_buf(ngx_nacos_grpc_buf_t *buf,
static ngx_int_t ngx_nacos_grpc_do_send(ngx_nacos_grpc_stream_t *st);
#define READ_BUF_CAP 65536
// read buf. 128K. max frame
#define READ_BUF_CAP (1 << 17)
#define MAX_FRAME_SIZE (READ_BUF_CAP - 9)
static u_char ngx_nacos_grpc_connection_start[] =
"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" /* connection preface */
"\x00\x00\x12\x04\x00\x00\x00\x00\x00" /* settings frame */
"\x00\x00\x18\x04\x00\x00\x00\x00\x00" /* settings frame */
"\x00\x01\x00\x00\x00\x00" /* header table size */
"\x00\x02\x00\x00\x00\x00" /* disable push */
"\x00\x04\x7f\xff\xff\xff" /* initial window */
"\x00\x05\x00\x01\xff\xf7" /* max frame size 128K - 9 */
"\x00\x00\x04\x08\x00\x00\x00\x00\x00" /* window update frame */
"\x7f\xff\x00\x00";
@ -544,8 +547,14 @@ static void ngx_nacos_grpc_event_handler(ngx_event_t *ev) {
gc = c->data;
if (ev == c->read) {
rc = ngx_nacos_grpc_read_handler(gc, ev);
if (rc == NGX_AGAIN && ngx_handle_read_event(ev, 0) != NGX_OK) {
rc = NGX_ERROR;
}
} else {
rc = ngx_nacos_grpc_write_handler(gc, ev);
if (rc == NGX_AGAIN && ngx_handle_write_event(ev, 0) != NGX_OK) {
rc = NGX_ERROR;
}
}
if (rc == NGX_ERROR || rc == NGX_DONE) {
@ -654,7 +663,7 @@ static ngx_int_t ngx_nacos_grpc_read_handler(ngx_nacos_grpc_conn_t *gc,
} else if (rc == 0) {
return NGX_DONE;
} else if (rc == NGX_AGAIN) {
return NGX_OK;
return NGX_AGAIN;
}
}
}
@ -672,7 +681,7 @@ static ngx_int_t ngx_nacos_grpc_parse_frame(ngx_nacos_grpc_conn_t *gc) {
if (gc->parse_stat == parse_frame_header) {
if (len < 9) {
return NGX_AGAIN;
break;
}
gc->frame_size =
(((size_t) b->pos[0]) << 16) | (b->pos[1] << 8) | (b->pos[2]);
@ -684,43 +693,55 @@ static ngx_int_t ngx_nacos_grpc_parse_frame(ngx_nacos_grpc_conn_t *gc) {
gc->parse_stat = parse_frame_payload;
b->pos += 9;
gc->frame_start = 1;
gc->frame_end = len - 9 >= gc->frame_size ? 1 : 0;
len -= 9;
if (gc->frame_type >
sizeof(frame_handlers) / sizeof(ngx_nacos_grpc_frame_handler)) {
ngx_log_error(NGX_LOG_ERR, gc->conn->log, 0,
"nacos http2 protocol error. error frame type");
return NGX_ERROR;
}
if (gc->frame_size > MAX_FRAME_SIZE) {
ngx_log_error(
NGX_LOG_ERR, gc->conn->log, 0,
"nacos http2 protocol error. exceed max frame size");
return NGX_ERROR;
}
}
if (gc->parse_stat == parse_frame_payload) {
if (gc->frame_type >
sizeof(frame_handlers) / sizeof(ngx_nacos_grpc_frame_handler)) {
return NGX_ERROR;
gc->frame_end = len >= gc->frame_size ? 1 : 0;
if (!gc->frame_end) {
break;
}
pp = b->pos;
lp = b->last;
if ((size_t) (lp - pp) > gc->frame_size) {
b->last = pp + gc->frame_size;
if ((size_t) (lp - b->pos) > gc->frame_size) {
b->last = b->pos + gc->frame_size;
}
pp = b->last;
rc = frame_handlers[gc->frame_type](gc);
if (b->pos == b->last) {
gc->parse_stat = parse_frame_header;
} else if (b->pos != pp) {
gc->frame_start = 0;
}
b->pos = pp;
b->last = lp;
if (rc == NGX_ERROR || rc == NGX_DONE) {
return rc;
} else if (rc == NGX_AGAIN) {
len = b->last - b->pos;
if (len > 0 && len * 4 < (size_t) (b->end - b->start) &&
(size_t) (b->end - b->pos) * 2 <
(size_t) (b->end - b->start)) {
ngx_memcpy(b->start, b->pos, len);
b->pos = b->start;
b->last = b->pos + len;
} else if (len == 0) {
b->pos = b->last = b->start;
}
if (rc != NGX_OK) {
return rc;
}
gc->parse_stat = parse_frame_header;
}
}
len = b->last - b->pos;
if (len == 0) {
b->pos = b->last = b->start;
} else if (len > 0 && len * 4 < (size_t) (b->end - b->start) &&
(size_t) (b->end - b->pos) * 2 < (size_t) (b->end - b->start)) {
ngx_memcpy(b->start, b->pos, len);
b->pos = b->start;
b->last = b->pos + len;
}
return NGX_AGAIN;
}
static ngx_nacos_grpc_stream_t *ngx_nacos_grpc_create_stream(
@ -977,7 +998,7 @@ static ngx_int_t ngx_nacos_grpc_parse_unknown_frame(ngx_nacos_grpc_conn_t *gc) {
static ngx_int_t ngx_nacos_grpc_parse_data_frame(ngx_nacos_grpc_conn_t *gc) {
ngx_nacos_grpc_stream_t *st;
ngx_buf_t *buf, *tb;
u_char *p;
u_char *p, *t;
ngx_int_t rc;
size_t len, msg_size;
ngx_str_t proto_msg;
@ -1008,6 +1029,11 @@ static ngx_int_t ngx_nacos_grpc_parse_data_frame(ngx_nacos_grpc_conn_t *gc) {
p = buf->pos;
len = buf->last - p;
if (len == 0) {
rc = NGX_OK;
break;
}
if (gc->frame_start) {
if (gc->frame_flags & HTTP_V2_PADDED_FLAG) {
if (len < 1) {
@ -1023,11 +1049,48 @@ static ngx_int_t ngx_nacos_grpc_parse_data_frame(ngx_nacos_grpc_conn_t *gc) {
}
if (st->parsing_state == parsing_prefix) {
if (len < 5) {
if (st->store_proto_size_buf) {
tb = st->tmp_buf;
if (len > 5 - (size_t) (tb->last - tb->pos)) {
msg_size = 5 - (tb->last - tb->pos);
} else {
msg_size = len;
}
memcpy(tb->last, p, msg_size);
tb->last += msg_size;
p += msg_size;
buf->pos = p;
if (tb->last - tb->pos < 5) {
rc = NGX_OK;
break;
}
} else if (len < 5) {
if (ngx_nacos_grpc_realloc_tmp_buf(st, 256) != NGX_OK) {
rc = NGX_ERROR;
break;
}
tb = st->tmp_buf;
memcpy(tb->last, p, len);
tb->last += len;
p += len;
buf->pos = p;
rc = NGX_OK;
st->store_proto_size_buf = 1;
break;
}
if (p[0] != 0) {
if (st->store_proto_size_buf) {
t = tb->pos;
tb->pos = tb->last = tb->start;
} else {
t = p;
p += 5;
len -= 5;
buf->pos = p;
}
if (t[0] != 0) {
ngx_log_error(NGX_LOG_ERR, gc->conn->log, 0,
"nacos server sent data frame "
"send compressed msg: %uz",
@ -1035,15 +1098,13 @@ static ngx_int_t ngx_nacos_grpc_parse_data_frame(ngx_nacos_grpc_conn_t *gc) {
rc = NGX_ERROR;
break;
}
msg_size = (p[1] << 24) | (p[2] << 16) | (p[3] << 8) | p[4];
msg_size = (t[1] << 24) | (t[2] << 16) | (t[3] << 8) | t[4];
if (ngx_nacos_grpc_realloc_tmp_buf(st, msg_size) != NGX_OK) {
rc = NGX_ERROR;
break;
}
tb = st->tmp_buf;
p += 5;
len -= 5;
buf->pos = p;
st->parsing_state = parsing_msg;
st->proto_len = msg_size;
st->recv_win -= 5;
@ -1063,6 +1124,9 @@ static ngx_int_t ngx_nacos_grpc_parse_data_frame(ngx_nacos_grpc_conn_t *gc) {
st->padding = 0;
}
len = tb->last - tb->pos;
if (st->proto_len > 300000) {
st->padding = 0;
}
if (len == st->proto_len) {
proto_msg.len = len;
proto_msg.data = tb->pos;
@ -1223,8 +1287,7 @@ static ngx_int_t ngx_nacos_grpc_parse_header_frame(ngx_nacos_grpc_conn_t *gc) {
b->pos = p;
len = b->last - p;
gc->frame_size -= min;
gc->frame_flags &=
~(HTTP_V2_PADDED_FLAG | HTTP_V2_PRIORITY_FLAG);
gc->frame_flags &= ~(HTTP_V2_PADDED_FLAG | HTTP_V2_PRIORITY_FLAG);
st->parsing_state = p_receiving;
}
}
@ -1372,7 +1435,8 @@ parse_header:
ch = 0;
tp = tmp;
if (ngx_nacos_http_v2_huff_decode(&ch, p, field_len, &tp, 1,
gc->conn->log) != NGX_OK) {
gc->conn->log) !=
NGX_OK) {
ngx_log_error(
NGX_LOG_ERR, gc->conn->log, 0,
"nacos server sent invalid encoded header");
@ -1421,7 +1485,8 @@ parse_header:
ch = 0;
tp = tmp;
if (ngx_nacos_http_v2_huff_decode(&ch, p, field_len, &tp, 1,
gc->conn->log) != NGX_OK) {
gc->conn->log) !=
NGX_OK) {
ngx_log_error(
NGX_LOG_ERR, gc->conn->log, 0,
"nacos server sent invalid encoded header");
@ -1634,8 +1699,8 @@ static ngx_int_t ngx_nacos_grpc_parse_ping_frame(ngx_nacos_grpc_conn_t *gc) {
if (buf == NULL) {
return NGX_ERROR;
}
ngx_nacos_grpc_encode_frame_header(
gc->m_stream, buf->b, HTTP_V2_PING_FRAME, HTTP_V2_ACK_FLAG, 8);
ngx_nacos_grpc_encode_frame_header(gc->m_stream, buf->b, HTTP_V2_PING_FRAME,
HTTP_V2_ACK_FLAG, 8);
buf->len = 9 + 8;
p = buf->b + 9;
p[0] = (data >> 56) & 0xFF;
@ -1845,26 +1910,27 @@ static ngx_nacos_grpc_buf_t *ngx_nacos_grpc_encode_request(
// AUTHORITY
*b++ = ngx_nacos_http_v2_inc_indexed(HTTP_V2_AUTHORITY_INDEX);
b = ngx_nacos_http_v2_write_value(b, (u_char *) "nacos-server",
sizeof("nacos-server") - 1, tmp);
sizeof("nacos-server") - 1, tmp);
// user-agent
*b++ = ngx_nacos_http_v2_inc_indexed(HTTP_V2_USER_AGENT_INDEX);
b = ngx_nacos_http_v2_write_value(b, (u_char *) "nginx-nacos-grpc-client",
sizeof("nginx-nacos-grpc-client") - 1, tmp);
sizeof("nginx-nacos-grpc-client") - 1,
tmp);
// content-type
*b++ = ngx_nacos_http_v2_inc_indexed(HTTP_V2_CONTENT_TYPE_INDEX);
b = ngx_nacos_http_v2_write_value(b, (u_char *) "application/grpc",
sizeof("application/grpc") - 1, tmp);
sizeof("application/grpc") - 1, tmp);
// te: trailers
*b++ = 0;
b = ngx_nacos_http_v2_write_name(b, (u_char *) "te", sizeof("te") - 1, tmp);
b = ngx_nacos_http_v2_write_value(b, (u_char *) "trailers",
sizeof("trailers") - 1, tmp);
sizeof("trailers") - 1, tmp);
// grpc-accept-encoding: identity
*b++ = 0;
b = ngx_nacos_http_v2_write_name(b, (u_char *) "grpc-accept-encoding",
sizeof("grpc-accept-encoding") - 1, tmp);
sizeof("grpc-accept-encoding") - 1, tmp);
b = ngx_nacos_http_v2_write_value(b, (u_char *) "identity",
sizeof("identity") - 1, tmp);
sizeof("identity") - 1, tmp);
buf->len = len = b - buf->b;
ngx_nacos_grpc_encode_frame_header(st, buf->b, HTTP_V2_HEADERS_FRAME,
HTTP_V2_END_HEADERS_FLAG, len - 9);
@ -2241,6 +2307,8 @@ static ngx_int_t ngx_nacos_grpc_encode_payload_init(
en->payload.body.value.funcs.encode = ngx_nacos_grpc_pb_encode_str;
en->payload.metadata.type.arg = &en->type;
en->payload.metadata.type.funcs.encode = ngx_nacos_grpc_pb_encode_str;
en->payload.has_metadata = 1;
en->payload.has_body = 1;
if (grpc_ctx.ncf->username.len > 0 && grpc_ctx.ncf->password.len > 0) {
en->payload.metadata.headers.arg = grpc_ctx.ncf;
@ -2292,9 +2360,9 @@ static ngx_nacos_grpc_buf_t *ngx_nacos_grpc_encode_data_msg(
goto err;
}
b = buf->b;
ngx_nacos_grpc_encode_frame_header(
st, b, HTTP_V2_DATA_FRAME,
end_stream ? HTTP_V2_END_STREAM_FLAG : 0, 5 + b_len);
ngx_nacos_grpc_encode_frame_header(st, b, HTTP_V2_DATA_FRAME,
end_stream ? HTTP_V2_END_STREAM_FLAG : 0,
5 + b_len);
b[9] = 0;
b[10] = (b_len >> 24) & 0xFF;
b[11] = (b_len >> 16) & 0xFF;
@ -2468,8 +2536,8 @@ static ngx_int_t ngx_nacos_grpc_send_win_update_frame(
if (buf == NULL) {
return NGX_ERROR;
}
ngx_nacos_grpc_encode_frame_header(st, buf->b,
HTTP_V2_WINDOW_UPDATE_FRAME, 0, 4);
ngx_nacos_grpc_encode_frame_header(st, buf->b, HTTP_V2_WINDOW_UPDATE_FRAME,
0, 4);
buf->len = 9 + 4;
p = buf->b + 9;
p[0] = (win_update >> 24) & 0x7F;
@ -2492,8 +2560,8 @@ static ngx_int_t ngx_nacos_send_ping_frame(ngx_nacos_grpc_conn_t *gc) {
if (buf == NULL) {
return NGX_ERROR;
}
ngx_nacos_grpc_encode_frame_header(gc->m_stream, buf->b,
HTTP_V2_PING_FRAME, 0, 8);
ngx_nacos_grpc_encode_frame_header(gc->m_stream, buf->b, HTTP_V2_PING_FRAME,
0, 8);
p = buf->b + 9;
data = ++gc->heartbeat;
p[0] = (data >> 56) & 0xFF;

View File

@ -233,7 +233,6 @@ static char *ngx_nacos_conf_block(ngx_conf_t *cf, ngx_command_t *cmd,
char *rv;
ngx_int_t i;
ngx_url_t u;
ngx_err_t err;
ngx_nacos_main_conf_t *ncf, **mncf = conf;
if (*mncf) {
@ -334,13 +333,6 @@ static char *ngx_nacos_conf_block(ngx_conf_t *cf, ngx_command_t *cmd,
rv = NGX_CONF_ERROR;
goto end;
}
if ((err = ngx_create_full_path(ncf->cache_dir.data, 0744))) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, err,
"nacos create cache dir \"%V\" error",
&ncf->cache_dir);
rv = NGX_CONF_ERROR;
goto end;
}
if (!ncf->default_group.data) {
ngx_str_set(&ncf->default_group, "DEFAULT_GROUP");
@ -508,10 +500,6 @@ static ngx_int_t ngx_nacos_subscribe(ngx_conf_t *cf, ngx_nacos_sub_t *sub,
return NGX_ERROR;
}
}
if (ngx_nacos_write_disk_data(mcf, &tmp) == NGX_ERROR) {
return NGX_ERROR;
}
}
kptr = ngx_array_push(all_keys);

File diff suppressed because it is too large Load Diff

View File

@ -5,102 +5,384 @@
#include "pb_common.h"
bool pb_field_iter_begin(pb_field_iter_t *iter, const pb_field_t *fields, void *dest_struct)
static bool load_descriptor_values(pb_field_iter_t *iter)
{
iter->start = fields;
iter->pos = fields;
iter->required_field_index = 0;
iter->dest_struct = dest_struct;
uint32_t word0;
uint32_t data_offset;
int_least8_t size_offset;
if (!dest_struct)
if (iter->index >= iter->descriptor->field_count)
return false;
word0 = PB_PROGMEM_READU32(iter->descriptor->field_info[iter->field_info_index]);
iter->type = (pb_type_t)((word0 >> 8) & 0xFF);
switch(word0 & 3)
{
iter->pData = NULL;
case 0: {
/* 1-word format */
iter->array_size = 1;
iter->tag = (pb_size_t)((word0 >> 2) & 0x3F);
size_offset = (int_least8_t)((word0 >> 24) & 0x0F);
data_offset = (word0 >> 16) & 0xFF;
iter->data_size = (pb_size_t)((word0 >> 28) & 0x0F);
break;
}
case 1: {
/* 2-word format */
uint32_t word1 = PB_PROGMEM_READU32(iter->descriptor->field_info[iter->field_info_index + 1]);
iter->array_size = (pb_size_t)((word0 >> 16) & 0x0FFF);
iter->tag = (pb_size_t)(((word0 >> 2) & 0x3F) | ((word1 >> 28) << 6));
size_offset = (int_least8_t)((word0 >> 28) & 0x0F);
data_offset = word1 & 0xFFFF;
iter->data_size = (pb_size_t)((word1 >> 16) & 0x0FFF);
break;
}
case 2: {
/* 4-word format */
uint32_t word1 = PB_PROGMEM_READU32(iter->descriptor->field_info[iter->field_info_index + 1]);
uint32_t word2 = PB_PROGMEM_READU32(iter->descriptor->field_info[iter->field_info_index + 2]);
uint32_t word3 = PB_PROGMEM_READU32(iter->descriptor->field_info[iter->field_info_index + 3]);
iter->array_size = (pb_size_t)(word0 >> 16);
iter->tag = (pb_size_t)(((word0 >> 2) & 0x3F) | ((word1 >> 8) << 6));
size_offset = (int_least8_t)(word1 & 0xFF);
data_offset = word2;
iter->data_size = (pb_size_t)word3;
break;
}
default: {
/* 8-word format */
uint32_t word1 = PB_PROGMEM_READU32(iter->descriptor->field_info[iter->field_info_index + 1]);
uint32_t word2 = PB_PROGMEM_READU32(iter->descriptor->field_info[iter->field_info_index + 2]);
uint32_t word3 = PB_PROGMEM_READU32(iter->descriptor->field_info[iter->field_info_index + 3]);
uint32_t word4 = PB_PROGMEM_READU32(iter->descriptor->field_info[iter->field_info_index + 4]);
iter->array_size = (pb_size_t)word4;
iter->tag = (pb_size_t)(((word0 >> 2) & 0x3F) | ((word1 >> 8) << 6));
size_offset = (int_least8_t)(word1 & 0xFF);
data_offset = word2;
iter->data_size = (pb_size_t)word3;
break;
}
}
if (!iter->message)
{
/* Avoid doing arithmetic on null pointers, it is undefined */
iter->pField = NULL;
iter->pSize = NULL;
}
else
{
iter->pData = (char*)dest_struct + iter->pos->data_offset;
iter->pSize = (char*)iter->pData + iter->pos->size_offset;
iter->pField = (char*)iter->message + data_offset;
if (size_offset)
{
iter->pSize = (char*)iter->pField - size_offset;
}
else if (PB_HTYPE(iter->type) == PB_HTYPE_REPEATED &&
(PB_ATYPE(iter->type) == PB_ATYPE_STATIC ||
PB_ATYPE(iter->type) == PB_ATYPE_POINTER))
{
/* Fixed count array */
iter->pSize = &iter->array_size;
}
else
{
iter->pSize = NULL;
}
if (PB_ATYPE(iter->type) == PB_ATYPE_POINTER && iter->pField != NULL)
{
iter->pData = *(void**)iter->pField;
}
else
{
iter->pData = iter->pField;
}
}
return (iter->pos->tag != 0);
if (PB_LTYPE_IS_SUBMSG(iter->type))
{
iter->submsg_desc = iter->descriptor->submsg_info[iter->submessage_index];
}
else
{
iter->submsg_desc = NULL;
}
return true;
}
static void advance_iterator(pb_field_iter_t *iter)
{
iter->index++;
if (iter->index >= iter->descriptor->field_count)
{
/* Restart */
iter->index = 0;
iter->field_info_index = 0;
iter->submessage_index = 0;
iter->required_field_index = 0;
}
else
{
/* Increment indexes based on previous field type.
* All field info formats have the following fields:
* - lowest 2 bits tell the amount of words in the descriptor (2^n words)
* - bits 2..7 give the lowest bits of tag number.
* - bits 8..15 give the field type.
*/
uint32_t prev_descriptor = PB_PROGMEM_READU32(iter->descriptor->field_info[iter->field_info_index]);
pb_type_t prev_type = (prev_descriptor >> 8) & 0xFF;
pb_size_t descriptor_len = (pb_size_t)(1 << (prev_descriptor & 3));
/* Add to fields.
* The cast to pb_size_t is needed to avoid -Wconversion warning.
* Because the data is is constants from generator, there is no danger of overflow.
*/
iter->field_info_index = (pb_size_t)(iter->field_info_index + descriptor_len);
iter->required_field_index = (pb_size_t)(iter->required_field_index + (PB_HTYPE(prev_type) == PB_HTYPE_REQUIRED));
iter->submessage_index = (pb_size_t)(iter->submessage_index + PB_LTYPE_IS_SUBMSG(prev_type));
}
}
bool pb_field_iter_begin(pb_field_iter_t *iter, const pb_msgdesc_t *desc, void *message)
{
memset(iter, 0, sizeof(*iter));
iter->descriptor = desc;
iter->message = message;
return load_descriptor_values(iter);
}
bool pb_field_iter_begin_extension(pb_field_iter_t *iter, pb_extension_t *extension)
{
const pb_msgdesc_t *msg = (const pb_msgdesc_t*)extension->type->arg;
bool status;
uint32_t word0 = PB_PROGMEM_READU32(msg->field_info[0]);
if (PB_ATYPE(word0 >> 8) == PB_ATYPE_POINTER)
{
/* For pointer extensions, the pointer is stored directly
* in the extension structure. This avoids having an extra
* indirection. */
status = pb_field_iter_begin(iter, msg, &extension->dest);
}
else
{
status = pb_field_iter_begin(iter, msg, extension->dest);
}
iter->pSize = &extension->found;
return status;
}
bool pb_field_iter_next(pb_field_iter_t *iter)
{
const pb_field_t *prev_field = iter->pos;
if (prev_field->tag == 0)
{
/* Handle empty message types, where the first field is already the terminator.
* In other cases, the iter->pos never points to the terminator. */
return false;
}
iter->pos++;
if (iter->pos->tag == 0)
{
/* Wrapped back to beginning, reinitialize */
(void)pb_field_iter_begin(iter, iter->start, iter->dest_struct);
return false;
}
else
{
/* Increment the pointers based on previous field size */
size_t prev_size = prev_field->data_size;
if (PB_HTYPE(prev_field->type) == PB_HTYPE_ONEOF &&
PB_HTYPE(iter->pos->type) == PB_HTYPE_ONEOF &&
iter->pos->data_offset == PB_SIZE_MAX)
{
/* Don't advance pointers inside unions */
return true;
}
else if (PB_ATYPE(prev_field->type) == PB_ATYPE_STATIC &&
PB_HTYPE(prev_field->type) == PB_HTYPE_REPEATED)
{
/* In static arrays, the data_size tells the size of a single entry and
* array_size is the number of entries */
prev_size *= prev_field->array_size;
}
else if (PB_ATYPE(prev_field->type) == PB_ATYPE_POINTER)
{
/* Pointer fields always have a constant size in the main structure.
* The data_size only applies to the dynamically allocated area. */
prev_size = sizeof(void*);
}
if (PB_HTYPE(prev_field->type) == PB_HTYPE_REQUIRED)
{
/* Count the required fields, in order to check their presence in the
* decoder. */
iter->required_field_index++;
}
iter->pData = (char*)iter->pData + prev_size + iter->pos->data_offset;
iter->pSize = (char*)iter->pData + iter->pos->size_offset;
return true;
}
advance_iterator(iter);
(void)load_descriptor_values(iter);
return iter->index != 0;
}
bool pb_field_iter_find(pb_field_iter_t *iter, uint32_t tag)
{
const pb_field_t *start = iter->pos;
do {
if (iter->pos->tag == tag &&
PB_LTYPE(iter->pos->type) != PB_LTYPE_EXTENSION)
if (iter->tag == tag)
{
return true; /* Nothing to do, correct field already. */
}
else if (tag > iter->descriptor->largest_tag)
{
return false;
}
else
{
pb_size_t start = iter->index;
uint32_t fieldinfo;
if (tag < iter->tag)
{
/* Found the wanted field */
return true;
/* Fields are in tag number order, so we know that tag is between
* 0 and our start position. Setting index to end forces
* advance_iterator() call below to restart from beginning. */
iter->index = iter->descriptor->field_count;
}
(void)pb_field_iter_next(iter);
} while (iter->pos != start);
/* Searched all the way back to start, and found nothing. */
return false;
do
{
/* Advance iterator but don't load values yet */
advance_iterator(iter);
/* Do fast check for tag number match */
fieldinfo = PB_PROGMEM_READU32(iter->descriptor->field_info[iter->field_info_index]);
if (((fieldinfo >> 2) & 0x3F) == (tag & 0x3F))
{
/* Good candidate, check further */
(void)load_descriptor_values(iter);
if (iter->tag == tag &&
PB_LTYPE(iter->type) != PB_LTYPE_EXTENSION)
{
/* Found it */
return true;
}
}
} while (iter->index != start);
/* Searched all the way back to start, and found nothing. */
(void)load_descriptor_values(iter);
return false;
}
}
bool pb_field_iter_find_extension(pb_field_iter_t *iter)
{
if (PB_LTYPE(iter->type) == PB_LTYPE_EXTENSION)
{
return true;
}
else
{
pb_size_t start = iter->index;
uint32_t fieldinfo;
do
{
/* Advance iterator but don't load values yet */
advance_iterator(iter);
/* Do fast check for field type */
fieldinfo = PB_PROGMEM_READU32(iter->descriptor->field_info[iter->field_info_index]);
if (PB_LTYPE((fieldinfo >> 8) & 0xFF) == PB_LTYPE_EXTENSION)
{
return load_descriptor_values(iter);
}
} while (iter->index != start);
/* Searched all the way back to start, and found nothing. */
(void)load_descriptor_values(iter);
return false;
}
}
static void *pb_const_cast(const void *p)
{
/* Note: this casts away const, in order to use the common field iterator
* logic for both encoding and decoding. The cast is done using union
* to avoid spurious compiler warnings. */
union {
void *p1;
const void *p2;
} t;
t.p2 = p;
return t.p1;
}
bool pb_field_iter_begin_const(pb_field_iter_t *iter, const pb_msgdesc_t *desc, const void *message)
{
return pb_field_iter_begin(iter, desc, pb_const_cast(message));
}
bool pb_field_iter_begin_extension_const(pb_field_iter_t *iter, const pb_extension_t *extension)
{
return pb_field_iter_begin_extension(iter, (pb_extension_t*)pb_const_cast(extension));
}
bool pb_default_field_callback(pb_istream_t *istream, pb_ostream_t *ostream, const pb_field_t *field)
{
if (field->data_size == sizeof(pb_callback_t))
{
pb_callback_t *pCallback = (pb_callback_t*)field->pData;
if (pCallback != NULL)
{
if (istream != NULL && pCallback->funcs.decode != NULL)
{
return pCallback->funcs.decode(istream, field, &pCallback->arg);
}
if (ostream != NULL && pCallback->funcs.encode != NULL)
{
return pCallback->funcs.encode(ostream, field, &pCallback->arg);
}
}
}
return true; /* Success, but didn't do anything */
}
#ifdef PB_VALIDATE_UTF8
/* This function checks whether a string is valid UTF-8 text.
*
* Algorithm is adapted from https://www.cl.cam.ac.uk/~mgk25/ucs/utf8_check.c
* Original copyright: Markus Kuhn <http://www.cl.cam.ac.uk/~mgk25/> 2005-03-30
* Licensed under "Short code license", which allows use under MIT license or
* any compatible with it.
*/
bool pb_validate_utf8(const char *str)
{
const pb_byte_t *s = (const pb_byte_t*)str;
while (*s)
{
if (*s < 0x80)
{
/* 0xxxxxxx */
s++;
}
else if ((s[0] & 0xe0) == 0xc0)
{
/* 110XXXXx 10xxxxxx */
if ((s[1] & 0xc0) != 0x80 ||
(s[0] & 0xfe) == 0xc0) /* overlong? */
return false;
else
s += 2;
}
else if ((s[0] & 0xf0) == 0xe0)
{
/* 1110XXXX 10Xxxxxx 10xxxxxx */
if ((s[1] & 0xc0) != 0x80 ||
(s[2] & 0xc0) != 0x80 ||
(s[0] == 0xe0 && (s[1] & 0xe0) == 0x80) || /* overlong? */
(s[0] == 0xed && (s[1] & 0xe0) == 0xa0) || /* surrogate? */
(s[0] == 0xef && s[1] == 0xbf &&
(s[2] & 0xfe) == 0xbe)) /* U+FFFE or U+FFFF? */
return false;
else
s += 3;
}
else if ((s[0] & 0xf8) == 0xf0)
{
/* 11110XXX 10XXxxxx 10xxxxxx 10xxxxxx */
if ((s[1] & 0xc0) != 0x80 ||
(s[2] & 0xc0) != 0x80 ||
(s[3] & 0xc0) != 0x80 ||
(s[0] == 0xf0 && (s[1] & 0xf0) == 0x80) || /* overlong? */
(s[0] == 0xf4 && s[1] > 0x8f) || s[0] > 0xf4) /* > U+10FFFF? */
return false;
else
s += 4;
}
else
{
return false;
}
}
return true;
}
#endif

View File

@ -11,20 +11,18 @@
extern "C" {
#endif
/* Iterator for pb_field_t list */
struct pb_field_iter_s {
const pb_field_t *start; /* Start of the pb_field_t array */
const pb_field_t *pos; /* Current position of the iterator */
unsigned required_field_index; /* Zero-based index that counts only the required fields */
void *dest_struct; /* Pointer to start of the structure */
void *pData; /* Pointer to current field value */
void *pSize; /* Pointer to count/has field */
};
typedef struct pb_field_iter_s pb_field_iter_t;
/* Initialize the field iterator structure to beginning.
* Returns false if the message type is empty. */
bool pb_field_iter_begin(pb_field_iter_t *iter, const pb_field_t *fields, void *dest_struct);
bool pb_field_iter_begin(pb_field_iter_t *iter, const pb_msgdesc_t *desc, void *message);
/* Get a field iterator for extension field. */
bool pb_field_iter_begin_extension(pb_field_iter_t *iter, pb_extension_t *extension);
/* Same as pb_field_iter_begin(), but for const message pointer.
* Note that the pointers in pb_field_iter_t will be non-const but shouldn't
* be written to when using these functions. */
bool pb_field_iter_begin_const(pb_field_iter_t *iter, const pb_msgdesc_t *desc, const void *message);
bool pb_field_iter_begin_extension_const(pb_field_iter_t *iter, const pb_extension_t *extension);
/* Advance the iterator to the next field.
* Returns false when the iterator wraps back to the first field. */
@ -34,6 +32,15 @@ bool pb_field_iter_next(pb_field_iter_t *iter);
* Returns false if no such field exists. */
bool pb_field_iter_find(pb_field_iter_t *iter, uint32_t tag);
/* Find a field with type PB_LTYPE_EXTENSION, or return false if not found.
* There can be only one extension range field per message. */
bool pb_field_iter_find_extension(pb_field_iter_t *iter);
#ifdef PB_VALIDATE_UTF8
/* Validate UTF-8 text string */
bool pb_validate_utf8(const char *s);
#endif
#ifdef __cplusplus
} /* extern "C" */
#endif

File diff suppressed because it is too large Load Diff

View File

@ -37,14 +37,31 @@ struct pb_istream_s
bool (*callback)(pb_istream_t *stream, pb_byte_t *buf, size_t count);
#endif
void *state; /* Free field for use by callback implementation */
/* state is a free field for use of the callback function defined above.
* Note that when pb_istream_from_buffer() is used, it reserves this field
* for its own use.
*/
void *state;
/* Maximum number of bytes left in this stream. Callback can report
* EOF before this limit is reached. Setting a limit is recommended
* when decoding directly from file or network streams to avoid
* denial-of-service by excessively long messages.
*/
size_t bytes_left;
#ifndef PB_NO_ERRMSG
/* Pointer to constant (ROM) string when decoding function returns error */
const char *errmsg;
#endif
};
#ifndef PB_NO_ERRMSG
#define PB_ISTREAM_EMPTY {0,0,0,0}
#else
#define PB_ISTREAM_EMPTY {0,0,0}
#endif
/***************************
* Main decoding functions *
***************************/
@ -65,57 +82,61 @@ struct pb_istream_s
* stream = pb_istream_from_buffer(buffer, count);
* pb_decode(&stream, MyMessage_fields, &msg);
*/
bool pb_decode(pb_istream_t *stream, const pb_field_t fields[], void *dest_struct);
bool pb_decode(pb_istream_t *stream, const pb_msgdesc_t *fields, void *dest_struct);
/* Same as pb_decode, except does not initialize the destination structure
* to default values. This is slightly faster if you need no default values
* and just do memset(struct, 0, sizeof(struct)) yourself.
/* Extended version of pb_decode, with several options to control
* the decoding process:
*
* This can also be used for 'merging' two messages, i.e. update only the
* fields that exist in the new message.
* PB_DECODE_NOINIT: Do not initialize the fields to default values.
* This is slightly faster if you do not need the default
* values and instead initialize the structure to 0 using
* e.g. memset(). This can also be used for merging two
* messages, i.e. combine already existing data with new
* values.
*
* Note: If this function returns with an error, it will not release any
* dynamically allocated fields. You will need to call pb_release() yourself.
* PB_DECODE_DELIMITED: Input message starts with the message size as varint.
* Corresponds to parseDelimitedFrom() in Google's
* protobuf API.
*
* PB_DECODE_NULLTERMINATED: Stop reading when field tag is read as 0. This allows
* reading null terminated messages.
* NOTE: Until nanopb-0.4.0, pb_decode() also allows
* null-termination. This behaviour is not supported in
* most other protobuf implementations, so PB_DECODE_DELIMITED
* is a better option for compatibility.
*
* Multiple flags can be combined with bitwise or (| operator)
*/
bool pb_decode_noinit(pb_istream_t *stream, const pb_field_t fields[], void *dest_struct);
#define PB_DECODE_NOINIT 0x01U
#define PB_DECODE_DELIMITED 0x02U
#define PB_DECODE_NULLTERMINATED 0x04U
bool pb_decode_ex(pb_istream_t *stream, const pb_msgdesc_t *fields, void *dest_struct, unsigned int flags);
/* Same as pb_decode, except expects the stream to start with the message size
* encoded as varint. Corresponds to parseDelimitedFrom() in Google's
* protobuf API.
*/
bool pb_decode_delimited(pb_istream_t *stream, const pb_field_t fields[], void *dest_struct);
/* Defines for backwards compatibility with code written before nanopb-0.4.0 */
#define pb_decode_noinit(s,f,d) pb_decode_ex(s,f,d, PB_DECODE_NOINIT)
#define pb_decode_delimited(s,f,d) pb_decode_ex(s,f,d, PB_DECODE_DELIMITED)
#define pb_decode_delimited_noinit(s,f,d) pb_decode_ex(s,f,d, PB_DECODE_DELIMITED | PB_DECODE_NOINIT)
#define pb_decode_nullterminated(s,f,d) pb_decode_ex(s,f,d, PB_DECODE_NULLTERMINATED)
/* Same as pb_decode_delimited, except that it does not initialize the destination structure.
* See pb_decode_noinit
*/
bool pb_decode_delimited_noinit(pb_istream_t *stream, const pb_field_t fields[], void *dest_struct);
/* Same as pb_decode, except allows the message to be terminated with a null byte.
* NOTE: Until nanopb-0.4.0, pb_decode() also allows null-termination. This behaviour
* is not supported in most other protobuf implementations, so pb_decode_delimited()
* is a better option for compatibility.
*/
bool pb_decode_nullterminated(pb_istream_t *stream, const pb_field_t fields[], void *dest_struct);
#ifdef PB_ENABLE_MALLOC
/* Release any allocated pointer fields. If you use dynamic allocation, you should
* call this for any successfully decoded message when you are done with it. If
* pb_decode() returns with an error, the message is already released.
*/
void pb_release(const pb_field_t fields[], void *dest_struct);
#endif
void pb_release(const pb_msgdesc_t *fields, void *dest_struct);
/**************************************
* Functions for manipulating streams *
**************************************/
/* Create an input stream for reading from a memory buffer.
*
* msglen should be the actual length of the message, not the full size of
* allocated buffer.
*
* Alternatively, you can use a custom stream that reads directly from e.g.
* a file or a network socket.
*/
pb_istream_t pb_istream_from_buffer(const pb_byte_t *buf, size_t bufsize);
pb_istream_t pb_istream_from_buffer(const pb_byte_t *buf, size_t msglen);
/* Function to read from a pb_istream_t. You can use this if you need to
* read some custom header data, or to read data in field callbacks.
@ -167,6 +188,11 @@ bool pb_decode_fixed32(pb_istream_t *stream, void *dest);
bool pb_decode_fixed64(pb_istream_t *stream, void *dest);
#endif
#ifdef PB_CONVERT_DOUBLE_FLOAT
/* Decode a double value into float variable. */
bool pb_decode_double_as_float(pb_istream_t *stream, float *dest);
#endif
/* Make a limited-length substream for reading a PB_WT_STRING field. */
bool pb_make_string_substream(pb_istream_t *stream, pb_istream_t *substream);
bool pb_close_string_substream(pb_istream_t *stream, pb_istream_t *substream);

File diff suppressed because it is too large Load Diff

View File

@ -33,15 +33,25 @@ struct pb_ostream_s
* Also, NULL pointer marks a 'sizing stream' that does not
* write anything.
*/
int *callback;
const int *callback;
#else
bool (*callback)(pb_ostream_t *stream, const pb_byte_t *buf, size_t count);
#endif
void *state; /* Free field for use by callback implementation. */
size_t max_size; /* Limit number of output bytes written (or use SIZE_MAX). */
size_t bytes_written; /* Number of bytes written so far. */
/* state is a free field for use of the callback function defined above.
* Note that when pb_ostream_from_buffer() is used, it reserves this field
* for its own use.
*/
void *state;
/* Limit number of output bytes written. Can be set to SIZE_MAX. */
size_t max_size;
/* Number of bytes written so far. */
size_t bytes_written;
#ifndef PB_NO_ERRMSG
/* Pointer to constant (ROM) string when decoding function returns error */
const char *errmsg;
#endif
};
@ -64,22 +74,31 @@ struct pb_ostream_s
* stream = pb_ostream_from_buffer(buffer, sizeof(buffer));
* pb_encode(&stream, MyMessage_fields, &msg);
*/
bool pb_encode(pb_ostream_t *stream, const pb_field_t fields[], const void *src_struct);
bool pb_encode(pb_ostream_t *stream, const pb_msgdesc_t *fields, const void *src_struct);
/* Same as pb_encode, but prepends the length of the message as a varint.
* Corresponds to writeDelimitedTo() in Google's protobuf API.
/* Extended version of pb_encode, with several options to control the
* encoding process:
*
* PB_ENCODE_DELIMITED: Prepend the length of message as a varint.
* Corresponds to writeDelimitedTo() in Google's
* protobuf API.
*
* PB_ENCODE_NULLTERMINATED: Append a null byte to the message for termination.
* NOTE: This behaviour is not supported in most other
* protobuf implementations, so PB_ENCODE_DELIMITED
* is a better option for compatibility.
*/
bool pb_encode_delimited(pb_ostream_t *stream, const pb_field_t fields[], const void *src_struct);
#define PB_ENCODE_DELIMITED 0x02U
#define PB_ENCODE_NULLTERMINATED 0x04U
bool pb_encode_ex(pb_ostream_t *stream, const pb_msgdesc_t *fields, const void *src_struct, unsigned int flags);
/* Same as pb_encode, but appends a null byte to the message for termination.
* NOTE: This behaviour is not supported in most other protobuf implementations, so pb_encode_delimited()
* is a better option for compatibility.
*/
bool pb_encode_nullterminated(pb_ostream_t *stream, const pb_field_t fields[], const void *src_struct);
/* Defines for backwards compatibility with code written before nanopb-0.4.0 */
#define pb_encode_delimited(s,f,d) pb_encode_ex(s,f,d, PB_ENCODE_DELIMITED)
#define pb_encode_nullterminated(s,f,d) pb_encode_ex(s,f,d, PB_ENCODE_NULLTERMINATED)
/* Encode the message to get the size of the encoded data, but do not store
* the data. */
bool pb_get_encoded_size(size_t *size, const pb_field_t fields[], const void *src_struct);
bool pb_get_encoded_size(size_t *size, const pb_msgdesc_t *fields, const void *src_struct);
/**************************************
* Functions for manipulating streams *
@ -121,7 +140,7 @@ bool pb_write(pb_ostream_t *stream, const pb_byte_t *buf, size_t count);
/* Encode field header based on type and field number defined in the field
* structure. Call this from the callback before writing out field contents. */
bool pb_encode_tag_for_field(pb_ostream_t *stream, const pb_field_t *field);
bool pb_encode_tag_for_field(pb_ostream_t *stream, const pb_field_iter_t *field);
/* Encode field header by manually specifying wire type. You need to use this
* if you want to write out packed arrays from a callback field. */
@ -156,12 +175,18 @@ bool pb_encode_fixed32(pb_ostream_t *stream, const void *value);
bool pb_encode_fixed64(pb_ostream_t *stream, const void *value);
#endif
#ifdef PB_CONVERT_DOUBLE_FLOAT
/* Encode a float value so that it appears like a double in the encoded
* message. */
bool pb_encode_float_as_double(pb_ostream_t *stream, float value);
#endif
/* Encode a submessage field.
* You need to pass the pb_field_t array and pointer to struct, just like
* with pb_encode(). This internally encodes the submessage twice, first to
* calculate message size and then to actually write it out.
*/
bool pb_encode_submessage(pb_ostream_t *stream, const pb_field_t fields[], const void *src_struct);
bool pb_encode_submessage(pb_ostream_t *stream, const pb_msgdesc_t *fields, const void *src_struct);
#ifdef __cplusplus
} /* extern "C" */

145
patch/openresty.patch Normal file
View File

@ -0,0 +1,145 @@
diff --git a/auto/make b/auto/make
index 25ee3fb..d61a188 100644
--- a/auto/make
+++ b/auto/make
@@ -38,7 +38,7 @@ fi
# ALL_INCS, required by the addons and by OpenWatcom C precompiled headers
-ngx_incs=`echo $CORE_INCS $NGX_OBJS $HTTP_INCS $MAIL_INCS $STREAM_INCS\
+ngx_incs=`echo $CORE_INCS $NGX_OBJS $AUXILIARY_INCS $HTTP_INCS $MAIL_INCS $STREAM_INCS\
| sed -e "s/ *\([^ ][^ ]*\)/$ngx_regex_cont$ngx_include_opt\1/g" \
-e "s/\//$ngx_regex_dirsep/g"`
@@ -58,7 +58,7 @@ ngx_deps=`echo $CORE_DEPS $NGX_AUTO_CONFIG_H $NGX_PCH \
| sed -e "s/ *\([^ ][^ ]*\)/$ngx_regex_cont\1/g" \
-e "s/\//$ngx_regex_dirsep/g"`
-ngx_incs=`echo $CORE_INCS $NGX_OBJS \
+ngx_incs=`echo $CORE_INCS $NGX_OBJS $AUXILIARY_INCS \
| sed -e "s/ *\([^ ][^ ]*\)/$ngx_regex_cont$ngx_include_opt\1/g" \
-e "s/\//$ngx_regex_dirsep/g"`
diff --git a/auto/modules b/auto/modules
index 300d07c..f72c9a9 100644
--- a/auto/modules
+++ b/auto/modules
@@ -1381,7 +1381,7 @@ if [ $USE_PCRE = YES ]; then
fi
-modules="$CORE_MODULES $EVENT_MODULES"
+modules="$CORE_MODULES $EVENT_MODULES $AUXILIARY_MODULES"
# thread pool module should be initialized after events
diff --git a/auto/sources b/auto/sources
index 46408ee..ab13c93 100644
--- a/auto/sources
+++ b/auto/sources
@@ -81,6 +81,9 @@ CORE_SRCS="src/core/nginx.c \
src/core/ngx_syslog.c"
+AUXILIARY_MODULES=
+AUXILIARY_INCS=
+
EVENT_MODULES="ngx_events_module ngx_event_core_module"
EVENT_INCS="src/event src/event/modules src/event/quic"
diff --git a/src/http/ngx_http_upstream.c b/src/http/ngx_http_upstream.c
index 2be233c..f4ee563 100644
--- a/src/http/ngx_http_upstream.c
+++ b/src/http/ngx_http_upstream.c
@@ -6146,11 +6146,13 @@ ngx_http_upstream(ngx_conf_t *cf, ngx_command_t *cmd, void *dummy)
return rv;
}
+#if !NGX_HAVE_NACOS
if (uscf->servers->nelts == 0) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"no servers are inside upstream");
return NGX_CONF_ERROR;
}
+#endif
return rv;
}
diff --git a/src/os/unix/ngx_process_cycle.c b/src/os/unix/ngx_process_cycle.c
index dadf03f..d003cc6 100644
--- a/src/os/unix/ngx_process_cycle.c
+++ b/src/os/unix/ngx_process_cycle.c
@@ -10,6 +10,9 @@
#include <ngx_event.h>
#include <ngx_channel.h>
+#if NGX_HAVE_AUXILIARY
+#include <ngx_auxiliary_module.h>
+#endif
static void ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n,
ngx_int_t type);
@@ -135,6 +138,9 @@ ngx_master_process_cycle(ngx_cycle_t *cycle)
ngx_start_worker_processes(cycle, ccf->worker_processes,
NGX_PROCESS_RESPAWN);
ngx_start_cache_manager_processes(cycle, 0);
+#if NGX_HAVE_AUXILIARY
+ ngx_aux_start_auxiliary_processes(cycle, 0);
+#endif
ngx_start_privileged_agent_processes(cycle, 0);
ngx_new_binary = 0;
@@ -221,6 +227,9 @@ ngx_master_process_cycle(ngx_cycle_t *cycle)
ngx_start_worker_processes(cycle, ccf->worker_processes,
NGX_PROCESS_RESPAWN);
ngx_start_cache_manager_processes(cycle, 0);
+#if NGX_HAVE_AUXILIARY
+ ngx_aux_start_auxiliary_processes(cycle, 0);
+#endif
ngx_start_privileged_agent_processes(cycle, 0);
ngx_noaccepting = 0;
@@ -241,6 +250,9 @@ ngx_master_process_cycle(ngx_cycle_t *cycle)
ngx_start_worker_processes(cycle, ccf->worker_processes,
NGX_PROCESS_JUST_RESPAWN);
ngx_start_cache_manager_processes(cycle, 1);
+#if NGX_HAVE_AUXILIARY
+ ngx_aux_start_auxiliary_processes(cycle, 1);
+#endif
ngx_start_privileged_agent_processes(cycle, 1);
/* allow new processes to start */
@@ -256,6 +268,9 @@ ngx_master_process_cycle(ngx_cycle_t *cycle)
ngx_start_worker_processes(cycle, ccf->worker_processes,
NGX_PROCESS_RESPAWN);
ngx_start_cache_manager_processes(cycle, 0);
+#if NGX_HAVE_AUXILIARY
+ ngx_aux_start_auxiliary_processes(cycle, 0);
+#endif
ngx_start_privileged_agent_processes(cycle, 0);
live = 1;
}
@@ -1309,3 +1324,10 @@ ngx_cache_loader_process_handler(ngx_event_t *ev)
exit(0);
}
+
+
+#if NGX_HAVE_AUXILIARY
+void ngx_worker_aux_process_init(ngx_cycle_t *cycle){
+ ngx_worker_process_init(cycle, -1);
+}
+#endif
diff --git a/src/os/unix/ngx_process_cycle.h b/src/os/unix/ngx_process_cycle.h
index 5149396..dbce402 100644
--- a/src/os/unix/ngx_process_cycle.h
+++ b/src/os/unix/ngx_process_cycle.h
@@ -59,4 +59,8 @@ extern sig_atomic_t ngx_reopen;
extern sig_atomic_t ngx_change_binary;
+#if NGX_HAVE_AUXILIARY
+void ngx_worker_aux_process_init(ngx_cycle_t *cycle);
+#endif
+
#endif /* _NGX_PROCESS_CYCLE_H_INCLUDED_ */