Compare commits
1 Commits
4c1e1621f1
...
fbb487cb1b
Author | SHA1 | Date |
---|---|---|
|
fbb487cb1b |
|
@ -36,11 +36,8 @@ func NewSubscribeCallback() *SubscribeCallback {
|
||||||
|
|
||||||
func (ed *SubscribeCallback) IsSubscribed(serviceName, clusters string) bool {
|
func (ed *SubscribeCallback) IsSubscribed(serviceName, clusters string) bool {
|
||||||
key := util.GetServiceCacheKey(serviceName, clusters)
|
key := util.GetServiceCacheKey(serviceName, clusters)
|
||||||
funcs, ok := ed.callbackFuncMap.Get(key)
|
_, ok := ed.callbackFuncMap.Get(key)
|
||||||
if ok {
|
return ok
|
||||||
return len(funcs.([]*func(services []model.Instance, err error))) > 0
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ed *SubscribeCallback) AddCallbackFunc(serviceName string, clusters string, callbackFunc *func(services []model.Instance, err error)) {
|
func (ed *SubscribeCallback) AddCallbackFunc(serviceName string, clusters string, callbackFunc *func(services []model.Instance, err error)) {
|
||||||
|
|
|
@ -135,44 +135,3 @@ func TestSubscribeCallback_ServiceChanged(t *testing.T) {
|
||||||
cacheKey := util.GetServiceCacheKey(service.Name, service.Clusters)
|
cacheKey := util.GetServiceCacheKey(service.Name, service.Clusters)
|
||||||
ed.ServiceChanged(cacheKey, &service)
|
ed.ServiceChanged(cacheKey, &service)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSubscribeCallback_RemoveCallbackFunc(t *testing.T) {
|
|
||||||
ed := NewSubscribeCallback()
|
|
||||||
serviceName := "Test"
|
|
||||||
clusters := "default"
|
|
||||||
groupName := "public"
|
|
||||||
|
|
||||||
callback1 := func(services []model.Instance, err error) {
|
|
||||||
log.Printf("callback1:%s \n", util.ToJsonString(services))
|
|
||||||
}
|
|
||||||
|
|
||||||
callback2 := func(services []model.Instance, err error) {
|
|
||||||
log.Printf("callback2:%s \n", util.ToJsonString(services))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add both callbacks
|
|
||||||
ed.AddCallbackFunc(util.GetGroupName(serviceName, groupName), clusters, &callback1)
|
|
||||||
ed.AddCallbackFunc(util.GetGroupName(serviceName, groupName), clusters, &callback2)
|
|
||||||
|
|
||||||
assert.True(t, ed.IsSubscribed(util.GetGroupName(serviceName, groupName), clusters))
|
|
||||||
// Remove the first callback
|
|
||||||
ed.RemoveCallbackFunc(util.GetGroupName(serviceName, groupName), clusters, &callback1)
|
|
||||||
|
|
||||||
// Check if only the second callback remains
|
|
||||||
cacheKey := util.GetServiceCacheKey(util.GetGroupName(serviceName, groupName), clusters)
|
|
||||||
funcs, ok := ed.callbackFuncMap.Get(cacheKey)
|
|
||||||
if !ok || len(funcs.([]*func(services []model.Instance, err error))) != 1 {
|
|
||||||
t.Errorf("Expected 1 callback function, got %d", len(funcs.([]*func(services []model.Instance, err error))))
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.True(t, ed.IsSubscribed(util.GetGroupName(serviceName, groupName), clusters))
|
|
||||||
// Remove the second callback
|
|
||||||
ed.RemoveCallbackFunc(util.GetGroupName(serviceName, groupName), clusters, &callback2)
|
|
||||||
|
|
||||||
// Check if no callbacks remain
|
|
||||||
funcs, ok = ed.callbackFuncMap.Get(cacheKey)
|
|
||||||
if ok && len(funcs.([]*func(services []model.Instance, err error))) != 0 {
|
|
||||||
t.Errorf("Expected 0 callback functions, got %d", len(funcs.([]*func(services []model.Instance, err error))))
|
|
||||||
}
|
|
||||||
assert.False(t, ed.IsSubscribed(util.GetGroupName(serviceName, groupName), clusters))
|
|
||||||
}
|
|
||||||
|
|
|
@ -18,14 +18,13 @@ package naming_client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/nacos-group/nacos-sdk-go/v2/common/security"
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nacos-group/nacos-sdk-go/v2/common/security"
|
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"github.com/nacos-group/nacos-sdk-go/v2/clients/nacos_client"
|
"github.com/nacos-group/nacos-sdk-go/v2/clients/nacos_client"
|
||||||
|
@ -346,7 +345,7 @@ func (sc *NamingClient) Unsubscribe(param *vo.SubscribeParam) (err error) {
|
||||||
clusters := strings.Join(param.Clusters, ",")
|
clusters := strings.Join(param.Clusters, ",")
|
||||||
serviceFullName := util.GetGroupName(param.ServiceName, param.GroupName)
|
serviceFullName := util.GetGroupName(param.ServiceName, param.GroupName)
|
||||||
sc.serviceInfoHolder.DeregisterCallback(serviceFullName, clusters, ¶m.SubscribeCallback)
|
sc.serviceInfoHolder.DeregisterCallback(serviceFullName, clusters, ¶m.SubscribeCallback)
|
||||||
if !sc.serviceInfoHolder.IsSubscribed(serviceFullName, clusters) {
|
if sc.serviceInfoHolder.IsSubscribed(serviceFullName, clusters) {
|
||||||
err = sc.serviceProxy.Unsubscribe(param.ServiceName, param.GroupName, clusters)
|
err = sc.serviceProxy.Unsubscribe(param.ServiceName, param.GroupName, clusters)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,8 +37,6 @@ var clientConfigTest = *constant.NewClientConfig(
|
||||||
var serverConfigTest = *constant.NewServerConfig("127.0.0.1", 80, constant.WithContextPath("/nacos"))
|
var serverConfigTest = *constant.NewServerConfig("127.0.0.1", 80, constant.WithContextPath("/nacos"))
|
||||||
|
|
||||||
type MockNamingProxy struct {
|
type MockNamingProxy struct {
|
||||||
unsubscribeCalled bool
|
|
||||||
unsubscribeParams []string // 记录调用参数
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MockNamingProxy) RegisterInstance(serviceName string, groupName string, instance model.Instance) (bool, error) {
|
func (m *MockNamingProxy) RegisterInstance(serviceName string, groupName string, instance model.Instance) (bool, error) {
|
||||||
|
@ -70,8 +68,6 @@ func (m *MockNamingProxy) Subscribe(serviceName, groupName, clusters string) (mo
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MockNamingProxy) Unsubscribe(serviceName, groupName, clusters string) error {
|
func (m *MockNamingProxy) Unsubscribe(serviceName, groupName, clusters string) error {
|
||||||
m.unsubscribeCalled = true
|
|
||||||
m.unsubscribeParams = []string{serviceName, groupName, clusters}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -456,108 +452,3 @@ func BenchmarkNamingClient_SelectOneHealthyInstances(b *testing.B) {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNamingClient_Unsubscribe_WithCallback_ShouldNotCallServiceProxyUnsubscribe(t *testing.T) {
|
|
||||||
// 创建一个带有回调函数的订阅参数
|
|
||||||
callback := func(services []model.Instance, err error) {
|
|
||||||
// 空回调函数
|
|
||||||
}
|
|
||||||
param := &vo.SubscribeParam{
|
|
||||||
ServiceName: "test-service",
|
|
||||||
GroupName: "test-group",
|
|
||||||
Clusters: []string{"test-cluster"},
|
|
||||||
SubscribeCallback: callback,
|
|
||||||
}
|
|
||||||
|
|
||||||
// 创建测试客户端
|
|
||||||
client := NewTestNamingClient()
|
|
||||||
mockProxy := client.serviceProxy.(*MockNamingProxy)
|
|
||||||
|
|
||||||
// 执行 Unsubscribe
|
|
||||||
err := client.Unsubscribe(param)
|
|
||||||
|
|
||||||
// 验证没有错误
|
|
||||||
assert.Nil(t, err)
|
|
||||||
assert.True(t, mockProxy.unsubscribeCalled)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestNamingClient_Unsubscribe_WithoutCallback_ShouldCallServiceProxyUnsubscribe(t *testing.T) {
|
|
||||||
// 创建一个没有回调函数的订阅参数
|
|
||||||
param := &vo.SubscribeParam{
|
|
||||||
ServiceName: "test-service",
|
|
||||||
GroupName: "test-group",
|
|
||||||
Clusters: []string{"test-cluster"},
|
|
||||||
// SubscribeCallback 为 nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// 创建测试客户端
|
|
||||||
client := NewTestNamingClient()
|
|
||||||
// 获取原始的 MockNamingProxy 来检查调用状态
|
|
||||||
mockProxy := client.serviceProxy.(*MockNamingProxy)
|
|
||||||
|
|
||||||
// 执行 Unsubscribe
|
|
||||||
err := client.Unsubscribe(param)
|
|
||||||
|
|
||||||
// 验证没有错误
|
|
||||||
assert.Nil(t, err)
|
|
||||||
assert.True(t, mockProxy.unsubscribeCalled)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestNamingClient_Unsubscribe_Integration_Test 集成测试,使用真实的 ServiceInfoHolder 来测试修复后的逻辑
|
|
||||||
func TestNamingClient_Unsubscribe_Integration_Test(t *testing.T) {
|
|
||||||
// 创建测试客户端
|
|
||||||
client := NewTestNamingClient()
|
|
||||||
|
|
||||||
// 获取原始的 MockNamingProxy 来检查调用状态
|
|
||||||
mockProxy := client.serviceProxy.(*MockNamingProxy)
|
|
||||||
|
|
||||||
// 创建回调函数
|
|
||||||
callback1 := func(services []model.Instance, err error) {
|
|
||||||
// 回调函数1
|
|
||||||
}
|
|
||||||
|
|
||||||
callback2 := func(services []model.Instance, err error) {
|
|
||||||
// 回调函数2
|
|
||||||
}
|
|
||||||
|
|
||||||
// 测试场景1:先注册两个回调函数,然后取消订阅第一个
|
|
||||||
// 这种情况下,取消订阅第一个回调函数后,还有其他回调函数,所以不应该调用 serviceProxy.Unsubscribe
|
|
||||||
|
|
||||||
// 注册第一个回调函数
|
|
||||||
param1 := &vo.SubscribeParam{
|
|
||||||
ServiceName: "test-service",
|
|
||||||
GroupName: "test-group",
|
|
||||||
Clusters: []string{"test-cluster"},
|
|
||||||
SubscribeCallback: callback1,
|
|
||||||
}
|
|
||||||
|
|
||||||
// 注册第二个回调函数
|
|
||||||
param2 := &vo.SubscribeParam{
|
|
||||||
ServiceName: "test-service",
|
|
||||||
GroupName: "test-group",
|
|
||||||
Clusters: []string{"test-cluster"},
|
|
||||||
SubscribeCallback: callback2,
|
|
||||||
}
|
|
||||||
|
|
||||||
// 先注册两个回调函数
|
|
||||||
err := client.Subscribe(param1)
|
|
||||||
assert.Nil(t, err)
|
|
||||||
|
|
||||||
err = client.Subscribe(param2)
|
|
||||||
assert.Nil(t, err)
|
|
||||||
|
|
||||||
// 重置 MockNamingProxy 的调用状态
|
|
||||||
mockProxy.unsubscribeCalled = false
|
|
||||||
mockProxy.unsubscribeParams = nil
|
|
||||||
|
|
||||||
// 取消订阅第一个回调函数
|
|
||||||
err = client.Unsubscribe(param1)
|
|
||||||
assert.Nil(t, err)
|
|
||||||
assert.False(t, mockProxy.unsubscribeCalled)
|
|
||||||
|
|
||||||
// 取消订阅第二个回调函数
|
|
||||||
err = client.Unsubscribe(param2)
|
|
||||||
assert.Nil(t, err)
|
|
||||||
assert.True(t, mockProxy.unsubscribeCalled)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue