Compare commits

...

6 Commits

Author SHA1 Message Date
Hunk Zhu 2fcd1a7b0a
Merge b05fa15123 into eb3adf0f0a 2025-05-30 09:53:14 +08:00
Hunk Zhu b05fa15123 Merge remote-tracking branch 'nacos_origin/master' into feature/improve_currentConnect 2023-11-20 07:06:07 +00:00
Hunk Zhu dc731b46d8 优化代码 2023-10-31 11:26:44 +00:00
Hunk Zhu 035d58be4c 加固代码 2023-10-27 09:23:33 +00:00
Hunk Zhu 4254799661 improve rpcClientStatus load 2023-10-08 17:39:45 +08:00
Hunk Zhu 48593d516b 改进currentConnect的使用方式,避免Data Race发生。 2023-10-03 05:23:24 +00:00
1 changed files with 51 additions and 22 deletions

View File

@ -100,7 +100,7 @@ type RpcClient struct {
ctx context.Context ctx context.Context
name string name string
labels map[string]string labels map[string]string
currentConnection IConnection currentConnection atomic.Value
rpcClientStatus RpcClientStatus rpcClientStatus RpcClientStatus
eventChan chan ConnectionEvent eventChan chan ConnectionEvent
reconnectionChan chan ReconnectContext reconnectionChan chan ReconnectContext
@ -128,6 +128,18 @@ type ConnectionEvent struct {
eventType ConnectionStatus eventType ConnectionStatus
} }
func (r *RpcClient) getCurrentConnection() IConnection {
if conn, ok := r.currentConnection.Load().(IConnection); ok {
return conn
} else {
return nil
}
}
func (r *RpcClient) setCurrentConnection(conn IConnection) {
r.currentConnection.Store(conn)
}
func (r *RpcClient) putAllLabels(labels map[string]string) { func (r *RpcClient) putAllLabels(labels map[string]string) {
for k, v := range labels { for k, v := range labels {
r.labels[k] = v r.labels[k] = v
@ -336,7 +348,7 @@ func (r *RpcClient) Start() {
if currentConnection != nil { if currentConnection != nil {
logger.Infof("%s success to connect to server %+v on start up, connectionId=%s", r.name, logger.Infof("%s success to connect to server %+v on start up, connectionId=%s", r.name,
currentConnection.getServerInfo(), currentConnection.getConnectionId()) currentConnection.getServerInfo(), currentConnection.getConnectionId())
r.currentConnection = currentConnection r.setCurrentConnection(currentConnection)
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING)) atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING))
r.notifyConnectionChange(CONNECTED) r.notifyConnectionChange(CONNECTED)
} else { } else {
@ -349,11 +361,12 @@ func (r *RpcClient) notifyConnectionChange(eventType ConnectionStatus) {
} }
func (r *RpcClient) notifyServerSrvChange() { func (r *RpcClient) notifyServerSrvChange() {
if r.currentConnection == nil { currentConnection := r.getCurrentConnection()
if currentConnection == nil {
r.switchServerAsync(ServerInfo{}, false) r.switchServerAsync(ServerInfo{}, false)
return return
} }
curServerInfo := r.currentConnection.getServerInfo() curServerInfo := currentConnection.getServerInfo()
var found bool var found bool
for _, ele := range r.nacosServer.GetServerList() { for _, ele := range r.nacosServer.GetServerList() {
if ele.IpAddr == curServerInfo.serverIp { if ele.IpAddr == curServerInfo.serverIp {
@ -411,7 +424,11 @@ func (r *RpcClient) switchServerAsync(recommendServerInfo ServerInfo, onRequestF
func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) { func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) {
if onRequestFail && r.sendHealthCheck() { if onRequestFail && r.sendHealthCheck() {
logger.Infof("%s server check success, currentServer is %+v", r.name, r.currentConnection.getServerInfo()) var serverInfo interface{} = nil
if currentConnection := r.getCurrentConnection(); currentConnection != nil {
serverInfo = currentConnection.getServerInfo()
}
logger.Infof("%s server check success, currentServer is %+v", r.name, serverInfo)
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING)) atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING))
r.notifyConnectionChange(CONNECTED) r.notifyConnectionChange(CONNECTED)
return return
@ -438,14 +455,14 @@ func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) {
if connectionNew != nil && err == nil { if connectionNew != nil && err == nil {
logger.Infof("%s success to connect a server %+v, connectionId=%s", r.name, serverInfo, logger.Infof("%s success to connect a server %+v, connectionId=%s", r.name, serverInfo,
connectionNew.getConnectionId()) connectionNew.getConnectionId())
currentConnection := r.getCurrentConnection()
if r.currentConnection != nil { if currentConnection != nil {
logger.Infof("%s abandon prev connection, server is %+v, connectionId is %s", r.name, serverInfo, logger.Infof("%s abandon prev connection, server is %+v, connectionId is %s", r.name, serverInfo,
r.currentConnection.getConnectionId()) currentConnection.getConnectionId())
r.currentConnection.setAbandon(true) currentConnection.setAbandon(true)
r.closeConnection() r.closeConnection()
} }
r.currentConnection = connectionNew r.setCurrentConnection(connectionNew)
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING)) atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING))
r.notifyConnectionChange(CONNECTED) r.notifyConnectionChange(CONNECTED)
return return
@ -471,8 +488,9 @@ func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) {
} }
func (r *RpcClient) closeConnection() { func (r *RpcClient) closeConnection() {
if r.currentConnection != nil { currentConnection := r.getCurrentConnection()
r.currentConnection.close() if currentConnection != nil {
currentConnection.close()
r.notifyConnectionChange(DISCONNECTED) r.notifyConnectionChange(DISCONNECTED)
} }
} }
@ -483,7 +501,11 @@ func (r *RpcClient) notifyConnectionEvent(event ConnectionEvent) {
if len(listeners) == 0 { if len(listeners) == 0 {
return return
} }
logger.Infof("%s notify %s event to listeners , connectionId=%s", r.name, event.toString(), r.currentConnection.getConnectionId()) var connectionId string
if currentConnection := r.getCurrentConnection(); currentConnection != nil {
connectionId = currentConnection.getConnectionId()
}
logger.Infof("%s notify %s event to listeners , connectionId=%s", r.name, event.toString(), connectionId)
for _, v := range listeners { for _, v := range listeners {
if event.isConnected() { if event.isConnected() {
v.OnConnected() v.OnConnected()
@ -505,10 +527,11 @@ func (r *RpcClient) healthCheck(timer *time.Timer) {
r.lastActiveTimestamp.Store(time.Now()) r.lastActiveTimestamp.Store(time.Now())
return return
} else { } else {
if r.currentConnection == nil || r.isShutdown() { currentConnection := r.getCurrentConnection()
if currentConnection == nil || r.isShutdown() {
return return
} }
logger.Infof("%s server healthy check fail, currentConnection=%s", r.name, r.currentConnection.getConnectionId()) logger.Infof("%s server healthy check fail, currentConnection=%s", r.name, currentConnection.getConnectionId())
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(UNHEALTHY)) atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(UNHEALTHY))
reconnectContext = ReconnectContext{onRequestFail: false} reconnectContext = ReconnectContext{onRequestFail: false}
} }
@ -516,10 +539,11 @@ func (r *RpcClient) healthCheck(timer *time.Timer) {
} }
func (r *RpcClient) sendHealthCheck() bool { func (r *RpcClient) sendHealthCheck() bool {
if r.currentConnection == nil { currentConnection := r.getCurrentConnection()
if currentConnection == nil {
return false return false
} }
response, err := r.currentConnection.request(rpc_request.NewHealthCheckRequest(), response, err := currentConnection.request(rpc_request.NewHealthCheckRequest(),
constant.DEFAULT_TIMEOUT_MILLS, r) constant.DEFAULT_TIMEOUT_MILLS, r)
if err != nil { if err != nil {
logger.Errorf("client sendHealthCheck failed,err=%v", err) logger.Errorf("client sendHealthCheck failed,err=%v", err)
@ -584,14 +608,19 @@ func (c *ConnectionEvent) toString() string {
func (r *RpcClient) Request(request rpc_request.IRequest, timeoutMills int64) (rpc_response.IResponse, error) { func (r *RpcClient) Request(request rpc_request.IRequest, timeoutMills int64) (rpc_response.IResponse, error) {
retryTimes := 0 retryTimes := 0
start := util.CurrentMillis() start := util.CurrentMillis()
var currentErr error var (
currentErr error
currentConnection IConnection
)
for retryTimes < constant.REQUEST_DOMAIN_RETRY_TIME && util.CurrentMillis() < start+timeoutMills { for retryTimes < constant.REQUEST_DOMAIN_RETRY_TIME && util.CurrentMillis() < start+timeoutMills {
if r.currentConnection == nil || !r.IsRunning() { currentConnection = r.getCurrentConnection()
if currentConnection == nil || !r.IsRunning() {
rpcClientStatus := RpcClientStatus(atomic.LoadInt32((*int32)(&r.rpcClientStatus)))
currentErr = waitReconnect(timeoutMills, &retryTimes, request, currentErr = waitReconnect(timeoutMills, &retryTimes, request,
errors.Errorf("client not connected, current status:%s", r.rpcClientStatus.getDesc())) errors.Errorf("client not connected, current status:%s", rpcClientStatus.getDesc()))
continue continue
} }
response, err := r.currentConnection.request(request, timeoutMills, r) response, err := currentConnection.request(request, timeoutMills, r)
if err != nil { if err != nil {
currentErr = waitReconnect(timeoutMills, &retryTimes, request, err) currentErr = waitReconnect(timeoutMills, &retryTimes, request, err)
continue continue
@ -601,7 +630,7 @@ func (r *RpcClient) Request(request rpc_request.IRequest, timeoutMills int64) (r
r.mux.Lock() r.mux.Lock()
if atomic.CompareAndSwapInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING), (int32)(UNHEALTHY)) { if atomic.CompareAndSwapInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING), (int32)(UNHEALTHY)) {
logger.Infof("Connection is unregistered, switch server, connectionId=%s, request=%s", logger.Infof("Connection is unregistered, switch server, connectionId=%s, request=%s",
r.currentConnection.getConnectionId(), request.GetRequestType()) currentConnection.getConnectionId(), request.GetRequestType())
r.switchServerAsync(ServerInfo{}, false) r.switchServerAsync(ServerInfo{}, false)
} }
r.mux.Unlock() r.mux.Unlock()