Compare commits

...

4 Commits

Author SHA1 Message Date
sodaRyCN 4c1e1621f1
Merge 683dce244d into ead2368c2f 2025-08-13 19:00:04 +08:00
shalk(xiao kun) ead2368c2f
fix: nacos client unsubscribe (#836) 2025-08-13 11:01:30 +08:00
sodaRyCN 683dce244d remove unneeded code and fix wrong cache key 2024-01-10 15:32:34 +08:00
sodaRyCN 0cd7a26811 thread safe call AddCallbackFunc and RemoveCallbackFunc 2024-01-10 15:29:51 +08:00
4 changed files with 161 additions and 43 deletions

View File

@ -36,8 +36,11 @@ func NewSubscribeCallback() *SubscribeCallback {
func (ed *SubscribeCallback) IsSubscribed(serviceName, clusters string) bool {
key := util.GetServiceCacheKey(serviceName, clusters)
_, ok := ed.callbackFuncMap.Get(key)
return ok
funcs, ok := ed.callbackFuncMap.Get(key)
if ok {
return len(funcs.([]*func(services []model.Instance, err error))) > 0
}
return false
}
func (ed *SubscribeCallback) AddCallbackFunc(serviceName string, clusters string, callbackFunc *func(services []model.Instance, err error)) {
@ -56,6 +59,8 @@ func (ed *SubscribeCallback) AddCallbackFunc(serviceName string, clusters string
func (ed *SubscribeCallback) RemoveCallbackFunc(serviceName string, clusters string, callbackFunc *func(services []model.Instance, err error)) {
logger.Info("removing " + serviceName + " with " + clusters + " to listener map")
key := util.GetServiceCacheKey(serviceName, clusters)
defer ed.mux.Unlock()
ed.mux.Lock()
funcs, ok := ed.callbackFuncMap.Get(key)
if ok && funcs != nil {
var newFuncs []*func(services []model.Instance, err error)

View File

@ -30,25 +30,6 @@ import (
)
func TestEventDispatcher_AddCallbackFuncs(t *testing.T) {
service := model.Service{
Clusters: strings.Join([]string{"default"}, ","),
CacheMillis: 10000,
Checksum: "abcd",
LastRefTime: uint64(time.Now().Unix()),
}
var hosts []model.Instance
host := model.Instance{
Enable: true,
InstanceId: "123",
Port: 8080,
Ip: "127.0.0.1",
Weight: 10,
ServiceName: "public@@Test",
ClusterName: strings.Join([]string{"default"}, ","),
}
hosts = append(hosts, host)
service.Hosts = hosts
ed := NewSubscribeCallback()
param := vo.SubscribeParam{
ServiceName: "Test",
@ -70,25 +51,6 @@ func TestEventDispatcher_AddCallbackFuncs(t *testing.T) {
}
func TestEventDispatcher_RemoveCallbackFuncs(t *testing.T) {
service := model.Service{
Clusters: strings.Join([]string{"default"}, ","),
CacheMillis: 10000,
Checksum: "abcd",
LastRefTime: uint64(time.Now().Unix()),
}
var hosts []model.Instance
host := model.Instance{
Enable: true,
InstanceId: "123",
Port: 8080,
Ip: "127.0.0.1",
Weight: 10,
ServiceName: "public@@Test",
ClusterName: strings.Join([]string{"default"}, ","),
}
hosts = append(hosts, host)
service.Hosts = hosts
ed := NewSubscribeCallback()
param := vo.SubscribeParam{
ServiceName: "Test",
@ -170,6 +132,47 @@ func TestSubscribeCallback_ServiceChanged(t *testing.T) {
},
}
ed.AddCallbackFunc(util.GetGroupName(param2.ServiceName, param2.GroupName), strings.Join(param2.Clusters, ","), &param2.SubscribeCallback)
cacheKey := util.GetServiceCacheKey(util.GetGroupName(service.Name, service.GroupName), service.Clusters)
cacheKey := util.GetServiceCacheKey(service.Name, service.Clusters)
ed.ServiceChanged(cacheKey, &service)
}
func TestSubscribeCallback_RemoveCallbackFunc(t *testing.T) {
ed := NewSubscribeCallback()
serviceName := "Test"
clusters := "default"
groupName := "public"
callback1 := func(services []model.Instance, err error) {
log.Printf("callback1:%s \n", util.ToJsonString(services))
}
callback2 := func(services []model.Instance, err error) {
log.Printf("callback2:%s \n", util.ToJsonString(services))
}
// Add both callbacks
ed.AddCallbackFunc(util.GetGroupName(serviceName, groupName), clusters, &callback1)
ed.AddCallbackFunc(util.GetGroupName(serviceName, groupName), clusters, &callback2)
assert.True(t, ed.IsSubscribed(util.GetGroupName(serviceName, groupName), clusters))
// Remove the first callback
ed.RemoveCallbackFunc(util.GetGroupName(serviceName, groupName), clusters, &callback1)
// Check if only the second callback remains
cacheKey := util.GetServiceCacheKey(util.GetGroupName(serviceName, groupName), clusters)
funcs, ok := ed.callbackFuncMap.Get(cacheKey)
if !ok || len(funcs.([]*func(services []model.Instance, err error))) != 1 {
t.Errorf("Expected 1 callback function, got %d", len(funcs.([]*func(services []model.Instance, err error))))
}
assert.True(t, ed.IsSubscribed(util.GetGroupName(serviceName, groupName), clusters))
// Remove the second callback
ed.RemoveCallbackFunc(util.GetGroupName(serviceName, groupName), clusters, &callback2)
// Check if no callbacks remain
funcs, ok = ed.callbackFuncMap.Get(cacheKey)
if ok && len(funcs.([]*func(services []model.Instance, err error))) != 0 {
t.Errorf("Expected 0 callback functions, got %d", len(funcs.([]*func(services []model.Instance, err error))))
}
assert.False(t, ed.IsSubscribed(util.GetGroupName(serviceName, groupName), clusters))
}

View File

@ -18,13 +18,14 @@ package naming_client
import (
"context"
"github.com/nacos-group/nacos-sdk-go/v2/common/security"
"math"
"math/rand"
"strings"
"sync"
"time"
"github.com/nacos-group/nacos-sdk-go/v2/common/security"
"github.com/pkg/errors"
"github.com/nacos-group/nacos-sdk-go/v2/clients/nacos_client"
@ -345,7 +346,7 @@ func (sc *NamingClient) Unsubscribe(param *vo.SubscribeParam) (err error) {
clusters := strings.Join(param.Clusters, ",")
serviceFullName := util.GetGroupName(param.ServiceName, param.GroupName)
sc.serviceInfoHolder.DeregisterCallback(serviceFullName, clusters, &param.SubscribeCallback)
if sc.serviceInfoHolder.IsSubscribed(serviceFullName, clusters) {
if !sc.serviceInfoHolder.IsSubscribed(serviceFullName, clusters) {
err = sc.serviceProxy.Unsubscribe(param.ServiceName, param.GroupName, clusters)
}

View File

@ -37,6 +37,8 @@ var clientConfigTest = *constant.NewClientConfig(
var serverConfigTest = *constant.NewServerConfig("127.0.0.1", 80, constant.WithContextPath("/nacos"))
type MockNamingProxy struct {
unsubscribeCalled bool
unsubscribeParams []string // 记录调用参数
}
func (m *MockNamingProxy) RegisterInstance(serviceName string, groupName string, instance model.Instance) (bool, error) {
@ -68,6 +70,8 @@ func (m *MockNamingProxy) Subscribe(serviceName, groupName, clusters string) (mo
}
func (m *MockNamingProxy) Unsubscribe(serviceName, groupName, clusters string) error {
m.unsubscribeCalled = true
m.unsubscribeParams = []string{serviceName, groupName, clusters}
return nil
}
@ -452,3 +456,108 @@ func BenchmarkNamingClient_SelectOneHealthyInstances(b *testing.B) {
}
}
func TestNamingClient_Unsubscribe_WithCallback_ShouldNotCallServiceProxyUnsubscribe(t *testing.T) {
// 创建一个带有回调函数的订阅参数
callback := func(services []model.Instance, err error) {
// 空回调函数
}
param := &vo.SubscribeParam{
ServiceName: "test-service",
GroupName: "test-group",
Clusters: []string{"test-cluster"},
SubscribeCallback: callback,
}
// 创建测试客户端
client := NewTestNamingClient()
mockProxy := client.serviceProxy.(*MockNamingProxy)
// 执行 Unsubscribe
err := client.Unsubscribe(param)
// 验证没有错误
assert.Nil(t, err)
assert.True(t, mockProxy.unsubscribeCalled)
}
func TestNamingClient_Unsubscribe_WithoutCallback_ShouldCallServiceProxyUnsubscribe(t *testing.T) {
// 创建一个没有回调函数的订阅参数
param := &vo.SubscribeParam{
ServiceName: "test-service",
GroupName: "test-group",
Clusters: []string{"test-cluster"},
// SubscribeCallback 为 nil
}
// 创建测试客户端
client := NewTestNamingClient()
// 获取原始的 MockNamingProxy 来检查调用状态
mockProxy := client.serviceProxy.(*MockNamingProxy)
// 执行 Unsubscribe
err := client.Unsubscribe(param)
// 验证没有错误
assert.Nil(t, err)
assert.True(t, mockProxy.unsubscribeCalled)
}
// TestNamingClient_Unsubscribe_Integration_Test 集成测试,使用真实的 ServiceInfoHolder 来测试修复后的逻辑
func TestNamingClient_Unsubscribe_Integration_Test(t *testing.T) {
// 创建测试客户端
client := NewTestNamingClient()
// 获取原始的 MockNamingProxy 来检查调用状态
mockProxy := client.serviceProxy.(*MockNamingProxy)
// 创建回调函数
callback1 := func(services []model.Instance, err error) {
// 回调函数1
}
callback2 := func(services []model.Instance, err error) {
// 回调函数2
}
// 测试场景1先注册两个回调函数然后取消订阅第一个
// 这种情况下,取消订阅第一个回调函数后,还有其他回调函数,所以不应该调用 serviceProxy.Unsubscribe
// 注册第一个回调函数
param1 := &vo.SubscribeParam{
ServiceName: "test-service",
GroupName: "test-group",
Clusters: []string{"test-cluster"},
SubscribeCallback: callback1,
}
// 注册第二个回调函数
param2 := &vo.SubscribeParam{
ServiceName: "test-service",
GroupName: "test-group",
Clusters: []string{"test-cluster"},
SubscribeCallback: callback2,
}
// 先注册两个回调函数
err := client.Subscribe(param1)
assert.Nil(t, err)
err = client.Subscribe(param2)
assert.Nil(t, err)
// 重置 MockNamingProxy 的调用状态
mockProxy.unsubscribeCalled = false
mockProxy.unsubscribeParams = nil
// 取消订阅第一个回调函数
err = client.Unsubscribe(param1)
assert.Nil(t, err)
assert.False(t, mockProxy.unsubscribeCalled)
// 取消订阅第二个回调函数
err = client.Unsubscribe(param2)
assert.Nil(t, err)
assert.True(t, mockProxy.unsubscribeCalled)
}