Compare commits
6 Commits
e1a42bfa6d
...
2fcd1a7b0a
Author | SHA1 | Date |
---|---|---|
|
2fcd1a7b0a | |
|
b05fa15123 | |
|
dc731b46d8 | |
|
035d58be4c | |
|
4254799661 | |
|
48593d516b |
|
@ -100,7 +100,7 @@ type RpcClient struct {
|
|||
ctx context.Context
|
||||
name string
|
||||
labels map[string]string
|
||||
currentConnection IConnection
|
||||
currentConnection atomic.Value
|
||||
rpcClientStatus RpcClientStatus
|
||||
eventChan chan ConnectionEvent
|
||||
reconnectionChan chan ReconnectContext
|
||||
|
@ -128,6 +128,18 @@ type ConnectionEvent struct {
|
|||
eventType ConnectionStatus
|
||||
}
|
||||
|
||||
func (r *RpcClient) getCurrentConnection() IConnection {
|
||||
if conn, ok := r.currentConnection.Load().(IConnection); ok {
|
||||
return conn
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RpcClient) setCurrentConnection(conn IConnection) {
|
||||
r.currentConnection.Store(conn)
|
||||
}
|
||||
|
||||
func (r *RpcClient) putAllLabels(labels map[string]string) {
|
||||
for k, v := range labels {
|
||||
r.labels[k] = v
|
||||
|
@ -336,7 +348,7 @@ func (r *RpcClient) Start() {
|
|||
if currentConnection != nil {
|
||||
logger.Infof("%s success to connect to server %+v on start up, connectionId=%s", r.name,
|
||||
currentConnection.getServerInfo(), currentConnection.getConnectionId())
|
||||
r.currentConnection = currentConnection
|
||||
r.setCurrentConnection(currentConnection)
|
||||
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING))
|
||||
r.notifyConnectionChange(CONNECTED)
|
||||
} else {
|
||||
|
@ -349,11 +361,12 @@ func (r *RpcClient) notifyConnectionChange(eventType ConnectionStatus) {
|
|||
}
|
||||
|
||||
func (r *RpcClient) notifyServerSrvChange() {
|
||||
if r.currentConnection == nil {
|
||||
currentConnection := r.getCurrentConnection()
|
||||
if currentConnection == nil {
|
||||
r.switchServerAsync(ServerInfo{}, false)
|
||||
return
|
||||
}
|
||||
curServerInfo := r.currentConnection.getServerInfo()
|
||||
curServerInfo := currentConnection.getServerInfo()
|
||||
var found bool
|
||||
for _, ele := range r.nacosServer.GetServerList() {
|
||||
if ele.IpAddr == curServerInfo.serverIp {
|
||||
|
@ -411,7 +424,11 @@ func (r *RpcClient) switchServerAsync(recommendServerInfo ServerInfo, onRequestF
|
|||
|
||||
func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) {
|
||||
if onRequestFail && r.sendHealthCheck() {
|
||||
logger.Infof("%s server check success, currentServer is %+v", r.name, r.currentConnection.getServerInfo())
|
||||
var serverInfo interface{} = nil
|
||||
if currentConnection := r.getCurrentConnection(); currentConnection != nil {
|
||||
serverInfo = currentConnection.getServerInfo()
|
||||
}
|
||||
logger.Infof("%s server check success, currentServer is %+v", r.name, serverInfo)
|
||||
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING))
|
||||
r.notifyConnectionChange(CONNECTED)
|
||||
return
|
||||
|
@ -438,14 +455,14 @@ func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) {
|
|||
if connectionNew != nil && err == nil {
|
||||
logger.Infof("%s success to connect a server %+v, connectionId=%s", r.name, serverInfo,
|
||||
connectionNew.getConnectionId())
|
||||
|
||||
if r.currentConnection != nil {
|
||||
currentConnection := r.getCurrentConnection()
|
||||
if currentConnection != nil {
|
||||
logger.Infof("%s abandon prev connection, server is %+v, connectionId is %s", r.name, serverInfo,
|
||||
r.currentConnection.getConnectionId())
|
||||
r.currentConnection.setAbandon(true)
|
||||
currentConnection.getConnectionId())
|
||||
currentConnection.setAbandon(true)
|
||||
r.closeConnection()
|
||||
}
|
||||
r.currentConnection = connectionNew
|
||||
r.setCurrentConnection(connectionNew)
|
||||
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING))
|
||||
r.notifyConnectionChange(CONNECTED)
|
||||
return
|
||||
|
@ -471,8 +488,9 @@ func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) {
|
|||
}
|
||||
|
||||
func (r *RpcClient) closeConnection() {
|
||||
if r.currentConnection != nil {
|
||||
r.currentConnection.close()
|
||||
currentConnection := r.getCurrentConnection()
|
||||
if currentConnection != nil {
|
||||
currentConnection.close()
|
||||
r.notifyConnectionChange(DISCONNECTED)
|
||||
}
|
||||
}
|
||||
|
@ -483,7 +501,11 @@ func (r *RpcClient) notifyConnectionEvent(event ConnectionEvent) {
|
|||
if len(listeners) == 0 {
|
||||
return
|
||||
}
|
||||
logger.Infof("%s notify %s event to listeners , connectionId=%s", r.name, event.toString(), r.currentConnection.getConnectionId())
|
||||
var connectionId string
|
||||
if currentConnection := r.getCurrentConnection(); currentConnection != nil {
|
||||
connectionId = currentConnection.getConnectionId()
|
||||
}
|
||||
logger.Infof("%s notify %s event to listeners , connectionId=%s", r.name, event.toString(), connectionId)
|
||||
for _, v := range listeners {
|
||||
if event.isConnected() {
|
||||
v.OnConnected()
|
||||
|
@ -505,10 +527,11 @@ func (r *RpcClient) healthCheck(timer *time.Timer) {
|
|||
r.lastActiveTimestamp.Store(time.Now())
|
||||
return
|
||||
} else {
|
||||
if r.currentConnection == nil || r.isShutdown() {
|
||||
currentConnection := r.getCurrentConnection()
|
||||
if currentConnection == nil || r.isShutdown() {
|
||||
return
|
||||
}
|
||||
logger.Infof("%s server healthy check fail, currentConnection=%s", r.name, r.currentConnection.getConnectionId())
|
||||
logger.Infof("%s server healthy check fail, currentConnection=%s", r.name, currentConnection.getConnectionId())
|
||||
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(UNHEALTHY))
|
||||
reconnectContext = ReconnectContext{onRequestFail: false}
|
||||
}
|
||||
|
@ -516,10 +539,11 @@ func (r *RpcClient) healthCheck(timer *time.Timer) {
|
|||
}
|
||||
|
||||
func (r *RpcClient) sendHealthCheck() bool {
|
||||
if r.currentConnection == nil {
|
||||
currentConnection := r.getCurrentConnection()
|
||||
if currentConnection == nil {
|
||||
return false
|
||||
}
|
||||
response, err := r.currentConnection.request(rpc_request.NewHealthCheckRequest(),
|
||||
response, err := currentConnection.request(rpc_request.NewHealthCheckRequest(),
|
||||
constant.DEFAULT_TIMEOUT_MILLS, r)
|
||||
if err != nil {
|
||||
logger.Errorf("client sendHealthCheck failed,err=%v", err)
|
||||
|
@ -584,14 +608,19 @@ func (c *ConnectionEvent) toString() string {
|
|||
func (r *RpcClient) Request(request rpc_request.IRequest, timeoutMills int64) (rpc_response.IResponse, error) {
|
||||
retryTimes := 0
|
||||
start := util.CurrentMillis()
|
||||
var currentErr error
|
||||
var (
|
||||
currentErr error
|
||||
currentConnection IConnection
|
||||
)
|
||||
for retryTimes < constant.REQUEST_DOMAIN_RETRY_TIME && util.CurrentMillis() < start+timeoutMills {
|
||||
if r.currentConnection == nil || !r.IsRunning() {
|
||||
currentConnection = r.getCurrentConnection()
|
||||
if currentConnection == nil || !r.IsRunning() {
|
||||
rpcClientStatus := RpcClientStatus(atomic.LoadInt32((*int32)(&r.rpcClientStatus)))
|
||||
currentErr = waitReconnect(timeoutMills, &retryTimes, request,
|
||||
errors.Errorf("client not connected, current status:%s", r.rpcClientStatus.getDesc()))
|
||||
errors.Errorf("client not connected, current status:%s", rpcClientStatus.getDesc()))
|
||||
continue
|
||||
}
|
||||
response, err := r.currentConnection.request(request, timeoutMills, r)
|
||||
response, err := currentConnection.request(request, timeoutMills, r)
|
||||
if err != nil {
|
||||
currentErr = waitReconnect(timeoutMills, &retryTimes, request, err)
|
||||
continue
|
||||
|
@ -601,7 +630,7 @@ func (r *RpcClient) Request(request rpc_request.IRequest, timeoutMills int64) (r
|
|||
r.mux.Lock()
|
||||
if atomic.CompareAndSwapInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING), (int32)(UNHEALTHY)) {
|
||||
logger.Infof("Connection is unregistered, switch server, connectionId=%s, request=%s",
|
||||
r.currentConnection.getConnectionId(), request.GetRequestType())
|
||||
currentConnection.getConnectionId(), request.GetRequestType())
|
||||
r.switchServerAsync(ServerInfo{}, false)
|
||||
}
|
||||
r.mux.Unlock()
|
||||
|
|
Loading…
Reference in New Issue