Compare commits
4 Commits
fbb487cb1b
...
4c1e1621f1
Author | SHA1 | Date |
---|---|---|
|
4c1e1621f1 | |
|
ead2368c2f | |
|
683dce244d | |
|
0cd7a26811 |
|
@ -36,8 +36,11 @@ func NewSubscribeCallback() *SubscribeCallback {
|
|||
|
||||
func (ed *SubscribeCallback) IsSubscribed(serviceName, clusters string) bool {
|
||||
key := util.GetServiceCacheKey(serviceName, clusters)
|
||||
_, ok := ed.callbackFuncMap.Get(key)
|
||||
return ok
|
||||
funcs, ok := ed.callbackFuncMap.Get(key)
|
||||
if ok {
|
||||
return len(funcs.([]*func(services []model.Instance, err error))) > 0
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (ed *SubscribeCallback) AddCallbackFunc(serviceName string, clusters string, callbackFunc *func(services []model.Instance, err error)) {
|
||||
|
@ -56,6 +59,8 @@ func (ed *SubscribeCallback) AddCallbackFunc(serviceName string, clusters string
|
|||
func (ed *SubscribeCallback) RemoveCallbackFunc(serviceName string, clusters string, callbackFunc *func(services []model.Instance, err error)) {
|
||||
logger.Info("removing " + serviceName + " with " + clusters + " to listener map")
|
||||
key := util.GetServiceCacheKey(serviceName, clusters)
|
||||
defer ed.mux.Unlock()
|
||||
ed.mux.Lock()
|
||||
funcs, ok := ed.callbackFuncMap.Get(key)
|
||||
if ok && funcs != nil {
|
||||
var newFuncs []*func(services []model.Instance, err error)
|
||||
|
|
|
@ -30,25 +30,6 @@ import (
|
|||
)
|
||||
|
||||
func TestEventDispatcher_AddCallbackFuncs(t *testing.T) {
|
||||
service := model.Service{
|
||||
Clusters: strings.Join([]string{"default"}, ","),
|
||||
CacheMillis: 10000,
|
||||
Checksum: "abcd",
|
||||
LastRefTime: uint64(time.Now().Unix()),
|
||||
}
|
||||
var hosts []model.Instance
|
||||
host := model.Instance{
|
||||
Enable: true,
|
||||
InstanceId: "123",
|
||||
Port: 8080,
|
||||
Ip: "127.0.0.1",
|
||||
Weight: 10,
|
||||
ServiceName: "public@@Test",
|
||||
ClusterName: strings.Join([]string{"default"}, ","),
|
||||
}
|
||||
hosts = append(hosts, host)
|
||||
service.Hosts = hosts
|
||||
|
||||
ed := NewSubscribeCallback()
|
||||
param := vo.SubscribeParam{
|
||||
ServiceName: "Test",
|
||||
|
@ -70,25 +51,6 @@ func TestEventDispatcher_AddCallbackFuncs(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEventDispatcher_RemoveCallbackFuncs(t *testing.T) {
|
||||
service := model.Service{
|
||||
Clusters: strings.Join([]string{"default"}, ","),
|
||||
CacheMillis: 10000,
|
||||
Checksum: "abcd",
|
||||
LastRefTime: uint64(time.Now().Unix()),
|
||||
}
|
||||
var hosts []model.Instance
|
||||
host := model.Instance{
|
||||
Enable: true,
|
||||
InstanceId: "123",
|
||||
Port: 8080,
|
||||
Ip: "127.0.0.1",
|
||||
Weight: 10,
|
||||
ServiceName: "public@@Test",
|
||||
ClusterName: strings.Join([]string{"default"}, ","),
|
||||
}
|
||||
hosts = append(hosts, host)
|
||||
service.Hosts = hosts
|
||||
|
||||
ed := NewSubscribeCallback()
|
||||
param := vo.SubscribeParam{
|
||||
ServiceName: "Test",
|
||||
|
@ -170,6 +132,47 @@ func TestSubscribeCallback_ServiceChanged(t *testing.T) {
|
|||
},
|
||||
}
|
||||
ed.AddCallbackFunc(util.GetGroupName(param2.ServiceName, param2.GroupName), strings.Join(param2.Clusters, ","), ¶m2.SubscribeCallback)
|
||||
cacheKey := util.GetServiceCacheKey(util.GetGroupName(service.Name, service.GroupName), service.Clusters)
|
||||
cacheKey := util.GetServiceCacheKey(service.Name, service.Clusters)
|
||||
ed.ServiceChanged(cacheKey, &service)
|
||||
}
|
||||
|
||||
func TestSubscribeCallback_RemoveCallbackFunc(t *testing.T) {
|
||||
ed := NewSubscribeCallback()
|
||||
serviceName := "Test"
|
||||
clusters := "default"
|
||||
groupName := "public"
|
||||
|
||||
callback1 := func(services []model.Instance, err error) {
|
||||
log.Printf("callback1:%s \n", util.ToJsonString(services))
|
||||
}
|
||||
|
||||
callback2 := func(services []model.Instance, err error) {
|
||||
log.Printf("callback2:%s \n", util.ToJsonString(services))
|
||||
}
|
||||
|
||||
// Add both callbacks
|
||||
ed.AddCallbackFunc(util.GetGroupName(serviceName, groupName), clusters, &callback1)
|
||||
ed.AddCallbackFunc(util.GetGroupName(serviceName, groupName), clusters, &callback2)
|
||||
|
||||
assert.True(t, ed.IsSubscribed(util.GetGroupName(serviceName, groupName), clusters))
|
||||
// Remove the first callback
|
||||
ed.RemoveCallbackFunc(util.GetGroupName(serviceName, groupName), clusters, &callback1)
|
||||
|
||||
// Check if only the second callback remains
|
||||
cacheKey := util.GetServiceCacheKey(util.GetGroupName(serviceName, groupName), clusters)
|
||||
funcs, ok := ed.callbackFuncMap.Get(cacheKey)
|
||||
if !ok || len(funcs.([]*func(services []model.Instance, err error))) != 1 {
|
||||
t.Errorf("Expected 1 callback function, got %d", len(funcs.([]*func(services []model.Instance, err error))))
|
||||
}
|
||||
|
||||
assert.True(t, ed.IsSubscribed(util.GetGroupName(serviceName, groupName), clusters))
|
||||
// Remove the second callback
|
||||
ed.RemoveCallbackFunc(util.GetGroupName(serviceName, groupName), clusters, &callback2)
|
||||
|
||||
// Check if no callbacks remain
|
||||
funcs, ok = ed.callbackFuncMap.Get(cacheKey)
|
||||
if ok && len(funcs.([]*func(services []model.Instance, err error))) != 0 {
|
||||
t.Errorf("Expected 0 callback functions, got %d", len(funcs.([]*func(services []model.Instance, err error))))
|
||||
}
|
||||
assert.False(t, ed.IsSubscribed(util.GetGroupName(serviceName, groupName), clusters))
|
||||
}
|
||||
|
|
|
@ -18,13 +18,14 @@ package naming_client
|
|||
|
||||
import (
|
||||
"context"
|
||||
"github.com/nacos-group/nacos-sdk-go/v2/common/security"
|
||||
"math"
|
||||
"math/rand"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/nacos-group/nacos-sdk-go/v2/common/security"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/nacos-group/nacos-sdk-go/v2/clients/nacos_client"
|
||||
|
@ -345,7 +346,7 @@ func (sc *NamingClient) Unsubscribe(param *vo.SubscribeParam) (err error) {
|
|||
clusters := strings.Join(param.Clusters, ",")
|
||||
serviceFullName := util.GetGroupName(param.ServiceName, param.GroupName)
|
||||
sc.serviceInfoHolder.DeregisterCallback(serviceFullName, clusters, ¶m.SubscribeCallback)
|
||||
if sc.serviceInfoHolder.IsSubscribed(serviceFullName, clusters) {
|
||||
if !sc.serviceInfoHolder.IsSubscribed(serviceFullName, clusters) {
|
||||
err = sc.serviceProxy.Unsubscribe(param.ServiceName, param.GroupName, clusters)
|
||||
}
|
||||
|
||||
|
|
|
@ -37,6 +37,8 @@ var clientConfigTest = *constant.NewClientConfig(
|
|||
var serverConfigTest = *constant.NewServerConfig("127.0.0.1", 80, constant.WithContextPath("/nacos"))
|
||||
|
||||
type MockNamingProxy struct {
|
||||
unsubscribeCalled bool
|
||||
unsubscribeParams []string // 记录调用参数
|
||||
}
|
||||
|
||||
func (m *MockNamingProxy) RegisterInstance(serviceName string, groupName string, instance model.Instance) (bool, error) {
|
||||
|
@ -68,6 +70,8 @@ func (m *MockNamingProxy) Subscribe(serviceName, groupName, clusters string) (mo
|
|||
}
|
||||
|
||||
func (m *MockNamingProxy) Unsubscribe(serviceName, groupName, clusters string) error {
|
||||
m.unsubscribeCalled = true
|
||||
m.unsubscribeParams = []string{serviceName, groupName, clusters}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -452,3 +456,108 @@ func BenchmarkNamingClient_SelectOneHealthyInstances(b *testing.B) {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
func TestNamingClient_Unsubscribe_WithCallback_ShouldNotCallServiceProxyUnsubscribe(t *testing.T) {
|
||||
// 创建一个带有回调函数的订阅参数
|
||||
callback := func(services []model.Instance, err error) {
|
||||
// 空回调函数
|
||||
}
|
||||
param := &vo.SubscribeParam{
|
||||
ServiceName: "test-service",
|
||||
GroupName: "test-group",
|
||||
Clusters: []string{"test-cluster"},
|
||||
SubscribeCallback: callback,
|
||||
}
|
||||
|
||||
// 创建测试客户端
|
||||
client := NewTestNamingClient()
|
||||
mockProxy := client.serviceProxy.(*MockNamingProxy)
|
||||
|
||||
// 执行 Unsubscribe
|
||||
err := client.Unsubscribe(param)
|
||||
|
||||
// 验证没有错误
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, mockProxy.unsubscribeCalled)
|
||||
}
|
||||
|
||||
func TestNamingClient_Unsubscribe_WithoutCallback_ShouldCallServiceProxyUnsubscribe(t *testing.T) {
|
||||
// 创建一个没有回调函数的订阅参数
|
||||
param := &vo.SubscribeParam{
|
||||
ServiceName: "test-service",
|
||||
GroupName: "test-group",
|
||||
Clusters: []string{"test-cluster"},
|
||||
// SubscribeCallback 为 nil
|
||||
}
|
||||
|
||||
// 创建测试客户端
|
||||
client := NewTestNamingClient()
|
||||
// 获取原始的 MockNamingProxy 来检查调用状态
|
||||
mockProxy := client.serviceProxy.(*MockNamingProxy)
|
||||
|
||||
// 执行 Unsubscribe
|
||||
err := client.Unsubscribe(param)
|
||||
|
||||
// 验证没有错误
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, mockProxy.unsubscribeCalled)
|
||||
}
|
||||
|
||||
// TestNamingClient_Unsubscribe_Integration_Test 集成测试,使用真实的 ServiceInfoHolder 来测试修复后的逻辑
|
||||
func TestNamingClient_Unsubscribe_Integration_Test(t *testing.T) {
|
||||
// 创建测试客户端
|
||||
client := NewTestNamingClient()
|
||||
|
||||
// 获取原始的 MockNamingProxy 来检查调用状态
|
||||
mockProxy := client.serviceProxy.(*MockNamingProxy)
|
||||
|
||||
// 创建回调函数
|
||||
callback1 := func(services []model.Instance, err error) {
|
||||
// 回调函数1
|
||||
}
|
||||
|
||||
callback2 := func(services []model.Instance, err error) {
|
||||
// 回调函数2
|
||||
}
|
||||
|
||||
// 测试场景1:先注册两个回调函数,然后取消订阅第一个
|
||||
// 这种情况下,取消订阅第一个回调函数后,还有其他回调函数,所以不应该调用 serviceProxy.Unsubscribe
|
||||
|
||||
// 注册第一个回调函数
|
||||
param1 := &vo.SubscribeParam{
|
||||
ServiceName: "test-service",
|
||||
GroupName: "test-group",
|
||||
Clusters: []string{"test-cluster"},
|
||||
SubscribeCallback: callback1,
|
||||
}
|
||||
|
||||
// 注册第二个回调函数
|
||||
param2 := &vo.SubscribeParam{
|
||||
ServiceName: "test-service",
|
||||
GroupName: "test-group",
|
||||
Clusters: []string{"test-cluster"},
|
||||
SubscribeCallback: callback2,
|
||||
}
|
||||
|
||||
// 先注册两个回调函数
|
||||
err := client.Subscribe(param1)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = client.Subscribe(param2)
|
||||
assert.Nil(t, err)
|
||||
|
||||
// 重置 MockNamingProxy 的调用状态
|
||||
mockProxy.unsubscribeCalled = false
|
||||
mockProxy.unsubscribeParams = nil
|
||||
|
||||
// 取消订阅第一个回调函数
|
||||
err = client.Unsubscribe(param1)
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, mockProxy.unsubscribeCalled)
|
||||
|
||||
// 取消订阅第二个回调函数
|
||||
err = client.Unsubscribe(param2)
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, mockProxy.unsubscribeCalled)
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue