Compare commits
6 Commits
e1a42bfa6d
...
2fcd1a7b0a
Author | SHA1 | Date |
---|---|---|
|
2fcd1a7b0a | |
|
b05fa15123 | |
|
dc731b46d8 | |
|
035d58be4c | |
|
4254799661 | |
|
48593d516b |
|
@ -100,7 +100,7 @@ type RpcClient struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
name string
|
name string
|
||||||
labels map[string]string
|
labels map[string]string
|
||||||
currentConnection IConnection
|
currentConnection atomic.Value
|
||||||
rpcClientStatus RpcClientStatus
|
rpcClientStatus RpcClientStatus
|
||||||
eventChan chan ConnectionEvent
|
eventChan chan ConnectionEvent
|
||||||
reconnectionChan chan ReconnectContext
|
reconnectionChan chan ReconnectContext
|
||||||
|
@ -128,6 +128,18 @@ type ConnectionEvent struct {
|
||||||
eventType ConnectionStatus
|
eventType ConnectionStatus
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *RpcClient) getCurrentConnection() IConnection {
|
||||||
|
if conn, ok := r.currentConnection.Load().(IConnection); ok {
|
||||||
|
return conn
|
||||||
|
} else {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RpcClient) setCurrentConnection(conn IConnection) {
|
||||||
|
r.currentConnection.Store(conn)
|
||||||
|
}
|
||||||
|
|
||||||
func (r *RpcClient) putAllLabels(labels map[string]string) {
|
func (r *RpcClient) putAllLabels(labels map[string]string) {
|
||||||
for k, v := range labels {
|
for k, v := range labels {
|
||||||
r.labels[k] = v
|
r.labels[k] = v
|
||||||
|
@ -336,7 +348,7 @@ func (r *RpcClient) Start() {
|
||||||
if currentConnection != nil {
|
if currentConnection != nil {
|
||||||
logger.Infof("%s success to connect to server %+v on start up, connectionId=%s", r.name,
|
logger.Infof("%s success to connect to server %+v on start up, connectionId=%s", r.name,
|
||||||
currentConnection.getServerInfo(), currentConnection.getConnectionId())
|
currentConnection.getServerInfo(), currentConnection.getConnectionId())
|
||||||
r.currentConnection = currentConnection
|
r.setCurrentConnection(currentConnection)
|
||||||
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING))
|
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING))
|
||||||
r.notifyConnectionChange(CONNECTED)
|
r.notifyConnectionChange(CONNECTED)
|
||||||
} else {
|
} else {
|
||||||
|
@ -349,11 +361,12 @@ func (r *RpcClient) notifyConnectionChange(eventType ConnectionStatus) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RpcClient) notifyServerSrvChange() {
|
func (r *RpcClient) notifyServerSrvChange() {
|
||||||
if r.currentConnection == nil {
|
currentConnection := r.getCurrentConnection()
|
||||||
|
if currentConnection == nil {
|
||||||
r.switchServerAsync(ServerInfo{}, false)
|
r.switchServerAsync(ServerInfo{}, false)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
curServerInfo := r.currentConnection.getServerInfo()
|
curServerInfo := currentConnection.getServerInfo()
|
||||||
var found bool
|
var found bool
|
||||||
for _, ele := range r.nacosServer.GetServerList() {
|
for _, ele := range r.nacosServer.GetServerList() {
|
||||||
if ele.IpAddr == curServerInfo.serverIp {
|
if ele.IpAddr == curServerInfo.serverIp {
|
||||||
|
@ -411,7 +424,11 @@ func (r *RpcClient) switchServerAsync(recommendServerInfo ServerInfo, onRequestF
|
||||||
|
|
||||||
func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) {
|
func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) {
|
||||||
if onRequestFail && r.sendHealthCheck() {
|
if onRequestFail && r.sendHealthCheck() {
|
||||||
logger.Infof("%s server check success, currentServer is %+v", r.name, r.currentConnection.getServerInfo())
|
var serverInfo interface{} = nil
|
||||||
|
if currentConnection := r.getCurrentConnection(); currentConnection != nil {
|
||||||
|
serverInfo = currentConnection.getServerInfo()
|
||||||
|
}
|
||||||
|
logger.Infof("%s server check success, currentServer is %+v", r.name, serverInfo)
|
||||||
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING))
|
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING))
|
||||||
r.notifyConnectionChange(CONNECTED)
|
r.notifyConnectionChange(CONNECTED)
|
||||||
return
|
return
|
||||||
|
@ -438,14 +455,14 @@ func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) {
|
||||||
if connectionNew != nil && err == nil {
|
if connectionNew != nil && err == nil {
|
||||||
logger.Infof("%s success to connect a server %+v, connectionId=%s", r.name, serverInfo,
|
logger.Infof("%s success to connect a server %+v, connectionId=%s", r.name, serverInfo,
|
||||||
connectionNew.getConnectionId())
|
connectionNew.getConnectionId())
|
||||||
|
currentConnection := r.getCurrentConnection()
|
||||||
if r.currentConnection != nil {
|
if currentConnection != nil {
|
||||||
logger.Infof("%s abandon prev connection, server is %+v, connectionId is %s", r.name, serverInfo,
|
logger.Infof("%s abandon prev connection, server is %+v, connectionId is %s", r.name, serverInfo,
|
||||||
r.currentConnection.getConnectionId())
|
currentConnection.getConnectionId())
|
||||||
r.currentConnection.setAbandon(true)
|
currentConnection.setAbandon(true)
|
||||||
r.closeConnection()
|
r.closeConnection()
|
||||||
}
|
}
|
||||||
r.currentConnection = connectionNew
|
r.setCurrentConnection(connectionNew)
|
||||||
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING))
|
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING))
|
||||||
r.notifyConnectionChange(CONNECTED)
|
r.notifyConnectionChange(CONNECTED)
|
||||||
return
|
return
|
||||||
|
@ -471,8 +488,9 @@ func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RpcClient) closeConnection() {
|
func (r *RpcClient) closeConnection() {
|
||||||
if r.currentConnection != nil {
|
currentConnection := r.getCurrentConnection()
|
||||||
r.currentConnection.close()
|
if currentConnection != nil {
|
||||||
|
currentConnection.close()
|
||||||
r.notifyConnectionChange(DISCONNECTED)
|
r.notifyConnectionChange(DISCONNECTED)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -483,7 +501,11 @@ func (r *RpcClient) notifyConnectionEvent(event ConnectionEvent) {
|
||||||
if len(listeners) == 0 {
|
if len(listeners) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
logger.Infof("%s notify %s event to listeners , connectionId=%s", r.name, event.toString(), r.currentConnection.getConnectionId())
|
var connectionId string
|
||||||
|
if currentConnection := r.getCurrentConnection(); currentConnection != nil {
|
||||||
|
connectionId = currentConnection.getConnectionId()
|
||||||
|
}
|
||||||
|
logger.Infof("%s notify %s event to listeners , connectionId=%s", r.name, event.toString(), connectionId)
|
||||||
for _, v := range listeners {
|
for _, v := range listeners {
|
||||||
if event.isConnected() {
|
if event.isConnected() {
|
||||||
v.OnConnected()
|
v.OnConnected()
|
||||||
|
@ -505,10 +527,11 @@ func (r *RpcClient) healthCheck(timer *time.Timer) {
|
||||||
r.lastActiveTimestamp.Store(time.Now())
|
r.lastActiveTimestamp.Store(time.Now())
|
||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
if r.currentConnection == nil || r.isShutdown() {
|
currentConnection := r.getCurrentConnection()
|
||||||
|
if currentConnection == nil || r.isShutdown() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
logger.Infof("%s server healthy check fail, currentConnection=%s", r.name, r.currentConnection.getConnectionId())
|
logger.Infof("%s server healthy check fail, currentConnection=%s", r.name, currentConnection.getConnectionId())
|
||||||
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(UNHEALTHY))
|
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(UNHEALTHY))
|
||||||
reconnectContext = ReconnectContext{onRequestFail: false}
|
reconnectContext = ReconnectContext{onRequestFail: false}
|
||||||
}
|
}
|
||||||
|
@ -516,10 +539,11 @@ func (r *RpcClient) healthCheck(timer *time.Timer) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RpcClient) sendHealthCheck() bool {
|
func (r *RpcClient) sendHealthCheck() bool {
|
||||||
if r.currentConnection == nil {
|
currentConnection := r.getCurrentConnection()
|
||||||
|
if currentConnection == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
response, err := r.currentConnection.request(rpc_request.NewHealthCheckRequest(),
|
response, err := currentConnection.request(rpc_request.NewHealthCheckRequest(),
|
||||||
constant.DEFAULT_TIMEOUT_MILLS, r)
|
constant.DEFAULT_TIMEOUT_MILLS, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorf("client sendHealthCheck failed,err=%v", err)
|
logger.Errorf("client sendHealthCheck failed,err=%v", err)
|
||||||
|
@ -584,14 +608,19 @@ func (c *ConnectionEvent) toString() string {
|
||||||
func (r *RpcClient) Request(request rpc_request.IRequest, timeoutMills int64) (rpc_response.IResponse, error) {
|
func (r *RpcClient) Request(request rpc_request.IRequest, timeoutMills int64) (rpc_response.IResponse, error) {
|
||||||
retryTimes := 0
|
retryTimes := 0
|
||||||
start := util.CurrentMillis()
|
start := util.CurrentMillis()
|
||||||
var currentErr error
|
var (
|
||||||
|
currentErr error
|
||||||
|
currentConnection IConnection
|
||||||
|
)
|
||||||
for retryTimes < constant.REQUEST_DOMAIN_RETRY_TIME && util.CurrentMillis() < start+timeoutMills {
|
for retryTimes < constant.REQUEST_DOMAIN_RETRY_TIME && util.CurrentMillis() < start+timeoutMills {
|
||||||
if r.currentConnection == nil || !r.IsRunning() {
|
currentConnection = r.getCurrentConnection()
|
||||||
|
if currentConnection == nil || !r.IsRunning() {
|
||||||
|
rpcClientStatus := RpcClientStatus(atomic.LoadInt32((*int32)(&r.rpcClientStatus)))
|
||||||
currentErr = waitReconnect(timeoutMills, &retryTimes, request,
|
currentErr = waitReconnect(timeoutMills, &retryTimes, request,
|
||||||
errors.Errorf("client not connected, current status:%s", r.rpcClientStatus.getDesc()))
|
errors.Errorf("client not connected, current status:%s", rpcClientStatus.getDesc()))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
response, err := r.currentConnection.request(request, timeoutMills, r)
|
response, err := currentConnection.request(request, timeoutMills, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
currentErr = waitReconnect(timeoutMills, &retryTimes, request, err)
|
currentErr = waitReconnect(timeoutMills, &retryTimes, request, err)
|
||||||
continue
|
continue
|
||||||
|
@ -601,7 +630,7 @@ func (r *RpcClient) Request(request rpc_request.IRequest, timeoutMills int64) (r
|
||||||
r.mux.Lock()
|
r.mux.Lock()
|
||||||
if atomic.CompareAndSwapInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING), (int32)(UNHEALTHY)) {
|
if atomic.CompareAndSwapInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING), (int32)(UNHEALTHY)) {
|
||||||
logger.Infof("Connection is unregistered, switch server, connectionId=%s, request=%s",
|
logger.Infof("Connection is unregistered, switch server, connectionId=%s, request=%s",
|
||||||
r.currentConnection.getConnectionId(), request.GetRequestType())
|
currentConnection.getConnectionId(), request.GetRequestType())
|
||||||
r.switchServerAsync(ServerInfo{}, false)
|
r.switchServerAsync(ServerInfo{}, false)
|
||||||
}
|
}
|
||||||
r.mux.Unlock()
|
r.mux.Unlock()
|
||||||
|
|
Loading…
Reference in New Issue