Compare commits
2 Commits
fbb487cb1b
...
4c1e1621f1
Author | SHA1 | Date |
---|---|---|
|
4c1e1621f1 | |
|
ead2368c2f |
|
@ -36,8 +36,11 @@ func NewSubscribeCallback() *SubscribeCallback {
|
||||||
|
|
||||||
func (ed *SubscribeCallback) IsSubscribed(serviceName, clusters string) bool {
|
func (ed *SubscribeCallback) IsSubscribed(serviceName, clusters string) bool {
|
||||||
key := util.GetServiceCacheKey(serviceName, clusters)
|
key := util.GetServiceCacheKey(serviceName, clusters)
|
||||||
_, ok := ed.callbackFuncMap.Get(key)
|
funcs, ok := ed.callbackFuncMap.Get(key)
|
||||||
return ok
|
if ok {
|
||||||
|
return len(funcs.([]*func(services []model.Instance, err error))) > 0
|
||||||
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ed *SubscribeCallback) AddCallbackFunc(serviceName string, clusters string, callbackFunc *func(services []model.Instance, err error)) {
|
func (ed *SubscribeCallback) AddCallbackFunc(serviceName string, clusters string, callbackFunc *func(services []model.Instance, err error)) {
|
||||||
|
|
|
@ -135,3 +135,44 @@ func TestSubscribeCallback_ServiceChanged(t *testing.T) {
|
||||||
cacheKey := util.GetServiceCacheKey(service.Name, service.Clusters)
|
cacheKey := util.GetServiceCacheKey(service.Name, service.Clusters)
|
||||||
ed.ServiceChanged(cacheKey, &service)
|
ed.ServiceChanged(cacheKey, &service)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSubscribeCallback_RemoveCallbackFunc(t *testing.T) {
|
||||||
|
ed := NewSubscribeCallback()
|
||||||
|
serviceName := "Test"
|
||||||
|
clusters := "default"
|
||||||
|
groupName := "public"
|
||||||
|
|
||||||
|
callback1 := func(services []model.Instance, err error) {
|
||||||
|
log.Printf("callback1:%s \n", util.ToJsonString(services))
|
||||||
|
}
|
||||||
|
|
||||||
|
callback2 := func(services []model.Instance, err error) {
|
||||||
|
log.Printf("callback2:%s \n", util.ToJsonString(services))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add both callbacks
|
||||||
|
ed.AddCallbackFunc(util.GetGroupName(serviceName, groupName), clusters, &callback1)
|
||||||
|
ed.AddCallbackFunc(util.GetGroupName(serviceName, groupName), clusters, &callback2)
|
||||||
|
|
||||||
|
assert.True(t, ed.IsSubscribed(util.GetGroupName(serviceName, groupName), clusters))
|
||||||
|
// Remove the first callback
|
||||||
|
ed.RemoveCallbackFunc(util.GetGroupName(serviceName, groupName), clusters, &callback1)
|
||||||
|
|
||||||
|
// Check if only the second callback remains
|
||||||
|
cacheKey := util.GetServiceCacheKey(util.GetGroupName(serviceName, groupName), clusters)
|
||||||
|
funcs, ok := ed.callbackFuncMap.Get(cacheKey)
|
||||||
|
if !ok || len(funcs.([]*func(services []model.Instance, err error))) != 1 {
|
||||||
|
t.Errorf("Expected 1 callback function, got %d", len(funcs.([]*func(services []model.Instance, err error))))
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.True(t, ed.IsSubscribed(util.GetGroupName(serviceName, groupName), clusters))
|
||||||
|
// Remove the second callback
|
||||||
|
ed.RemoveCallbackFunc(util.GetGroupName(serviceName, groupName), clusters, &callback2)
|
||||||
|
|
||||||
|
// Check if no callbacks remain
|
||||||
|
funcs, ok = ed.callbackFuncMap.Get(cacheKey)
|
||||||
|
if ok && len(funcs.([]*func(services []model.Instance, err error))) != 0 {
|
||||||
|
t.Errorf("Expected 0 callback functions, got %d", len(funcs.([]*func(services []model.Instance, err error))))
|
||||||
|
}
|
||||||
|
assert.False(t, ed.IsSubscribed(util.GetGroupName(serviceName, groupName), clusters))
|
||||||
|
}
|
||||||
|
|
|
@ -18,13 +18,14 @@ package naming_client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/nacos-group/nacos-sdk-go/v2/common/security"
|
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/nacos-group/nacos-sdk-go/v2/common/security"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"github.com/nacos-group/nacos-sdk-go/v2/clients/nacos_client"
|
"github.com/nacos-group/nacos-sdk-go/v2/clients/nacos_client"
|
||||||
|
@ -345,7 +346,7 @@ func (sc *NamingClient) Unsubscribe(param *vo.SubscribeParam) (err error) {
|
||||||
clusters := strings.Join(param.Clusters, ",")
|
clusters := strings.Join(param.Clusters, ",")
|
||||||
serviceFullName := util.GetGroupName(param.ServiceName, param.GroupName)
|
serviceFullName := util.GetGroupName(param.ServiceName, param.GroupName)
|
||||||
sc.serviceInfoHolder.DeregisterCallback(serviceFullName, clusters, ¶m.SubscribeCallback)
|
sc.serviceInfoHolder.DeregisterCallback(serviceFullName, clusters, ¶m.SubscribeCallback)
|
||||||
if sc.serviceInfoHolder.IsSubscribed(serviceFullName, clusters) {
|
if !sc.serviceInfoHolder.IsSubscribed(serviceFullName, clusters) {
|
||||||
err = sc.serviceProxy.Unsubscribe(param.ServiceName, param.GroupName, clusters)
|
err = sc.serviceProxy.Unsubscribe(param.ServiceName, param.GroupName, clusters)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,8 @@ var clientConfigTest = *constant.NewClientConfig(
|
||||||
var serverConfigTest = *constant.NewServerConfig("127.0.0.1", 80, constant.WithContextPath("/nacos"))
|
var serverConfigTest = *constant.NewServerConfig("127.0.0.1", 80, constant.WithContextPath("/nacos"))
|
||||||
|
|
||||||
type MockNamingProxy struct {
|
type MockNamingProxy struct {
|
||||||
|
unsubscribeCalled bool
|
||||||
|
unsubscribeParams []string // 记录调用参数
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MockNamingProxy) RegisterInstance(serviceName string, groupName string, instance model.Instance) (bool, error) {
|
func (m *MockNamingProxy) RegisterInstance(serviceName string, groupName string, instance model.Instance) (bool, error) {
|
||||||
|
@ -68,6 +70,8 @@ func (m *MockNamingProxy) Subscribe(serviceName, groupName, clusters string) (mo
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MockNamingProxy) Unsubscribe(serviceName, groupName, clusters string) error {
|
func (m *MockNamingProxy) Unsubscribe(serviceName, groupName, clusters string) error {
|
||||||
|
m.unsubscribeCalled = true
|
||||||
|
m.unsubscribeParams = []string{serviceName, groupName, clusters}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -452,3 +456,108 @@ func BenchmarkNamingClient_SelectOneHealthyInstances(b *testing.B) {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNamingClient_Unsubscribe_WithCallback_ShouldNotCallServiceProxyUnsubscribe(t *testing.T) {
|
||||||
|
// 创建一个带有回调函数的订阅参数
|
||||||
|
callback := func(services []model.Instance, err error) {
|
||||||
|
// 空回调函数
|
||||||
|
}
|
||||||
|
param := &vo.SubscribeParam{
|
||||||
|
ServiceName: "test-service",
|
||||||
|
GroupName: "test-group",
|
||||||
|
Clusters: []string{"test-cluster"},
|
||||||
|
SubscribeCallback: callback,
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建测试客户端
|
||||||
|
client := NewTestNamingClient()
|
||||||
|
mockProxy := client.serviceProxy.(*MockNamingProxy)
|
||||||
|
|
||||||
|
// 执行 Unsubscribe
|
||||||
|
err := client.Unsubscribe(param)
|
||||||
|
|
||||||
|
// 验证没有错误
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.True(t, mockProxy.unsubscribeCalled)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNamingClient_Unsubscribe_WithoutCallback_ShouldCallServiceProxyUnsubscribe(t *testing.T) {
|
||||||
|
// 创建一个没有回调函数的订阅参数
|
||||||
|
param := &vo.SubscribeParam{
|
||||||
|
ServiceName: "test-service",
|
||||||
|
GroupName: "test-group",
|
||||||
|
Clusters: []string{"test-cluster"},
|
||||||
|
// SubscribeCallback 为 nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建测试客户端
|
||||||
|
client := NewTestNamingClient()
|
||||||
|
// 获取原始的 MockNamingProxy 来检查调用状态
|
||||||
|
mockProxy := client.serviceProxy.(*MockNamingProxy)
|
||||||
|
|
||||||
|
// 执行 Unsubscribe
|
||||||
|
err := client.Unsubscribe(param)
|
||||||
|
|
||||||
|
// 验证没有错误
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.True(t, mockProxy.unsubscribeCalled)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNamingClient_Unsubscribe_Integration_Test 集成测试,使用真实的 ServiceInfoHolder 来测试修复后的逻辑
|
||||||
|
func TestNamingClient_Unsubscribe_Integration_Test(t *testing.T) {
|
||||||
|
// 创建测试客户端
|
||||||
|
client := NewTestNamingClient()
|
||||||
|
|
||||||
|
// 获取原始的 MockNamingProxy 来检查调用状态
|
||||||
|
mockProxy := client.serviceProxy.(*MockNamingProxy)
|
||||||
|
|
||||||
|
// 创建回调函数
|
||||||
|
callback1 := func(services []model.Instance, err error) {
|
||||||
|
// 回调函数1
|
||||||
|
}
|
||||||
|
|
||||||
|
callback2 := func(services []model.Instance, err error) {
|
||||||
|
// 回调函数2
|
||||||
|
}
|
||||||
|
|
||||||
|
// 测试场景1:先注册两个回调函数,然后取消订阅第一个
|
||||||
|
// 这种情况下,取消订阅第一个回调函数后,还有其他回调函数,所以不应该调用 serviceProxy.Unsubscribe
|
||||||
|
|
||||||
|
// 注册第一个回调函数
|
||||||
|
param1 := &vo.SubscribeParam{
|
||||||
|
ServiceName: "test-service",
|
||||||
|
GroupName: "test-group",
|
||||||
|
Clusters: []string{"test-cluster"},
|
||||||
|
SubscribeCallback: callback1,
|
||||||
|
}
|
||||||
|
|
||||||
|
// 注册第二个回调函数
|
||||||
|
param2 := &vo.SubscribeParam{
|
||||||
|
ServiceName: "test-service",
|
||||||
|
GroupName: "test-group",
|
||||||
|
Clusters: []string{"test-cluster"},
|
||||||
|
SubscribeCallback: callback2,
|
||||||
|
}
|
||||||
|
|
||||||
|
// 先注册两个回调函数
|
||||||
|
err := client.Subscribe(param1)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
err = client.Subscribe(param2)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
// 重置 MockNamingProxy 的调用状态
|
||||||
|
mockProxy.unsubscribeCalled = false
|
||||||
|
mockProxy.unsubscribeParams = nil
|
||||||
|
|
||||||
|
// 取消订阅第一个回调函数
|
||||||
|
err = client.Unsubscribe(param1)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.False(t, mockProxy.unsubscribeCalled)
|
||||||
|
|
||||||
|
// 取消订阅第二个回调函数
|
||||||
|
err = client.Unsubscribe(param2)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.True(t, mockProxy.unsubscribeCalled)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue