Compare commits
24 Commits
Author | SHA1 | Date |
---|---|---|
|
ef4398d834 | |
|
7cb9d73a53 | |
|
e72d3a9ed0 | |
|
0720800755 | |
|
144d5f7a99 | |
|
8188868e16 | |
|
1713fd8d96 | |
|
fa1155911d | |
|
637cbbe327 | |
|
7abca3647d | |
|
9371be3854 | |
|
07770923c5 | |
|
4e8138d3a1 | |
|
c65e98972d | |
|
8b9e7859db | |
|
925a0e5435 | |
|
dd7ac4ae0d | |
|
ce5011e666 | |
|
4527321855 | |
|
0b638f82be | |
|
0067c76bd5 | |
|
6d8c265167 | |
|
3d7d43827b | |
|
ff9f7addfc |
|
@ -118,6 +118,7 @@ class ServiceSync extends React.Component {
|
||||||
<Table dataSource={taskModels} loading={loading}>
|
<Table dataSource={taskModels} loading={loading}>
|
||||||
<Table.Column title={locale.serviceName} dataIndex="serviceName" />
|
<Table.Column title={locale.serviceName} dataIndex="serviceName" />
|
||||||
<Table.Column title={locale.groupName} dataIndex="groupName" />
|
<Table.Column title={locale.groupName} dataIndex="groupName" />
|
||||||
|
<Table.Column title={locale.nameSpace} dataIndex="nameSpace" />
|
||||||
<Table.Column
|
<Table.Column
|
||||||
title={locale.sourceCluster}
|
title={locale.sourceCluster}
|
||||||
dataIndex="sourceClusterId"
|
dataIndex="sourceClusterId"
|
||||||
|
|
|
@ -41,6 +41,7 @@ const I18N_CONF = {
|
||||||
nameSpace: '命名空间',
|
nameSpace: '命名空间',
|
||||||
serviceName: '服务名',
|
serviceName: '服务名',
|
||||||
groupName: '分组',
|
groupName: '分组',
|
||||||
|
nameSpace: '命名空间',
|
||||||
sourceCluster: '源集群',
|
sourceCluster: '源集群',
|
||||||
destCluster: '目标集群',
|
destCluster: '目标集群',
|
||||||
instancesCount: '实例数',
|
instancesCount: '实例数',
|
||||||
|
@ -63,6 +64,8 @@ const I18N_CONF = {
|
||||||
serviceNamePlaceholder: '请输入服务名',
|
serviceNamePlaceholder: '请输入服务名',
|
||||||
groupName: '分组名',
|
groupName: '分组名',
|
||||||
groupNamePlaceholder: '请输入分组名',
|
groupNamePlaceholder: '请输入分组名',
|
||||||
|
nameSpace: '命名空间',
|
||||||
|
nameSpacePlaceholder: '请输入命名空间',
|
||||||
sourceCluster: '源集群',
|
sourceCluster: '源集群',
|
||||||
destCluster: '目标集群',
|
destCluster: '目标集群',
|
||||||
version: '版本',
|
version: '版本',
|
||||||
|
|
|
@ -0,0 +1,46 @@
|
||||||
|
package com.alibaba.nacossync.constant;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by maj on 2020/11/18.
|
||||||
|
*/
|
||||||
|
public enum ShardingLogTypeEnum {
|
||||||
|
|
||||||
|
ADD("add", "新增"),
|
||||||
|
|
||||||
|
DELETE("DELETE", "删除");
|
||||||
|
|
||||||
|
private String type;
|
||||||
|
private String desc;
|
||||||
|
|
||||||
|
ShardingLogTypeEnum(String type, String desc) {
|
||||||
|
this.type = type;
|
||||||
|
this.desc = desc;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getType() {
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setType(String type) {
|
||||||
|
this.type = type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDesc() {
|
||||||
|
return desc;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDesc(String desc) {
|
||||||
|
this.desc = desc;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean contains(String type) {
|
||||||
|
|
||||||
|
for (ShardingLogTypeEnum shardingLogTypeEnum : ShardingLogTypeEnum.values()) {
|
||||||
|
|
||||||
|
if (shardingLogTypeEnum.getType().equals(type)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
|
@ -12,14 +12,11 @@
|
||||||
*/
|
*/
|
||||||
package com.alibaba.nacossync.extension;
|
package com.alibaba.nacossync.extension;
|
||||||
|
|
||||||
import static com.alibaba.nacossync.util.SkyWalkerUtil.generateSyncKey;
|
|
||||||
|
|
||||||
import com.alibaba.nacos.api.exception.NacosException;
|
import com.alibaba.nacos.api.exception.NacosException;
|
||||||
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
|
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
|
||||||
import com.alibaba.nacossync.constant.ClusterTypeEnum;
|
import com.alibaba.nacossync.constant.ClusterTypeEnum;
|
||||||
import com.alibaba.nacossync.extension.annotation.NacosSyncService;
|
import com.alibaba.nacossync.extension.annotation.NacosSyncService;
|
||||||
import com.alibaba.nacossync.pojo.model.TaskDO;
|
import com.alibaba.nacossync.pojo.model.TaskDO;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.BeansException;
|
import org.springframework.beans.BeansException;
|
||||||
import org.springframework.beans.factory.InitializingBean;
|
import org.springframework.beans.factory.InitializingBean;
|
||||||
|
@ -27,6 +24,10 @@ import org.springframework.context.ApplicationContext;
|
||||||
import org.springframework.context.ApplicationContextAware;
|
import org.springframework.context.ApplicationContextAware;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
import static com.alibaba.nacossync.util.SkyWalkerUtil.generateSyncKey;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author NacosSync
|
* @author NacosSync
|
||||||
* @version $Id: SyncManagerService.java, v 0.1 2018-09-25 PM5:17 NacosSync Exp $$
|
* @version $Id: SyncManagerService.java, v 0.1 2018-09-25 PM5:17 NacosSync Exp $$
|
||||||
|
@ -42,20 +43,17 @@ public class SyncManagerService implements InitializingBean, ApplicationContextA
|
||||||
private ApplicationContext applicationContext;
|
private ApplicationContext applicationContext;
|
||||||
|
|
||||||
public SyncManagerService(
|
public SyncManagerService(
|
||||||
SkyWalkerCacheServices skyWalkerCacheServices) {
|
SkyWalkerCacheServices skyWalkerCacheServices) {
|
||||||
this.skyWalkerCacheServices = skyWalkerCacheServices;
|
this.skyWalkerCacheServices = skyWalkerCacheServices;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean delete(TaskDO taskDO) throws NacosException {
|
public boolean delete(TaskDO taskDO) throws NacosException {
|
||||||
|
|
||||||
return getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId()).delete(taskDO);
|
return getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId()).delete(taskDO);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean sync(TaskDO taskDO) {
|
public boolean sync(TaskDO taskDO) {
|
||||||
|
|
||||||
return getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId()).sync(taskDO);
|
return getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId()).sync(taskDO);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -80,5 +78,4 @@ public class SyncManagerService implements InitializingBean, ApplicationContextA
|
||||||
|
|
||||||
return syncServiceMap.get(generateSyncKey(sourceClusterType, destClusterType));
|
return syncServiceMap.get(generateSyncKey(sourceClusterType, destClusterType));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,9 +12,6 @@
|
||||||
*/
|
*/
|
||||||
package com.alibaba.nacossync.extension.impl;
|
package com.alibaba.nacossync.extension.impl;
|
||||||
|
|
||||||
import static com.alibaba.nacossync.util.StringUtils.convertDubboFullPathForZk;
|
|
||||||
import static com.alibaba.nacossync.util.StringUtils.convertDubboProvidersPath;
|
|
||||||
|
|
||||||
import com.alibaba.nacos.api.naming.NamingService;
|
import com.alibaba.nacos.api.naming.NamingService;
|
||||||
import com.alibaba.nacos.api.naming.listener.EventListener;
|
import com.alibaba.nacos.api.naming.listener.EventListener;
|
||||||
import com.alibaba.nacos.api.naming.listener.NamingEvent;
|
import com.alibaba.nacos.api.naming.listener.NamingEvent;
|
||||||
|
@ -28,17 +25,13 @@ import com.alibaba.nacossync.extension.SyncService;
|
||||||
import com.alibaba.nacossync.extension.annotation.NacosSyncService;
|
import com.alibaba.nacossync.extension.annotation.NacosSyncService;
|
||||||
import com.alibaba.nacossync.extension.holder.NacosServerHolder;
|
import com.alibaba.nacossync.extension.holder.NacosServerHolder;
|
||||||
import com.alibaba.nacossync.extension.holder.ZookeeperServerHolder;
|
import com.alibaba.nacossync.extension.holder.ZookeeperServerHolder;
|
||||||
|
import com.alibaba.nacossync.extension.impl.extend.NacosSyncToZookeeperServicesSharding;
|
||||||
|
import com.alibaba.nacossync.extension.impl.extend.Sharding;
|
||||||
import com.alibaba.nacossync.monitor.MetricsManager;
|
import com.alibaba.nacossync.monitor.MetricsManager;
|
||||||
import com.alibaba.nacossync.pojo.model.TaskDO;
|
import com.alibaba.nacossync.pojo.model.TaskDO;
|
||||||
import com.alibaba.nacossync.util.DubboConstants;
|
import com.alibaba.nacossync.util.DubboConstants;
|
||||||
|
import com.alibaba.nacossync.util.ExpirySet;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import java.io.UnsupportedEncodingException;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.curator.framework.CuratorFramework;
|
import org.apache.curator.framework.CuratorFramework;
|
||||||
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
|
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
|
||||||
|
@ -47,6 +40,16 @@ import org.apache.curator.utils.CloseableUtils;
|
||||||
import org.apache.zookeeper.CreateMode;
|
import org.apache.zookeeper.CreateMode;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.io.UnsupportedEncodingException;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
|
import static com.alibaba.nacossync.util.StringUtils.convertDubboFullPathForZk;
|
||||||
|
import static com.alibaba.nacossync.util.StringUtils.convertDubboProvidersPath;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Nacos 同步 Zk 数据
|
* Nacos 同步 Zk 数据
|
||||||
*
|
*
|
||||||
|
@ -90,9 +93,16 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
|
||||||
|
|
||||||
private final ZookeeperServerHolder zookeeperServerHolder;
|
private final ZookeeperServerHolder zookeeperServerHolder;
|
||||||
|
|
||||||
|
private static ExpirySet<String> serviceNameSet = new ExpirySet<String>();
|
||||||
|
|
||||||
|
private static ExecutorService EXECUTOR = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
|
||||||
|
|
||||||
|
@Resource(type = NacosSyncToZookeeperServicesSharding.class)
|
||||||
|
private Sharding sharding;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
public NacosSyncToZookeeperServiceImpl(SkyWalkerCacheServices skyWalkerCacheServices,
|
public NacosSyncToZookeeperServiceImpl(SkyWalkerCacheServices skyWalkerCacheServices,
|
||||||
NacosServerHolder nacosServerHolder, ZookeeperServerHolder zookeeperServerHolder) {
|
NacosServerHolder nacosServerHolder, ZookeeperServerHolder zookeeperServerHolder) {
|
||||||
this.skyWalkerCacheServices = skyWalkerCacheServices;
|
this.skyWalkerCacheServices = skyWalkerCacheServices;
|
||||||
this.nacosServerHolder = nacosServerHolder;
|
this.nacosServerHolder = nacosServerHolder;
|
||||||
this.zookeeperServerHolder = zookeeperServerHolder;
|
this.zookeeperServerHolder = zookeeperServerHolder;
|
||||||
|
@ -101,9 +111,9 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
|
||||||
@Override
|
@Override
|
||||||
public boolean delete(TaskDO taskDO) {
|
public boolean delete(TaskDO taskDO) {
|
||||||
try {
|
try {
|
||||||
|
|
||||||
NamingService sourceNamingService =
|
NamingService sourceNamingService =
|
||||||
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());
|
//nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());//
|
||||||
|
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace());//fix with no nameSpaceName
|
||||||
EventListener eventListener = nacosListenerMap.remove(taskDO.getTaskId());
|
EventListener eventListener = nacosListenerMap.remove(taskDO.getTaskId());
|
||||||
PathChildrenCache pathChildrenCache = pathChildrenCacheMap.get(taskDO.getTaskId());
|
PathChildrenCache pathChildrenCache = pathChildrenCacheMap.get(taskDO.getTaskId());
|
||||||
sourceNamingService.unsubscribe(taskDO.getServiceName(), eventListener);
|
sourceNamingService.unsubscribe(taskDO.getServiceName(), eventListener);
|
||||||
|
@ -113,6 +123,7 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
|
||||||
for (String instanceUrl : instanceUrlSet) {
|
for (String instanceUrl : instanceUrlSet) {
|
||||||
client.delete().quietly().forPath(instanceUrl);
|
client.delete().quietly().forPath(instanceUrl);
|
||||||
}
|
}
|
||||||
|
sharding.stop(taskDO);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("delete task from nacos to zk was failed, taskId:{}", taskDO.getTaskId(), e);
|
log.error("delete task from nacos to zk was failed, taskId:{}", taskDO.getTaskId(), e);
|
||||||
metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
|
metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
|
||||||
|
@ -123,37 +134,7 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean sync(TaskDO taskDO) {
|
public boolean sync(TaskDO taskDO) {
|
||||||
try {
|
sharding.start(taskDO);
|
||||||
NamingService sourceNamingService =
|
|
||||||
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());
|
|
||||||
CuratorFramework client = zookeeperServerHolder.get(taskDO.getDestClusterId(), taskDO.getGroupName());
|
|
||||||
nacosListenerMap.putIfAbsent(taskDO.getTaskId(), event -> {
|
|
||||||
if (event instanceof NamingEvent) {
|
|
||||||
try {
|
|
||||||
|
|
||||||
List<Instance> sourceInstances = sourceNamingService.getAllInstances(taskDO.getServiceName());
|
|
||||||
Set<String> newInstanceUrlSet = getWaitingToAddInstance(taskDO, client, sourceInstances);
|
|
||||||
|
|
||||||
// 获取之前的备份 删除无效实例
|
|
||||||
deleteInvalidInstances(taskDO, client, newInstanceUrlSet);
|
|
||||||
// 替换当前备份为最新备份
|
|
||||||
instanceBackupMap.put(taskDO.getTaskId(), newInstanceUrlSet);
|
|
||||||
// 尝试恢复因为zk客户端意外断开导致的实例数据
|
|
||||||
tryToCompensate(taskDO, sourceNamingService, sourceInstances);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("event process fail, taskId:{}", taskDO.getTaskId(), e);
|
|
||||||
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
sourceNamingService.subscribe(taskDO.getServiceName(), nacosListenerMap.get(taskDO.getTaskId()));
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("sync task from nacos to zk was failed, taskId:{}", taskDO.getTaskId(), e);
|
|
||||||
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,13 +145,13 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
|
||||||
pathCache.getListenable().addListener((zkClient, zkEvent) -> {
|
pathCache.getListenable().addListener((zkClient, zkEvent) -> {
|
||||||
if (zkEvent.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
|
if (zkEvent.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
|
||||||
List<Instance> allInstances =
|
List<Instance> allInstances =
|
||||||
sourceNamingService.getAllInstances(taskDO.getServiceName());
|
sourceNamingService.getAllInstances(taskDO.getServiceName());
|
||||||
for (Instance instance : allInstances) {
|
for (Instance instance : allInstances) {
|
||||||
String instanceUrl = buildSyncInstance(instance, taskDO);
|
String instanceUrl = buildSyncInstance(instance, taskDO);
|
||||||
String zkInstancePath = zkEvent.getData().getPath();
|
String zkInstancePath = zkEvent.getData().getPath();
|
||||||
if (zkInstancePath.equals(instanceUrl)) {
|
if (zkInstancePath.equals(instanceUrl)) {
|
||||||
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
|
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
|
||||||
.forPath(zkInstancePath);
|
.forPath(zkInstancePath);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -182,9 +163,9 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteInvalidInstances(TaskDO taskDO, CuratorFramework client, Set<String> newInstanceUrlSet)
|
private void deleteInvalidInstances(TaskDO taskDO, CuratorFramework client, Set<String> newInstanceUrlSet)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
Set<String> instanceBackup =
|
Set<String> instanceBackup =
|
||||||
instanceBackupMap.getOrDefault(taskDO.getTaskId(), Sets.newHashSet());
|
instanceBackupMap.getOrDefault(taskDO.getTaskId(), Sets.newHashSet());
|
||||||
for (String instanceUrl : instanceBackup) {
|
for (String instanceUrl : instanceBackup) {
|
||||||
if (newInstanceUrlSet.contains(instanceUrl)) {
|
if (newInstanceUrlSet.contains(instanceUrl)) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -194,15 +175,17 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
|
||||||
}
|
}
|
||||||
|
|
||||||
private HashSet<String> getWaitingToAddInstance(TaskDO taskDO, CuratorFramework client,
|
private HashSet<String> getWaitingToAddInstance(TaskDO taskDO, CuratorFramework client,
|
||||||
List<Instance> sourceInstances) throws Exception {
|
List<Instance> sourceInstances) throws Exception {
|
||||||
HashSet<String> waitingToAddInstance = new HashSet<>();
|
HashSet<String> waitingToAddInstance = new HashSet<>();
|
||||||
for (Instance instance : sourceInstances) {
|
for (Instance instance : sourceInstances) {
|
||||||
if (needSync(instance.getMetadata())) {
|
if (needSync(instance.getMetadata())) {
|
||||||
|
log.info("nacos->zk ,real sync service :{},and instance :{}", instance.getServiceName(), instance.getIp());
|
||||||
String instanceUrl = buildSyncInstance(instance, taskDO);
|
String instanceUrl = buildSyncInstance(instance, taskDO);
|
||||||
if (null == client.checkExists().forPath(instanceUrl)) {
|
if (null != client.checkExists().forPath(instanceUrl)) {
|
||||||
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
|
client.delete().quietly().forPath(instanceUrl);
|
||||||
.forPath(instanceUrl);
|
|
||||||
}
|
}
|
||||||
|
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
|
||||||
|
.forPath(instanceUrl);
|
||||||
waitingToAddInstance.add(instanceUrl);
|
waitingToAddInstance.add(instanceUrl);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -214,11 +197,11 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
|
||||||
metaData.putAll(instance.getMetadata());
|
metaData.putAll(instance.getMetadata());
|
||||||
metaData.put(SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId());
|
metaData.put(SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId());
|
||||||
metaData.put(SkyWalkerConstants.SYNC_SOURCE_KEY,
|
metaData.put(SkyWalkerConstants.SYNC_SOURCE_KEY,
|
||||||
skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
|
skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
|
||||||
metaData.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, taskDO.getSourceClusterId());
|
metaData.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, taskDO.getSourceClusterId());
|
||||||
|
|
||||||
String servicePath = monitorPath.computeIfAbsent(taskDO.getTaskId(),
|
String servicePath = monitorPath.computeIfAbsent(taskDO.getTaskId(),
|
||||||
key -> convertDubboProvidersPath(metaData.get(DubboConstants.INTERFACE_KEY)));
|
key -> convertDubboProvidersPath(metaData.get(DubboConstants.INTERFACE_KEY)));
|
||||||
|
|
||||||
return convertDubboFullPathForZk(metaData, servicePath, instance.getIp(), instance.getPort());
|
return convertDubboFullPathForZk(metaData, servicePath, instance.getIp(), instance.getPort());
|
||||||
}
|
}
|
||||||
|
@ -234,7 +217,7 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
|
||||||
return pathChildrenCacheMap.computeIfAbsent(taskDO.getTaskId(), (key) -> {
|
return pathChildrenCacheMap.computeIfAbsent(taskDO.getTaskId(), (key) -> {
|
||||||
try {
|
try {
|
||||||
PathChildrenCache pathChildrenCache = new PathChildrenCache(
|
PathChildrenCache pathChildrenCache = new PathChildrenCache(
|
||||||
zookeeperServerHolder.get(taskDO.getDestClusterId(), ""), monitorPath.get(key), false);
|
zookeeperServerHolder.get(taskDO.getDestClusterId(), ""), monitorPath.get(key), false);
|
||||||
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
|
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
|
||||||
return pathChildrenCache;
|
return pathChildrenCache;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -246,4 +229,74 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private class SyncThread implements Runnable {
|
||||||
|
|
||||||
|
NamingService sourceNamingService;
|
||||||
|
|
||||||
|
TaskDO taskDO;
|
||||||
|
|
||||||
|
CuratorFramework client;
|
||||||
|
|
||||||
|
SyncThread(NamingService sourceNamingService, TaskDO taskDO, CuratorFramework client) {
|
||||||
|
this.sourceNamingService = sourceNamingService;
|
||||||
|
this.taskDO = taskDO;
|
||||||
|
this.client = client;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
|
||||||
|
//List<Instance> sourceInstances = sourceNamingService.getAllInstances(taskDO.getServiceName());
|
||||||
|
List<Instance> sourceInstances = sourceNamingService.getAllInstances(taskDO.getServiceName(), taskDO.getGroupName());//fix with no group
|
||||||
|
Set<String> newInstanceUrlSet = getWaitingToAddInstance(taskDO, client, sourceInstances);
|
||||||
|
|
||||||
|
// 获取之前的备份 删除无效实例
|
||||||
|
deleteInvalidInstances(taskDO, client, newInstanceUrlSet);
|
||||||
|
// 替换当前备份为最新备份
|
||||||
|
instanceBackupMap.put(taskDO.getTaskId(), newInstanceUrlSet);
|
||||||
|
// 尝试恢复因为zk客户端意外断开导致的实例数据
|
||||||
|
tryToCompensate(taskDO, sourceNamingService, filterNeedSync(sourceInstances));
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("event process fail, taskId:{}", taskDO.getTaskId(), e);
|
||||||
|
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
//serviceNameSet.remove(((NamingEvent) event).getServiceName());//如果考虑高实时性 可以手动remove 这样时间窗口的大小就不固定 依赖处理速度 窗口大小作为兜底
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<Instance> filterNeedSync(List<Instance> sourceInstances) {
|
||||||
|
Iterator<Instance> iterator = sourceInstances.iterator();
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
if (!needSync(iterator.next().getMetadata())) {
|
||||||
|
iterator.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return sourceInstances;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean addSyncService(TaskDO taskDO) {
|
||||||
|
try {
|
||||||
|
NamingService sourceNamingService =
|
||||||
|
//nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());
|
||||||
|
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace());//fix with no nameSpaceName
|
||||||
|
CuratorFramework client = zookeeperServerHolder.get(taskDO.getDestClusterId(), taskDO.getGroupName());
|
||||||
|
nacosListenerMap.putIfAbsent(taskDO.getTaskId(), event -> {
|
||||||
|
if (event instanceof NamingEvent) {
|
||||||
|
if (serviceNameSet.set(((NamingEvent) event).getServiceName())) {// add event merge
|
||||||
|
EXECUTOR.execute(new SyncThread(sourceNamingService, taskDO, client));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
sourceNamingService.subscribe(taskDO.getServiceName(), nacosListenerMap.get(taskDO.getTaskId()));
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("sync task from nacos to zk was failed, taskId:{}", taskDO.getTaskId(), e);
|
||||||
|
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,41 +12,24 @@
|
||||||
*/
|
*/
|
||||||
package com.alibaba.nacossync.extension.impl;
|
package com.alibaba.nacossync.extension.impl;
|
||||||
|
|
||||||
import static com.alibaba.nacossync.util.DubboConstants.ALL_SERVICE_NAME_PATTERN;
|
|
||||||
import static com.alibaba.nacossync.util.DubboConstants.DUBBO_PATH_FORMAT;
|
|
||||||
import static com.alibaba.nacossync.util.DubboConstants.DUBBO_ROOT_PATH;
|
|
||||||
import static com.alibaba.nacossync.util.DubboConstants.GROUP_KEY;
|
|
||||||
import static com.alibaba.nacossync.util.DubboConstants.INSTANCE_IP_KEY;
|
|
||||||
import static com.alibaba.nacossync.util.DubboConstants.INSTANCE_PORT_KEY;
|
|
||||||
import static com.alibaba.nacossync.util.DubboConstants.INTERFACE_KEY;
|
|
||||||
import static com.alibaba.nacossync.util.DubboConstants.PROTOCOL_KEY;
|
|
||||||
import static com.alibaba.nacossync.util.DubboConstants.VERSION_KEY;
|
|
||||||
import static com.alibaba.nacossync.util.DubboConstants.WEIGHT_KEY;
|
|
||||||
import static com.alibaba.nacossync.util.DubboConstants.ZOOKEEPER_SEPARATOR;
|
|
||||||
import static com.alibaba.nacossync.util.DubboConstants.createServiceName;
|
|
||||||
import static com.alibaba.nacossync.util.StringUtils.parseIpAndPortString;
|
|
||||||
import static com.alibaba.nacossync.util.StringUtils.parseQueryString;
|
|
||||||
|
|
||||||
import com.alibaba.nacos.api.exception.NacosException;
|
import com.alibaba.nacos.api.exception.NacosException;
|
||||||
import com.alibaba.nacos.api.naming.NamingService;
|
import com.alibaba.nacos.api.naming.NamingService;
|
||||||
import com.alibaba.nacos.api.naming.pojo.Instance;
|
import com.alibaba.nacos.api.naming.pojo.Instance;
|
||||||
|
import com.alibaba.nacos.client.naming.NacosNamingService;
|
||||||
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
|
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
|
||||||
import com.alibaba.nacossync.constant.ClusterTypeEnum;
|
import com.alibaba.nacossync.constant.ClusterTypeEnum;
|
||||||
import com.alibaba.nacossync.constant.MetricsStatisticsType;
|
import com.alibaba.nacossync.constant.MetricsStatisticsType;
|
||||||
|
import com.alibaba.nacossync.constant.ShardingLogTypeEnum;
|
||||||
import com.alibaba.nacossync.constant.SkyWalkerConstants;
|
import com.alibaba.nacossync.constant.SkyWalkerConstants;
|
||||||
import com.alibaba.nacossync.extension.SyncService;
|
import com.alibaba.nacossync.extension.SyncService;
|
||||||
import com.alibaba.nacossync.extension.annotation.NacosSyncService;
|
import com.alibaba.nacossync.extension.annotation.NacosSyncService;
|
||||||
import com.alibaba.nacossync.extension.holder.NacosServerHolder;
|
import com.alibaba.nacossync.extension.holder.NacosServerHolder;
|
||||||
import com.alibaba.nacossync.extension.holder.ZookeeperServerHolder;
|
import com.alibaba.nacossync.extension.holder.ZookeeperServerHolder;
|
||||||
|
import com.alibaba.nacossync.extension.impl.extend.Sharding;
|
||||||
|
import com.alibaba.nacossync.extension.impl.extend.ZookeeperSyncToNacosServiceSharding;
|
||||||
import com.alibaba.nacossync.monitor.MetricsManager;
|
import com.alibaba.nacossync.monitor.MetricsManager;
|
||||||
|
import com.alibaba.nacossync.pojo.ShardingLog;
|
||||||
import com.alibaba.nacossync.pojo.model.TaskDO;
|
import com.alibaba.nacossync.pojo.model.TaskDO;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.function.Predicate;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.curator.framework.CuratorFramework;
|
import org.apache.curator.framework.CuratorFramework;
|
||||||
|
@ -55,6 +38,17 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
|
||||||
import org.apache.curator.utils.CloseableUtils;
|
import org.apache.curator.utils.CloseableUtils;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static com.alibaba.nacossync.util.DubboConstants.*;
|
||||||
|
import static com.alibaba.nacossync.util.StringUtils.parseIpAndPortString;
|
||||||
|
import static com.alibaba.nacossync.util.StringUtils.parseQueryString;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author paderlol
|
* @author paderlol
|
||||||
* @version 1.0
|
* @version 1.0
|
||||||
|
@ -82,6 +76,12 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
||||||
|
|
||||||
private final SkyWalkerCacheServices skyWalkerCacheServices;
|
private final SkyWalkerCacheServices skyWalkerCacheServices;
|
||||||
|
|
||||||
|
@Resource(type = ZookeeperSyncToNacosServiceSharding.class)
|
||||||
|
private Sharding sharding;
|
||||||
|
|
||||||
|
//排除/dobbo下面的所有非服务节点
|
||||||
|
private static final List<String> IGNORED_DUBBO_PATH = Stream.of("mapping", "metadata", "yellow").collect(Collectors.toList());
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
public ZookeeperSyncToNacosServiceImpl(ZookeeperServerHolder zookeeperServerHolder,
|
public ZookeeperSyncToNacosServiceImpl(ZookeeperServerHolder zookeeperServerHolder,
|
||||||
NacosServerHolder nacosServerHolder, SkyWalkerCacheServices skyWalkerCacheServices) {
|
NacosServerHolder nacosServerHolder, SkyWalkerCacheServices skyWalkerCacheServices) {
|
||||||
|
@ -96,7 +96,6 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
||||||
if (treeCacheMap.containsKey(taskDO.getTaskId())) {
|
if (treeCacheMap.containsKey(taskDO.getTaskId())) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
TreeCache treeCache = getTreeCache(taskDO);
|
TreeCache treeCache = getTreeCache(taskDO);
|
||||||
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), null);
|
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), null);
|
||||||
// 初次执行任务统一注册所有实例
|
// 初次执行任务统一注册所有实例
|
||||||
|
@ -104,10 +103,16 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
||||||
//注册ZK监听
|
//注册ZK监听
|
||||||
Objects.requireNonNull(treeCache).getListenable().addListener((client, event) -> {
|
Objects.requireNonNull(treeCache).getListenable().addListener((client, event) -> {
|
||||||
try {
|
try {
|
||||||
|
|
||||||
String path = event.getData().getPath();
|
String path = event.getData().getPath();
|
||||||
|
if (!com.alibaba.nacossync.util.StringUtils.isDubboProviderPath(path)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
Map<String, String> queryParam = parseQueryString(path);
|
Map<String, String> queryParam = parseQueryString(path);
|
||||||
|
//add sharding
|
||||||
|
if (!isProcess(taskDO, destNamingService, queryParam.get(INTERFACE_KEY)))
|
||||||
|
return;
|
||||||
if (isMatch(taskDO, queryParam) && needSync(queryParam)) {
|
if (isMatch(taskDO, queryParam) && needSync(queryParam)) {
|
||||||
|
log.info("sync sharding Zookeeper to Nacos serviceName:{},local servicesName :{}", queryParam.get(INTERFACE_KEY), sharding.getLocalServices(null));
|
||||||
processEvent(taskDO, destNamingService, event, path, queryParam);
|
processEvent(taskDO, destNamingService, event, path, queryParam);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -126,9 +131,6 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
||||||
|
|
||||||
private void processEvent(TaskDO taskDO, NamingService destNamingService, TreeCacheEvent event, String path,
|
private void processEvent(TaskDO taskDO, NamingService destNamingService, TreeCacheEvent event, String path,
|
||||||
Map<String, String> queryParam) throws NacosException {
|
Map<String, String> queryParam) throws NacosException {
|
||||||
if(!com.alibaba.nacossync.util.StringUtils.isDubboProviderPath(path)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
Map<String, String> ipAndPortParam = parseIpAndPortString(path);
|
Map<String, String> ipAndPortParam = parseIpAndPortString(path);
|
||||||
Instance instance = buildSyncInstance(queryParam, ipAndPortParam, taskDO);
|
Instance instance = buildSyncInstance(queryParam, ipAndPortParam, taskDO);
|
||||||
|
@ -138,15 +140,18 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
||||||
case NODE_UPDATED:
|
case NODE_UPDATED:
|
||||||
|
|
||||||
destNamingService.registerInstance(
|
destNamingService.registerInstance(
|
||||||
getServiceNameFromCache(serviceName, queryParam), instance);
|
getServiceNameFromCache(serviceName, queryParam), instance);
|
||||||
|
//getServiceNameFromCache(serviceName, queryParam, instance), instance);
|
||||||
|
log.info("syn add service : {} ,instance:{}", serviceName, instance);
|
||||||
break;
|
break;
|
||||||
case NODE_REMOVED:
|
case NODE_REMOVED:
|
||||||
|
|
||||||
destNamingService.deregisterInstance(
|
destNamingService.deregisterInstance(
|
||||||
getServiceNameFromCache(serviceName, queryParam),
|
getServiceNameFromCache(serviceName, queryParam),
|
||||||
ipAndPortParam.get(INSTANCE_IP_KEY),
|
ipAndPortParam.get(INSTANCE_IP_KEY),
|
||||||
Integer.parseInt(ipAndPortParam.get(INSTANCE_PORT_KEY)));
|
Integer.parseInt(ipAndPortParam.get(INSTANCE_PORT_KEY)));
|
||||||
nacosServiceNameMap.remove(serviceName);
|
nacosServiceNameMap.remove(serviceName);
|
||||||
|
log.info("syn delete service : {} ,instance:{}", serviceName, instance);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
|
@ -154,26 +159,34 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void registerAllInstances(TaskDO taskDO, NamingService destNamingService) throws Exception {
|
private void registerAllInstances(TaskDO taskDO, NamingService destNamingService) throws Exception {
|
||||||
CuratorFramework zk = zookeeperServerHolder.get(taskDO.getSourceClusterId(), "");
|
CuratorFramework zk = zookeeperServerHolder.get(taskDO.getSourceClusterId(), "");
|
||||||
if(!ALL_SERVICE_NAME_PATTERN.equals(taskDO.getServiceName())) {
|
sharding.start(taskDO);//幂等 可重复添加
|
||||||
registerALLInstances0(taskDO, destNamingService, zk, taskDO.getServiceName());
|
if (!ALL_SERVICE_NAME_PATTERN.equals(taskDO.getServiceName())) {
|
||||||
|
sharding.doSharding(null, new ArrayList<>(Arrays.asList(taskDO.getServiceName())));
|
||||||
|
TreeSet<String> shardingServices = sharding.getLocalServices(null);
|
||||||
|
if (shardingServices.contains(taskDO.getServiceName())) {
|
||||||
|
registerALLInstances0(taskDO, destNamingService, zk, taskDO.getServiceName());
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// 同步全部
|
// 同步全部
|
||||||
List<String> serviceList = zk.getChildren().forPath(DUBBO_ROOT_PATH);
|
List<String> serviceList = zk.getChildren().forPath(DUBBO_ROOT_PATH);
|
||||||
for(String serviceName : serviceList) {
|
sharding.doSharding(null, filterNoProviderPath(serviceList));
|
||||||
registerALLInstances0(taskDO, destNamingService, zk, serviceName);
|
TreeSet<String> shardingServices = sharding.getLocalServices(null);
|
||||||
|
for (String serviceName : serviceList) {
|
||||||
|
if (shardingServices.contains(serviceName) && !IGNORED_DUBBO_PATH.contains(serviceName))//add
|
||||||
|
registerALLInstances0(taskDO, destNamingService, zk, serviceName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void registerALLInstances0(TaskDO taskDO, NamingService destNamingService, CuratorFramework zk,
|
private void registerALLInstances0(TaskDO taskDO, NamingService destNamingService, CuratorFramework zk,
|
||||||
String serviceName) throws Exception {
|
String serviceName) throws Exception {
|
||||||
String path = String.format(DUBBO_PATH_FORMAT, serviceName);
|
String path = String.format(DUBBO_PATH_FORMAT, serviceName);
|
||||||
if(zk.getChildren()==null) {
|
if (zk.getChildren() == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
List<String> providers = zk.getChildren().forPath(path);
|
List<String> providers = zk.getChildren().forPath(path);
|
||||||
for(String provider : providers) {
|
for (String provider : providers) {
|
||||||
Map<String, String> queryParam = parseQueryString(provider);
|
Map<String, String> queryParam = parseQueryString(provider);
|
||||||
if (isMatch(taskDO, queryParam) && needSync(queryParam)) {
|
if (isMatch(taskDO, queryParam) && needSync(queryParam)) {
|
||||||
Map<String, String> ipAndPortParam = parseIpAndPortString(path + ZOOKEEPER_SEPARATOR + provider);
|
Map<String, String> ipAndPortParam = parseIpAndPortString(path + ZOOKEEPER_SEPARATOR + provider);
|
||||||
|
@ -186,14 +199,14 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean delete(TaskDO taskDO) {
|
public boolean delete(TaskDO taskDO) {
|
||||||
if (taskDO.getServiceName() == null) {
|
if (taskDO.getServiceName() == null) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
|
||||||
CloseableUtils.closeQuietly(treeCacheMap.get(taskDO.getTaskId()));
|
CloseableUtils.closeQuietly(treeCacheMap.get(taskDO.getTaskId()));
|
||||||
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), null);
|
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), null);
|
||||||
if(!ALL_SERVICE_NAME_PATTERN.equals(taskDO.getServiceName())) {
|
if (!ALL_SERVICE_NAME_PATTERN.equals(taskDO.getServiceName())) {
|
||||||
if (nacosServiceNameMap.containsKey(taskDO.getServiceName())) {
|
if (nacosServiceNameMap.containsKey(taskDO.getServiceName())) {
|
||||||
List<Instance> allInstances =
|
List<Instance> allInstances =
|
||||||
destNamingService.getAllInstances(nacosServiceNameMap.get(taskDO.getServiceName()));
|
destNamingService.getAllInstances(nacosServiceNameMap.get(taskDO.getServiceName()));
|
||||||
|
@ -208,15 +221,15 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Set<String> serviceNames = nacosServiceNameMap.keySet();
|
Set<String> serviceNames = nacosServiceNameMap.keySet();
|
||||||
for(String serviceName : serviceNames) {
|
for (String serviceName : serviceNames) {
|
||||||
|
|
||||||
if (nacosServiceNameMap.containsKey(serviceName)) {
|
if (nacosServiceNameMap.containsKey(serviceName)) {
|
||||||
List<Instance> allInstances =
|
List<Instance> allInstances =
|
||||||
destNamingService.getAllInstances(serviceName);
|
destNamingService.getAllInstances(serviceName);
|
||||||
for (Instance instance : allInstances) {
|
for (Instance instance : allInstances) {
|
||||||
if (needDelete(instance.getMetadata(), taskDO)) {
|
if (needDelete(instance.getMetadata(), taskDO)) {
|
||||||
destNamingService.deregisterInstance(instance.getServiceName(), instance.getIp(),
|
destNamingService.deregisterInstance(instance.getServiceName(), instance.getIp(),
|
||||||
instance.getPort());
|
instance.getPort());
|
||||||
}
|
}
|
||||||
nacosServiceNameMap.remove(serviceName);
|
nacosServiceNameMap.remove(serviceName);
|
||||||
|
|
||||||
|
@ -226,8 +239,6 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("delete task from zookeeper to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
|
log.error("delete task from zookeeper to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
|
||||||
metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
|
metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
|
||||||
|
@ -243,8 +254,8 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
||||||
return treeCacheMap.computeIfAbsent(taskDO.getTaskId(), (key) -> {
|
return treeCacheMap.computeIfAbsent(taskDO.getTaskId(), (key) -> {
|
||||||
try {
|
try {
|
||||||
TreeCache treeCache =
|
TreeCache treeCache =
|
||||||
new TreeCache(zookeeperServerHolder.get(taskDO.getSourceClusterId(), ""),
|
new TreeCache(zookeeperServerHolder.get(taskDO.getSourceClusterId(), ""),
|
||||||
DUBBO_ROOT_PATH);
|
DUBBO_ROOT_PATH);
|
||||||
treeCache.start();
|
treeCache.start();
|
||||||
return treeCache;
|
return treeCache;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -261,20 +272,20 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
||||||
*/
|
*/
|
||||||
protected boolean isMatch(TaskDO taskDO, Map<String, String> queryParam) {
|
protected boolean isMatch(TaskDO taskDO, Map<String, String> queryParam) {
|
||||||
Predicate<TaskDO> isVersionEq = (task) -> StringUtils.isBlank(taskDO.getVersion())
|
Predicate<TaskDO> isVersionEq = (task) -> StringUtils.isBlank(taskDO.getVersion())
|
||||||
|| StringUtils.equals(task.getVersion(), queryParam.get(VERSION_KEY));
|
|| StringUtils.equals(task.getVersion(), queryParam.get(VERSION_KEY));
|
||||||
Predicate<TaskDO> isGroupEq = (task) -> StringUtils.isBlank(taskDO.getGroupName())
|
Predicate<TaskDO> isGroupEq = (task) -> StringUtils.isBlank(taskDO.getGroupName()) || StringUtils.isBlank(queryParam.get(GROUP_KEY)) //fix
|
||||||
|| StringUtils.equals(task.getGroupName(), queryParam.get(GROUP_KEY));
|
|| StringUtils.equals(task.getGroupName(), queryParam.get(GROUP_KEY));
|
||||||
return isVersionEq.and(isGroupEq).test(taskDO);
|
return isVersionEq.and(isGroupEq).test(taskDO);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* create Nacos service instance
|
* create Nacos service instance
|
||||||
*
|
*
|
||||||
* @param queryParam dubbo metadata
|
* @param queryParam dubbo metadata
|
||||||
* @param ipAndPortMap dubbo ip and address
|
* @param ipAndPortMap dubbo ip and address
|
||||||
*/
|
*/
|
||||||
protected Instance buildSyncInstance(Map<String, String> queryParam, Map<String, String> ipAndPortMap,
|
protected Instance buildSyncInstance(Map<String, String> queryParam, Map<String, String> ipAndPortMap,
|
||||||
TaskDO taskDO) {
|
TaskDO taskDO) {
|
||||||
Instance temp = new Instance();
|
Instance temp = new Instance();
|
||||||
temp.setIp(ipAndPortMap.get(INSTANCE_IP_KEY));
|
temp.setIp(ipAndPortMap.get(INSTANCE_IP_KEY));
|
||||||
temp.setPort(Integer.parseInt(ipAndPortMap.get(INSTANCE_PORT_KEY)));
|
temp.setPort(Integer.parseInt(ipAndPortMap.get(INSTANCE_PORT_KEY)));
|
||||||
|
@ -286,7 +297,7 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
||||||
metaData.put(PROTOCOL_KEY, ipAndPortMap.get(PROTOCOL_KEY));
|
metaData.put(PROTOCOL_KEY, ipAndPortMap.get(PROTOCOL_KEY));
|
||||||
metaData.put(SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId());
|
metaData.put(SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId());
|
||||||
metaData.put(SkyWalkerConstants.SYNC_SOURCE_KEY,
|
metaData.put(SkyWalkerConstants.SYNC_SOURCE_KEY,
|
||||||
skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
|
skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
|
||||||
metaData.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, taskDO.getSourceClusterId());
|
metaData.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, taskDO.getSourceClusterId());
|
||||||
temp.setMetadata(metaData);
|
temp.setMetadata(metaData);
|
||||||
return temp;
|
return temp;
|
||||||
|
@ -296,10 +307,76 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
||||||
* cteate Dubbo service name
|
* cteate Dubbo service name
|
||||||
*
|
*
|
||||||
* @param serviceName dubbo service name
|
* @param serviceName dubbo service name
|
||||||
* @param queryParam dubbo metadata
|
* @param queryParam dubbo metadata
|
||||||
*/
|
*/
|
||||||
protected String getServiceNameFromCache(String serviceName, Map<String, String> queryParam) {
|
protected String getServiceNameFromCache(String serviceName, Map<String, String> queryParam) {
|
||||||
return nacosServiceNameMap.computeIfAbsent(serviceName, (key) -> createServiceName(queryParam));
|
return nacosServiceNameMap.computeIfAbsent(serviceName, (key) -> createServiceName(queryParam));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<String> filterNoProviderPath(List<String> sourceInstances) {
|
||||||
|
Iterator<String> iterator = sourceInstances.iterator();
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
if (IGNORED_DUBBO_PATH.contains(iterator.next())) {
|
||||||
|
iterator.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return sourceInstances;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 取消service下的instance注册,防止在server变化的时候,server之前sharding的service被分配到其他server上,这里需要手动将这部分instance下线;
|
||||||
|
* (这里没有直接调用Nacos的deregisterInstance是因为在当前分布式下存在时序问题,可能导致该任务误删除其他server刚注册上的instance,这里使用停止发送心跳的方法,让instance自己下线)
|
||||||
|
*
|
||||||
|
* @param namingService
|
||||||
|
* @param serviceNames
|
||||||
|
*/
|
||||||
|
private void deregisterService(NamingService namingService, Queue<ShardingLog> serviceNames, TaskDO taskDO) {
|
||||||
|
log.info("zk->nacos current deal with serviceNames:" + sharding.getLocalServices(null));
|
||||||
|
log.info("zk->nacos current change serviceNames count:" + serviceNames.size());
|
||||||
|
while (!serviceNames.isEmpty()) {
|
||||||
|
ShardingLog shardingLog = serviceNames.poll();
|
||||||
|
if (!shardingLog.getType().equals(ShardingLogTypeEnum.DELETE.getType())) {
|
||||||
|
log.info("zk->nacos current add serviceName:{},will skip...", shardingLog.getServiceName());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
List<Instance> allInstances =
|
||||||
|
namingService.getAllInstances(nacosServiceNameMap.get(shardingLog.getServiceName()));
|
||||||
|
for (Instance instance : allInstances) {
|
||||||
|
if (needDelete(instance.getMetadata(), taskDO)) {
|
||||||
|
log.info("zk->nacos current will stop beat:" + instance.getIp() + instance.getPort() + " ,key:" + instance.getServiceName());
|
||||||
|
((NacosNamingService) namingService).getBeatReactor().removeBeatInfo(instance.getServiceName(), instance.getIp(), instance.getPort());
|
||||||
|
}
|
||||||
|
nacosServiceNameMap.remove(shardingLog.getServiceName());
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("deregisterService faild ,cause by:{}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 判断是否本机处理的service
|
||||||
|
*
|
||||||
|
* @param taskDO
|
||||||
|
* @param destNamingService
|
||||||
|
* @param serviceName
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private boolean isProcess(TaskDO taskDO, NamingService destNamingService, String serviceName) {
|
||||||
|
try {
|
||||||
|
if (IGNORED_DUBBO_PATH.contains(serviceName))
|
||||||
|
return false;
|
||||||
|
sharding.doSharding(null, new ArrayList<>(Arrays.asList(serviceName)));
|
||||||
|
deregisterService(destNamingService, sharding.getChangeService(), taskDO);
|
||||||
|
if (sharding.getLocalServices(null).contains(serviceName)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("zk->nacos sharding faild ,taskid:{}", taskDO.getId(), e);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,210 @@
|
||||||
|
package com.alibaba.nacossync.extension.impl.extend;
|
||||||
|
|
||||||
|
import com.alibaba.nacos.api.naming.NamingService;
|
||||||
|
import com.alibaba.nacos.client.naming.utils.NetUtils;
|
||||||
|
import com.alibaba.nacossync.constant.ShardingLogTypeEnum;
|
||||||
|
import com.alibaba.nacossync.extension.SyncManagerService;
|
||||||
|
import com.alibaba.nacossync.extension.holder.NacosServerHolder;
|
||||||
|
import com.alibaba.nacossync.extension.impl.NacosSyncToZookeeperServiceImpl;
|
||||||
|
import com.alibaba.nacossync.extension.sharding.ConsistentHashServiceSharding;
|
||||||
|
import com.alibaba.nacossync.extension.sharding.ServiceSharding;
|
||||||
|
import com.alibaba.nacossync.pojo.ShardingLog;
|
||||||
|
import com.alibaba.nacossync.pojo.model.TaskDO;
|
||||||
|
import com.alibaba.nacossync.util.DubboConstants;
|
||||||
|
import com.alibaba.nacossync.util.SkyWalkerUtil;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.BeanUtils;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Lazy;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by maj on 2020/10/29.
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
@Slf4j
|
||||||
|
public class NacosSyncToZookeeperServicesSharding implements Sharding {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private NacosServerHolder nacosServerHolder;
|
||||||
|
|
||||||
|
public static final int DEFAULT_SERVICE_PAGENO = 1;
|
||||||
|
|
||||||
|
public static final int DEFAULT_SERVICE_PAGE_SIZE = 10000;
|
||||||
|
|
||||||
|
private volatile String serviceListMd5;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private SyncManagerService syncManagerService;
|
||||||
|
|
||||||
|
@Lazy
|
||||||
|
@Resource(type = ConsistentHashServiceSharding.class)
|
||||||
|
private ServiceSharding serviceSharding;
|
||||||
|
|
||||||
|
private static final String SHARDING_KEY_NAME = NacosSyncToZookeeperServicesSharding.class.getName();
|
||||||
|
|
||||||
|
private static final long DEFAULT_SERVICES_CHANGE_THREAD_DELAY = 10;
|
||||||
|
|
||||||
|
private static final long DEFAULT_SERVICES_CHANGE_THREAD_INTERVAL = 5;
|
||||||
|
|
||||||
|
//add cache taskDO
|
||||||
|
private Map<String, TaskDO> taskDOMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
@Value("${server.port}")
|
||||||
|
private String serverPort;
|
||||||
|
|
||||||
|
|
||||||
|
private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
|
||||||
|
@Override
|
||||||
|
public Thread newThread(Runnable r) {
|
||||||
|
Thread thread = new Thread(r);
|
||||||
|
thread.setDaemon(true);
|
||||||
|
thread.setName(" com.alibaba.nacossync.sharding.getServiceName");
|
||||||
|
return thread;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
protected boolean servicesIschanged(TaskDO taskDO) throws Exception {
|
||||||
|
NamingService sourceNamingService =
|
||||||
|
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace());
|
||||||
|
List<String> serviceNames = sourceNamingService.getServicesOfServer(DEFAULT_SERVICE_PAGENO, DEFAULT_SERVICE_PAGE_SIZE, SkyWalkerUtil.getGroupName(taskDO.getGroupName())).getData();
|
||||||
|
Collections.sort(serviceNames);
|
||||||
|
String md5 = SkyWalkerUtil.StringToMd5(serviceNames.toString());
|
||||||
|
if (!md5.equals(serviceListMd5)) {
|
||||||
|
serviceListMd5 = md5;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected synchronized void reSubscribeService(TaskDO taskDO) {
|
||||||
|
log.error("reSubscribe start");
|
||||||
|
if (Objects.isNull(taskDO)) return;
|
||||||
|
try {
|
||||||
|
NamingService sourceNamingService =
|
||||||
|
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace());
|
||||||
|
List<String> serviceNames = sourceNamingService.getServicesOfServer(DEFAULT_SERVICE_PAGENO, DEFAULT_SERVICE_PAGE_SIZE, taskDO.getGroupName()).getData();//如果使用同一个groupName暂时没问题,如果配置了多个group,需要升级sdk1.4+,支持按照*的group查询
|
||||||
|
serviceSharding.sharding(SHARDING_KEY_NAME, serviceNames);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("reSubscribe faild,task id:{}", taskDO.getId(), e);
|
||||||
|
}
|
||||||
|
if (!serviceSharding.getChangeServices(SHARDING_KEY_NAME).isEmpty()) {
|
||||||
|
try {
|
||||||
|
syncChangedServices();
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("reSubscribe -->delete service faild,task id:{}", taskDO.getId(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//暂时不支持多source_cluster_id,多nammespace维度(默认取第一个task的source_cluster_id和namespace)
|
||||||
|
@Override
|
||||||
|
public void start(TaskDO taskDO) {
|
||||||
|
taskDOMap.putIfAbsent(taskDO.getServiceName(), taskDO);
|
||||||
|
if (!serviceSharding.addServerChange(SHARDING_KEY_NAME, this)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
executorService.scheduleWithFixedDelay(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
if (servicesIschanged(taskDO)) {
|
||||||
|
reSubscribeService(taskDO);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("schedule reSubscribe service thread faild ,task id :", taskDO.getId(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, DEFAULT_SERVICES_CHANGE_THREAD_DELAY, DEFAULT_SERVICES_CHANGE_THREAD_INTERVAL, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onServerChange() {
|
||||||
|
if (taskDOMap.size() > 0) {
|
||||||
|
for (TaskDO taskDO : taskDOMap.values()) {//任意取一个taskDo
|
||||||
|
reSubscribeService(taskDO);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Queue<ShardingLog> getChangeService() {
|
||||||
|
return serviceSharding.getChangeServices(SHARDING_KEY_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void doSharding(String key, List<String> serviceNames) {
|
||||||
|
serviceSharding.sharding(SHARDING_KEY_NAME, serviceNames);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TreeSet<String> getLocalServices(String key) {
|
||||||
|
return serviceSharding.getLocalServices(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void syncAddedServices(String serviceName) {
|
||||||
|
if (taskDOMap.containsKey(DubboConstants.ALL_SERVICE_NAME_PATTERN)) {//如果有配置为* 的 则不用处理单独配置serviceName的task
|
||||||
|
TaskDO taskDO = buildNewTaskDo(taskDOMap.get(DubboConstants.ALL_SERVICE_NAME_PATTERN), serviceName);
|
||||||
|
((NacosSyncToZookeeperServiceImpl) syncManagerService.getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId())).addSyncService(taskDO);
|
||||||
|
log.info("reSubscribe ,{} is add", serviceName);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (taskDOMap.containsKey(serviceName)) {//如果有配置变更的serviceName,而且sharding到本server则处理
|
||||||
|
TaskDO taskDO = buildNewTaskDo(taskDOMap.get(serviceName), serviceName);
|
||||||
|
((NacosSyncToZookeeperServiceImpl) syncManagerService.getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId())).addSyncService(taskDO);
|
||||||
|
log.info("reSubscribe ,{} is add", serviceName);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void syncRemovedServices(String serviceName) {
|
||||||
|
if (taskDOMap.containsKey(DubboConstants.ALL_SERVICE_NAME_PATTERN)) {
|
||||||
|
TaskDO taskDO = buildNewTaskDo(taskDOMap.get(DubboConstants.ALL_SERVICE_NAME_PATTERN), serviceName);
|
||||||
|
syncManagerService.getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId()).delete(taskDO);
|
||||||
|
log.info("reSubscribe ,{} is remove", serviceName);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (taskDOMap.containsKey(serviceName)) {
|
||||||
|
TaskDO taskDO = buildNewTaskDo(taskDOMap.get(serviceName), serviceName);
|
||||||
|
syncManagerService.getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId()).delete(taskDO);
|
||||||
|
log.info("reSubscribe ,{} is remove", serviceName);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void syncChangedServices() {
|
||||||
|
log.info("reSubscribe ,local ip: {},current sharding service:{}", NetUtils.localIP() + ":" + serverPort, serviceSharding.getLocalServices(SHARDING_KEY_NAME).toString());
|
||||||
|
while (!serviceSharding.getChangeServices(SHARDING_KEY_NAME).isEmpty()) {
|
||||||
|
ShardingLog shardingLog = serviceSharding.getChangeServices(SHARDING_KEY_NAME).poll();
|
||||||
|
if (shardingLog.getType().equals(ShardingLogTypeEnum.ADD.getType())) {
|
||||||
|
syncAddedServices(shardingLog.getServiceName());
|
||||||
|
}
|
||||||
|
if (shardingLog.getType().equals(ShardingLogTypeEnum.DELETE.getType())) {
|
||||||
|
syncRemovedServices(shardingLog.getServiceName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop(TaskDO taskDO) {
|
||||||
|
if (taskDOMap.containsKey(taskDO.getServiceName())) {
|
||||||
|
taskDOMap.remove(taskDO.getServiceName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private TaskDO buildNewTaskDo(TaskDO taskDO, String serviceName) {
|
||||||
|
TaskDO taskDO1 = new TaskDO();
|
||||||
|
BeanUtils.copyProperties(taskDO, taskDO1);
|
||||||
|
taskDO1.setTaskId(serviceName);//需要一个key替换以前的taskid很多封装维度,暂时使用serviceName
|
||||||
|
taskDO1.setServiceName(serviceName);
|
||||||
|
return taskDO1;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,26 @@
|
||||||
|
package com.alibaba.nacossync.extension.impl.extend;
|
||||||
|
|
||||||
|
import com.alibaba.nacossync.pojo.ShardingLog;
|
||||||
|
import com.alibaba.nacossync.pojo.model.TaskDO;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by maj on 2020/10/30.
|
||||||
|
*/
|
||||||
|
public interface Sharding {
|
||||||
|
|
||||||
|
public void onServerChange();
|
||||||
|
|
||||||
|
public void start(TaskDO taskDO);
|
||||||
|
|
||||||
|
public void stop(TaskDO taskDO);
|
||||||
|
|
||||||
|
public void doSharding(String key, List<String> serviceNames);
|
||||||
|
|
||||||
|
public TreeSet<String> getLocalServices(String key);
|
||||||
|
|
||||||
|
public Queue<ShardingLog> getChangeService();
|
||||||
|
}
|
|
@ -0,0 +1,84 @@
|
||||||
|
package com.alibaba.nacossync.extension.impl.extend;
|
||||||
|
|
||||||
|
import com.alibaba.nacossync.extension.sharding.ConsistentHashServiceSharding;
|
||||||
|
import com.alibaba.nacossync.extension.sharding.ServiceSharding;
|
||||||
|
import com.alibaba.nacossync.pojo.ShardingLog;
|
||||||
|
import com.alibaba.nacossync.pojo.model.TaskDO;
|
||||||
|
import com.alibaba.nacossync.util.SkyWalkerUtil;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.context.annotation.Lazy;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by maj on 2020/10/30.
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
@Slf4j
|
||||||
|
public class ZookeeperSyncToNacosServiceSharding implements Sharding {
|
||||||
|
|
||||||
|
private static final String SHARDING_KEY_NAME = ZookeeperSyncToNacosServiceSharding.class.getName();
|
||||||
|
|
||||||
|
private volatile String serviceListMd5;
|
||||||
|
|
||||||
|
private volatile boolean serverChange = false;
|
||||||
|
|
||||||
|
@Lazy
|
||||||
|
@Resource(type = ConsistentHashServiceSharding.class)
|
||||||
|
private ServiceSharding serviceSharding;
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onServerChange() {
|
||||||
|
serverChange = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start(TaskDO taskDO) {
|
||||||
|
serviceSharding.addServerChange(SHARDING_KEY_NAME, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Queue<ShardingLog> getChangeService() {
|
||||||
|
return serviceSharding.getChangeServices(SHARDING_KEY_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void doSharding(String key, List<String> serviceNames) {
|
||||||
|
try {
|
||||||
|
if (servicesIschanged(serviceNames) || serverChange) {
|
||||||
|
log.info("zk ->nacos reshading start");
|
||||||
|
serverChange = false;
|
||||||
|
serviceSharding.sharding(SHARDING_KEY_NAME, serviceNames);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("zk ->nacos reshading faild.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TreeSet<String> getLocalServices(String key) {
|
||||||
|
return serviceSharding.getLocalServices(SHARDING_KEY_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean servicesIschanged(List<String> serviceNames) throws Exception {//zk区分不了是service变化还是instance变化
|
||||||
|
Collections.sort(serviceNames);
|
||||||
|
String md5 = SkyWalkerUtil.StringToMd5(serviceNames.toString());
|
||||||
|
if (!md5.equals(serviceListMd5)) {
|
||||||
|
serviceListMd5 = md5;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop(TaskDO taskDO) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,148 @@
|
||||||
|
package com.alibaba.nacossync.extension.sharding;
|
||||||
|
|
||||||
|
import com.alibaba.nacos.api.naming.listener.Event;
|
||||||
|
import com.alibaba.nacos.api.naming.listener.EventListener;
|
||||||
|
import com.alibaba.nacos.client.naming.utils.NetUtils;
|
||||||
|
import com.alibaba.nacossync.constant.ShardingLogTypeEnum;
|
||||||
|
import com.alibaba.nacossync.extension.impl.extend.Sharding;
|
||||||
|
import com.alibaba.nacossync.pojo.ShardingLog;
|
||||||
|
import com.alibaba.nacossync.util.SkyWalkerUtil;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.InitializingBean;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Lazy;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by maj on 2020/10/27.
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public abstract class AbstractServiceSharding implements ServiceSharding, InitializingBean {
|
||||||
|
|
||||||
|
protected volatile List<String> servers = new LinkedList<String>();
|
||||||
|
|
||||||
|
private Map<String, ConcurrentLinkedQueue<ShardingLog>> localServicesChangeMap = new ConcurrentHashMap<String, ConcurrentLinkedQueue<ShardingLog>>();
|
||||||
|
|
||||||
|
private volatile Map<String, TreeSet<String>> localServicesMap = new ConcurrentHashMap<String, TreeSet<String>>();
|
||||||
|
|
||||||
|
private final static String LOCAL_IP = NetUtils.localIP();
|
||||||
|
|
||||||
|
private volatile String serverListMd5;
|
||||||
|
|
||||||
|
private Map<String, Sharding> serverListens = new ConcurrentHashMap<String, Sharding>();
|
||||||
|
|
||||||
|
@Value("${server.port}")
|
||||||
|
private String serverPort;
|
||||||
|
|
||||||
|
@Lazy
|
||||||
|
@Resource(type = NacosServersManager.class)
|
||||||
|
private ServersManager serversManager;
|
||||||
|
|
||||||
|
protected List<String> getServers() {
|
||||||
|
return servers;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void listenServer() {
|
||||||
|
try {
|
||||||
|
serversManager.subscribeServers(new EventListener() {
|
||||||
|
@Override
|
||||||
|
public void onEvent(Event event) {
|
||||||
|
try {
|
||||||
|
shadingServers();
|
||||||
|
for (Sharding sharding : serverListens.values()) {
|
||||||
|
sharding.onServerChange();
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("subscribe servers faild.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("subscribe servers faild.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected void shadingServers() throws Exception {
|
||||||
|
List<String> serversList = serversManager.getServers();
|
||||||
|
Collections.sort(serversList);
|
||||||
|
String md5 = SkyWalkerUtil.StringToMd5(serversList.toString());
|
||||||
|
if (!md5.equals(serverListMd5)) {
|
||||||
|
servers = serversList;
|
||||||
|
serverListMd5 = md5;
|
||||||
|
doSharding();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
//need fix: 暂时同步话解决
|
||||||
|
protected void shadingServices(String key, List<String> serviceNames) {
|
||||||
|
if (!localServicesMap.containsKey(key)) {
|
||||||
|
TreeSet<String> localServicesSet = new TreeSet<String>();
|
||||||
|
localServicesMap.putIfAbsent(key, localServicesSet);
|
||||||
|
}
|
||||||
|
if (!localServicesChangeMap.containsKey(key)) {
|
||||||
|
ConcurrentLinkedQueue<ShardingLog> removeQueue = new ConcurrentLinkedQueue<ShardingLog>();
|
||||||
|
localServicesChangeMap.putIfAbsent(key, removeQueue);
|
||||||
|
}
|
||||||
|
TreeSet<String> localServices = localServicesMap.get(key);
|
||||||
|
try {
|
||||||
|
for (String serviceName : serviceNames) {
|
||||||
|
if (getShardingServer(serviceName).equals(LOCAL_IP + ":" + serverPort)) {
|
||||||
|
if (!localServices.contains(serviceName)) {
|
||||||
|
localServicesMap.get(key).add(serviceName);
|
||||||
|
localServicesChangeMap.get(key).offer(new ShardingLog(serviceName, ShardingLogTypeEnum.ADD.getType()));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (localServices.contains(serviceName)) {
|
||||||
|
localServicesMap.get(key).remove(serviceName);
|
||||||
|
localServicesChangeMap.get(key).offer(new ShardingLog(serviceName, ShardingLogTypeEnum.DELETE.getType()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("shading services faild.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//need fix:按照service维度做sharding,但是在service维度存在zk->nacos nacos->zk两种service,而目前避免环的处理在instance维度的metadata中,如果每次做sharding都去判断instance消耗太大,而且也不能完全避免service中存在多种源的instance,故目前做法是按照zk和nacos注册上的所有
|
||||||
|
//serviceName List做sharding,可能存在sharding不均衡问题,如导致大部分的service都落在一个node上的可能
|
||||||
|
@Override
|
||||||
|
public void sharding(String key, List<String> serviceNames) {
|
||||||
|
try {
|
||||||
|
shadingServices(key, serviceNames);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("sharding faild. sharding key is:{}", key, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean addServerChange(String key, Sharding sharding) {
|
||||||
|
return serverListens.putIfAbsent(key, sharding) == null ? true : false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TreeSet<String> getLocalServices(String key) {
|
||||||
|
return localServicesMap.get(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void doSharding();
|
||||||
|
|
||||||
|
protected abstract String getShardingServer(String key);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterPropertiesSet() throws Exception {
|
||||||
|
listenServer();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Queue<ShardingLog> getChangeServices(String key) {
|
||||||
|
return localServicesChangeMap.get(key);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,78 @@
|
||||||
|
package com.alibaba.nacossync.extension.sharding;
|
||||||
|
|
||||||
|
import com.alibaba.nacos.common.utils.StringUtils;
|
||||||
|
import com.alibaba.nacossync.util.SkyWalkerUtil;
|
||||||
|
import org.springframework.context.annotation.Lazy;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by maj on 2020/10/27.
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
@Lazy
|
||||||
|
public class ConsistentHashServiceSharding extends AbstractServiceSharding {
|
||||||
|
|
||||||
|
public static final String HASH_NODES = "-hash.vn.nodes";
|
||||||
|
|
||||||
|
private List<String> nodes = new LinkedList<String>();
|
||||||
|
|
||||||
|
private SortedMap<Integer, String> virtualNodes = new TreeMap<Integer, String>();
|
||||||
|
|
||||||
|
private static final int VIRTUAL_COUNT = 100;
|
||||||
|
|
||||||
|
public ConsistentHashServiceSharding() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void doSharding() {
|
||||||
|
List<String> servers = getServers();
|
||||||
|
nodes.clear();
|
||||||
|
virtualNodes.clear();
|
||||||
|
for (String node : servers) {
|
||||||
|
nodes.add(node);
|
||||||
|
}
|
||||||
|
for (String node : nodes) {
|
||||||
|
for (int i = 0; i < VIRTUAL_COUNT; i++) {
|
||||||
|
String virtualNodeName = node + HASH_NODES + String.valueOf(i);
|
||||||
|
virtualNodes.put(getHash(virtualNodeName), virtualNodeName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getShardingServer(String key) {
|
||||||
|
int hash = getHash(SkyWalkerUtil.StringToMd5(key));
|
||||||
|
SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);
|
||||||
|
String virtualNode;
|
||||||
|
if (subMap.isEmpty()) {
|
||||||
|
Integer i = virtualNodes.firstKey();
|
||||||
|
virtualNode = virtualNodes.get(i);
|
||||||
|
} else {
|
||||||
|
Integer i = subMap.firstKey();
|
||||||
|
virtualNode = subMap.get(i);
|
||||||
|
}
|
||||||
|
if (StringUtils.isNotBlank(virtualNode)) {
|
||||||
|
return virtualNode.substring(0, virtualNode.indexOf("-"));
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getHash(String str) {
|
||||||
|
final int p = 16777619;
|
||||||
|
int hash = (int) 2166136261L;
|
||||||
|
for (int i = 0; i < str.length(); i++)
|
||||||
|
hash = (hash ^ str.charAt(i)) * p;
|
||||||
|
hash += hash << 13;
|
||||||
|
hash ^= hash >> 7;
|
||||||
|
hash += hash << 3;
|
||||||
|
hash ^= hash >> 17;
|
||||||
|
hash += hash << 5;
|
||||||
|
if (hash < 0)
|
||||||
|
hash = Math.abs(hash);
|
||||||
|
return hash;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,106 @@
|
||||||
|
package com.alibaba.nacossync.extension.sharding;
|
||||||
|
|
||||||
|
import com.alibaba.nacos.api.PropertyKeyConst;
|
||||||
|
import com.alibaba.nacos.api.naming.NamingFactory;
|
||||||
|
import com.alibaba.nacos.api.naming.NamingService;
|
||||||
|
import com.alibaba.nacos.api.naming.listener.EventListener;
|
||||||
|
import com.alibaba.nacos.api.naming.pojo.Instance;
|
||||||
|
import com.alibaba.nacos.client.naming.utils.NetUtils;
|
||||||
|
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
|
||||||
|
import com.alibaba.nacossync.util.StringUtils;
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.InitializingBean;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Lazy;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by maj on 2020/10/27.
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
@Slf4j
|
||||||
|
@Lazy
|
||||||
|
public class NacosServersManager implements ServersManager<EventListener>, InitializingBean {
|
||||||
|
|
||||||
|
private NamingService namingService;
|
||||||
|
|
||||||
|
@Value("${server.port}")
|
||||||
|
private String serverPort;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private SkyWalkerCacheServices skyWalkerCacheServices;
|
||||||
|
|
||||||
|
//不动现有业务表和逻辑基础上,指定一个固定的key--"SHARDINGKEY" 生成md5值 作为默认连接的nacos路径
|
||||||
|
private static final String DEFAULT_SHARDING_NACOS_KEY = "3ac01a8c7501f121ab01efb920aa4764";
|
||||||
|
|
||||||
|
private static final String DEFAULT_SHARDING_NACOS_NAMESPACES = "public";
|
||||||
|
|
||||||
|
private static final String DEFAULT_SHARDING_NACOS_GOURPID = "shadinggroup";
|
||||||
|
|
||||||
|
private static final String DEFAULT_SHARDING_NACOS_SERVICENAME = "com.dmall.sharding";
|
||||||
|
|
||||||
|
@Value("${sharding.nacos.url}")
|
||||||
|
private String shardingNacosUrl;
|
||||||
|
|
||||||
|
@Value("${sharding.nacos.namespace}")
|
||||||
|
private String shardingNacosnameSpace;
|
||||||
|
|
||||||
|
@Value("${sharding.nacos.groupname}")
|
||||||
|
private String shardingNacosGroupName;
|
||||||
|
|
||||||
|
@Value("${sharding.nacos.servicename}")
|
||||||
|
private String shardingNacosServiceName;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<String> getServers() throws Exception {
|
||||||
|
List<Instance> instanceList = namingService.getAllInstances(DEFAULT_SHARDING_NACOS_SERVICENAME, DEFAULT_SHARDING_NACOS_GOURPID);
|
||||||
|
List<String> serverList = new LinkedList<String>();
|
||||||
|
for (Instance instance : instanceList) {
|
||||||
|
serverList.add(instance.getIp() + ":" + instance.getPort());
|
||||||
|
}
|
||||||
|
return serverList;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void subscribeServers(EventListener listener) throws Exception {
|
||||||
|
namingService.subscribe(DEFAULT_SHARDING_NACOS_SERVICENAME, DEFAULT_SHARDING_NACOS_GOURPID, listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void register(String ip, int port) throws Exception {
|
||||||
|
namingService.registerInstance(DEFAULT_SHARDING_NACOS_SERVICENAME, DEFAULT_SHARDING_NACOS_GOURPID, ip, port);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterPropertiesSet() throws Exception {
|
||||||
|
try {
|
||||||
|
log.info("start init nacos servers.");
|
||||||
|
if (StringUtils.isEmpty(shardingNacosUrl)) {
|
||||||
|
shardingNacosUrl = DEFAULT_SHARDING_NACOS_KEY;
|
||||||
|
}
|
||||||
|
Properties properties = new Properties();
|
||||||
|
properties.setProperty(PropertyKeyConst.SERVER_ADDR, getNacosUrl());
|
||||||
|
properties.setProperty(PropertyKeyConst.NAMESPACE, StringUtils.isEmpty(shardingNacosnameSpace) ? DEFAULT_SHARDING_NACOS_NAMESPACES : shardingNacosnameSpace);
|
||||||
|
namingService = NamingFactory.createNamingService(properties);
|
||||||
|
register(NetUtils.localIP(), Integer.parseInt(serverPort));
|
||||||
|
log.info("init nacos servers sucess.");
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.info("init nacos faild .", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getNacosUrl() {
|
||||||
|
if (!StringUtils.isIPV4AndPorts(shardingNacosUrl, ",")) {
|
||||||
|
List<String> allClusterConnectKey = skyWalkerCacheServices
|
||||||
|
.getAllClusterConnectKey(StringUtils.isEmpty(shardingNacosUrl) ? DEFAULT_SHARDING_NACOS_KEY : shardingNacosUrl);
|
||||||
|
return Joiner.on(",").join(allClusterConnectKey);
|
||||||
|
}
|
||||||
|
return shardingNacosUrl;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,17 @@
|
||||||
|
package com.alibaba.nacossync.extension.sharding;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by maj on 2020/10/27.
|
||||||
|
*/
|
||||||
|
public interface ServersManager<T> {
|
||||||
|
|
||||||
|
public List<String> getServers() throws Exception;
|
||||||
|
|
||||||
|
public void subscribeServers(T listener) throws Exception;
|
||||||
|
|
||||||
|
public void register(String ip, int port) throws Exception;
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,22 @@
|
||||||
|
package com.alibaba.nacossync.extension.sharding;
|
||||||
|
|
||||||
|
import com.alibaba.nacossync.extension.impl.extend.Sharding;
|
||||||
|
import com.alibaba.nacossync.pojo.ShardingLog;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by maj on 2020/10/27.
|
||||||
|
*/
|
||||||
|
public interface ServiceSharding {
|
||||||
|
|
||||||
|
public void sharding(String key, List<String> serviceNames);
|
||||||
|
|
||||||
|
public TreeSet<String> getLocalServices(String key);
|
||||||
|
|
||||||
|
public boolean addServerChange(String name, Sharding sharding);
|
||||||
|
|
||||||
|
public Queue<ShardingLog> getChangeServices(String key);
|
||||||
|
}
|
|
@ -0,0 +1,35 @@
|
||||||
|
package com.alibaba.nacossync.pojo;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by maj on 2020/11/18.
|
||||||
|
*/
|
||||||
|
public class ShardingLog {
|
||||||
|
|
||||||
|
private String serviceName;
|
||||||
|
|
||||||
|
private String type;
|
||||||
|
|
||||||
|
public ShardingLog() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public ShardingLog(String serviceName, String type) {
|
||||||
|
this.serviceName = serviceName;
|
||||||
|
this.type = type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getServiceName() {
|
||||||
|
return serviceName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setServiceName(String serviceName) {
|
||||||
|
this.serviceName = serviceName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getType() {
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setType(String type) {
|
||||||
|
this.type = type;
|
||||||
|
}
|
||||||
|
}
|
|
@ -31,5 +31,6 @@ public class TaskModel {
|
||||||
private String serviceName;
|
private String serviceName;
|
||||||
private String groupName;
|
private String groupName;
|
||||||
private String taskStatus;
|
private String taskStatus;
|
||||||
|
private String nameSpace;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,6 +56,7 @@ public class TaskDetailProcessor implements Processor<TaskDetailQueryRequest, Ta
|
||||||
taskModel.setSourceClusterId(taskDO.getSourceClusterId());
|
taskModel.setSourceClusterId(taskDO.getSourceClusterId());
|
||||||
taskModel.setTaskStatus(taskDO.getTaskStatus());
|
taskModel.setTaskStatus(taskDO.getTaskStatus());
|
||||||
taskModel.setTaskId(taskDO.getTaskId());
|
taskModel.setTaskId(taskDO.getTaskId());
|
||||||
|
taskModel.setNameSpace(taskDO.getNameSpace());
|
||||||
|
|
||||||
taskDetailQueryResult.setTaskModel(taskModel);
|
taskDetailQueryResult.setTaskModel(taskModel);
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,6 +74,7 @@ public class TaskListQueryProcessor implements Processor<TaskListQueryRequest, T
|
||||||
taskModel.setServiceName(taskDO.getServiceName());
|
taskModel.setServiceName(taskDO.getServiceName());
|
||||||
taskModel.setGroupName(taskDO.getGroupName());
|
taskModel.setGroupName(taskDO.getGroupName());
|
||||||
taskModel.setTaskStatus(taskDO.getTaskStatus());
|
taskModel.setTaskStatus(taskDO.getTaskStatus());
|
||||||
|
taskModel.setNameSpace(taskDO.getNameSpace());
|
||||||
taskList.add(taskModel);
|
taskList.add(taskModel);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,51 @@
|
||||||
|
package com.alibaba.nacossync.util;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.CopyOnWriteArraySet;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by maj on 2020/11/2.
|
||||||
|
*/
|
||||||
|
public class ExpirySet<T> {
|
||||||
|
|
||||||
|
private Set<T> set = new CopyOnWriteArraySet();
|
||||||
|
|
||||||
|
private Map<T, Long> expiryMap = new ConcurrentHashMap<T, Long>();
|
||||||
|
|
||||||
|
private long timeOut = 10;//默认10s窗口期
|
||||||
|
|
||||||
|
public ExpirySet() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public ExpirySet(long timeout) {
|
||||||
|
this.timeOut = timeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean set(T key) {
|
||||||
|
if (!set.add(key)) {
|
||||||
|
if (getIntervalBySecond(expiryMap.get(key), new Date().getTime()) > timeOut) {
|
||||||
|
expiryMap.putIfAbsent(key, new Date().getTime());
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
expiryMap.putIfAbsent(key, (new Date().getTime()));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean remove(T key) {
|
||||||
|
return set.remove(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getIntervalBySecond(long beforeTime, long currentTime) {
|
||||||
|
return (currentTime - beforeTime) / 1000;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -33,14 +33,14 @@ import java.util.Enumeration;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author NacosSync
|
* @author NacosSync
|
||||||
* @version $Id: SkyWalkerUtil.java, v 0.1 2018-09-26 AM12:10 NacosSync Exp $$
|
* @version $Id: SkyWalkerUtil.java, v 0.1 2018-09-26 AM12:10 NacosSync Exp $$
|
||||||
*/
|
*/
|
||||||
public class SkyWalkerUtil {
|
public class SkyWalkerUtil {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
|
||||||
* Gets the string md5
|
* Gets the string md5
|
||||||
|
*
|
||||||
* @param value
|
* @param value
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
|
@ -69,6 +69,7 @@ public class SkyWalkerUtil {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The rules of generating taskId
|
* The rules of generating taskId
|
||||||
|
*
|
||||||
* @param addTaskRequest
|
* @param addTaskRequest
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
|
@ -116,6 +117,7 @@ public class SkyWalkerUtil {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Avoid getting a return address
|
* Avoid getting a return address
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
|
@ -153,4 +155,11 @@ public class SkyWalkerUtil {
|
||||||
|
|
||||||
return UUID.randomUUID().toString();
|
return UUID.randomUUID().toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static String getGroupName(String groupName) {
|
||||||
|
if (StringUtils.isEmpty(groupName)) {
|
||||||
|
return "DEFAULT_GROUP";
|
||||||
|
}
|
||||||
|
return groupName;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,6 @@ import com.google.common.base.Joiner;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.UnsupportedEncodingException;
|
import java.io.UnsupportedEncodingException;
|
||||||
import java.net.URLDecoder;
|
import java.net.URLDecoder;
|
||||||
import java.net.URLEncoder;
|
import java.net.URLEncoder;
|
||||||
|
@ -38,13 +37,13 @@ public final class StringUtils {
|
||||||
.compile("([_.a-zA-Z0-9][-_.a-zA-Z0-9]*)[=](.*)");
|
.compile("([_.a-zA-Z0-9][-_.a-zA-Z0-9]*)[=](.*)");
|
||||||
private static final Pattern IP_PORT_PATTERN = Pattern
|
private static final Pattern IP_PORT_PATTERN = Pattern
|
||||||
.compile(".*/(.*)://(\\d+\\.\\d+\\.\\d+\\.\\d+):(\\d+)");
|
.compile(".*/(.*)://(\\d+\\.\\d+\\.\\d+\\.\\d+):(\\d+)");
|
||||||
private static final Pattern DUBBO_PROVIDER_PATTERN = Pattern
|
private static final Pattern DUBBO_PROVIDER_PATTERN = Pattern
|
||||||
.compile("/dubbo/(.*)/providers/(.*)");
|
.compile("/dubbo/(.*)/providers/(.*)");
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* parse key-value pair.
|
* parse key-value pair.
|
||||||
*
|
*
|
||||||
* @param str string.
|
* @param str string.
|
||||||
* @param itemSeparator item separator.
|
* @param itemSeparator item separator.
|
||||||
* @return key-value map;
|
* @return key-value map;
|
||||||
*/
|
*/
|
||||||
|
@ -119,22 +118,47 @@ public final class StringUtils {
|
||||||
return String.format(DUBBO_PATH_FORMAT, interfaceName);
|
return String.format(DUBBO_PATH_FORMAT, interfaceName);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String convertDubboFullPathForZk(Map<String, String> metaData, String providersPath, String ip,
|
public static String convertDubboFullPathForZk(Map<String, String> metaData, String providersPath, String ip,
|
||||||
int port) {
|
int port) {
|
||||||
try {
|
try {
|
||||||
String urlParam = Joiner.on("&").withKeyValueSeparator("=").join(metaData);
|
String urlParam = Joiner.on("&").withKeyValueSeparator("=").join(metaData);
|
||||||
String instanceUrl = String.format(DUBBO_URL_FORMAT, metaData.get(PROTOCOL_KEY), ip, port,
|
String instanceUrl = String.format(DUBBO_URL_FORMAT, metaData.get(PROTOCOL_KEY), ip, port,
|
||||||
metaData.get(INTERFACE_KEY), urlParam);
|
metaData.get(INTERFACE_KEY), urlParam);
|
||||||
|
|
||||||
return Joiner.on(ZOOKEEPER_SEPARATOR).join(providersPath, URLEncoder.encode(instanceUrl, "UTF-8"));
|
return Joiner.on(ZOOKEEPER_SEPARATOR).join(providersPath, URLEncoder.encode(instanceUrl, "UTF-8"));
|
||||||
} catch (UnsupportedEncodingException e) {
|
} catch (UnsupportedEncodingException e) {
|
||||||
log.warn("convert Dubbo full path", e);
|
log.warn("convert Dubbo full path", e);
|
||||||
return "";
|
return "";
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static boolean isDubboProviderPath(String path) {
|
public static boolean isDubboProviderPath(String path) {
|
||||||
return DUBBO_PROVIDER_PATTERN.matcher(path).matches();
|
return DUBBO_PROVIDER_PATTERN.matcher(path).matches();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static boolean isIPV4AndPort(String addr) {
|
||||||
|
if (StringUtils.isEmpty(addr)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
String rexp = "^(([1-9]|([1-9]\\d)|(1\\d\\d)|(2([0-4]\\d|5[0-5])))\\.)(([0-9]|([1-9]\\d)|(1\\d\\d)|(2([0-4]\\d|5[0-5])))\\.){2}([1-9]|([1-9]\\d)|(1\\d\\d)|(2([0-4]\\d|5[0-5]))):([0-9]|[1-9]\\d|[1-9]\\d{2}|[1-9]\\d{3}|[1-5]\\d{4}|6[0-4]\\d{3}|65[0-4]\\d{2}|655[0-2]\\d|6553[0-5])$";
|
||||||
|
|
||||||
|
Pattern pat = Pattern.compile(rexp);
|
||||||
|
|
||||||
|
Matcher mat = pat.matcher(addr);
|
||||||
|
|
||||||
|
return mat.find();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isIPV4AndPorts(String ips, String reg) {
|
||||||
|
if (StringUtils.isEmpty(ips)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
String[] ipvs = ips.split(reg);
|
||||||
|
for (String ipv : ipvs) {
|
||||||
|
if (!isIPV4AndPort(ipv))
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,3 +13,12 @@ spring.datasource.password=root
|
||||||
|
|
||||||
management.endpoints.web.exposure.include=*
|
management.endpoints.web.exposure.include=*
|
||||||
management.endpoint.health.show-details=always
|
management.endpoint.health.show-details=always
|
||||||
|
|
||||||
|
##以下配置为使用sharding功能时,使用nacos维持nacos-sync的节点状态时用到的namespace,gourp,serviceName,如果不配置则使用(public-->DEFAULT_GROUP---->com.default.service);
|
||||||
|
#(建议配置,不要和业务耦合在一个空间下,否则会导致做全量同步的时候,这部分节点也会同步)
|
||||||
|
#url为nacos的连接地址,可以配置成ip+port方式,也可以通过web配置-->集群配置配置在cluster表中,这里填写cluster_id(如果不配置则默认一个md5值)
|
||||||
|
sharding.nacos.url=3ac01a8c7501f121ab01efb920aa4764
|
||||||
|
#nanespace建议参考nacos社区,需要手动提前配置,不会自动创建,而且不是填写定义的名字而是md5值,默认public除外
|
||||||
|
sharding.nacos.namespace=6ab2bbda-4b84-42d0-8392-99aa1446cffb
|
||||||
|
sharding.nacos.groupname=shadinggroup
|
||||||
|
sharding.nacos.servicename=com.dmall.sharding
|
Loading…
Reference in New Issue