Compare commits

...

10 Commits

Author SHA1 Message Date
Hunk Zhu e1a42bfa6d
Merge b05fa15123 into e868a001ec 2025-08-18 23:05:38 +00:00
Sunrisea e868a001ec
feat: add default log rolling config , update version of nacos go client (#841)
* add default log rolling config , update version of nacos go client

* add v2.2.9 deprecated information
2025-08-18 15:09:40 +08:00
Sunrisea 8bad3fed0f
Update grpc version and go version (#839)
* Update Grpc and protobuf version

* fix version

* fix workflow

* fix codestyle

* fix go mod
2025-08-14 11:29:27 +08:00
Sunrisea 6d3e89ff89
fix: Supports multiple subscriptions to services across different clusters. (#838)
* Supports multiple subscriptions to services across different clusters.

* fix bug
2025-08-13 19:08:52 +08:00
shalk(xiao kun) ead2368c2f
fix: nacos client unsubscribe (#836) 2025-08-13 11:01:30 +08:00
Hunk Zhu b05fa15123 Merge remote-tracking branch 'nacos_origin/master' into feature/improve_currentConnect 2023-11-20 07:06:07 +00:00
Hunk Zhu dc731b46d8 优化代码 2023-10-31 11:26:44 +00:00
Hunk Zhu 035d58be4c 加固代码 2023-10-27 09:23:33 +00:00
Hunk Zhu 4254799661 improve rpcClientStatus load 2023-10-08 17:39:45 +08:00
Hunk Zhu 48593d516b 改进currentConnect的使用方式,避免Data Race发生。 2023-10-03 05:23:24 +00:00
21 changed files with 989 additions and 549 deletions

View File

@ -14,7 +14,7 @@ jobs:
strategy:
matrix:
config:
- go_version: 1.18
- go_version: 1.21
steps:
- name: Set up Go 1.x

View File

@ -1,467 +1,233 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//
// Copyright 1999-2020 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: nacos_grpc_service.proto
// versions:
// protoc-gen-go v1.36.7
// protoc v5.29.3
// source: api/proto/nacos_grpc_service.proto
package grpc
package auto
import (
context "context"
fmt "fmt"
math "math"
proto "github.com/golang/protobuf/proto"
any "github.com/golang/protobuf/ptypes/any"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
anypb "google.golang.org/protobuf/types/known/anypb"
reflect "reflect"
sync "sync"
unsafe "unsafe"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type Metadata struct {
Type string `protobuf:"bytes,3,opt,name=type,proto3" json:"type,omitempty"`
ClientIp string `protobuf:"bytes,8,opt,name=clientIp,proto3" json:"clientIp,omitempty"`
Headers map[string]string `protobuf:"bytes,7,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
state protoimpl.MessageState `protogen:"open.v1"`
Type string `protobuf:"bytes,3,opt,name=type,proto3" json:"type,omitempty"`
ClientIp string `protobuf:"bytes,8,opt,name=clientIp,proto3" json:"clientIp,omitempty"`
Headers map[string]string `protobuf:"bytes,7,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (m *Metadata) Reset() { *m = Metadata{} }
func (m *Metadata) String() string { return proto.CompactTextString(m) }
func (*Metadata) ProtoMessage() {}
func (x *Metadata) Reset() {
*x = Metadata{}
mi := &file_api_proto_nacos_grpc_service_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *Metadata) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Metadata) ProtoMessage() {}
func (x *Metadata) ProtoReflect() protoreflect.Message {
mi := &file_api_proto_nacos_grpc_service_proto_msgTypes[0]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Metadata.ProtoReflect.Descriptor instead.
func (*Metadata) Descriptor() ([]byte, []int) {
return fileDescriptor_f908b146bdb05ce9, []int{0}
return file_api_proto_nacos_grpc_service_proto_rawDescGZIP(), []int{0}
}
func (m *Metadata) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Metadata.Unmarshal(m, b)
}
func (m *Metadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Metadata.Marshal(b, m, deterministic)
}
func (m *Metadata) XXX_Merge(src proto.Message) {
xxx_messageInfo_Metadata.Merge(m, src)
}
func (m *Metadata) XXX_Size() int {
return xxx_messageInfo_Metadata.Size(m)
}
func (m *Metadata) XXX_DiscardUnknown() {
xxx_messageInfo_Metadata.DiscardUnknown(m)
}
var xxx_messageInfo_Metadata proto.InternalMessageInfo
func (m *Metadata) GetType() string {
if m != nil {
return m.Type
func (x *Metadata) GetType() string {
if x != nil {
return x.Type
}
return ""
}
func (m *Metadata) GetClientIp() string {
if m != nil {
return m.ClientIp
func (x *Metadata) GetClientIp() string {
if x != nil {
return x.ClientIp
}
return ""
}
func (m *Metadata) GetHeaders() map[string]string {
if m != nil {
return m.Headers
func (x *Metadata) GetHeaders() map[string]string {
if x != nil {
return x.Headers
}
return nil
}
type Payload struct {
Metadata *Metadata `protobuf:"bytes,2,opt,name=metadata,proto3" json:"metadata,omitempty"`
Body *any.Any `protobuf:"bytes,3,opt,name=body,proto3" json:"body,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
state protoimpl.MessageState `protogen:"open.v1"`
Metadata *Metadata `protobuf:"bytes,2,opt,name=metadata,proto3" json:"metadata,omitempty"`
Body *anypb.Any `protobuf:"bytes,3,opt,name=body,proto3" json:"body,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (m *Payload) Reset() { *m = Payload{} }
func (m *Payload) String() string { return proto.CompactTextString(m) }
func (*Payload) ProtoMessage() {}
func (x *Payload) Reset() {
*x = Payload{}
mi := &file_api_proto_nacos_grpc_service_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *Payload) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Payload) ProtoMessage() {}
func (x *Payload) ProtoReflect() protoreflect.Message {
mi := &file_api_proto_nacos_grpc_service_proto_msgTypes[1]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Payload.ProtoReflect.Descriptor instead.
func (*Payload) Descriptor() ([]byte, []int) {
return fileDescriptor_f908b146bdb05ce9, []int{1}
return file_api_proto_nacos_grpc_service_proto_rawDescGZIP(), []int{1}
}
func (m *Payload) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Payload.Unmarshal(m, b)
}
func (m *Payload) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Payload.Marshal(b, m, deterministic)
}
func (m *Payload) XXX_Merge(src proto.Message) {
xxx_messageInfo_Payload.Merge(m, src)
}
func (m *Payload) XXX_Size() int {
return xxx_messageInfo_Payload.Size(m)
}
func (m *Payload) XXX_DiscardUnknown() {
xxx_messageInfo_Payload.DiscardUnknown(m)
}
var xxx_messageInfo_Payload proto.InternalMessageInfo
func (m *Payload) GetMetadata() *Metadata {
if m != nil {
return m.Metadata
func (x *Payload) GetMetadata() *Metadata {
if x != nil {
return x.Metadata
}
return nil
}
func (m *Payload) GetBody() *any.Any {
if m != nil {
return m.Body
func (x *Payload) GetBody() *anypb.Any {
if x != nil {
return x.Body
}
return nil
}
func init() {
proto.RegisterType((*Metadata)(nil), "Metadata")
proto.RegisterMapType((map[string]string)(nil), "Metadata.HeadersEntry")
proto.RegisterType((*Payload)(nil), "Payload")
var File_api_proto_nacos_grpc_service_proto protoreflect.FileDescriptor
const file_api_proto_nacos_grpc_service_proto_rawDesc = "" +
"\n" +
"\"api/proto/nacos_grpc_service.proto\x1a\x19google/protobuf/any.proto\"\xa8\x01\n" +
"\bMetadata\x12\x12\n" +
"\x04type\x18\x03 \x01(\tR\x04type\x12\x1a\n" +
"\bclientIp\x18\b \x01(\tR\bclientIp\x120\n" +
"\aheaders\x18\a \x03(\v2\x16.Metadata.HeadersEntryR\aheaders\x1a:\n" +
"\fHeadersEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
"\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"Z\n" +
"\aPayload\x12%\n" +
"\bmetadata\x18\x02 \x01(\v2\t.MetadataR\bmetadata\x12(\n" +
"\x04body\x18\x03 \x01(\v2\x14.google.protobuf.AnyR\x04body28\n" +
"\rRequestStream\x12'\n" +
"\rrequestStream\x12\b.Payload\x1a\b.Payload\"\x000\x012*\n" +
"\aRequest\x12\x1f\n" +
"\arequest\x12\b.Payload\x1a\b.Payload\"\x002>\n" +
"\x0fBiRequestStream\x12+\n" +
"\x0frequestBiStream\x12\b.Payload\x1a\b.Payload\"\x00(\x010\x01B^\n" +
"\x1fcom.alibaba.nacos.api.grpc.autoP\x01Z9github.com/nacos-group/nacos-sdk-go/v2/api/grpc/auto;autob\x06proto3"
var (
file_api_proto_nacos_grpc_service_proto_rawDescOnce sync.Once
file_api_proto_nacos_grpc_service_proto_rawDescData []byte
)
func file_api_proto_nacos_grpc_service_proto_rawDescGZIP() []byte {
file_api_proto_nacos_grpc_service_proto_rawDescOnce.Do(func() {
file_api_proto_nacos_grpc_service_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_api_proto_nacos_grpc_service_proto_rawDesc), len(file_api_proto_nacos_grpc_service_proto_rawDesc)))
})
return file_api_proto_nacos_grpc_service_proto_rawDescData
}
func init() { proto.RegisterFile("nacos_grpc_service.proto", fileDescriptor_f908b146bdb05ce9) }
var fileDescriptor_f908b146bdb05ce9 = []byte{
// 333 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x51, 0x4f, 0x4b, 0xeb, 0x40,
0x10, 0x7f, 0xdb, 0xf6, 0xbd, 0xa4, 0xd3, 0x57, 0x2a, 0x4b, 0x91, 0x98, 0x4b, 0x4b, 0x45, 0x0c,
0x0a, 0xdb, 0x12, 0x2f, 0xa5, 0x07, 0xc1, 0x82, 0xa0, 0x07, 0xa1, 0xc4, 0x9b, 0x97, 0x32, 0x49,
0xd6, 0x1a, 0x4c, 0xb3, 0x71, 0xb3, 0x29, 0xec, 0x37, 0xf2, 0x63, 0x4a, 0x37, 0x69, 0xb0, 0x20,
0xde, 0x66, 0x7e, 0x7f, 0xe6, 0xc7, 0xcc, 0x80, 0x93, 0x61, 0x24, 0x8a, 0xf5, 0x46, 0xe6, 0xd1,
0xba, 0xe0, 0x72, 0x97, 0x44, 0x9c, 0xe5, 0x52, 0x28, 0xe1, 0x9e, 0x6d, 0x84, 0xd8, 0xa4, 0x7c,
0x6a, 0xba, 0xb0, 0x7c, 0x9d, 0x62, 0xa6, 0x2b, 0x6a, 0xf2, 0x49, 0xc0, 0x7e, 0xe2, 0x0a, 0x63,
0x54, 0x48, 0x29, 0x74, 0x94, 0xce, 0xb9, 0xd3, 0x1e, 0x13, 0xaf, 0x1b, 0x98, 0x9a, 0xba, 0x60,
0x47, 0x69, 0xc2, 0x33, 0xf5, 0x98, 0x3b, 0xb6, 0xc1, 0x9b, 0x9e, 0xce, 0xc0, 0x7a, 0xe3, 0x18,
0x73, 0x59, 0x38, 0xd6, 0xb8, 0xed, 0xf5, 0xfc, 0x53, 0x76, 0x98, 0xc5, 0x1e, 0x2a, 0xe2, 0x3e,
0x53, 0x52, 0x07, 0x07, 0x99, 0xbb, 0x80, 0xff, 0xdf, 0x09, 0x7a, 0x02, 0xed, 0x77, 0xae, 0x1d,
0x62, 0x06, 0xef, 0x4b, 0x3a, 0x84, 0xbf, 0x3b, 0x4c, 0x4b, 0xee, 0xb4, 0x0c, 0x56, 0x35, 0x8b,
0xd6, 0x9c, 0x4c, 0x5e, 0xc0, 0x5a, 0xa1, 0x4e, 0x05, 0xc6, 0xf4, 0x02, 0xec, 0x6d, 0x1d, 0x64,
0x74, 0x3d, 0xbf, 0xdb, 0x24, 0x07, 0x0d, 0x45, 0x3d, 0xe8, 0x84, 0x22, 0xd6, 0x66, 0x9f, 0x9e,
0x3f, 0x64, 0xd5, 0x19, 0xd8, 0xe1, 0x0c, 0xec, 0x2e, 0xd3, 0x81, 0x51, 0xf8, 0x73, 0xe8, 0x07,
0xfc, 0xa3, 0xe4, 0x85, 0x7a, 0x56, 0x92, 0xe3, 0x96, 0x5e, 0x42, 0x5f, 0x1e, 0x01, 0x36, 0xab,
0xc3, 0xdd, 0xa6, 0x9a, 0xfc, 0x99, 0x11, 0xff, 0x0a, 0xac, 0xda, 0x49, 0x47, 0x60, 0xd5, 0x9e,
0x9f, 0xd5, 0xfe, 0x2d, 0x0c, 0x96, 0xc9, 0x71, 0xce, 0x35, 0x0c, 0x6a, 0xcf, 0x32, 0xf9, 0x2d,
0xc9, 0x23, 0x33, 0xb2, 0x3c, 0x87, 0x51, 0x24, 0xb6, 0x0c, 0xd3, 0x24, 0xc4, 0x10, 0x99, 0xf9,
0x37, 0xc3, 0x3c, 0x61, 0xfb, 0x9f, 0x33, 0x2c, 0x95, 0x58, 0x91, 0xf0, 0x9f, 0x59, 0xef, 0xe6,
0x2b, 0x00, 0x00, 0xff, 0xff, 0x0f, 0x9e, 0xc7, 0x2d, 0x0f, 0x02, 0x00, 0x00,
var file_api_proto_nacos_grpc_service_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_api_proto_nacos_grpc_service_proto_goTypes = []any{
(*Metadata)(nil), // 0: Metadata
(*Payload)(nil), // 1: Payload
nil, // 2: Metadata.HeadersEntry
(*anypb.Any)(nil), // 3: google.protobuf.Any
}
var file_api_proto_nacos_grpc_service_proto_depIdxs = []int32{
2, // 0: Metadata.headers:type_name -> Metadata.HeadersEntry
0, // 1: Payload.metadata:type_name -> Metadata
3, // 2: Payload.body:type_name -> google.protobuf.Any
1, // 3: RequestStream.requestStream:input_type -> Payload
1, // 4: Request.request:input_type -> Payload
1, // 5: BiRequestStream.requestBiStream:input_type -> Payload
1, // 6: RequestStream.requestStream:output_type -> Payload
1, // 7: Request.request:output_type -> Payload
1, // 8: BiRequestStream.requestBiStream:output_type -> Payload
6, // [6:9] is the sub-list for method output_type
3, // [3:6] is the sub-list for method input_type
3, // [3:3] is the sub-list for extension type_name
3, // [3:3] is the sub-list for extension extendee
0, // [0:3] is the sub-list for field type_name
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// RequestStreamClient is the client API for RequestStream service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type RequestStreamClient interface {
// build a streamRequest
RequestStream(ctx context.Context, in *Payload, opts ...grpc.CallOption) (RequestStream_RequestStreamClient, error)
}
type requestStreamClient struct {
cc *grpc.ClientConn
}
func NewRequestStreamClient(cc *grpc.ClientConn) RequestStreamClient {
return &requestStreamClient{cc}
}
func (c *requestStreamClient) RequestStream(ctx context.Context, in *Payload, opts ...grpc.CallOption) (RequestStream_RequestStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &_RequestStream_serviceDesc.Streams[0], "/RequestStream/requestStream", opts...)
if err != nil {
return nil, err
func init() { file_api_proto_nacos_grpc_service_proto_init() }
func file_api_proto_nacos_grpc_service_proto_init() {
if File_api_proto_nacos_grpc_service_proto != nil {
return
}
x := &requestStreamRequestStreamClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type RequestStream_RequestStreamClient interface {
Recv() (*Payload, error)
grpc.ClientStream
}
type requestStreamRequestStreamClient struct {
grpc.ClientStream
}
func (x *requestStreamRequestStreamClient) Recv() (*Payload, error) {
m := new(Payload)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// RequestStreamServer is the server API for RequestStream service.
type RequestStreamServer interface {
// build a streamRequest
RequestStream(*Payload, RequestStream_RequestStreamServer) error
}
// UnimplementedRequestStreamServer can be embedded to have forward compatible implementations.
type UnimplementedRequestStreamServer struct {
}
func (*UnimplementedRequestStreamServer) RequestStream(req *Payload, srv RequestStream_RequestStreamServer) error {
return status.Errorf(codes.Unimplemented, "method RequestStream not implemented")
}
func RegisterRequestStreamServer(s *grpc.Server, srv RequestStreamServer) {
s.RegisterService(&_RequestStream_serviceDesc, srv)
}
func _RequestStream_RequestStream_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(Payload)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(RequestStreamServer).RequestStream(m, &requestStreamRequestStreamServer{stream})
}
type RequestStream_RequestStreamServer interface {
Send(*Payload) error
grpc.ServerStream
}
type requestStreamRequestStreamServer struct {
grpc.ServerStream
}
func (x *requestStreamRequestStreamServer) Send(m *Payload) error {
return x.ServerStream.SendMsg(m)
}
var _RequestStream_serviceDesc = grpc.ServiceDesc{
ServiceName: "RequestStream",
HandlerType: (*RequestStreamServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "requestStream",
Handler: _RequestStream_RequestStream_Handler,
ServerStreams: true,
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_api_proto_nacos_grpc_service_proto_rawDesc), len(file_api_proto_nacos_grpc_service_proto_rawDesc)),
NumEnums: 0,
NumMessages: 3,
NumExtensions: 0,
NumServices: 3,
},
},
Metadata: "nacos_grpc_service.proto",
}
// RequestClient is the client API for Request service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type RequestClient interface {
// Sends a commonRequest
Request(ctx context.Context, in *Payload, opts ...grpc.CallOption) (*Payload, error)
}
type requestClient struct {
cc *grpc.ClientConn
}
func NewRequestClient(cc *grpc.ClientConn) RequestClient {
return &requestClient{cc}
}
func (c *requestClient) Request(ctx context.Context, in *Payload, opts ...grpc.CallOption) (*Payload, error) {
out := new(Payload)
err := c.cc.Invoke(ctx, "/Request/request", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// RequestServer is the server API for Request service.
type RequestServer interface {
// Sends a commonRequest
Request(context.Context, *Payload) (*Payload, error)
}
// UnimplementedRequestServer can be embedded to have forward compatible implementations.
type UnimplementedRequestServer struct {
}
func (*UnimplementedRequestServer) Request(ctx context.Context, req *Payload) (*Payload, error) {
return nil, status.Errorf(codes.Unimplemented, "method Request not implemented")
}
func RegisterRequestServer(s *grpc.Server, srv RequestServer) {
s.RegisterService(&_Request_serviceDesc, srv)
}
func _Request_Request_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Payload)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RequestServer).Request(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/Request/Request",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RequestServer).Request(ctx, req.(*Payload))
}
return interceptor(ctx, in, info, handler)
}
var _Request_serviceDesc = grpc.ServiceDesc{
ServiceName: "Request",
HandlerType: (*RequestServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "request",
Handler: _Request_Request_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "nacos_grpc_service.proto",
}
// BiRequestStreamClient is the client API for BiRequestStream service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type BiRequestStreamClient interface {
// Sends a commonRequest
RequestBiStream(ctx context.Context, opts ...grpc.CallOption) (BiRequestStream_RequestBiStreamClient, error)
}
type biRequestStreamClient struct {
cc *grpc.ClientConn
}
func NewBiRequestStreamClient(cc *grpc.ClientConn) BiRequestStreamClient {
return &biRequestStreamClient{cc}
}
func (c *biRequestStreamClient) RequestBiStream(ctx context.Context, opts ...grpc.CallOption) (BiRequestStream_RequestBiStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &_BiRequestStream_serviceDesc.Streams[0], "/BiRequestStream/requestBiStream", opts...)
if err != nil {
return nil, err
}
x := &biRequestStreamRequestBiStreamClient{stream}
return x, nil
}
type BiRequestStream_RequestBiStreamClient interface {
Send(*Payload) error
Recv() (*Payload, error)
grpc.ClientStream
}
type biRequestStreamRequestBiStreamClient struct {
grpc.ClientStream
}
func (x *biRequestStreamRequestBiStreamClient) Send(m *Payload) error {
return x.ClientStream.SendMsg(m)
}
func (x *biRequestStreamRequestBiStreamClient) Recv() (*Payload, error) {
m := new(Payload)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// BiRequestStreamServer is the server API for BiRequestStream service.
type BiRequestStreamServer interface {
// Sends a commonRequest
RequestBiStream(BiRequestStream_RequestBiStreamServer) error
}
// UnimplementedBiRequestStreamServer can be embedded to have forward compatible implementations.
type UnimplementedBiRequestStreamServer struct {
}
func (*UnimplementedBiRequestStreamServer) RequestBiStream(srv BiRequestStream_RequestBiStreamServer) error {
return status.Errorf(codes.Unimplemented, "method RequestBiStream not implemented")
}
func RegisterBiRequestStreamServer(s *grpc.Server, srv BiRequestStreamServer) {
s.RegisterService(&_BiRequestStream_serviceDesc, srv)
}
func _BiRequestStream_RequestBiStream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(BiRequestStreamServer).RequestBiStream(&biRequestStreamRequestBiStreamServer{stream})
}
type BiRequestStream_RequestBiStreamServer interface {
Send(*Payload) error
Recv() (*Payload, error)
grpc.ServerStream
}
type biRequestStreamRequestBiStreamServer struct {
grpc.ServerStream
}
func (x *biRequestStreamRequestBiStreamServer) Send(m *Payload) error {
return x.ServerStream.SendMsg(m)
}
func (x *biRequestStreamRequestBiStreamServer) Recv() (*Payload, error) {
m := new(Payload)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
var _BiRequestStream_serviceDesc = grpc.ServiceDesc{
ServiceName: "BiRequestStream",
HandlerType: (*BiRequestStreamServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "requestBiStream",
Handler: _BiRequestStream_RequestBiStream_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "nacos_grpc_service.proto",
GoTypes: file_api_proto_nacos_grpc_service_proto_goTypes,
DependencyIndexes: file_api_proto_nacos_grpc_service_proto_depIdxs,
MessageInfos: file_api_proto_nacos_grpc_service_proto_msgTypes,
}.Build()
File_api_proto_nacos_grpc_service_proto = out.File
file_api_proto_nacos_grpc_service_proto_goTypes = nil
file_api_proto_nacos_grpc_service_proto_depIdxs = nil
}

View File

@ -0,0 +1,343 @@
//
// Copyright 1999-2020 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.5.1
// - protoc v5.29.3
// source: api/proto/nacos_grpc_service.proto
package auto
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion9
const (
RequestStream_RequestStream_FullMethodName = "/RequestStream/requestStream"
)
// RequestStreamClient is the client API for RequestStream service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type RequestStreamClient interface {
// build a streamRequest
RequestStream(ctx context.Context, in *Payload, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Payload], error)
}
type requestStreamClient struct {
cc grpc.ClientConnInterface
}
func NewRequestStreamClient(cc grpc.ClientConnInterface) RequestStreamClient {
return &requestStreamClient{cc}
}
func (c *requestStreamClient) RequestStream(ctx context.Context, in *Payload, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Payload], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &RequestStream_ServiceDesc.Streams[0], RequestStream_RequestStream_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[Payload, Payload]{ClientStream: stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RequestStream_RequestStreamClient = grpc.ServerStreamingClient[Payload]
// RequestStreamServer is the server API for RequestStream service.
// All implementations must embed UnimplementedRequestStreamServer
// for forward compatibility.
type RequestStreamServer interface {
// build a streamRequest
RequestStream(*Payload, grpc.ServerStreamingServer[Payload]) error
mustEmbedUnimplementedRequestStreamServer()
}
// UnimplementedRequestStreamServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedRequestStreamServer struct{}
func (UnimplementedRequestStreamServer) RequestStream(*Payload, grpc.ServerStreamingServer[Payload]) error {
return status.Errorf(codes.Unimplemented, "method RequestStream not implemented")
}
func (UnimplementedRequestStreamServer) mustEmbedUnimplementedRequestStreamServer() {}
func (UnimplementedRequestStreamServer) testEmbeddedByValue() {}
// UnsafeRequestStreamServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to RequestStreamServer will
// result in compilation errors.
type UnsafeRequestStreamServer interface {
mustEmbedUnimplementedRequestStreamServer()
}
func RegisterRequestStreamServer(s grpc.ServiceRegistrar, srv RequestStreamServer) {
// If the following call pancis, it indicates UnimplementedRequestStreamServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&RequestStream_ServiceDesc, srv)
}
func _RequestStream_RequestStream_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(Payload)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(RequestStreamServer).RequestStream(m, &grpc.GenericServerStream[Payload, Payload]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type RequestStream_RequestStreamServer = grpc.ServerStreamingServer[Payload]
// RequestStream_ServiceDesc is the grpc.ServiceDesc for RequestStream service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var RequestStream_ServiceDesc = grpc.ServiceDesc{
ServiceName: "RequestStream",
HandlerType: (*RequestStreamServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "requestStream",
Handler: _RequestStream_RequestStream_Handler,
ServerStreams: true,
},
},
Metadata: "api/proto/nacos_grpc_service.proto",
}
const (
Request_Request_FullMethodName = "/Request/request"
)
// RequestClient is the client API for Request service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type RequestClient interface {
// Sends a commonRequest
Request(ctx context.Context, in *Payload, opts ...grpc.CallOption) (*Payload, error)
}
type requestClient struct {
cc grpc.ClientConnInterface
}
func NewRequestClient(cc grpc.ClientConnInterface) RequestClient {
return &requestClient{cc}
}
func (c *requestClient) Request(ctx context.Context, in *Payload, opts ...grpc.CallOption) (*Payload, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(Payload)
err := c.cc.Invoke(ctx, Request_Request_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// RequestServer is the server API for Request service.
// All implementations must embed UnimplementedRequestServer
// for forward compatibility.
type RequestServer interface {
// Sends a commonRequest
Request(context.Context, *Payload) (*Payload, error)
mustEmbedUnimplementedRequestServer()
}
// UnimplementedRequestServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedRequestServer struct{}
func (UnimplementedRequestServer) Request(context.Context, *Payload) (*Payload, error) {
return nil, status.Errorf(codes.Unimplemented, "method Request not implemented")
}
func (UnimplementedRequestServer) mustEmbedUnimplementedRequestServer() {}
func (UnimplementedRequestServer) testEmbeddedByValue() {}
// UnsafeRequestServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to RequestServer will
// result in compilation errors.
type UnsafeRequestServer interface {
mustEmbedUnimplementedRequestServer()
}
func RegisterRequestServer(s grpc.ServiceRegistrar, srv RequestServer) {
// If the following call pancis, it indicates UnimplementedRequestServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&Request_ServiceDesc, srv)
}
func _Request_Request_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Payload)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RequestServer).Request(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Request_Request_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RequestServer).Request(ctx, req.(*Payload))
}
return interceptor(ctx, in, info, handler)
}
// Request_ServiceDesc is the grpc.ServiceDesc for Request service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Request_ServiceDesc = grpc.ServiceDesc{
ServiceName: "Request",
HandlerType: (*RequestServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "request",
Handler: _Request_Request_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "api/proto/nacos_grpc_service.proto",
}
const (
BiRequestStream_RequestBiStream_FullMethodName = "/BiRequestStream/requestBiStream"
)
// BiRequestStreamClient is the client API for BiRequestStream service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type BiRequestStreamClient interface {
// Sends a commonRequest
RequestBiStream(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[Payload, Payload], error)
}
type biRequestStreamClient struct {
cc grpc.ClientConnInterface
}
func NewBiRequestStreamClient(cc grpc.ClientConnInterface) BiRequestStreamClient {
return &biRequestStreamClient{cc}
}
func (c *biRequestStreamClient) RequestBiStream(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[Payload, Payload], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &BiRequestStream_ServiceDesc.Streams[0], BiRequestStream_RequestBiStream_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[Payload, Payload]{ClientStream: stream}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type BiRequestStream_RequestBiStreamClient = grpc.BidiStreamingClient[Payload, Payload]
// BiRequestStreamServer is the server API for BiRequestStream service.
// All implementations must embed UnimplementedBiRequestStreamServer
// for forward compatibility.
type BiRequestStreamServer interface {
// Sends a commonRequest
RequestBiStream(grpc.BidiStreamingServer[Payload, Payload]) error
mustEmbedUnimplementedBiRequestStreamServer()
}
// UnimplementedBiRequestStreamServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedBiRequestStreamServer struct{}
func (UnimplementedBiRequestStreamServer) RequestBiStream(grpc.BidiStreamingServer[Payload, Payload]) error {
return status.Errorf(codes.Unimplemented, "method RequestBiStream not implemented")
}
func (UnimplementedBiRequestStreamServer) mustEmbedUnimplementedBiRequestStreamServer() {}
func (UnimplementedBiRequestStreamServer) testEmbeddedByValue() {}
// UnsafeBiRequestStreamServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to BiRequestStreamServer will
// result in compilation errors.
type UnsafeBiRequestStreamServer interface {
mustEmbedUnimplementedBiRequestStreamServer()
}
func RegisterBiRequestStreamServer(s grpc.ServiceRegistrar, srv BiRequestStreamServer) {
// If the following call pancis, it indicates UnimplementedBiRequestStreamServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&BiRequestStream_ServiceDesc, srv)
}
func _BiRequestStream_RequestBiStream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(BiRequestStreamServer).RequestBiStream(&grpc.GenericServerStream[Payload, Payload]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type BiRequestStream_RequestBiStreamServer = grpc.BidiStreamingServer[Payload, Payload]
// BiRequestStream_ServiceDesc is the grpc.ServiceDesc for BiRequestStream service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var BiRequestStream_ServiceDesc = grpc.ServiceDesc{
ServiceName: "BiRequestStream",
HandlerType: (*BiRequestStreamServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "requestBiStream",
Handler: _BiRequestStream_RequestBiStream_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "api/proto/nacos_grpc_service.proto",
}

View File

@ -21,7 +21,8 @@ import "google/protobuf/any.proto";
option java_multiple_files = true;
option java_package = "com.alibaba.nacos.api.grpc.auto";
option go_package = "nacos/api/grpc/auto";
option go_package = "github.com/nacos-group/nacos-sdk-go/v2/api/grpc/auto;auto";
message Metadata {
string type = 3;

View File

@ -296,7 +296,7 @@ func (m ConcurrentMap) Keys() []string {
return keys
}
//Reviles ConcurrentMap "private" variables to json marshal.
// Reviles ConcurrentMap "private" variables to json marshal.
func (m ConcurrentMap) MarshalJSON() ([]byte, error) {
// Create a temporary map, which will hold all item spread across shards.
tmp := make(map[string]interface{})

View File

@ -115,12 +115,12 @@ func (s *ServiceInfoHolder) GetServiceInfo(serviceName, groupName, clusters stri
return model.Service{}, ok
}
func (s *ServiceInfoHolder) RegisterCallback(serviceName string, clusters string, callbackFunc *func(services []model.Instance, err error)) {
s.subCallback.AddCallbackFunc(serviceName, clusters, callbackFunc)
func (s *ServiceInfoHolder) RegisterCallback(serviceName string, clusters string, callbackWrapper *SubscribeCallbackFuncWrapper) {
s.subCallback.AddCallbackFunc(serviceName, clusters, callbackWrapper)
}
func (s *ServiceInfoHolder) DeregisterCallback(serviceName string, clusters string, callbackFunc *func(services []model.Instance, err error)) {
s.subCallback.RemoveCallbackFunc(serviceName, clusters, callbackFunc)
func (s *ServiceInfoHolder) DeregisterCallback(serviceName string, clusters string, callbackWrapper *SubscribeCallbackFuncWrapper) {
s.subCallback.RemoveCallbackFunc(serviceName, clusters, callbackWrapper)
}
func (s *ServiceInfoHolder) StopUpdateIfContain(serviceName, clusters string) {

View File

@ -36,31 +36,34 @@ func NewSubscribeCallback() *SubscribeCallback {
func (ed *SubscribeCallback) IsSubscribed(serviceName, clusters string) bool {
key := util.GetServiceCacheKey(serviceName, clusters)
_, ok := ed.callbackFuncMap.Get(key)
return ok
funcs, ok := ed.callbackFuncMap.Get(key)
if ok {
return len(funcs.([]*SubscribeCallbackFuncWrapper)) > 0
}
return false
}
func (ed *SubscribeCallback) AddCallbackFunc(serviceName string, clusters string, callbackFunc *func(services []model.Instance, err error)) {
func (ed *SubscribeCallback) AddCallbackFunc(serviceName string, clusters string, callbackWrapper *SubscribeCallbackFuncWrapper) {
key := util.GetServiceCacheKey(serviceName, clusters)
ed.mux.Lock()
defer ed.mux.Unlock()
var funcSlice []*func(services []model.Instance, err error)
var funcSlice []*SubscribeCallbackFuncWrapper
old, ok := ed.callbackFuncMap.Get(key)
if ok {
funcSlice = append(funcSlice, old.([]*func(services []model.Instance, err error))...)
funcSlice = append(funcSlice, old.([]*SubscribeCallbackFuncWrapper)...)
}
funcSlice = append(funcSlice, callbackFunc)
funcSlice = append(funcSlice, callbackWrapper)
ed.callbackFuncMap.Set(key, funcSlice)
}
func (ed *SubscribeCallback) RemoveCallbackFunc(serviceName string, clusters string, callbackFunc *func(services []model.Instance, err error)) {
func (ed *SubscribeCallback) RemoveCallbackFunc(serviceName string, clusters string, callbackWrapper *SubscribeCallbackFuncWrapper) {
logger.Info("removing " + serviceName + " with " + clusters + " to listener map")
key := util.GetServiceCacheKey(serviceName, clusters)
funcs, ok := ed.callbackFuncMap.Get(key)
if ok && funcs != nil {
var newFuncs []*func(services []model.Instance, err error)
for _, funcItem := range funcs.([]*func(services []model.Instance, err error)) {
if funcItem != callbackFunc {
var newFuncs []*SubscribeCallbackFuncWrapper
for _, funcItem := range funcs.([]*SubscribeCallbackFuncWrapper) {
if funcItem.CallbackFunc != callbackWrapper.CallbackFunc || !funcItem.Selector.Equals(callbackWrapper.Selector) {
newFuncs = append(newFuncs, funcItem)
}
}
@ -72,8 +75,8 @@ func (ed *SubscribeCallback) RemoveCallbackFunc(serviceName string, clusters str
func (ed *SubscribeCallback) ServiceChanged(cacheKey string, service *model.Service) {
funcs, ok := ed.callbackFuncMap.Get(cacheKey)
if ok {
for _, funcItem := range funcs.([]*func(services []model.Instance, err error)) {
(*funcItem)(service.Hosts, nil)
for _, funcItem := range funcs.([]*SubscribeCallbackFuncWrapper) {
funcItem.notifyListener(service)
}
}
}

View File

@ -58,13 +58,15 @@ func TestEventDispatcher_AddCallbackFuncs(t *testing.T) {
fmt.Println(util.ToJsonString(ed.callbackFuncMap))
},
}
ed.AddCallbackFunc(util.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","), &param.SubscribeCallback)
clusterSelector := NewClusterSelector(param.Clusters)
callbackWrapper := NewSubscribeCallbackFuncWrapper(clusterSelector, &param.SubscribeCallback)
ed.AddCallbackFunc(util.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","), callbackWrapper)
key := util.GetServiceCacheKey(util.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","))
for k, v := range ed.callbackFuncMap.Items() {
assert.Equal(t, key, k, "key should be equal!")
funcs := v.([]*func(services []model.Instance, err error))
funcs := v.([]*SubscribeCallbackFuncWrapper)
assert.Equal(t, len(funcs), 1)
assert.Equal(t, funcs[0], &param.SubscribeCallback, "callback function must be equal!")
assert.Equal(t, funcs[0].CallbackFunc, &param.SubscribeCallback, "callback function must be equal!")
}
}
@ -98,7 +100,9 @@ func TestEventDispatcher_RemoveCallbackFuncs(t *testing.T) {
fmt.Printf("func1:%s \n", util.ToJsonString(services))
},
}
ed.AddCallbackFunc(util.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","), &param.SubscribeCallback)
clusterSelector := NewClusterSelector(param.Clusters)
callbackWrapper := NewSubscribeCallbackFuncWrapper(clusterSelector, &param.SubscribeCallback)
ed.AddCallbackFunc(util.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","), callbackWrapper)
assert.Equal(t, len(ed.callbackFuncMap.Items()), 1, "callback funcs map length should be 1")
param2 := vo.SubscribeParam{
@ -109,21 +113,23 @@ func TestEventDispatcher_RemoveCallbackFuncs(t *testing.T) {
fmt.Printf("func2:%s \n", util.ToJsonString(services))
},
}
ed.AddCallbackFunc(util.GetGroupName(param2.ServiceName, param2.GroupName), strings.Join(param2.Clusters, ","), &param2.SubscribeCallback)
clusterSelector2 := NewClusterSelector(param2.Clusters)
callbackWrapper2 := NewSubscribeCallbackFuncWrapper(clusterSelector2, &param2.SubscribeCallback)
ed.AddCallbackFunc(util.GetGroupName(param2.ServiceName, param2.GroupName), strings.Join(param2.Clusters, ","), callbackWrapper2)
assert.Equal(t, len(ed.callbackFuncMap.Items()), 1, "callback funcs map length should be 2")
for k, v := range ed.callbackFuncMap.Items() {
log.Printf("key:%s,%d", k, len(v.([]*func(services []model.Instance, err error))))
log.Printf("key:%s,%d", k, len(v.([]*SubscribeCallbackFuncWrapper)))
}
ed.RemoveCallbackFunc(util.GetGroupName(param2.ServiceName, param2.GroupName), strings.Join(param2.Clusters, ","), &param2.SubscribeCallback)
ed.RemoveCallbackFunc(util.GetGroupName(param2.ServiceName, param2.GroupName), strings.Join(param2.Clusters, ","), callbackWrapper2)
key := util.GetServiceCacheKey(util.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","))
for k, v := range ed.callbackFuncMap.Items() {
assert.Equal(t, key, k, "key should be equal!")
funcs := v.([]*func(services []model.Instance, err error))
funcs := v.([]*SubscribeCallbackFuncWrapper)
assert.Equal(t, len(funcs), 1)
assert.Equal(t, funcs[0], &param.SubscribeCallback, "callback function must be equal!")
assert.Equal(t, funcs[0].CallbackFunc, &param.SubscribeCallback, "callback function must be equal!")
}
}
@ -158,7 +164,9 @@ func TestSubscribeCallback_ServiceChanged(t *testing.T) {
log.Printf("func1:%s \n", util.ToJsonString(services))
},
}
ed.AddCallbackFunc(util.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","), &param.SubscribeCallback)
clusterSelector := NewClusterSelector(param.Clusters)
callbackWrapper := NewSubscribeCallbackFuncWrapper(clusterSelector, &param.SubscribeCallback)
ed.AddCallbackFunc(util.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","), callbackWrapper)
param2 := vo.SubscribeParam{
ServiceName: "Test",
@ -169,7 +177,54 @@ func TestSubscribeCallback_ServiceChanged(t *testing.T) {
},
}
ed.AddCallbackFunc(util.GetGroupName(param2.ServiceName, param2.GroupName), strings.Join(param2.Clusters, ","), &param2.SubscribeCallback)
clusterSelector2 := NewClusterSelector(param2.Clusters)
callbackWrapper2 := NewSubscribeCallbackFuncWrapper(clusterSelector2, &param2.SubscribeCallback)
ed.AddCallbackFunc(util.GetGroupName(param2.ServiceName, param2.GroupName), strings.Join(param2.Clusters, ","), callbackWrapper2)
cacheKey := util.GetServiceCacheKey(util.GetGroupName(service.Name, service.GroupName), service.Clusters)
ed.ServiceChanged(cacheKey, &service)
}
func TestSubscribeCallback_RemoveCallbackFunc(t *testing.T) {
ed := NewSubscribeCallback()
serviceName := "Test"
clusters := "default"
groupName := "public"
callback1 := func(services []model.Instance, err error) {
log.Printf("callback1:%s \n", util.ToJsonString(services))
}
clusterSelector1 := NewClusterSelector([]string{clusters})
callbackWrapper1 := NewSubscribeCallbackFuncWrapper(clusterSelector1, &callback1)
callback2 := func(services []model.Instance, err error) {
log.Printf("callback2:%s \n", util.ToJsonString(services))
}
clusterSelector2 := NewClusterSelector([]string{clusters})
callbackWrapper2 := NewSubscribeCallbackFuncWrapper(clusterSelector2, &callback2)
// Add both callbacks
ed.AddCallbackFunc(util.GetGroupName(serviceName, groupName), clusters, callbackWrapper1)
ed.AddCallbackFunc(util.GetGroupName(serviceName, groupName), clusters, callbackWrapper2)
assert.True(t, ed.IsSubscribed(util.GetGroupName(serviceName, groupName), clusters))
// Remove the first callback
ed.RemoveCallbackFunc(util.GetGroupName(serviceName, groupName), clusters, callbackWrapper1)
// Check if only the second callback remains
cacheKey := util.GetServiceCacheKey(util.GetGroupName(serviceName, groupName), clusters)
funcs, ok := ed.callbackFuncMap.Get(cacheKey)
if !ok || len(funcs.([]*SubscribeCallbackFuncWrapper)) != 1 {
t.Errorf("Expected 1 callback function, got %d", len(funcs.([]*SubscribeCallbackFuncWrapper)))
}
assert.True(t, ed.IsSubscribed(util.GetGroupName(serviceName, groupName), clusters))
// Remove the second callback
ed.RemoveCallbackFunc(util.GetGroupName(serviceName, groupName), clusters, callbackWrapper2)
// Check if no callbacks remain
funcs, ok = ed.callbackFuncMap.Get(cacheKey)
if ok && len(funcs.([]*SubscribeCallbackFuncWrapper)) != 0 {
t.Errorf("Expected 0 callback functions, got %d", len(funcs.([]*func(services []model.Instance, err error))))
}
assert.False(t, ed.IsSubscribed(util.GetGroupName(serviceName, groupName), clusters))
}

View File

@ -0,0 +1,106 @@
package naming_cache
import (
"sort"
"strings"
"github.com/nacos-group/nacos-sdk-go/v2/model"
"github.com/nacos-group/nacos-sdk-go/v2/util"
)
type Selector interface {
SelectInstance(service *model.Service) []model.Instance
Equals(o Selector) bool
}
type ClusterSelector struct {
ClusterNames string
Clusters []string
}
func NewClusterSelector(clusters []string) *ClusterSelector {
if len(clusters) == 0 {
return &ClusterSelector{
ClusterNames: "",
Clusters: []string{},
}
}
// 创建副本避免外部修改
clustersCopy := make([]string, len(clusters))
copy(clustersCopy, clusters)
return &ClusterSelector{
ClusterNames: joinCluster(clusters),
Clusters: clustersCopy,
}
}
func NewSubscribeCallbackFuncWrapper(selector Selector, callback *func(services []model.Instance, err error)) *SubscribeCallbackFuncWrapper {
if selector == nil {
panic("selector cannot be nil")
}
if callback == nil {
panic("callback cannot be nil")
}
return &SubscribeCallbackFuncWrapper{
Selector: selector,
CallbackFunc: callback,
}
}
type SubscribeCallbackFuncWrapper struct {
Selector Selector
CallbackFunc *func(services []model.Instance, err error)
}
func (ed *SubscribeCallbackFuncWrapper) notifyListener(service *model.Service) {
instances := ed.Selector.SelectInstance(service)
if ed.CallbackFunc != nil {
(*ed.CallbackFunc)(instances, nil)
}
}
func (cs *ClusterSelector) SelectInstance(service *model.Service) []model.Instance {
var instances []model.Instance
if cs.ClusterNames == "" {
return service.Hosts
}
for _, instance := range service.Hosts {
if util.Contains(cs.Clusters, instance.ClusterName) {
instances = append(instances, instance)
}
}
return instances
}
func (cs *ClusterSelector) Equals(o Selector) bool {
if o == nil {
return false
}
if o, ok := o.(*ClusterSelector); ok {
return cs.ClusterNames == o.ClusterNames
}
return false
}
func joinCluster(cluster []string) string {
// 使用map实现去重
uniqueSet := make(map[string]struct{})
for _, item := range cluster {
if item != "" { // 过滤空字符串类似Java中的isNotEmpty
uniqueSet[item] = struct{}{}
}
}
uniqueSlice := make([]string, 0, len(uniqueSet))
for item := range uniqueSet {
uniqueSlice = append(uniqueSlice, item)
}
sort.Strings(uniqueSlice)
// 使用逗号连接
return strings.Join(uniqueSlice, ",")
}

View File

@ -18,13 +18,14 @@ package naming_client
import (
"context"
"github.com/nacos-group/nacos-sdk-go/v2/common/security"
"math"
"math/rand"
"strings"
"sync"
"time"
"github.com/nacos-group/nacos-sdk-go/v2/common/security"
"github.com/pkg/errors"
"github.com/nacos-group/nacos-sdk-go/v2/clients/nacos_client"
@ -200,11 +201,14 @@ func (sc *NamingClient) GetService(param vo.GetServiceParam) (service model.Serv
param.GroupName = constant.DEFAULT_GROUP
}
var ok bool
clusterSelector := naming_cache.NewClusterSelector(param.Clusters)
clusters := strings.Join(param.Clusters, ",")
service, ok = sc.serviceInfoHolder.GetServiceInfo(param.ServiceName, param.GroupName, clusters)
service, ok = sc.serviceInfoHolder.GetServiceInfo(param.ServiceName, param.GroupName, "")
if !ok {
service, err = sc.serviceProxy.Subscribe(param.ServiceName, param.GroupName, clusters)
service, err = sc.serviceProxy.Subscribe(param.ServiceName, param.GroupName, "")
}
service.Clusters = clusters
service.Hosts = clusterSelector.SelectInstance(&service)
return service, err
}
@ -230,21 +234,24 @@ func (sc *NamingClient) SelectAllInstances(param vo.SelectAllInstancesParam) ([]
if len(param.GroupName) == 0 {
param.GroupName = constant.DEFAULT_GROUP
}
clusters := strings.Join(param.Clusters, ",")
var (
service model.Service
ok bool
err error
)
service, ok = sc.serviceInfoHolder.GetServiceInfo(param.ServiceName, param.GroupName, clusters)
clusterSelector := naming_cache.NewClusterSelector(param.Clusters)
service, ok = sc.serviceInfoHolder.GetServiceInfo(param.ServiceName, param.GroupName, "")
if !ok {
service, err = sc.serviceProxy.Subscribe(param.ServiceName, param.GroupName, clusters)
service, err = sc.serviceProxy.Subscribe(param.ServiceName, param.GroupName, "")
}
if err != nil || service.Hosts == nil || len(service.Hosts) == 0 {
if err != nil {
return []model.Instance{}, err
}
return service.Hosts, err
instances := clusterSelector.SelectInstance(&service)
if instances == nil || len(instances) == 0 {
return []model.Instance{}, err
}
return instances, err
}
// SelectInstances Get all instance by DataId, Group and Health
@ -257,14 +264,15 @@ func (sc *NamingClient) SelectInstances(param vo.SelectInstancesParam) ([]model.
ok bool
err error
)
clusters := strings.Join(param.Clusters, ",")
service, ok = sc.serviceInfoHolder.GetServiceInfo(param.ServiceName, param.GroupName, clusters)
clusterSelector := naming_cache.NewClusterSelector(param.Clusters)
service, ok = sc.serviceInfoHolder.GetServiceInfo(param.ServiceName, param.GroupName, "")
if !ok {
service, err = sc.serviceProxy.Subscribe(param.ServiceName, param.GroupName, clusters)
service, err = sc.serviceProxy.Subscribe(param.ServiceName, param.GroupName, "")
if err != nil {
return nil, err
}
}
service.Hosts = clusterSelector.SelectInstance(&service)
return sc.selectInstances(service, param.HealthyOnly)
}
@ -293,15 +301,15 @@ func (sc *NamingClient) SelectOneHealthyInstance(param vo.SelectOneHealthInstanc
ok bool
err error
)
clusters := strings.Join(param.Clusters, ",")
service, ok = sc.serviceInfoHolder.GetServiceInfo(param.ServiceName, param.GroupName, clusters)
clusterSelector := naming_cache.NewClusterSelector(param.Clusters)
service, ok = sc.serviceInfoHolder.GetServiceInfo(param.ServiceName, param.GroupName, "")
if !ok {
service, err = sc.serviceProxy.Subscribe(param.ServiceName, param.GroupName, clusters)
service, err = sc.serviceProxy.Subscribe(param.ServiceName, param.GroupName, "")
if err != nil {
return nil, err
}
}
service.Hosts = clusterSelector.SelectInstance(&service)
return sc.selectOneHealthyInstances(service)
}
@ -334,19 +342,21 @@ func (sc *NamingClient) Subscribe(param *vo.SubscribeParam) error {
if len(param.GroupName) == 0 {
param.GroupName = constant.DEFAULT_GROUP
}
clusters := strings.Join(param.Clusters, ",")
sc.serviceInfoHolder.RegisterCallback(util.GetGroupName(param.ServiceName, param.GroupName), clusters, &param.SubscribeCallback)
_, err := sc.serviceProxy.Subscribe(param.ServiceName, param.GroupName, clusters)
clusterSelector := naming_cache.NewClusterSelector(param.Clusters)
callbackWrapper := naming_cache.NewSubscribeCallbackFuncWrapper(clusterSelector, &param.SubscribeCallback)
sc.serviceInfoHolder.RegisterCallback(util.GetGroupName(param.ServiceName, param.GroupName), "", callbackWrapper)
_, err := sc.serviceProxy.Subscribe(param.ServiceName, param.GroupName, "")
return err
}
// Unsubscribe ...
func (sc *NamingClient) Unsubscribe(param *vo.SubscribeParam) (err error) {
clusters := strings.Join(param.Clusters, ",")
clusterSelector := naming_cache.NewClusterSelector(param.Clusters)
callbackWrapper := naming_cache.NewSubscribeCallbackFuncWrapper(clusterSelector, &param.SubscribeCallback)
serviceFullName := util.GetGroupName(param.ServiceName, param.GroupName)
sc.serviceInfoHolder.DeregisterCallback(serviceFullName, clusters, &param.SubscribeCallback)
if sc.serviceInfoHolder.IsSubscribed(serviceFullName, clusters) {
err = sc.serviceProxy.Unsubscribe(param.ServiceName, param.GroupName, clusters)
sc.serviceInfoHolder.DeregisterCallback(serviceFullName, "", callbackWrapper)
if !sc.serviceInfoHolder.IsSubscribed(serviceFullName, "") {
err = sc.serviceProxy.Unsubscribe(param.ServiceName, param.GroupName, "")
}
return err

View File

@ -37,6 +37,8 @@ var clientConfigTest = *constant.NewClientConfig(
var serverConfigTest = *constant.NewServerConfig("127.0.0.1", 80, constant.WithContextPath("/nacos"))
type MockNamingProxy struct {
unsubscribeCalled bool
unsubscribeParams []string // 记录调用参数
}
func (m *MockNamingProxy) RegisterInstance(serviceName string, groupName string, instance model.Instance) (bool, error) {
@ -68,6 +70,8 @@ func (m *MockNamingProxy) Subscribe(serviceName, groupName, clusters string) (mo
}
func (m *MockNamingProxy) Unsubscribe(serviceName, groupName, clusters string) error {
m.unsubscribeCalled = true
m.unsubscribeParams = []string{serviceName, groupName, clusters}
return nil
}
@ -452,3 +456,108 @@ func BenchmarkNamingClient_SelectOneHealthyInstances(b *testing.B) {
}
}
func TestNamingClient_Unsubscribe_WithCallback_ShouldNotCallServiceProxyUnsubscribe(t *testing.T) {
// 创建一个带有回调函数的订阅参数
callback := func(services []model.Instance, err error) {
// 空回调函数
}
param := &vo.SubscribeParam{
ServiceName: "test-service",
GroupName: "test-group",
Clusters: []string{"test-cluster"},
SubscribeCallback: callback,
}
// 创建测试客户端
client := NewTestNamingClient()
mockProxy := client.serviceProxy.(*MockNamingProxy)
// 执行 Unsubscribe
err := client.Unsubscribe(param)
// 验证没有错误
assert.Nil(t, err)
assert.True(t, mockProxy.unsubscribeCalled)
}
func TestNamingClient_Unsubscribe_WithoutCallback_ShouldCallServiceProxyUnsubscribe(t *testing.T) {
// 创建一个没有回调函数的订阅参数
param := &vo.SubscribeParam{
ServiceName: "test-service",
GroupName: "test-group",
Clusters: []string{"test-cluster"},
// SubscribeCallback 为 nil
}
// 创建测试客户端
client := NewTestNamingClient()
// 获取原始的 MockNamingProxy 来检查调用状态
mockProxy := client.serviceProxy.(*MockNamingProxy)
// 执行 Unsubscribe
err := client.Unsubscribe(param)
// 验证没有错误
assert.Nil(t, err)
assert.True(t, mockProxy.unsubscribeCalled)
}
// TestNamingClient_Unsubscribe_Integration_Test 集成测试,使用真实的 ServiceInfoHolder 来测试修复后的逻辑
func TestNamingClient_Unsubscribe_Integration_Test(t *testing.T) {
// 创建测试客户端
client := NewTestNamingClient()
// 获取原始的 MockNamingProxy 来检查调用状态
mockProxy := client.serviceProxy.(*MockNamingProxy)
// 创建回调函数
callback1 := func(services []model.Instance, err error) {
// 回调函数1
}
callback2 := func(services []model.Instance, err error) {
// 回调函数2
}
// 测试场景1先注册两个回调函数然后取消订阅第一个
// 这种情况下,取消订阅第一个回调函数后,还有其他回调函数,所以不应该调用 serviceProxy.Unsubscribe
// 注册第一个回调函数
param1 := &vo.SubscribeParam{
ServiceName: "test-service",
GroupName: "test-group",
Clusters: []string{"test-cluster"},
SubscribeCallback: callback1,
}
// 注册第二个回调函数
param2 := &vo.SubscribeParam{
ServiceName: "test-service",
GroupName: "test-group",
Clusters: []string{"test-cluster"},
SubscribeCallback: callback2,
}
// 先注册两个回调函数
err := client.Subscribe(param1)
assert.Nil(t, err)
err = client.Subscribe(param2)
assert.Nil(t, err)
// 重置 MockNamingProxy 的调用状态
mockProxy.unsubscribeCalled = false
mockProxy.unsubscribeParams = nil
// 取消订阅第一个回调函数
err = client.Unsubscribe(param1)
assert.Nil(t, err)
assert.False(t, mockProxy.unsubscribeCalled)
// 取消订阅第二个回调函数
err = client.Unsubscribe(param2)
assert.Nil(t, err)
assert.True(t, mockProxy.unsubscribeCalled)
}

View File

@ -76,7 +76,7 @@ const (
KEY_BEAT = "beat"
KEY_DOM = "dom"
DEFAULT_CONTEXT_PATH = "/nacos"
CLIENT_VERSION = "Nacos-Go-Client:v2.2.7"
CLIENT_VERSION = "Nacos-Go-Client:v2.3.3"
REQUEST_DOMAIN_RETRY_TIME = 3
SERVICE_INFO_SPLITER = "@@"
CONFIG_INFO_SPLITER = "@@"

View File

@ -34,35 +34,35 @@ func NewServerConfig(ipAddr string, port uint64, opts ...ServerOption) *ServerCo
// ServerOption ...
type ServerOption func(*ServerConfig)
//WithScheme set Scheme for server
// WithScheme set Scheme for server
func WithScheme(scheme string) ServerOption {
return func(config *ServerConfig) {
config.Scheme = scheme
}
}
//WithContextPath set contextPath for server
// WithContextPath set contextPath for server
func WithContextPath(contextPath string) ServerOption {
return func(config *ServerConfig) {
config.ContextPath = contextPath
}
}
//WithIpAddr set ip address for server
// WithIpAddr set ip address for server
func WithIpAddr(ipAddr string) ServerOption {
return func(config *ServerConfig) {
config.IpAddr = ipAddr
}
}
//WithPort set port for server
// WithPort set port for server
func WithPort(port uint64) ServerOption {
return func(config *ServerConfig) {
config.Port = port
}
}
//WithGrpcPort set grpc port for server
// WithGrpcPort set grpc port for server
func WithGrpcPort(port uint64) ServerOption {
return func(config *ServerConfig) {
config.GrpcPort = port

View File

@ -110,6 +110,12 @@ func BuildLoggerConfig(clientConfig constant.ClientConfig) Config {
loggerConfig.LogRollingConfig.MaxBackups = logRollingConfig.MaxBackups
loggerConfig.LogRollingConfig.LocalTime = logRollingConfig.LocalTime
loggerConfig.LogRollingConfig.Compress = logRollingConfig.Compress
} else {
loggerConfig.LogRollingConfig.MaxSize = 100
loggerConfig.LogRollingConfig.MaxAge = 30
loggerConfig.LogRollingConfig.MaxBackups = 5
loggerConfig.LogRollingConfig.LocalTime = true
loggerConfig.LogRollingConfig.Compress = false
}
return loggerConfig
}

View File

@ -18,15 +18,16 @@ package rpc
import (
"context"
"github.com/nacos-group/nacos-sdk-go/v2/common/logger"
"time"
"github.com/golang/protobuf/ptypes/any"
"github.com/nacos-group/nacos-sdk-go/v2/common/logger"
nacos_grpc_service "github.com/nacos-group/nacos-sdk-go/v2/api/grpc"
"github.com/nacos-group/nacos-sdk-go/v2/common/remote/rpc/rpc_request"
"github.com/nacos-group/nacos-sdk-go/v2/common/remote/rpc/rpc_response"
"github.com/nacos-group/nacos-sdk-go/v2/util"
"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/grpc"
)
@ -86,7 +87,7 @@ func convertRequest(r rpc_request.IRequest) *nacos_grpc_service.Payload {
}
return &nacos_grpc_service.Payload{
Metadata: &Metadata,
Body: &any.Any{Value: []byte(r.GetBody(r))},
Body: &anypb.Any{Value: []byte(r.GetBody(r))},
}
}
@ -97,6 +98,6 @@ func convertResponse(r rpc_response.IResponse) *nacos_grpc_service.Payload {
}
return &nacos_grpc_service.Payload{
Metadata: &Metadata,
Body: &any.Any{Value: []byte(r.GetBody())},
Body: &anypb.Any{Value: []byte(r.GetBody())},
}
}

View File

@ -100,7 +100,7 @@ type RpcClient struct {
ctx context.Context
name string
labels map[string]string
currentConnection IConnection
currentConnection atomic.Value
rpcClientStatus RpcClientStatus
eventChan chan ConnectionEvent
reconnectionChan chan ReconnectContext
@ -128,6 +128,18 @@ type ConnectionEvent struct {
eventType ConnectionStatus
}
func (r *RpcClient) getCurrentConnection() IConnection {
if conn, ok := r.currentConnection.Load().(IConnection); ok {
return conn
} else {
return nil
}
}
func (r *RpcClient) setCurrentConnection(conn IConnection) {
r.currentConnection.Store(conn)
}
func (r *RpcClient) putAllLabels(labels map[string]string) {
for k, v := range labels {
r.labels[k] = v
@ -336,7 +348,7 @@ func (r *RpcClient) Start() {
if currentConnection != nil {
logger.Infof("%s success to connect to server %+v on start up, connectionId=%s", r.name,
currentConnection.getServerInfo(), currentConnection.getConnectionId())
r.currentConnection = currentConnection
r.setCurrentConnection(currentConnection)
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING))
r.notifyConnectionChange(CONNECTED)
} else {
@ -349,11 +361,12 @@ func (r *RpcClient) notifyConnectionChange(eventType ConnectionStatus) {
}
func (r *RpcClient) notifyServerSrvChange() {
if r.currentConnection == nil {
currentConnection := r.getCurrentConnection()
if currentConnection == nil {
r.switchServerAsync(ServerInfo{}, false)
return
}
curServerInfo := r.currentConnection.getServerInfo()
curServerInfo := currentConnection.getServerInfo()
var found bool
for _, ele := range r.nacosServer.GetServerList() {
if ele.IpAddr == curServerInfo.serverIp {
@ -411,7 +424,11 @@ func (r *RpcClient) switchServerAsync(recommendServerInfo ServerInfo, onRequestF
func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) {
if onRequestFail && r.sendHealthCheck() {
logger.Infof("%s server check success, currentServer is %+v", r.name, r.currentConnection.getServerInfo())
var serverInfo interface{} = nil
if currentConnection := r.getCurrentConnection(); currentConnection != nil {
serverInfo = currentConnection.getServerInfo()
}
logger.Infof("%s server check success, currentServer is %+v", r.name, serverInfo)
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING))
r.notifyConnectionChange(CONNECTED)
return
@ -438,14 +455,14 @@ func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) {
if connectionNew != nil && err == nil {
logger.Infof("%s success to connect a server %+v, connectionId=%s", r.name, serverInfo,
connectionNew.getConnectionId())
if r.currentConnection != nil {
currentConnection := r.getCurrentConnection()
if currentConnection != nil {
logger.Infof("%s abandon prev connection, server is %+v, connectionId is %s", r.name, serverInfo,
r.currentConnection.getConnectionId())
r.currentConnection.setAbandon(true)
currentConnection.getConnectionId())
currentConnection.setAbandon(true)
r.closeConnection()
}
r.currentConnection = connectionNew
r.setCurrentConnection(connectionNew)
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING))
r.notifyConnectionChange(CONNECTED)
return
@ -471,8 +488,9 @@ func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) {
}
func (r *RpcClient) closeConnection() {
if r.currentConnection != nil {
r.currentConnection.close()
currentConnection := r.getCurrentConnection()
if currentConnection != nil {
currentConnection.close()
r.notifyConnectionChange(DISCONNECTED)
}
}
@ -483,7 +501,11 @@ func (r *RpcClient) notifyConnectionEvent(event ConnectionEvent) {
if len(listeners) == 0 {
return
}
logger.Infof("%s notify %s event to listeners , connectionId=%s", r.name, event.toString(), r.currentConnection.getConnectionId())
var connectionId string
if currentConnection := r.getCurrentConnection(); currentConnection != nil {
connectionId = currentConnection.getConnectionId()
}
logger.Infof("%s notify %s event to listeners , connectionId=%s", r.name, event.toString(), connectionId)
for _, v := range listeners {
if event.isConnected() {
v.OnConnected()
@ -505,10 +527,11 @@ func (r *RpcClient) healthCheck(timer *time.Timer) {
r.lastActiveTimestamp.Store(time.Now())
return
} else {
if r.currentConnection == nil || r.isShutdown() {
currentConnection := r.getCurrentConnection()
if currentConnection == nil || r.isShutdown() {
return
}
logger.Infof("%s server healthy check fail, currentConnection=%s", r.name, r.currentConnection.getConnectionId())
logger.Infof("%s server healthy check fail, currentConnection=%s", r.name, currentConnection.getConnectionId())
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(UNHEALTHY))
reconnectContext = ReconnectContext{onRequestFail: false}
}
@ -516,10 +539,11 @@ func (r *RpcClient) healthCheck(timer *time.Timer) {
}
func (r *RpcClient) sendHealthCheck() bool {
if r.currentConnection == nil {
currentConnection := r.getCurrentConnection()
if currentConnection == nil {
return false
}
response, err := r.currentConnection.request(rpc_request.NewHealthCheckRequest(),
response, err := currentConnection.request(rpc_request.NewHealthCheckRequest(),
constant.DEFAULT_TIMEOUT_MILLS, r)
if err != nil {
logger.Errorf("client sendHealthCheck failed,err=%v", err)
@ -584,14 +608,19 @@ func (c *ConnectionEvent) toString() string {
func (r *RpcClient) Request(request rpc_request.IRequest, timeoutMills int64) (rpc_response.IResponse, error) {
retryTimes := 0
start := util.CurrentMillis()
var currentErr error
var (
currentErr error
currentConnection IConnection
)
for retryTimes < constant.REQUEST_DOMAIN_RETRY_TIME && util.CurrentMillis() < start+timeoutMills {
if r.currentConnection == nil || !r.IsRunning() {
currentConnection = r.getCurrentConnection()
if currentConnection == nil || !r.IsRunning() {
rpcClientStatus := RpcClientStatus(atomic.LoadInt32((*int32)(&r.rpcClientStatus)))
currentErr = waitReconnect(timeoutMills, &retryTimes, request,
errors.Errorf("client not connected, current status:%s", r.rpcClientStatus.getDesc()))
errors.Errorf("client not connected, current status:%s", rpcClientStatus.getDesc()))
continue
}
response, err := r.currentConnection.request(request, timeoutMills, r)
response, err := currentConnection.request(request, timeoutMills, r)
if err != nil {
currentErr = waitReconnect(timeoutMills, &retryTimes, request, err)
continue
@ -601,7 +630,7 @@ func (r *RpcClient) Request(request rpc_request.IRequest, timeoutMills int64) (r
r.mux.Lock()
if atomic.CompareAndSwapInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING), (int32)(UNHEALTHY)) {
logger.Infof("Connection is unregistered, switch server, connectionId=%s, request=%s",
r.currentConnection.getConnectionId(), request.GetRequestType())
currentConnection.getConnectionId(), request.GetRequestType())
r.switchServerAsync(ServerInfo{}, false)
}
r.mux.Unlock()

18
go.mod
View File

@ -1,6 +1,7 @@
module github.com/nacos-group/nacos-sdk-go/v2
go 1.18
// v2.2.9 is deprecated due to a critical bug. Use v2.3.0 or later
go 1.21
require (
github.com/alibabacloud-go/darabonba-openapi/v2 v2.0.10
@ -10,20 +11,24 @@ require (
github.com/aliyun/alibaba-cloud-sdk-go v1.61.1800
github.com/aliyun/alibabacloud-dkms-gcs-go-sdk v0.5.1
github.com/aliyun/alibabacloud-dkms-transfer-go-sdk v0.1.8
github.com/aliyun/aliyun-secretsmanager-client-go v1.1.5
github.com/aliyun/credentials-go v1.4.3
github.com/buger/jsonparser v1.1.1
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.3
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.2
github.com/stretchr/testify v1.8.1
go.uber.org/zap v1.21.0
golang.org/x/sync v0.10.0
golang.org/x/time v0.1.0
google.golang.org/grpc v1.56.3
google.golang.org/grpc v1.67.3
google.golang.org/protobuf v1.36.5
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)
require github.com/golang/protobuf v1.5.4 // indirect
require (
github.com/alibabacloud-go/alibabacloud-gateway-pop v0.0.6 // indirect
github.com/alibabacloud-go/alibabacloud-gateway-spi v0.0.5 // indirect
@ -37,10 +42,8 @@ require (
github.com/alibabacloud-go/openapi-util v0.1.0 // indirect
github.com/alibabacloud-go/tea-utils v1.4.4 // indirect
github.com/alibabacloud-go/tea-xml v1.1.3 // indirect
github.com/aliyun/aliyun-secretsmanager-client-go v1.1.5 // indirect
github.com/aliyun/credentials-go v1.4.3 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/clbanning/mxj/v2 v2.5.5 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deckarep/golang-set v1.7.1 // indirect
@ -63,8 +66,7 @@ require (
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/protobuf v1.33.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

28
go.sum
View File

@ -88,12 +88,8 @@ github.com/alibabacloud-go/tea-xml v1.1.3 h1:7LYnm+JbOq2B+T/B0fHC4Ies4/FofC4zHzY
github.com/alibabacloud-go/tea-xml v1.1.3/go.mod h1:Rq08vgCcCAjHyRi/M7xlHKUykZCEtyBy9+DPF6GgEu8=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.1800 h1:ie/8RxBOfKZWcrbYSJi2Z8uX8TcOlSMwPlEJh83OeOw=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.1800/go.mod h1:RcDobYh8k5VP6TNybz9m++gL3ijVI5wueVr0EM10VsU=
github.com/aliyun/alibabacloud-dkms-gcs-go-sdk v0.2.2 h1:rWkH6D2XlXb/Y+tNAQROxBzp3a0p92ni+pXcaHBe/WI=
github.com/aliyun/alibabacloud-dkms-gcs-go-sdk v0.2.2/go.mod h1:GDtq+Kw+v0fO+j5BrrWiUHbBq7L+hfpzpPfXKOZMFE0=
github.com/aliyun/alibabacloud-dkms-gcs-go-sdk v0.5.1 h1:nJYyoFP+aqGKgPs9JeZgS1rWQ4NndNR0Zfhh161ZltU=
github.com/aliyun/alibabacloud-dkms-gcs-go-sdk v0.5.1/go.mod h1:WzGOmFFTlUzXM03CJnHWMQ85UN6QGpOXZocCjwkiyOg=
github.com/aliyun/alibabacloud-dkms-transfer-go-sdk v0.1.7 h1:olLiPI2iM8Hqq6vKnSxpM3awCrm9/BeOgHpzQkOYnI4=
github.com/aliyun/alibabacloud-dkms-transfer-go-sdk v0.1.7/go.mod h1:oDg1j4kFxnhgftaiLJABkGeSvuEvSF5Lo6UmRAMruX4=
github.com/aliyun/alibabacloud-dkms-transfer-go-sdk v0.1.8 h1:QeUdR7JF7iNCvO/81EhxEr3wDwxk4YBoYZOq6E0AjHI=
github.com/aliyun/alibabacloud-dkms-transfer-go-sdk v0.1.8/go.mod h1:xP0KIZry6i7oGPF24vhAPr1Q8vLZRcMcxtft5xDKwCU=
github.com/aliyun/aliyun-secretsmanager-client-go v1.1.5 h1:8S0mtD101RDYa0LXwdoqgN0RxdMmmJYjq8g2mk7/lQ4=
@ -101,7 +97,6 @@ github.com/aliyun/aliyun-secretsmanager-client-go v1.1.5/go.mod h1:M19fxYz3gpm0E
github.com/aliyun/credentials-go v1.1.2/go.mod h1:ozcZaMR5kLM7pwtCMEpVmQ242suV6qTJya2bDq4X1Tw=
github.com/aliyun/credentials-go v1.3.1/go.mod h1:8jKYhQuDawt8x2+fusqa1Y6mPxemTsBEN04dgcAcYz0=
github.com/aliyun/credentials-go v1.3.6/go.mod h1:1LxUuX7L5YrZUWzBrRyk0SwSdH4OmPrib8NVePL3fxM=
github.com/aliyun/credentials-go v1.3.10 h1:45Xxrae/evfzQL9V10zL3xX31eqgLWEaIdCoPipOEQA=
github.com/aliyun/credentials-go v1.3.10/go.mod h1:Jm6d+xIgwJVLVWT561vy67ZRP4lPTQxMbEYRuT2Ti1U=
github.com/aliyun/credentials-go v1.4.3 h1:N3iHyvHRMyOwY1+0qBLSf3hb5JFiOujVSVuEpgeGttY=
github.com/aliyun/credentials-go v1.4.3/go.mod h1:Jm6d+xIgwJVLVWT561vy67ZRP4lPTQxMbEYRuT2Ti1U=
@ -116,8 +111,8 @@ github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx2
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
@ -175,8 +170,8 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
@ -188,7 +183,8 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
@ -609,8 +605,8 @@ google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7Fc
google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 h1:e7S5W7MGGLaSu8j3YjdezkZ+m1/Nm0uRVRMEMGk26Xs=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
@ -623,8 +619,8 @@ google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKa
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.56.3 h1:8I4C0Yq1EjstUzUJzpcRVbuYA2mODtEmpWiQoN/b2nc=
google.golang.org/grpc v1.56.3/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s=
google.golang.org/grpc v1.67.3 h1:OgPcDAFKHnH8X3O4WcO4XUc8GRDeKsKReqbQtiCj7N8=
google.golang.org/grpc v1.67.3/go.mod h1:YGaHCc6Oap+FzBJTZLBzkGSYt/cvGPFTPxkn7QfSU8s=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@ -638,8 +634,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@ -70,27 +70,30 @@ func (u UUID) MarshalText() (text []byte, err error) {
// UnmarshalText implements the encoding.TextUnmarshaler interface.
// Following formats are supported:
// "6ba7b810-9dad-11d1-80b4-00c04fd430c8",
// "{6ba7b810-9dad-11d1-80b4-00c04fd430c8}",
// "urn:uuid:6ba7b810-9dad-11d1-80b4-00c04fd430c8"
// "6ba7b8109dad11d180b400c04fd430c8"
//
// "6ba7b810-9dad-11d1-80b4-00c04fd430c8",
// "{6ba7b810-9dad-11d1-80b4-00c04fd430c8}",
// "urn:uuid:6ba7b810-9dad-11d1-80b4-00c04fd430c8"
// "6ba7b8109dad11d180b400c04fd430c8"
//
// ABNF for supported UUID text representation follows:
// uuid := canonical | hashlike | braced | urn
// plain := canonical | hashlike
// canonical := 4hexoct '-' 2hexoct '-' 2hexoct '-' 6hexoct
// hashlike := 12hexoct
// braced := '{' plain '}'
// urn := URN ':' UUID-NID ':' plain
// URN := 'urn'
// UUID-NID := 'uuid'
// 12hexoct := 6hexoct 6hexoct
// 6hexoct := 4hexoct 2hexoct
// 4hexoct := 2hexoct 2hexoct
// 2hexoct := hexoct hexoct
// hexoct := hexdig hexdig
// hexdig := '0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9' |
// 'a' | 'b' | 'c' | 'd' | 'e' | 'f' |
// 'A' | 'B' | 'C' | 'D' | 'E' | 'F'
//
// uuid := canonical | hashlike | braced | urn
// plain := canonical | hashlike
// canonical := 4hexoct '-' 2hexoct '-' 2hexoct '-' 6hexoct
// hashlike := 12hexoct
// braced := '{' plain '}'
// urn := URN ':' UUID-NID ':' plain
// URN := 'urn'
// UUID-NID := 'uuid'
// 12hexoct := 6hexoct 6hexoct
// 6hexoct := 4hexoct 2hexoct
// 4hexoct := 2hexoct 2hexoct
// 2hexoct := hexoct hexoct
// hexoct := hexdig hexdig
// hexdig := '0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9' |
// 'a' | 'b' | 'c' | 'd' | 'e' | 'f' |
// 'A' | 'B' | 'C' | 'D' | 'E' | 'F'
func (u *UUID) UnmarshalText(text []byte) (err error) {
switch len(text) {
case 32:

View File

@ -152,6 +152,7 @@ func (u *UUID) SetVariant(v byte) {
// Must is a helper that wraps a call to a function returning (UUID, error)
// and panics if the error is non-nil. It is intended for use in variable
// initializations such as
//
// var packageUUID = uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440000"));
func Must(u UUID, err error) UUID {
if err != nil {

View File

@ -139,3 +139,12 @@ func DeepCopyMap(params map[string]string) map[string]string {
}
return result
}
func Contains(slice []string, item string) bool {
for _, s := range slice {
if s == item {
return true
}
}
return false
}