Compare commits

...

6 Commits

Author SHA1 Message Date
Hunk Zhu 2fcd1a7b0a
Merge b05fa15123 into eb3adf0f0a 2025-05-30 09:53:14 +08:00
Hunk Zhu b05fa15123 Merge remote-tracking branch 'nacos_origin/master' into feature/improve_currentConnect 2023-11-20 07:06:07 +00:00
Hunk Zhu dc731b46d8 优化代码 2023-10-31 11:26:44 +00:00
Hunk Zhu 035d58be4c 加固代码 2023-10-27 09:23:33 +00:00
Hunk Zhu 4254799661 improve rpcClientStatus load 2023-10-08 17:39:45 +08:00
Hunk Zhu 48593d516b 改进currentConnect的使用方式,避免Data Race发生。 2023-10-03 05:23:24 +00:00
1 changed files with 51 additions and 22 deletions

View File

@ -100,7 +100,7 @@ type RpcClient struct {
ctx context.Context
name string
labels map[string]string
currentConnection IConnection
currentConnection atomic.Value
rpcClientStatus RpcClientStatus
eventChan chan ConnectionEvent
reconnectionChan chan ReconnectContext
@ -128,6 +128,18 @@ type ConnectionEvent struct {
eventType ConnectionStatus
}
func (r *RpcClient) getCurrentConnection() IConnection {
if conn, ok := r.currentConnection.Load().(IConnection); ok {
return conn
} else {
return nil
}
}
func (r *RpcClient) setCurrentConnection(conn IConnection) {
r.currentConnection.Store(conn)
}
func (r *RpcClient) putAllLabels(labels map[string]string) {
for k, v := range labels {
r.labels[k] = v
@ -336,7 +348,7 @@ func (r *RpcClient) Start() {
if currentConnection != nil {
logger.Infof("%s success to connect to server %+v on start up, connectionId=%s", r.name,
currentConnection.getServerInfo(), currentConnection.getConnectionId())
r.currentConnection = currentConnection
r.setCurrentConnection(currentConnection)
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING))
r.notifyConnectionChange(CONNECTED)
} else {
@ -349,11 +361,12 @@ func (r *RpcClient) notifyConnectionChange(eventType ConnectionStatus) {
}
func (r *RpcClient) notifyServerSrvChange() {
if r.currentConnection == nil {
currentConnection := r.getCurrentConnection()
if currentConnection == nil {
r.switchServerAsync(ServerInfo{}, false)
return
}
curServerInfo := r.currentConnection.getServerInfo()
curServerInfo := currentConnection.getServerInfo()
var found bool
for _, ele := range r.nacosServer.GetServerList() {
if ele.IpAddr == curServerInfo.serverIp {
@ -411,7 +424,11 @@ func (r *RpcClient) switchServerAsync(recommendServerInfo ServerInfo, onRequestF
func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) {
if onRequestFail && r.sendHealthCheck() {
logger.Infof("%s server check success, currentServer is %+v", r.name, r.currentConnection.getServerInfo())
var serverInfo interface{} = nil
if currentConnection := r.getCurrentConnection(); currentConnection != nil {
serverInfo = currentConnection.getServerInfo()
}
logger.Infof("%s server check success, currentServer is %+v", r.name, serverInfo)
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING))
r.notifyConnectionChange(CONNECTED)
return
@ -438,14 +455,14 @@ func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) {
if connectionNew != nil && err == nil {
logger.Infof("%s success to connect a server %+v, connectionId=%s", r.name, serverInfo,
connectionNew.getConnectionId())
if r.currentConnection != nil {
currentConnection := r.getCurrentConnection()
if currentConnection != nil {
logger.Infof("%s abandon prev connection, server is %+v, connectionId is %s", r.name, serverInfo,
r.currentConnection.getConnectionId())
r.currentConnection.setAbandon(true)
currentConnection.getConnectionId())
currentConnection.setAbandon(true)
r.closeConnection()
}
r.currentConnection = connectionNew
r.setCurrentConnection(connectionNew)
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING))
r.notifyConnectionChange(CONNECTED)
return
@ -471,8 +488,9 @@ func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) {
}
func (r *RpcClient) closeConnection() {
if r.currentConnection != nil {
r.currentConnection.close()
currentConnection := r.getCurrentConnection()
if currentConnection != nil {
currentConnection.close()
r.notifyConnectionChange(DISCONNECTED)
}
}
@ -483,7 +501,11 @@ func (r *RpcClient) notifyConnectionEvent(event ConnectionEvent) {
if len(listeners) == 0 {
return
}
logger.Infof("%s notify %s event to listeners , connectionId=%s", r.name, event.toString(), r.currentConnection.getConnectionId())
var connectionId string
if currentConnection := r.getCurrentConnection(); currentConnection != nil {
connectionId = currentConnection.getConnectionId()
}
logger.Infof("%s notify %s event to listeners , connectionId=%s", r.name, event.toString(), connectionId)
for _, v := range listeners {
if event.isConnected() {
v.OnConnected()
@ -505,10 +527,11 @@ func (r *RpcClient) healthCheck(timer *time.Timer) {
r.lastActiveTimestamp.Store(time.Now())
return
} else {
if r.currentConnection == nil || r.isShutdown() {
currentConnection := r.getCurrentConnection()
if currentConnection == nil || r.isShutdown() {
return
}
logger.Infof("%s server healthy check fail, currentConnection=%s", r.name, r.currentConnection.getConnectionId())
logger.Infof("%s server healthy check fail, currentConnection=%s", r.name, currentConnection.getConnectionId())
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(UNHEALTHY))
reconnectContext = ReconnectContext{onRequestFail: false}
}
@ -516,10 +539,11 @@ func (r *RpcClient) healthCheck(timer *time.Timer) {
}
func (r *RpcClient) sendHealthCheck() bool {
if r.currentConnection == nil {
currentConnection := r.getCurrentConnection()
if currentConnection == nil {
return false
}
response, err := r.currentConnection.request(rpc_request.NewHealthCheckRequest(),
response, err := currentConnection.request(rpc_request.NewHealthCheckRequest(),
constant.DEFAULT_TIMEOUT_MILLS, r)
if err != nil {
logger.Errorf("client sendHealthCheck failed,err=%v", err)
@ -584,14 +608,19 @@ func (c *ConnectionEvent) toString() string {
func (r *RpcClient) Request(request rpc_request.IRequest, timeoutMills int64) (rpc_response.IResponse, error) {
retryTimes := 0
start := util.CurrentMillis()
var currentErr error
var (
currentErr error
currentConnection IConnection
)
for retryTimes < constant.REQUEST_DOMAIN_RETRY_TIME && util.CurrentMillis() < start+timeoutMills {
if r.currentConnection == nil || !r.IsRunning() {
currentConnection = r.getCurrentConnection()
if currentConnection == nil || !r.IsRunning() {
rpcClientStatus := RpcClientStatus(atomic.LoadInt32((*int32)(&r.rpcClientStatus)))
currentErr = waitReconnect(timeoutMills, &retryTimes, request,
errors.Errorf("client not connected, current status:%s", r.rpcClientStatus.getDesc()))
errors.Errorf("client not connected, current status:%s", rpcClientStatus.getDesc()))
continue
}
response, err := r.currentConnection.request(request, timeoutMills, r)
response, err := currentConnection.request(request, timeoutMills, r)
if err != nil {
currentErr = waitReconnect(timeoutMills, &retryTimes, request, err)
continue
@ -601,7 +630,7 @@ func (r *RpcClient) Request(request rpc_request.IRequest, timeoutMills int64) (r
r.mux.Lock()
if atomic.CompareAndSwapInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING), (int32)(UNHEALTHY)) {
logger.Infof("Connection is unregistered, switch server, connectionId=%s, request=%s",
r.currentConnection.getConnectionId(), request.GetRequestType())
currentConnection.getConnectionId(), request.GetRequestType())
r.switchServerAsync(ServerInfo{}, false)
}
r.mux.Unlock()