Compare commits

...

20 Commits

Author SHA1 Message Date
paderlol 144d5f7a99
Merge pull request #213 from nacos-group/develop
0.4.4
2021-02-25 22:28:24 +08:00
paderlol 8188868e16
Merge pull request #210 from nacos-group/develop
Fix delete sync task issue #168
2021-02-22 23:29:34 +08:00
paderlol 1713fd8d96
Merge pull request #207 from nacos-group/develop
Develop
2021-02-11 21:17:29 +08:00
paderlol fa1155911d
Merge pull request #192 from cdmaji/refactor_feature_fix_nacostozk
解决nacos->zk时非正常下线server导致的问题
2020-12-18 19:08:31 +08:00
maj 637cbbe327 解决nacos->zk时候,因为server非正常下线导致接管server创建不了zk注册节点的问题 2020-12-16 19:47:43 +08:00
maj 7abca3647d 调整虚拟环大小 2020-12-15 11:07:43 +08:00
maj 9371be3854 format code 2020-12-08 16:30:56 +08:00
maj 07770923c5 format code 2020-12-08 15:29:53 +08:00
maj 4e8138d3a1 format code 2020-12-08 15:23:17 +08:00
maj c65e98972d 优化serviceName sharding处理 2020-12-07 11:13:42 +08:00
maj 8b9e7859db fix reviewed 2020-11-23 14:13:45 +08:00
maj 925a0e5435 优化一致性hash算法 2020-11-18 19:21:08 +08:00
maj dd7ac4ae0d 1、解决sharding时候add和remove时序问题
2、添加nacos->zk时候,sharding同步阻塞
2020-11-18 16:58:58 +08:00
maj ce5011e666 优化部分代码 2020-11-12 15:55:09 +08:00
maj 4527321855 前端页面支持namespace维护 2020-11-12 10:48:16 +08:00
maj 0b638f82be 前端页面支持namespace维护 2020-11-12 10:38:23 +08:00
maj 0067c76bd5 1、add shrading
2、add nacos->zk *
3、deal with others
2020-11-11 19:03:16 +08:00
cdmaji 6d8c265167
Merge pull request #2 from nacos-group/refactor_feature
pull Refactor feature
2020-11-11 17:00:54 +08:00
cdmaji 3d7d43827b
Merge pull request #1 from nacos-group/master
update
2020-11-11 16:41:37 +08:00
paderlol ff9f7addfc
Merge pull request #186 from nacos-group/develop
Develop
2020-11-10 22:20:15 +08:00
24 changed files with 1150 additions and 142 deletions

View File

@ -19,6 +19,7 @@ class AddSyncDialog extends React.Component {
visible: false,
destClusterId: '',
groupName: '',
nameSpace: '',
serviceName: '',
sourceClusterId: '',
version: '',
@ -31,8 +32,8 @@ class AddSyncDialog extends React.Component {
}
save() {
const { destClusterId, groupName, serviceName, sourceClusterId, version } = this.state;
add({ destClusterId, groupName, serviceName, sourceClusterId, version })
const { destClusterId, groupName, nameSpace, serviceName, sourceClusterId, version } = this.state;
add({ destClusterId, groupName, nameSpace, serviceName, sourceClusterId, version })
.then(() => {
this.props.turnPage(1);
this.close();
@ -75,6 +76,11 @@ class AddSyncDialog extends React.Component {
placeholder={locale.groupNamePlaceholder}
onChange={groupName => this.setState({ groupName })}
/>
</FormItem> <FormItem label={`${locale.nameSpace}:`}>
<Input
placeholder={locale.nameSpacePlaceholder}
onChange={nameSpace => this.setState({ nameSpace })}
/>
</FormItem>
{
sourceCluster.clusterType === 'ZK' && (

View File

@ -118,6 +118,7 @@ class ServiceSync extends React.Component {
<Table dataSource={taskModels} loading={loading}>
<Table.Column title={locale.serviceName} dataIndex="serviceName" />
<Table.Column title={locale.groupName} dataIndex="groupName" />
<Table.Column title={locale.nameSpace} dataIndex="nameSpace" />
<Table.Column
title={locale.sourceCluster}
dataIndex="sourceClusterId"

View File

@ -40,6 +40,7 @@ const I18N_CONF = {
addSync: 'New Sync',
serviceName: 'Service Name',
groupName: 'Group',
nameSpace: 'Namespace',
sourceCluster: 'Source Cluster',
destCluster: 'Dest Cluster',
instancesCount: 'Instances Count',
@ -60,6 +61,8 @@ const I18N_CONF = {
serviceNamePlaceholder: 'Please enter service name',
groupName: 'Group Name',
groupNamePlaceholder: 'Please enter group name',
nameSpace: 'Namespace',
nameSpacePlaceholder: 'Please enter namespace',
sourceCluster: 'Source Cluster',
destCluster: 'Dest Cluster',
version: 'Version',

View File

@ -40,6 +40,7 @@ const I18N_CONF = {
addSync: '新增同步',
serviceName: '服务名',
groupName: '分组',
nameSpace: '命名空间',
sourceCluster: '源集群',
destCluster: '目标集群',
instancesCount: '实例数',
@ -60,6 +61,8 @@ const I18N_CONF = {
serviceNamePlaceholder: '请输入服务名',
groupName: '分组名',
groupNamePlaceholder: '请输入分组名',
nameSpace: '命名空间',
nameSpacePlaceholder: '请输入命名空间',
sourceCluster: '源集群',
destCluster: '目标集群',
version: '版本',

View File

@ -0,0 +1,46 @@
package com.alibaba.nacossync.constant;
/**
* Created by maj on 2020/11/18.
*/
public enum ShardingLogTypeEnum {
ADD("add", "新增"),
DELETE("DELETE", "删除");
private String type;
private String desc;
ShardingLogTypeEnum(String type, String desc) {
this.type = type;
this.desc = desc;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
public static boolean contains(String type) {
for (ShardingLogTypeEnum shardingLogTypeEnum : ShardingLogTypeEnum.values()) {
if (shardingLogTypeEnum.getType().equals(type)) {
return true;
}
}
return false;
}
}

View File

@ -12,14 +12,11 @@
*/
package com.alibaba.nacossync.extension;
import static com.alibaba.nacossync.util.SkyWalkerUtil.generateSyncKey;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
import com.alibaba.nacossync.constant.ClusterTypeEnum;
import com.alibaba.nacossync.extension.annotation.NacosSyncService;
import com.alibaba.nacossync.pojo.model.TaskDO;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
@ -27,6 +24,10 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;
import java.util.concurrent.ConcurrentHashMap;
import static com.alibaba.nacossync.util.SkyWalkerUtil.generateSyncKey;
/**
* @author NacosSync
* @version $Id: SyncManagerService.java, v 0.1 2018-09-25 PM5:17 NacosSync Exp $$
@ -42,20 +43,17 @@ public class SyncManagerService implements InitializingBean, ApplicationContextA
private ApplicationContext applicationContext;
public SyncManagerService(
SkyWalkerCacheServices skyWalkerCacheServices) {
SkyWalkerCacheServices skyWalkerCacheServices) {
this.skyWalkerCacheServices = skyWalkerCacheServices;
}
public boolean delete(TaskDO taskDO) throws NacosException {
return getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId()).delete(taskDO);
}
public boolean sync(TaskDO taskDO) {
return getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId()).sync(taskDO);
}
@Override
@ -80,5 +78,4 @@ public class SyncManagerService implements InitializingBean, ApplicationContextA
return syncServiceMap.get(generateSyncKey(sourceClusterType, destClusterType));
}
}

View File

@ -12,9 +12,6 @@
*/
package com.alibaba.nacossync.extension.impl;
import static com.alibaba.nacossync.util.StringUtils.convertDubboFullPathForZk;
import static com.alibaba.nacossync.util.StringUtils.convertDubboProvidersPath;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
@ -28,17 +25,13 @@ import com.alibaba.nacossync.extension.SyncService;
import com.alibaba.nacossync.extension.annotation.NacosSyncService;
import com.alibaba.nacossync.extension.holder.NacosServerHolder;
import com.alibaba.nacossync.extension.holder.ZookeeperServerHolder;
import com.alibaba.nacossync.extension.impl.extend.NacosSyncToZookeeperServicesSharding;
import com.alibaba.nacossync.extension.impl.extend.Sharding;
import com.alibaba.nacossync.monitor.MetricsManager;
import com.alibaba.nacossync.pojo.model.TaskDO;
import com.alibaba.nacossync.util.DubboConstants;
import com.alibaba.nacossync.util.ExpirySet;
import com.google.common.collect.Sets;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
@ -47,6 +40,16 @@ import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static com.alibaba.nacossync.util.StringUtils.convertDubboFullPathForZk;
import static com.alibaba.nacossync.util.StringUtils.convertDubboProvidersPath;
/**
* Nacos 同步 Zk 数据
*
@ -90,9 +93,16 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
private final ZookeeperServerHolder zookeeperServerHolder;
private static ExpirySet<String> serviceNameSet = new ExpirySet<String>();
private static ExecutorService EXECUTOR = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
@Resource(type = NacosSyncToZookeeperServicesSharding.class)
private Sharding sharding;
@Autowired
public NacosSyncToZookeeperServiceImpl(SkyWalkerCacheServices skyWalkerCacheServices,
NacosServerHolder nacosServerHolder, ZookeeperServerHolder zookeeperServerHolder) {
NacosServerHolder nacosServerHolder, ZookeeperServerHolder zookeeperServerHolder) {
this.skyWalkerCacheServices = skyWalkerCacheServices;
this.nacosServerHolder = nacosServerHolder;
this.zookeeperServerHolder = zookeeperServerHolder;
@ -101,9 +111,9 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
@Override
public boolean delete(TaskDO taskDO) {
try {
NamingService sourceNamingService =
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());
//nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());//
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace());//fix with no nameSpaceName
EventListener eventListener = nacosListenerMap.remove(taskDO.getTaskId());
PathChildrenCache pathChildrenCache = pathChildrenCacheMap.get(taskDO.getTaskId());
sourceNamingService.unsubscribe(taskDO.getServiceName(), eventListener);
@ -113,6 +123,7 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
for (String instanceUrl : instanceUrlSet) {
client.delete().quietly().forPath(instanceUrl);
}
sharding.stop(taskDO);
} catch (Exception e) {
log.error("delete task from nacos to zk was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
@ -123,37 +134,7 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
@Override
public boolean sync(TaskDO taskDO) {
try {
NamingService sourceNamingService =
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());
CuratorFramework client = zookeeperServerHolder.get(taskDO.getDestClusterId(), taskDO.getGroupName());
nacosListenerMap.putIfAbsent(taskDO.getTaskId(), event -> {
if (event instanceof NamingEvent) {
try {
List<Instance> sourceInstances = sourceNamingService.getAllInstances(taskDO.getServiceName());
Set<String> newInstanceUrlSet = getWaitingToAddInstance(taskDO, client, sourceInstances);
// 获取之前的备份 删除无效实例
deleteInvalidInstances(taskDO, client, newInstanceUrlSet);
// 替换当前备份为最新备份
instanceBackupMap.put(taskDO.getTaskId(), newInstanceUrlSet);
// 尝试恢复因为zk客户端意外断开导致的实例数据
tryToCompensate(taskDO, sourceNamingService, sourceInstances);
} catch (Exception e) {
log.error("event process fail, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
}
}
});
sourceNamingService.subscribe(taskDO.getServiceName(), nacosListenerMap.get(taskDO.getTaskId()));
} catch (Exception e) {
log.error("sync task from nacos to zk was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
return false;
}
sharding.start(taskDO);
return true;
}
@ -164,13 +145,13 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
pathCache.getListenable().addListener((zkClient, zkEvent) -> {
if (zkEvent.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
List<Instance> allInstances =
sourceNamingService.getAllInstances(taskDO.getServiceName());
sourceNamingService.getAllInstances(taskDO.getServiceName());
for (Instance instance : allInstances) {
String instanceUrl = buildSyncInstance(instance, taskDO);
String zkInstancePath = zkEvent.getData().getPath();
if (zkInstancePath.equals(instanceUrl)) {
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
.forPath(zkInstancePath);
.forPath(zkInstancePath);
break;
}
}
@ -182,9 +163,9 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
}
private void deleteInvalidInstances(TaskDO taskDO, CuratorFramework client, Set<String> newInstanceUrlSet)
throws Exception {
throws Exception {
Set<String> instanceBackup =
instanceBackupMap.getOrDefault(taskDO.getTaskId(), Sets.newHashSet());
instanceBackupMap.getOrDefault(taskDO.getTaskId(), Sets.newHashSet());
for (String instanceUrl : instanceBackup) {
if (newInstanceUrlSet.contains(instanceUrl)) {
continue;
@ -194,15 +175,17 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
}
private HashSet<String> getWaitingToAddInstance(TaskDO taskDO, CuratorFramework client,
List<Instance> sourceInstances) throws Exception {
List<Instance> sourceInstances) throws Exception {
HashSet<String> waitingToAddInstance = new HashSet<>();
for (Instance instance : sourceInstances) {
if (needSync(instance.getMetadata())) {
log.info("nacos->zk ,real sync service :{},and instance :{}", instance.getServiceName(), instance.getIp());
String instanceUrl = buildSyncInstance(instance, taskDO);
if (null == client.checkExists().forPath(instanceUrl)) {
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
.forPath(instanceUrl);
if (null != client.checkExists().forPath(instanceUrl)) {
client.delete().quietly().forPath(instanceUrl);
}
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
.forPath(instanceUrl);
waitingToAddInstance.add(instanceUrl);
}
}
@ -214,11 +197,11 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
metaData.putAll(instance.getMetadata());
metaData.put(SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId());
metaData.put(SkyWalkerConstants.SYNC_SOURCE_KEY,
skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
metaData.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, taskDO.getSourceClusterId());
String servicePath = monitorPath.computeIfAbsent(taskDO.getTaskId(),
key -> convertDubboProvidersPath(metaData.get(DubboConstants.INTERFACE_KEY)));
key -> convertDubboProvidersPath(metaData.get(DubboConstants.INTERFACE_KEY)));
return convertDubboFullPathForZk(metaData, servicePath, instance.getIp(), instance.getPort());
}
@ -234,7 +217,7 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
return pathChildrenCacheMap.computeIfAbsent(taskDO.getTaskId(), (key) -> {
try {
PathChildrenCache pathChildrenCache = new PathChildrenCache(
zookeeperServerHolder.get(taskDO.getDestClusterId(), ""), monitorPath.get(key), false);
zookeeperServerHolder.get(taskDO.getDestClusterId(), ""), monitorPath.get(key), false);
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
return pathChildrenCache;
} catch (Exception e) {
@ -246,4 +229,74 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
}
private class SyncThread implements Runnable {
NamingService sourceNamingService;
TaskDO taskDO;
CuratorFramework client;
SyncThread(NamingService sourceNamingService, TaskDO taskDO, CuratorFramework client) {
this.sourceNamingService = sourceNamingService;
this.taskDO = taskDO;
this.client = client;
}
@Override
public void run() {
try {
//List<Instance> sourceInstances = sourceNamingService.getAllInstances(taskDO.getServiceName());
List<Instance> sourceInstances = sourceNamingService.getAllInstances(taskDO.getServiceName(), taskDO.getGroupName());//fix with no group
Set<String> newInstanceUrlSet = getWaitingToAddInstance(taskDO, client, sourceInstances);
// 获取之前的备份 删除无效实例
deleteInvalidInstances(taskDO, client, newInstanceUrlSet);
// 替换当前备份为最新备份
instanceBackupMap.put(taskDO.getTaskId(), newInstanceUrlSet);
// 尝试恢复因为zk客户端意外断开导致的实例数据
tryToCompensate(taskDO, sourceNamingService, filterNeedSync(sourceInstances));
} catch (Exception e) {
log.error("event process fail, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
} finally {
//serviceNameSet.remove(((NamingEvent) event).getServiceName());//如果考虑高实时性 可以手动remove 这样时间窗口的大小就不固定 依赖处理速度 窗口大小作为兜底
}
}
}
private List<Instance> filterNeedSync(List<Instance> sourceInstances) {
Iterator<Instance> iterator = sourceInstances.iterator();
while (iterator.hasNext()) {
if (!needSync(iterator.next().getMetadata())) {
iterator.remove();
}
}
return sourceInstances;
}
public boolean addSyncService(TaskDO taskDO) {
try {
NamingService sourceNamingService =
//nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace());//fix with no nameSpaceName
CuratorFramework client = zookeeperServerHolder.get(taskDO.getDestClusterId(), taskDO.getGroupName());
nacosListenerMap.putIfAbsent(taskDO.getTaskId(), event -> {
if (event instanceof NamingEvent) {
if (serviceNameSet.set(((NamingEvent) event).getServiceName())) {// add event merge
EXECUTOR.execute(new SyncThread(sourceNamingService, taskDO, client));
}
}
});
sourceNamingService.subscribe(taskDO.getServiceName(), nacosListenerMap.get(taskDO.getTaskId()));
} catch (Exception e) {
log.error("sync task from nacos to zk was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
return false;
}
return true;
}
}

View File

@ -12,41 +12,24 @@
*/
package com.alibaba.nacossync.extension.impl;
import static com.alibaba.nacossync.util.DubboConstants.ALL_SERVICE_NAME_PATTERN;
import static com.alibaba.nacossync.util.DubboConstants.DUBBO_PATH_FORMAT;
import static com.alibaba.nacossync.util.DubboConstants.DUBBO_ROOT_PATH;
import static com.alibaba.nacossync.util.DubboConstants.GROUP_KEY;
import static com.alibaba.nacossync.util.DubboConstants.INSTANCE_IP_KEY;
import static com.alibaba.nacossync.util.DubboConstants.INSTANCE_PORT_KEY;
import static com.alibaba.nacossync.util.DubboConstants.INTERFACE_KEY;
import static com.alibaba.nacossync.util.DubboConstants.PROTOCOL_KEY;
import static com.alibaba.nacossync.util.DubboConstants.VERSION_KEY;
import static com.alibaba.nacossync.util.DubboConstants.WEIGHT_KEY;
import static com.alibaba.nacossync.util.DubboConstants.ZOOKEEPER_SEPARATOR;
import static com.alibaba.nacossync.util.DubboConstants.createServiceName;
import static com.alibaba.nacossync.util.StringUtils.parseIpAndPortString;
import static com.alibaba.nacossync.util.StringUtils.parseQueryString;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.client.naming.NacosNamingService;
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
import com.alibaba.nacossync.constant.ClusterTypeEnum;
import com.alibaba.nacossync.constant.MetricsStatisticsType;
import com.alibaba.nacossync.constant.ShardingLogTypeEnum;
import com.alibaba.nacossync.constant.SkyWalkerConstants;
import com.alibaba.nacossync.extension.SyncService;
import com.alibaba.nacossync.extension.annotation.NacosSyncService;
import com.alibaba.nacossync.extension.holder.NacosServerHolder;
import com.alibaba.nacossync.extension.holder.ZookeeperServerHolder;
import com.alibaba.nacossync.extension.impl.extend.Sharding;
import com.alibaba.nacossync.extension.impl.extend.ZookeeperSyncToNacosServiceSharding;
import com.alibaba.nacossync.monitor.MetricsManager;
import com.alibaba.nacossync.pojo.ShardingLog;
import com.alibaba.nacossync.pojo.model.TaskDO;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
@ -55,6 +38,17 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.utils.CloseableUtils;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.alibaba.nacossync.util.DubboConstants.*;
import static com.alibaba.nacossync.util.StringUtils.parseIpAndPortString;
import static com.alibaba.nacossync.util.StringUtils.parseQueryString;
/**
* @author paderlol
* @version 1.0
@ -82,6 +76,12 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
private final SkyWalkerCacheServices skyWalkerCacheServices;
@Resource(type = ZookeeperSyncToNacosServiceSharding.class)
private Sharding sharding;
//排除/dobbo下面的所有非服务节点
private static final List<String> IGNORED_DUBBO_PATH = Stream.of("mapping", "metadata", "yellow").collect(Collectors.toList());
@Autowired
public ZookeeperSyncToNacosServiceImpl(ZookeeperServerHolder zookeeperServerHolder,
NacosServerHolder nacosServerHolder, SkyWalkerCacheServices skyWalkerCacheServices) {
@ -96,7 +96,6 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
if (treeCacheMap.containsKey(taskDO.getTaskId())) {
return true;
}
TreeCache treeCache = getTreeCache(taskDO);
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), null);
// 初次执行任务统一注册所有实例
@ -104,10 +103,16 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
//注册ZK监听
Objects.requireNonNull(treeCache).getListenable().addListener((client, event) -> {
try {
String path = event.getData().getPath();
if (!com.alibaba.nacossync.util.StringUtils.isDubboProviderPath(path)) {
return;
}
Map<String, String> queryParam = parseQueryString(path);
//add sharding
if (!isProcess(taskDO, destNamingService, queryParam.get(INTERFACE_KEY)))
return;
if (isMatch(taskDO, queryParam) && needSync(queryParam)) {
log.info("sync sharding Zookeeper to Nacos serviceName:{},local servicesName :{}", queryParam.get(INTERFACE_KEY), sharding.getLocalServices(null));
processEvent(taskDO, destNamingService, event, path, queryParam);
}
} catch (Exception e) {
@ -126,9 +131,6 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
private void processEvent(TaskDO taskDO, NamingService destNamingService, TreeCacheEvent event, String path,
Map<String, String> queryParam) throws NacosException {
if(!com.alibaba.nacossync.util.StringUtils.isDubboProviderPath(path)) {
return;
}
Map<String, String> ipAndPortParam = parseIpAndPortString(path);
Instance instance = buildSyncInstance(queryParam, ipAndPortParam, taskDO);
@ -138,15 +140,18 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
case NODE_UPDATED:
destNamingService.registerInstance(
getServiceNameFromCache(serviceName, queryParam), instance);
getServiceNameFromCache(serviceName, queryParam), instance);
//getServiceNameFromCache(serviceName, queryParam, instance), instance);
log.info("syn add service : {} ,instance:{}", serviceName, instance);
break;
case NODE_REMOVED:
destNamingService.deregisterInstance(
getServiceNameFromCache(serviceName, queryParam),
ipAndPortParam.get(INSTANCE_IP_KEY),
Integer.parseInt(ipAndPortParam.get(INSTANCE_PORT_KEY)));
getServiceNameFromCache(serviceName, queryParam),
ipAndPortParam.get(INSTANCE_IP_KEY),
Integer.parseInt(ipAndPortParam.get(INSTANCE_PORT_KEY)));
nacosServiceNameMap.remove(serviceName);
log.info("syn delete service : {} ,instance:{}", serviceName, instance);
break;
default:
break;
@ -154,26 +159,34 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
}
private void registerAllInstances(TaskDO taskDO, NamingService destNamingService) throws Exception {
CuratorFramework zk = zookeeperServerHolder.get(taskDO.getSourceClusterId(), "");
if(!ALL_SERVICE_NAME_PATTERN.equals(taskDO.getServiceName())) {
registerALLInstances0(taskDO, destNamingService, zk, taskDO.getServiceName());
CuratorFramework zk = zookeeperServerHolder.get(taskDO.getSourceClusterId(), "");
sharding.start(taskDO);//幂等 可重复添加
if (!ALL_SERVICE_NAME_PATTERN.equals(taskDO.getServiceName())) {
sharding.doSharding(null, new ArrayList<>(Arrays.asList(taskDO.getServiceName())));
TreeSet<String> shardingServices = sharding.getLocalServices(null);
if (shardingServices.contains(taskDO.getServiceName())) {
registerALLInstances0(taskDO, destNamingService, zk, taskDO.getServiceName());
}
} else {
// 同步全部
List<String> serviceList = zk.getChildren().forPath(DUBBO_ROOT_PATH);
for(String serviceName : serviceList) {
registerALLInstances0(taskDO, destNamingService, zk, serviceName);
sharding.doSharding(null, filterNoProviderPath(serviceList));
TreeSet<String> shardingServices = sharding.getLocalServices(null);
for (String serviceName : serviceList) {
if (shardingServices.contains(serviceName) && !IGNORED_DUBBO_PATH.contains(serviceName))//add
registerALLInstances0(taskDO, destNamingService, zk, serviceName);
}
}
}
private void registerALLInstances0(TaskDO taskDO, NamingService destNamingService, CuratorFramework zk,
String serviceName) throws Exception {
String serviceName) throws Exception {
String path = String.format(DUBBO_PATH_FORMAT, serviceName);
if(zk.getChildren()==null) {
if (zk.getChildren() == null) {
return;
}
List<String> providers = zk.getChildren().forPath(path);
for(String provider : providers) {
for (String provider : providers) {
Map<String, String> queryParam = parseQueryString(provider);
if (isMatch(taskDO, queryParam) && needSync(queryParam)) {
Map<String, String> ipAndPortParam = parseIpAndPortString(path + ZOOKEEPER_SEPARATOR + provider);
@ -186,14 +199,14 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
@Override
public boolean delete(TaskDO taskDO) {
if (taskDO.getServiceName() == null) {
if (taskDO.getServiceName() == null) {
return true;
}
try {
CloseableUtils.closeQuietly(treeCacheMap.get(taskDO.getTaskId()));
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), null);
if(!ALL_SERVICE_NAME_PATTERN.equals(taskDO.getServiceName())) {
if (!ALL_SERVICE_NAME_PATTERN.equals(taskDO.getServiceName())) {
if (nacosServiceNameMap.containsKey(taskDO.getServiceName())) {
List<Instance> allInstances =
destNamingService.getAllInstances(nacosServiceNameMap.get(taskDO.getServiceName()));
@ -208,15 +221,15 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
}
} else {
Set<String> serviceNames = nacosServiceNameMap.keySet();
for(String serviceName : serviceNames) {
for (String serviceName : serviceNames) {
if (nacosServiceNameMap.containsKey(serviceName)) {
List<Instance> allInstances =
destNamingService.getAllInstances(serviceName);
destNamingService.getAllInstances(serviceName);
for (Instance instance : allInstances) {
if (needDelete(instance.getMetadata(), taskDO)) {
destNamingService.deregisterInstance(instance.getServiceName(), instance.getIp(),
instance.getPort());
instance.getPort());
}
nacosServiceNameMap.remove(serviceName);
@ -226,8 +239,6 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
}
} catch (Exception e) {
log.error("delete task from zookeeper to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
@ -243,8 +254,8 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
return treeCacheMap.computeIfAbsent(taskDO.getTaskId(), (key) -> {
try {
TreeCache treeCache =
new TreeCache(zookeeperServerHolder.get(taskDO.getSourceClusterId(), ""),
DUBBO_ROOT_PATH);
new TreeCache(zookeeperServerHolder.get(taskDO.getSourceClusterId(), ""),
DUBBO_ROOT_PATH);
treeCache.start();
return treeCache;
} catch (Exception e) {
@ -261,20 +272,20 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
*/
protected boolean isMatch(TaskDO taskDO, Map<String, String> queryParam) {
Predicate<TaskDO> isVersionEq = (task) -> StringUtils.isBlank(taskDO.getVersion())
|| StringUtils.equals(task.getVersion(), queryParam.get(VERSION_KEY));
Predicate<TaskDO> isGroupEq = (task) -> StringUtils.isBlank(taskDO.getGroupName())
|| StringUtils.equals(task.getGroupName(), queryParam.get(GROUP_KEY));
|| StringUtils.equals(task.getVersion(), queryParam.get(VERSION_KEY));
Predicate<TaskDO> isGroupEq = (task) -> StringUtils.isBlank(taskDO.getGroupName()) || StringUtils.isBlank(queryParam.get(GROUP_KEY)) //fix
|| StringUtils.equals(task.getGroupName(), queryParam.get(GROUP_KEY));
return isVersionEq.and(isGroupEq).test(taskDO);
}
/**
* create Nacos service instance
*
* @param queryParam dubbo metadata
* @param queryParam dubbo metadata
* @param ipAndPortMap dubbo ip and address
*/
protected Instance buildSyncInstance(Map<String, String> queryParam, Map<String, String> ipAndPortMap,
TaskDO taskDO) {
TaskDO taskDO) {
Instance temp = new Instance();
temp.setIp(ipAndPortMap.get(INSTANCE_IP_KEY));
temp.setPort(Integer.parseInt(ipAndPortMap.get(INSTANCE_PORT_KEY)));
@ -286,7 +297,7 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
metaData.put(PROTOCOL_KEY, ipAndPortMap.get(PROTOCOL_KEY));
metaData.put(SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId());
metaData.put(SkyWalkerConstants.SYNC_SOURCE_KEY,
skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
metaData.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, taskDO.getSourceClusterId());
temp.setMetadata(metaData);
return temp;
@ -296,10 +307,76 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
* cteate Dubbo service name
*
* @param serviceName dubbo service name
* @param queryParam dubbo metadata
* @param queryParam dubbo metadata
*/
protected String getServiceNameFromCache(String serviceName, Map<String, String> queryParam) {
return nacosServiceNameMap.computeIfAbsent(serviceName, (key) -> createServiceName(queryParam));
}
private List<String> filterNoProviderPath(List<String> sourceInstances) {
Iterator<String> iterator = sourceInstances.iterator();
while (iterator.hasNext()) {
if (IGNORED_DUBBO_PATH.contains(iterator.next())) {
iterator.remove();
}
}
return sourceInstances;
}
/**
* 取消service下的instance注册防止在server变化的时候server之前sharding的service被分配到其他server上这里需要手动将这部分instance下线
* (这里没有直接调用Nacos的deregisterInstance是因为在当前分布式下存在时序问题可能导致该任务误删除其他server刚注册上的instance这里使用停止发送心跳的方法让instance自己下线)
*
* @param namingService
* @param serviceNames
*/
private void deregisterService(NamingService namingService, Queue<ShardingLog> serviceNames, TaskDO taskDO) {
log.info("zk->nacos current deal with serviceNames" + sharding.getLocalServices(null));
log.info("zk->nacos current change serviceNames count" + serviceNames.size());
while (!serviceNames.isEmpty()) {
ShardingLog shardingLog = serviceNames.poll();
if (!shardingLog.getType().equals(ShardingLogTypeEnum.DELETE.getType())) {
log.info("zk->nacos current add serviceName{},will skip...", shardingLog.getServiceName());
continue;
}
try {
List<Instance> allInstances =
namingService.getAllInstances(nacosServiceNameMap.get(shardingLog.getServiceName()));
for (Instance instance : allInstances) {
if (needDelete(instance.getMetadata(), taskDO)) {
log.info("zk->nacos current will stop beat" + instance.getIp() + instance.getPort() + " ,key:" + instance.getServiceName());
((NacosNamingService) namingService).getBeatReactor().removeBeatInfo(instance.getServiceName(), instance.getIp(), instance.getPort());
}
nacosServiceNameMap.remove(shardingLog.getServiceName());
}
} catch (Exception e) {
log.error("deregisterService faild ,cause by:{}", e);
}
}
}
/**
* 判断是否本机处理的service
*
* @param taskDO
* @param destNamingService
* @param serviceName
* @return
*/
private boolean isProcess(TaskDO taskDO, NamingService destNamingService, String serviceName) {
try {
if (IGNORED_DUBBO_PATH.contains(serviceName))
return false;
sharding.doSharding(null, new ArrayList<>(Arrays.asList(serviceName)));
deregisterService(destNamingService, sharding.getChangeService(), taskDO);
if (sharding.getLocalServices(null).contains(serviceName)) {
return true;
}
} catch (Exception e) {
log.error("zk->nacos sharding faild ,taskid:{}", taskDO.getId(), e);
}
return false;
}
}

View File

@ -0,0 +1,210 @@
package com.alibaba.nacossync.extension.impl.extend;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.client.naming.utils.NetUtils;
import com.alibaba.nacossync.constant.ShardingLogTypeEnum;
import com.alibaba.nacossync.extension.SyncManagerService;
import com.alibaba.nacossync.extension.holder.NacosServerHolder;
import com.alibaba.nacossync.extension.impl.NacosSyncToZookeeperServiceImpl;
import com.alibaba.nacossync.extension.sharding.ConsistentHashServiceSharding;
import com.alibaba.nacossync.extension.sharding.ServiceSharding;
import com.alibaba.nacossync.pojo.ShardingLog;
import com.alibaba.nacossync.pojo.model.TaskDO;
import com.alibaba.nacossync.util.DubboConstants;
import com.alibaba.nacossync.util.SkyWalkerUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;
/**
* Created by maj on 2020/10/29.
*/
@Service
@Slf4j
public class NacosSyncToZookeeperServicesSharding implements Sharding {
@Autowired
private NacosServerHolder nacosServerHolder;
public static final int DEFAULT_SERVICE_PAGENO = 1;
public static final int DEFAULT_SERVICE_PAGE_SIZE = 10000;
private volatile String serviceListMd5;
@Autowired
private SyncManagerService syncManagerService;
@Lazy
@Resource(type = ConsistentHashServiceSharding.class)
private ServiceSharding serviceSharding;
private static final String SHARDING_KEY_NAME = NacosSyncToZookeeperServicesSharding.class.getName();
private static final long DEFAULT_SERVICES_CHANGE_THREAD_DELAY = 10;
private static final long DEFAULT_SERVICES_CHANGE_THREAD_INTERVAL = 5;
//add cache taskDO
private Map<String, TaskDO> taskDOMap = new ConcurrentHashMap<>();
@Value("${server.port}")
private String serverPort;
private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName(" com.alibaba.nacossync.sharding.getServiceName");
return thread;
}
});
protected boolean servicesIschanged(TaskDO taskDO) throws Exception {
NamingService sourceNamingService =
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace());
List<String> serviceNames = sourceNamingService.getServicesOfServer(DEFAULT_SERVICE_PAGENO, DEFAULT_SERVICE_PAGE_SIZE, SkyWalkerUtil.getGroupName(taskDO.getGroupName())).getData();
Collections.sort(serviceNames);
String md5 = SkyWalkerUtil.StringToMd5(serviceNames.toString());
if (!md5.equals(serviceListMd5)) {
serviceListMd5 = md5;
return true;
}
return false;
}
protected synchronized void reSubscribeService(TaskDO taskDO) {
log.error("reSubscribe start");
if (Objects.isNull(taskDO)) return;
try {
NamingService sourceNamingService =
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace());
List<String> serviceNames = sourceNamingService.getServicesOfServer(DEFAULT_SERVICE_PAGENO, DEFAULT_SERVICE_PAGE_SIZE, taskDO.getGroupName()).getData();//如果使用同一个groupName暂时没问题如果配置了多个group需要升级sdk1.4+,支持按照*的group查询
serviceSharding.sharding(SHARDING_KEY_NAME, serviceNames);
} catch (Exception e) {
log.error("reSubscribe faild,task id:{}", taskDO.getId(), e);
}
if (!serviceSharding.getChangeServices(SHARDING_KEY_NAME).isEmpty()) {
try {
syncChangedServices();
} catch (Exception e) {
log.error("reSubscribe -->delete service faild,task id:{}", taskDO.getId(), e);
}
}
}
//暂时不支持多source_cluster_id多nammespace维度默认取第一个task的source_cluster_id和namespace
@Override
public void start(TaskDO taskDO) {
taskDOMap.putIfAbsent(taskDO.getServiceName(), taskDO);
if (!serviceSharding.addServerChange(SHARDING_KEY_NAME, this)) {
return;
}
executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
if (servicesIschanged(taskDO)) {
reSubscribeService(taskDO);
}
} catch (Exception e) {
log.error("schedule reSubscribe service thread faild ,task id :", taskDO.getId(), e);
}
}
}, DEFAULT_SERVICES_CHANGE_THREAD_DELAY, DEFAULT_SERVICES_CHANGE_THREAD_INTERVAL, TimeUnit.SECONDS);
}
@Override
public void onServerChange() {
if (taskDOMap.size() > 0) {
for (TaskDO taskDO : taskDOMap.values()) {//任意取一个taskDo
reSubscribeService(taskDO);
return;
}
}
}
@Override
public Queue<ShardingLog> getChangeService() {
return serviceSharding.getChangeServices(SHARDING_KEY_NAME);
}
@Override
public void doSharding(String key, List<String> serviceNames) {
serviceSharding.sharding(SHARDING_KEY_NAME, serviceNames);
}
@Override
public TreeSet<String> getLocalServices(String key) {
return serviceSharding.getLocalServices(key);
}
private void syncAddedServices(String serviceName) {
if (taskDOMap.containsKey(DubboConstants.ALL_SERVICE_NAME_PATTERN)) {//如果有配置为* 则不用处理单独配置serviceName的task
TaskDO taskDO = buildNewTaskDo(taskDOMap.get(DubboConstants.ALL_SERVICE_NAME_PATTERN), serviceName);
((NacosSyncToZookeeperServiceImpl) syncManagerService.getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId())).addSyncService(taskDO);
log.info("reSubscribe ,{} is add", serviceName);
return;
}
if (taskDOMap.containsKey(serviceName)) {//如果有配置变更的serviceName而且sharding到本server则处理
TaskDO taskDO = buildNewTaskDo(taskDOMap.get(serviceName), serviceName);
((NacosSyncToZookeeperServiceImpl) syncManagerService.getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId())).addSyncService(taskDO);
log.info("reSubscribe ,{} is add", serviceName);
}
}
private void syncRemovedServices(String serviceName) {
if (taskDOMap.containsKey(DubboConstants.ALL_SERVICE_NAME_PATTERN)) {
TaskDO taskDO = buildNewTaskDo(taskDOMap.get(DubboConstants.ALL_SERVICE_NAME_PATTERN), serviceName);
syncManagerService.getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId()).delete(taskDO);
log.info("reSubscribe ,{} is remove", serviceName);
return;
}
if (taskDOMap.containsKey(serviceName)) {
TaskDO taskDO = buildNewTaskDo(taskDOMap.get(serviceName), serviceName);
syncManagerService.getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId()).delete(taskDO);
log.info("reSubscribe ,{} is remove", serviceName);
}
}
private void syncChangedServices() {
log.info("reSubscribe ,local ip: {},current sharding service:{}", NetUtils.localIP() + ":" + serverPort, serviceSharding.getLocalServices(SHARDING_KEY_NAME).toString());
while (!serviceSharding.getChangeServices(SHARDING_KEY_NAME).isEmpty()) {
ShardingLog shardingLog = serviceSharding.getChangeServices(SHARDING_KEY_NAME).poll();
if (shardingLog.getType().equals(ShardingLogTypeEnum.ADD.getType())) {
syncAddedServices(shardingLog.getServiceName());
}
if (shardingLog.getType().equals(ShardingLogTypeEnum.DELETE.getType())) {
syncRemovedServices(shardingLog.getServiceName());
}
}
}
@Override
public void stop(TaskDO taskDO) {
if (taskDOMap.containsKey(taskDO.getServiceName())) {
taskDOMap.remove(taskDO.getServiceName());
}
}
private TaskDO buildNewTaskDo(TaskDO taskDO, String serviceName) {
TaskDO taskDO1 = new TaskDO();
BeanUtils.copyProperties(taskDO, taskDO1);
taskDO1.setTaskId(serviceName);//需要一个key替换以前的taskid很多封装维度暂时使用serviceName
taskDO1.setServiceName(serviceName);
return taskDO1;
}
}

View File

@ -0,0 +1,26 @@
package com.alibaba.nacossync.extension.impl.extend;
import com.alibaba.nacossync.pojo.ShardingLog;
import com.alibaba.nacossync.pojo.model.TaskDO;
import java.util.List;
import java.util.Queue;
import java.util.TreeSet;
/**
* Created by maj on 2020/10/30.
*/
public interface Sharding {
public void onServerChange();
public void start(TaskDO taskDO);
public void stop(TaskDO taskDO);
public void doSharding(String key, List<String> serviceNames);
public TreeSet<String> getLocalServices(String key);
public Queue<ShardingLog> getChangeService();
}

View File

@ -0,0 +1,84 @@
package com.alibaba.nacossync.extension.impl.extend;
import com.alibaba.nacossync.extension.sharding.ConsistentHashServiceSharding;
import com.alibaba.nacossync.extension.sharding.ServiceSharding;
import com.alibaba.nacossync.pojo.ShardingLog;
import com.alibaba.nacossync.pojo.model.TaskDO;
import com.alibaba.nacossync.util.SkyWalkerUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.TreeSet;
/**
* Created by maj on 2020/10/30.
*/
@Service
@Slf4j
public class ZookeeperSyncToNacosServiceSharding implements Sharding {
private static final String SHARDING_KEY_NAME = ZookeeperSyncToNacosServiceSharding.class.getName();
private volatile String serviceListMd5;
private volatile boolean serverChange = false;
@Lazy
@Resource(type = ConsistentHashServiceSharding.class)
private ServiceSharding serviceSharding;
@Override
public void onServerChange() {
serverChange = true;
}
@Override
public void start(TaskDO taskDO) {
serviceSharding.addServerChange(SHARDING_KEY_NAME, this);
}
@Override
public Queue<ShardingLog> getChangeService() {
return serviceSharding.getChangeServices(SHARDING_KEY_NAME);
}
@Override
public void doSharding(String key, List<String> serviceNames) {
try {
if (servicesIschanged(serviceNames) || serverChange) {
log.info("zk ->nacos reshading start");
serverChange = false;
serviceSharding.sharding(SHARDING_KEY_NAME, serviceNames);
}
} catch (Exception e) {
log.error("zk ->nacos reshading faild.", e);
}
}
@Override
public TreeSet<String> getLocalServices(String key) {
return serviceSharding.getLocalServices(SHARDING_KEY_NAME);
}
protected boolean servicesIschanged(List<String> serviceNames) throws Exception {//zk区分不了是service变化还是instance变化
Collections.sort(serviceNames);
String md5 = SkyWalkerUtil.StringToMd5(serviceNames.toString());
if (!md5.equals(serviceListMd5)) {
serviceListMd5 = md5;
return true;
}
return false;
}
@Override
public void stop(TaskDO taskDO) {
}
}

View File

@ -0,0 +1,148 @@
package com.alibaba.nacossync.extension.sharding;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.client.naming.utils.NetUtils;
import com.alibaba.nacossync.constant.ShardingLogTypeEnum;
import com.alibaba.nacossync.extension.impl.extend.Sharding;
import com.alibaba.nacossync.pojo.ShardingLog;
import com.alibaba.nacossync.util.SkyWalkerUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Created by maj on 2020/10/27.
*/
@Slf4j
public abstract class AbstractServiceSharding implements ServiceSharding, InitializingBean {
protected volatile List<String> servers = new LinkedList<String>();
private Map<String, ConcurrentLinkedQueue<ShardingLog>> localServicesChangeMap = new ConcurrentHashMap<String, ConcurrentLinkedQueue<ShardingLog>>();
private volatile Map<String, TreeSet<String>> localServicesMap = new ConcurrentHashMap<String, TreeSet<String>>();
private final static String LOCAL_IP = NetUtils.localIP();
private volatile String serverListMd5;
private Map<String, Sharding> serverListens = new ConcurrentHashMap<String, Sharding>();
@Value("${server.port}")
private String serverPort;
@Lazy
@Resource(type = NacosServersManager.class)
private ServersManager serversManager;
protected List<String> getServers() {
return servers;
}
protected void listenServer() {
try {
serversManager.subscribeServers(new EventListener() {
@Override
public void onEvent(Event event) {
try {
shadingServers();
for (Sharding sharding : serverListens.values()) {
sharding.onServerChange();
}
} catch (Exception e) {
log.error("subscribe servers faild.", e);
}
}
});
} catch (Exception e) {
log.error("subscribe servers faild.", e);
}
}
protected void shadingServers() throws Exception {
List<String> serversList = serversManager.getServers();
Collections.sort(serversList);
String md5 = SkyWalkerUtil.StringToMd5(serversList.toString());
if (!md5.equals(serverListMd5)) {
servers = serversList;
serverListMd5 = md5;
doSharding();
}
}
//need fix: 暂时同步话解决
protected void shadingServices(String key, List<String> serviceNames) {
if (!localServicesMap.containsKey(key)) {
TreeSet<String> localServicesSet = new TreeSet<String>();
localServicesMap.putIfAbsent(key, localServicesSet);
}
if (!localServicesChangeMap.containsKey(key)) {
ConcurrentLinkedQueue<ShardingLog> removeQueue = new ConcurrentLinkedQueue<ShardingLog>();
localServicesChangeMap.putIfAbsent(key, removeQueue);
}
TreeSet<String> localServices = localServicesMap.get(key);
try {
for (String serviceName : serviceNames) {
if (getShardingServer(serviceName).equals(LOCAL_IP + ":" + serverPort)) {
if (!localServices.contains(serviceName)) {
localServicesMap.get(key).add(serviceName);
localServicesChangeMap.get(key).offer(new ShardingLog(serviceName, ShardingLogTypeEnum.ADD.getType()));
}
} else {
if (localServices.contains(serviceName)) {
localServicesMap.get(key).remove(serviceName);
localServicesChangeMap.get(key).offer(new ShardingLog(serviceName, ShardingLogTypeEnum.DELETE.getType()));
}
}
}
} catch (Exception e) {
log.error("shading services faild.", e);
}
}
//need fix按照service维度做sharding但是在service维度存在zk->nacos nacos->zk两种service而目前避免环的处理在instance维度的metadata中如果每次做sharding都去判断instance消耗太大而且也不能完全避免service中存在多种源的instance故目前做法是按照zk和nacos注册上的所有
//serviceName List做sharding,可能存在sharding不均衡问题如导致大部分的service都落在一个node上的可能
@Override
public void sharding(String key, List<String> serviceNames) {
try {
shadingServices(key, serviceNames);
} catch (Exception e) {
log.error("sharding faild. sharding key is:{}", key, e);
}
}
@Override
public boolean addServerChange(String key, Sharding sharding) {
return serverListens.putIfAbsent(key, sharding) == null ? true : false;
}
@Override
public TreeSet<String> getLocalServices(String key) {
return localServicesMap.get(key);
}
protected abstract void doSharding();
protected abstract String getShardingServer(String key);
@Override
public void afterPropertiesSet() throws Exception {
listenServer();
}
@Override
public Queue<ShardingLog> getChangeServices(String key) {
return localServicesChangeMap.get(key);
}
}

View File

@ -0,0 +1,78 @@
package com.alibaba.nacossync.extension.sharding;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacossync.util.SkyWalkerUtil;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import java.util.*;
/**
* Created by maj on 2020/10/27.
*/
@Service
@Lazy
public class ConsistentHashServiceSharding extends AbstractServiceSharding {
public static final String HASH_NODES = "-hash.vn.nodes";
private List<String> nodes = new LinkedList<String>();
private SortedMap<Integer, String> virtualNodes = new TreeMap<Integer, String>();
private static final int VIRTUAL_COUNT = 100;
public ConsistentHashServiceSharding() {
super();
}
@Override
public void doSharding() {
List<String> servers = getServers();
nodes.clear();
virtualNodes.clear();
for (String node : servers) {
nodes.add(node);
}
for (String node : nodes) {
for (int i = 0; i < VIRTUAL_COUNT; i++) {
String virtualNodeName = node + HASH_NODES + String.valueOf(i);
virtualNodes.put(getHash(virtualNodeName), virtualNodeName);
}
}
}
@Override
public String getShardingServer(String key) {
int hash = getHash(SkyWalkerUtil.StringToMd5(key));
SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);
String virtualNode;
if (subMap.isEmpty()) {
Integer i = virtualNodes.firstKey();
virtualNode = virtualNodes.get(i);
} else {
Integer i = subMap.firstKey();
virtualNode = subMap.get(i);
}
if (StringUtils.isNotBlank(virtualNode)) {
return virtualNode.substring(0, virtualNode.indexOf("-"));
}
return null;
}
private int getHash(String str) {
final int p = 16777619;
int hash = (int) 2166136261L;
for (int i = 0; i < str.length(); i++)
hash = (hash ^ str.charAt(i)) * p;
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
if (hash < 0)
hash = Math.abs(hash);
return hash;
}
}

View File

@ -0,0 +1,106 @@
package com.alibaba.nacossync.extension.sharding;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.client.naming.utils.NetUtils;
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
import com.alibaba.nacossync.util.StringUtils;
import com.google.common.base.Joiner;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
/**
* Created by maj on 2020/10/27.
*/
@Service
@Slf4j
@Lazy
public class NacosServersManager implements ServersManager<EventListener>, InitializingBean {
private NamingService namingService;
@Value("${server.port}")
private String serverPort;
@Autowired
private SkyWalkerCacheServices skyWalkerCacheServices;
//不动现有业务表和逻辑基础上指定一个固定的key--"SHARDINGKEY" 生成md5值 作为默认连接的nacos路径
private static final String DEFAULT_SHARDING_NACOS_KEY = "3ac01a8c7501f121ab01efb920aa4764";
private static final String DEFAULT_SHARDING_NACOS_NAMESPACES = "public";
private static final String DEFAULT_SHARDING_NACOS_GOURPID = "shadinggroup";
private static final String DEFAULT_SHARDING_NACOS_SERVICENAME = "com.dmall.sharding";
@Value("${sharding.nacos.url}")
private String shardingNacosUrl;
@Value("${sharding.nacos.namespace}")
private String shardingNacosnameSpace;
@Value("${sharding.nacos.groupname}")
private String shardingNacosGroupName;
@Value("${sharding.nacos.servicename}")
private String shardingNacosServiceName;
@Override
public List<String> getServers() throws Exception {
List<Instance> instanceList = namingService.getAllInstances(DEFAULT_SHARDING_NACOS_SERVICENAME, DEFAULT_SHARDING_NACOS_GOURPID);
List<String> serverList = new LinkedList<String>();
for (Instance instance : instanceList) {
serverList.add(instance.getIp() + ":" + instance.getPort());
}
return serverList;
}
@Override
public void subscribeServers(EventListener listener) throws Exception {
namingService.subscribe(DEFAULT_SHARDING_NACOS_SERVICENAME, DEFAULT_SHARDING_NACOS_GOURPID, listener);
}
@Override
public void register(String ip, int port) throws Exception {
namingService.registerInstance(DEFAULT_SHARDING_NACOS_SERVICENAME, DEFAULT_SHARDING_NACOS_GOURPID, ip, port);
}
@Override
public void afterPropertiesSet() throws Exception {
try {
log.info("start init nacos servers.");
if (StringUtils.isEmpty(shardingNacosUrl)) {
shardingNacosUrl = DEFAULT_SHARDING_NACOS_KEY;
}
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, getNacosUrl());
properties.setProperty(PropertyKeyConst.NAMESPACE, StringUtils.isEmpty(shardingNacosnameSpace) ? DEFAULT_SHARDING_NACOS_NAMESPACES : shardingNacosnameSpace);
namingService = NamingFactory.createNamingService(properties);
register(NetUtils.localIP(), Integer.parseInt(serverPort));
log.info("init nacos servers sucess.");
} catch (Exception e) {
log.info("init nacos faild .", e);
}
}
private String getNacosUrl() {
if (!StringUtils.isIPV4AndPorts(shardingNacosUrl, ",")) {
List<String> allClusterConnectKey = skyWalkerCacheServices
.getAllClusterConnectKey(StringUtils.isEmpty(shardingNacosUrl) ? DEFAULT_SHARDING_NACOS_KEY : shardingNacosUrl);
return Joiner.on(",").join(allClusterConnectKey);
}
return shardingNacosUrl;
}
}

View File

@ -0,0 +1,17 @@
package com.alibaba.nacossync.extension.sharding;
import java.util.List;
/**
* Created by maj on 2020/10/27.
*/
public interface ServersManager<T> {
public List<String> getServers() throws Exception;
public void subscribeServers(T listener) throws Exception;
public void register(String ip, int port) throws Exception;
}

View File

@ -0,0 +1,22 @@
package com.alibaba.nacossync.extension.sharding;
import com.alibaba.nacossync.extension.impl.extend.Sharding;
import com.alibaba.nacossync.pojo.ShardingLog;
import java.util.List;
import java.util.Queue;
import java.util.TreeSet;
/**
* Created by maj on 2020/10/27.
*/
public interface ServiceSharding {
public void sharding(String key, List<String> serviceNames);
public TreeSet<String> getLocalServices(String key);
public boolean addServerChange(String name, Sharding sharding);
public Queue<ShardingLog> getChangeServices(String key);
}

View File

@ -0,0 +1,35 @@
package com.alibaba.nacossync.pojo;
/**
* Created by maj on 2020/11/18.
*/
public class ShardingLog {
private String serviceName;
private String type;
public ShardingLog() {
}
public ShardingLog(String serviceName, String type) {
this.serviceName = serviceName;
this.type = type;
}
public String getServiceName() {
return serviceName;
}
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
}

View File

@ -31,5 +31,6 @@ public class TaskModel {
private String serviceName;
private String groupName;
private String taskStatus;
private String nameSpace;
}

View File

@ -56,6 +56,7 @@ public class TaskDetailProcessor implements Processor<TaskDetailQueryRequest, Ta
taskModel.setSourceClusterId(taskDO.getSourceClusterId());
taskModel.setTaskStatus(taskDO.getTaskStatus());
taskModel.setTaskId(taskDO.getTaskId());
taskModel.setNameSpace(taskDO.getNameSpace());
taskDetailQueryResult.setTaskModel(taskModel);
}

View File

@ -74,6 +74,7 @@ public class TaskListQueryProcessor implements Processor<TaskListQueryRequest, T
taskModel.setServiceName(taskDO.getServiceName());
taskModel.setGroupName(taskDO.getGroupName());
taskModel.setTaskStatus(taskDO.getTaskStatus());
taskModel.setNameSpace(taskDO.getNameSpace());
taskList.add(taskModel);
});

View File

@ -0,0 +1,51 @@
package com.alibaba.nacossync.util;
import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* Created by maj on 2020/11/2.
*/
public class ExpirySet<T> {
private Set<T> set = new CopyOnWriteArraySet();
private Map<T, Long> expiryMap = new ConcurrentHashMap<T, Long>();
private long timeOut = 10;//默认10s窗口期
public ExpirySet() {
}
public ExpirySet(long timeout) {
this.timeOut = timeout;
}
public boolean set(T key) {
if (!set.add(key)) {
if (getIntervalBySecond(expiryMap.get(key), new Date().getTime()) > timeOut) {
expiryMap.putIfAbsent(key, new Date().getTime());
return true;
} else {
return false;
}
} else {
expiryMap.putIfAbsent(key, (new Date().getTime()));
return true;
}
}
public boolean remove(T key) {
return set.remove(key);
}
public long getIntervalBySecond(long beforeTime, long currentTime) {
return (currentTime - beforeTime) / 1000;
}
}

View File

@ -33,14 +33,14 @@ import java.util.Enumeration;
import java.util.UUID;
/**
* @author NacosSync
* @version $Id: SkyWalkerUtil.java, v 0.1 2018-09-26 AM12:10 NacosSync Exp $$
*/
* @author NacosSync
* @version $Id: SkyWalkerUtil.java, v 0.1 2018-09-26 AM12:10 NacosSync Exp $$
*/
public class SkyWalkerUtil {
/**
*
* Gets the string md5
*
* @param value
* @return
*/
@ -69,6 +69,7 @@ public class SkyWalkerUtil {
/**
* The rules of generating taskId
*
* @param addTaskRequest
* @return
*/
@ -116,6 +117,7 @@ public class SkyWalkerUtil {
/**
* Avoid getting a return address
*
* @return
* @throws Exception
*/
@ -153,4 +155,11 @@ public class SkyWalkerUtil {
return UUID.randomUUID().toString();
}
public static String getGroupName(String groupName) {
if (StringUtils.isEmpty(groupName)) {
return "DEFAULT_GROUP";
}
return groupName;
}
}

View File

@ -16,7 +16,6 @@ import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
@ -38,13 +37,13 @@ public final class StringUtils {
.compile("([_.a-zA-Z0-9][-_.a-zA-Z0-9]*)[=](.*)");
private static final Pattern IP_PORT_PATTERN = Pattern
.compile(".*/(.*)://(\\d+\\.\\d+\\.\\d+\\.\\d+):(\\d+)");
private static final Pattern DUBBO_PROVIDER_PATTERN = Pattern
private static final Pattern DUBBO_PROVIDER_PATTERN = Pattern
.compile("/dubbo/(.*)/providers/(.*)");
/**
* parse key-value pair.
*
* @param str string.
* @param str string.
* @param itemSeparator item separator.
* @return key-value map;
*/
@ -119,22 +118,47 @@ public final class StringUtils {
return String.format(DUBBO_PATH_FORMAT, interfaceName);
}
public static String convertDubboFullPathForZk(Map<String, String> metaData, String providersPath, String ip,
int port) {
try {
String urlParam = Joiner.on("&").withKeyValueSeparator("=").join(metaData);
String instanceUrl = String.format(DUBBO_URL_FORMAT, metaData.get(PROTOCOL_KEY), ip, port,
metaData.get(INTERFACE_KEY), urlParam);
public static String convertDubboFullPathForZk(Map<String, String> metaData, String providersPath, String ip,
int port) {
try {
String urlParam = Joiner.on("&").withKeyValueSeparator("=").join(metaData);
String instanceUrl = String.format(DUBBO_URL_FORMAT, metaData.get(PROTOCOL_KEY), ip, port,
metaData.get(INTERFACE_KEY), urlParam);
return Joiner.on(ZOOKEEPER_SEPARATOR).join(providersPath, URLEncoder.encode(instanceUrl, "UTF-8"));
} catch (UnsupportedEncodingException e) {
log.warn("convert Dubbo full path", e);
return "";
}
return Joiner.on(ZOOKEEPER_SEPARATOR).join(providersPath, URLEncoder.encode(instanceUrl, "UTF-8"));
} catch (UnsupportedEncodingException e) {
log.warn("convert Dubbo full path", e);
return "";
}
}
}
public static boolean isDubboProviderPath(String path) {
return DUBBO_PROVIDER_PATTERN.matcher(path).matches();
}
public static boolean isDubboProviderPath(String path) {
return DUBBO_PROVIDER_PATTERN.matcher(path).matches();
}
public static boolean isIPV4AndPort(String addr) {
if (StringUtils.isEmpty(addr)) {
return false;
}
String rexp = "^(([1-9]|([1-9]\\d)|(1\\d\\d)|(2([0-4]\\d|5[0-5])))\\.)(([0-9]|([1-9]\\d)|(1\\d\\d)|(2([0-4]\\d|5[0-5])))\\.){2}([1-9]|([1-9]\\d)|(1\\d\\d)|(2([0-4]\\d|5[0-5]))):([0-9]|[1-9]\\d|[1-9]\\d{2}|[1-9]\\d{3}|[1-5]\\d{4}|6[0-4]\\d{3}|65[0-4]\\d{2}|655[0-2]\\d|6553[0-5])$";
Pattern pat = Pattern.compile(rexp);
Matcher mat = pat.matcher(addr);
return mat.find();
}
public static boolean isIPV4AndPorts(String ips, String reg) {
if (StringUtils.isEmpty(ips)) {
return false;
}
String[] ipvs = ips.split(reg);
for (String ipv : ipvs) {
if (!isIPV4AndPort(ipv))
return false;
}
return true;
}
}

View File

@ -13,3 +13,12 @@ spring.datasource.password=root
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
##以下配置为使用sharding功能时使用nacos维持nacos-sync的节点状态时用到的namespace,gourp,serviceName,如果不配置则使用public-->DEFAULT_GROUP---->com.default.service
#(建议配置,不要和业务耦合在一个空间下,否则会导致做全量同步的时候,这部分节点也会同步)
#url为nacos的连接地址可以配置成ip+port方式也可以通过web配置-->集群配置配置在cluster表中这里填写cluster_id(如果不配置则默认一个md5值)
sharding.nacos.url=3ac01a8c7501f121ab01efb920aa4764
#nanespace建议参考nacos社区需要手动提前配置不会自动创建而且不是填写定义的名字而是md5值默认public除外
sharding.nacos.namespace=6ab2bbda-4b84-42d0-8392-99aa1446cffb
sharding.nacos.groupname=shadinggroup
sharding.nacos.servicename=com.dmall.sharding