Compare commits
20 Commits
Author | SHA1 | Date |
---|---|---|
|
144d5f7a99 | |
|
8188868e16 | |
|
1713fd8d96 | |
|
fa1155911d | |
|
637cbbe327 | |
|
7abca3647d | |
|
9371be3854 | |
|
07770923c5 | |
|
4e8138d3a1 | |
|
c65e98972d | |
|
8b9e7859db | |
|
925a0e5435 | |
|
dd7ac4ae0d | |
|
ce5011e666 | |
|
4527321855 | |
|
0b638f82be | |
|
0067c76bd5 | |
|
6d8c265167 | |
|
3d7d43827b | |
|
ff9f7addfc |
|
@ -19,6 +19,7 @@ class AddSyncDialog extends React.Component {
|
|||
visible: false,
|
||||
destClusterId: '',
|
||||
groupName: '',
|
||||
nameSpace: '',
|
||||
serviceName: '',
|
||||
sourceClusterId: '',
|
||||
version: '',
|
||||
|
@ -31,8 +32,8 @@ class AddSyncDialog extends React.Component {
|
|||
}
|
||||
|
||||
save() {
|
||||
const { destClusterId, groupName, serviceName, sourceClusterId, version } = this.state;
|
||||
add({ destClusterId, groupName, serviceName, sourceClusterId, version })
|
||||
const { destClusterId, groupName, nameSpace, serviceName, sourceClusterId, version } = this.state;
|
||||
add({ destClusterId, groupName, nameSpace, serviceName, sourceClusterId, version })
|
||||
.then(() => {
|
||||
this.props.turnPage(1);
|
||||
this.close();
|
||||
|
@ -75,6 +76,11 @@ class AddSyncDialog extends React.Component {
|
|||
placeholder={locale.groupNamePlaceholder}
|
||||
onChange={groupName => this.setState({ groupName })}
|
||||
/>
|
||||
</FormItem> <FormItem label={`${locale.nameSpace}:`}>
|
||||
<Input
|
||||
placeholder={locale.nameSpacePlaceholder}
|
||||
onChange={nameSpace => this.setState({ nameSpace })}
|
||||
/>
|
||||
</FormItem>
|
||||
{
|
||||
sourceCluster.clusterType === 'ZK' && (
|
||||
|
|
|
@ -118,6 +118,7 @@ class ServiceSync extends React.Component {
|
|||
<Table dataSource={taskModels} loading={loading}>
|
||||
<Table.Column title={locale.serviceName} dataIndex="serviceName" />
|
||||
<Table.Column title={locale.groupName} dataIndex="groupName" />
|
||||
<Table.Column title={locale.nameSpace} dataIndex="nameSpace" />
|
||||
<Table.Column
|
||||
title={locale.sourceCluster}
|
||||
dataIndex="sourceClusterId"
|
||||
|
|
|
@ -40,6 +40,7 @@ const I18N_CONF = {
|
|||
addSync: 'New Sync',
|
||||
serviceName: 'Service Name',
|
||||
groupName: 'Group',
|
||||
nameSpace: 'Namespace',
|
||||
sourceCluster: 'Source Cluster',
|
||||
destCluster: 'Dest Cluster',
|
||||
instancesCount: 'Instances Count',
|
||||
|
@ -60,6 +61,8 @@ const I18N_CONF = {
|
|||
serviceNamePlaceholder: 'Please enter service name',
|
||||
groupName: 'Group Name',
|
||||
groupNamePlaceholder: 'Please enter group name',
|
||||
nameSpace: 'Namespace',
|
||||
nameSpacePlaceholder: 'Please enter namespace',
|
||||
sourceCluster: 'Source Cluster',
|
||||
destCluster: 'Dest Cluster',
|
||||
version: 'Version',
|
||||
|
|
|
@ -40,6 +40,7 @@ const I18N_CONF = {
|
|||
addSync: '新增同步',
|
||||
serviceName: '服务名',
|
||||
groupName: '分组',
|
||||
nameSpace: '命名空间',
|
||||
sourceCluster: '源集群',
|
||||
destCluster: '目标集群',
|
||||
instancesCount: '实例数',
|
||||
|
@ -60,6 +61,8 @@ const I18N_CONF = {
|
|||
serviceNamePlaceholder: '请输入服务名',
|
||||
groupName: '分组名',
|
||||
groupNamePlaceholder: '请输入分组名',
|
||||
nameSpace: '命名空间',
|
||||
nameSpacePlaceholder: '请输入命名空间',
|
||||
sourceCluster: '源集群',
|
||||
destCluster: '目标集群',
|
||||
version: '版本',
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
package com.alibaba.nacossync.constant;
|
||||
|
||||
/**
|
||||
* Created by maj on 2020/11/18.
|
||||
*/
|
||||
public enum ShardingLogTypeEnum {
|
||||
|
||||
ADD("add", "新增"),
|
||||
|
||||
DELETE("DELETE", "删除");
|
||||
|
||||
private String type;
|
||||
private String desc;
|
||||
|
||||
ShardingLogTypeEnum(String type, String desc) {
|
||||
this.type = type;
|
||||
this.desc = desc;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public void setType(String type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public String getDesc() {
|
||||
return desc;
|
||||
}
|
||||
|
||||
public void setDesc(String desc) {
|
||||
this.desc = desc;
|
||||
}
|
||||
|
||||
public static boolean contains(String type) {
|
||||
|
||||
for (ShardingLogTypeEnum shardingLogTypeEnum : ShardingLogTypeEnum.values()) {
|
||||
|
||||
if (shardingLogTypeEnum.getType().equals(type)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -12,14 +12,11 @@
|
|||
*/
|
||||
package com.alibaba.nacossync.extension;
|
||||
|
||||
import static com.alibaba.nacossync.util.SkyWalkerUtil.generateSyncKey;
|
||||
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
|
||||
import com.alibaba.nacossync.constant.ClusterTypeEnum;
|
||||
import com.alibaba.nacossync.extension.annotation.NacosSyncService;
|
||||
import com.alibaba.nacossync.pojo.model.TaskDO;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
|
@ -27,6 +24,10 @@ import org.springframework.context.ApplicationContext;
|
|||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import static com.alibaba.nacossync.util.SkyWalkerUtil.generateSyncKey;
|
||||
|
||||
/**
|
||||
* @author NacosSync
|
||||
* @version $Id: SyncManagerService.java, v 0.1 2018-09-25 PM5:17 NacosSync Exp $$
|
||||
|
@ -47,15 +48,12 @@ public class SyncManagerService implements InitializingBean, ApplicationContextA
|
|||
}
|
||||
|
||||
public boolean delete(TaskDO taskDO) throws NacosException {
|
||||
|
||||
return getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId()).delete(taskDO);
|
||||
|
||||
}
|
||||
|
||||
public boolean sync(TaskDO taskDO) {
|
||||
|
||||
return getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId()).sync(taskDO);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -80,5 +78,4 @@ public class SyncManagerService implements InitializingBean, ApplicationContextA
|
|||
|
||||
return syncServiceMap.get(generateSyncKey(sourceClusterType, destClusterType));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -12,9 +12,6 @@
|
|||
*/
|
||||
package com.alibaba.nacossync.extension.impl;
|
||||
|
||||
import static com.alibaba.nacossync.util.StringUtils.convertDubboFullPathForZk;
|
||||
import static com.alibaba.nacossync.util.StringUtils.convertDubboProvidersPath;
|
||||
|
||||
import com.alibaba.nacos.api.naming.NamingService;
|
||||
import com.alibaba.nacos.api.naming.listener.EventListener;
|
||||
import com.alibaba.nacos.api.naming.listener.NamingEvent;
|
||||
|
@ -28,17 +25,13 @@ import com.alibaba.nacossync.extension.SyncService;
|
|||
import com.alibaba.nacossync.extension.annotation.NacosSyncService;
|
||||
import com.alibaba.nacossync.extension.holder.NacosServerHolder;
|
||||
import com.alibaba.nacossync.extension.holder.ZookeeperServerHolder;
|
||||
import com.alibaba.nacossync.extension.impl.extend.NacosSyncToZookeeperServicesSharding;
|
||||
import com.alibaba.nacossync.extension.impl.extend.Sharding;
|
||||
import com.alibaba.nacossync.monitor.MetricsManager;
|
||||
import com.alibaba.nacossync.pojo.model.TaskDO;
|
||||
import com.alibaba.nacossync.util.DubboConstants;
|
||||
import com.alibaba.nacossync.util.ExpirySet;
|
||||
import com.google.common.collect.Sets;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
|
||||
|
@ -47,6 +40,16 @@ import org.apache.curator.utils.CloseableUtils;
|
|||
import org.apache.zookeeper.CreateMode;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import static com.alibaba.nacossync.util.StringUtils.convertDubboFullPathForZk;
|
||||
import static com.alibaba.nacossync.util.StringUtils.convertDubboProvidersPath;
|
||||
|
||||
/**
|
||||
* Nacos 同步 Zk 数据
|
||||
*
|
||||
|
@ -90,6 +93,13 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
|
|||
|
||||
private final ZookeeperServerHolder zookeeperServerHolder;
|
||||
|
||||
private static ExpirySet<String> serviceNameSet = new ExpirySet<String>();
|
||||
|
||||
private static ExecutorService EXECUTOR = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
|
||||
|
||||
@Resource(type = NacosSyncToZookeeperServicesSharding.class)
|
||||
private Sharding sharding;
|
||||
|
||||
@Autowired
|
||||
public NacosSyncToZookeeperServiceImpl(SkyWalkerCacheServices skyWalkerCacheServices,
|
||||
NacosServerHolder nacosServerHolder, ZookeeperServerHolder zookeeperServerHolder) {
|
||||
|
@ -101,9 +111,9 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
|
|||
@Override
|
||||
public boolean delete(TaskDO taskDO) {
|
||||
try {
|
||||
|
||||
NamingService sourceNamingService =
|
||||
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());
|
||||
//nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());//
|
||||
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace());//fix with no nameSpaceName
|
||||
EventListener eventListener = nacosListenerMap.remove(taskDO.getTaskId());
|
||||
PathChildrenCache pathChildrenCache = pathChildrenCacheMap.get(taskDO.getTaskId());
|
||||
sourceNamingService.unsubscribe(taskDO.getServiceName(), eventListener);
|
||||
|
@ -113,6 +123,7 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
|
|||
for (String instanceUrl : instanceUrlSet) {
|
||||
client.delete().quietly().forPath(instanceUrl);
|
||||
}
|
||||
sharding.stop(taskDO);
|
||||
} catch (Exception e) {
|
||||
log.error("delete task from nacos to zk was failed, taskId:{}", taskDO.getTaskId(), e);
|
||||
metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
|
||||
|
@ -123,37 +134,7 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
|
|||
|
||||
@Override
|
||||
public boolean sync(TaskDO taskDO) {
|
||||
try {
|
||||
NamingService sourceNamingService =
|
||||
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());
|
||||
CuratorFramework client = zookeeperServerHolder.get(taskDO.getDestClusterId(), taskDO.getGroupName());
|
||||
nacosListenerMap.putIfAbsent(taskDO.getTaskId(), event -> {
|
||||
if (event instanceof NamingEvent) {
|
||||
try {
|
||||
|
||||
List<Instance> sourceInstances = sourceNamingService.getAllInstances(taskDO.getServiceName());
|
||||
Set<String> newInstanceUrlSet = getWaitingToAddInstance(taskDO, client, sourceInstances);
|
||||
|
||||
// 获取之前的备份 删除无效实例
|
||||
deleteInvalidInstances(taskDO, client, newInstanceUrlSet);
|
||||
// 替换当前备份为最新备份
|
||||
instanceBackupMap.put(taskDO.getTaskId(), newInstanceUrlSet);
|
||||
// 尝试恢复因为zk客户端意外断开导致的实例数据
|
||||
tryToCompensate(taskDO, sourceNamingService, sourceInstances);
|
||||
} catch (Exception e) {
|
||||
log.error("event process fail, taskId:{}", taskDO.getTaskId(), e);
|
||||
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
|
||||
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
sourceNamingService.subscribe(taskDO.getServiceName(), nacosListenerMap.get(taskDO.getTaskId()));
|
||||
} catch (Exception e) {
|
||||
log.error("sync task from nacos to zk was failed, taskId:{}", taskDO.getTaskId(), e);
|
||||
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
|
||||
return false;
|
||||
}
|
||||
sharding.start(taskDO);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -198,11 +179,13 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
|
|||
HashSet<String> waitingToAddInstance = new HashSet<>();
|
||||
for (Instance instance : sourceInstances) {
|
||||
if (needSync(instance.getMetadata())) {
|
||||
log.info("nacos->zk ,real sync service :{},and instance :{}", instance.getServiceName(), instance.getIp());
|
||||
String instanceUrl = buildSyncInstance(instance, taskDO);
|
||||
if (null == client.checkExists().forPath(instanceUrl)) {
|
||||
if (null != client.checkExists().forPath(instanceUrl)) {
|
||||
client.delete().quietly().forPath(instanceUrl);
|
||||
}
|
||||
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
|
||||
.forPath(instanceUrl);
|
||||
}
|
||||
waitingToAddInstance.add(instanceUrl);
|
||||
}
|
||||
}
|
||||
|
@ -246,4 +229,74 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
|
|||
}
|
||||
|
||||
|
||||
private class SyncThread implements Runnable {
|
||||
|
||||
NamingService sourceNamingService;
|
||||
|
||||
TaskDO taskDO;
|
||||
|
||||
CuratorFramework client;
|
||||
|
||||
SyncThread(NamingService sourceNamingService, TaskDO taskDO, CuratorFramework client) {
|
||||
this.sourceNamingService = sourceNamingService;
|
||||
this.taskDO = taskDO;
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
|
||||
//List<Instance> sourceInstances = sourceNamingService.getAllInstances(taskDO.getServiceName());
|
||||
List<Instance> sourceInstances = sourceNamingService.getAllInstances(taskDO.getServiceName(), taskDO.getGroupName());//fix with no group
|
||||
Set<String> newInstanceUrlSet = getWaitingToAddInstance(taskDO, client, sourceInstances);
|
||||
|
||||
// 获取之前的备份 删除无效实例
|
||||
deleteInvalidInstances(taskDO, client, newInstanceUrlSet);
|
||||
// 替换当前备份为最新备份
|
||||
instanceBackupMap.put(taskDO.getTaskId(), newInstanceUrlSet);
|
||||
// 尝试恢复因为zk客户端意外断开导致的实例数据
|
||||
tryToCompensate(taskDO, sourceNamingService, filterNeedSync(sourceInstances));
|
||||
} catch (Exception e) {
|
||||
log.error("event process fail, taskId:{}", taskDO.getTaskId(), e);
|
||||
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
|
||||
|
||||
} finally {
|
||||
//serviceNameSet.remove(((NamingEvent) event).getServiceName());//如果考虑高实时性 可以手动remove 这样时间窗口的大小就不固定 依赖处理速度 窗口大小作为兜底
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private List<Instance> filterNeedSync(List<Instance> sourceInstances) {
|
||||
Iterator<Instance> iterator = sourceInstances.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
if (!needSync(iterator.next().getMetadata())) {
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
return sourceInstances;
|
||||
}
|
||||
|
||||
public boolean addSyncService(TaskDO taskDO) {
|
||||
try {
|
||||
NamingService sourceNamingService =
|
||||
//nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());
|
||||
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace());//fix with no nameSpaceName
|
||||
CuratorFramework client = zookeeperServerHolder.get(taskDO.getDestClusterId(), taskDO.getGroupName());
|
||||
nacosListenerMap.putIfAbsent(taskDO.getTaskId(), event -> {
|
||||
if (event instanceof NamingEvent) {
|
||||
if (serviceNameSet.set(((NamingEvent) event).getServiceName())) {// add event merge
|
||||
EXECUTOR.execute(new SyncThread(sourceNamingService, taskDO, client));
|
||||
}
|
||||
}
|
||||
});
|
||||
sourceNamingService.subscribe(taskDO.getServiceName(), nacosListenerMap.get(taskDO.getTaskId()));
|
||||
} catch (Exception e) {
|
||||
log.error("sync task from nacos to zk was failed, taskId:{}", taskDO.getTaskId(), e);
|
||||
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -12,41 +12,24 @@
|
|||
*/
|
||||
package com.alibaba.nacossync.extension.impl;
|
||||
|
||||
import static com.alibaba.nacossync.util.DubboConstants.ALL_SERVICE_NAME_PATTERN;
|
||||
import static com.alibaba.nacossync.util.DubboConstants.DUBBO_PATH_FORMAT;
|
||||
import static com.alibaba.nacossync.util.DubboConstants.DUBBO_ROOT_PATH;
|
||||
import static com.alibaba.nacossync.util.DubboConstants.GROUP_KEY;
|
||||
import static com.alibaba.nacossync.util.DubboConstants.INSTANCE_IP_KEY;
|
||||
import static com.alibaba.nacossync.util.DubboConstants.INSTANCE_PORT_KEY;
|
||||
import static com.alibaba.nacossync.util.DubboConstants.INTERFACE_KEY;
|
||||
import static com.alibaba.nacossync.util.DubboConstants.PROTOCOL_KEY;
|
||||
import static com.alibaba.nacossync.util.DubboConstants.VERSION_KEY;
|
||||
import static com.alibaba.nacossync.util.DubboConstants.WEIGHT_KEY;
|
||||
import static com.alibaba.nacossync.util.DubboConstants.ZOOKEEPER_SEPARATOR;
|
||||
import static com.alibaba.nacossync.util.DubboConstants.createServiceName;
|
||||
import static com.alibaba.nacossync.util.StringUtils.parseIpAndPortString;
|
||||
import static com.alibaba.nacossync.util.StringUtils.parseQueryString;
|
||||
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.alibaba.nacos.api.naming.NamingService;
|
||||
import com.alibaba.nacos.api.naming.pojo.Instance;
|
||||
import com.alibaba.nacos.client.naming.NacosNamingService;
|
||||
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
|
||||
import com.alibaba.nacossync.constant.ClusterTypeEnum;
|
||||
import com.alibaba.nacossync.constant.MetricsStatisticsType;
|
||||
import com.alibaba.nacossync.constant.ShardingLogTypeEnum;
|
||||
import com.alibaba.nacossync.constant.SkyWalkerConstants;
|
||||
import com.alibaba.nacossync.extension.SyncService;
|
||||
import com.alibaba.nacossync.extension.annotation.NacosSyncService;
|
||||
import com.alibaba.nacossync.extension.holder.NacosServerHolder;
|
||||
import com.alibaba.nacossync.extension.holder.ZookeeperServerHolder;
|
||||
import com.alibaba.nacossync.extension.impl.extend.Sharding;
|
||||
import com.alibaba.nacossync.extension.impl.extend.ZookeeperSyncToNacosServiceSharding;
|
||||
import com.alibaba.nacossync.monitor.MetricsManager;
|
||||
import com.alibaba.nacossync.pojo.ShardingLog;
|
||||
import com.alibaba.nacossync.pojo.model.TaskDO;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Predicate;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
|
@ -55,6 +38,17 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
|
|||
import org.apache.curator.utils.CloseableUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static com.alibaba.nacossync.util.DubboConstants.*;
|
||||
import static com.alibaba.nacossync.util.StringUtils.parseIpAndPortString;
|
||||
import static com.alibaba.nacossync.util.StringUtils.parseQueryString;
|
||||
|
||||
/**
|
||||
* @author paderlol
|
||||
* @version 1.0
|
||||
|
@ -82,6 +76,12 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
|||
|
||||
private final SkyWalkerCacheServices skyWalkerCacheServices;
|
||||
|
||||
@Resource(type = ZookeeperSyncToNacosServiceSharding.class)
|
||||
private Sharding sharding;
|
||||
|
||||
//排除/dobbo下面的所有非服务节点
|
||||
private static final List<String> IGNORED_DUBBO_PATH = Stream.of("mapping", "metadata", "yellow").collect(Collectors.toList());
|
||||
|
||||
@Autowired
|
||||
public ZookeeperSyncToNacosServiceImpl(ZookeeperServerHolder zookeeperServerHolder,
|
||||
NacosServerHolder nacosServerHolder, SkyWalkerCacheServices skyWalkerCacheServices) {
|
||||
|
@ -96,7 +96,6 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
|||
if (treeCacheMap.containsKey(taskDO.getTaskId())) {
|
||||
return true;
|
||||
}
|
||||
|
||||
TreeCache treeCache = getTreeCache(taskDO);
|
||||
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), null);
|
||||
// 初次执行任务统一注册所有实例
|
||||
|
@ -104,10 +103,16 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
|||
//注册ZK监听
|
||||
Objects.requireNonNull(treeCache).getListenable().addListener((client, event) -> {
|
||||
try {
|
||||
|
||||
String path = event.getData().getPath();
|
||||
if (!com.alibaba.nacossync.util.StringUtils.isDubboProviderPath(path)) {
|
||||
return;
|
||||
}
|
||||
Map<String, String> queryParam = parseQueryString(path);
|
||||
//add sharding
|
||||
if (!isProcess(taskDO, destNamingService, queryParam.get(INTERFACE_KEY)))
|
||||
return;
|
||||
if (isMatch(taskDO, queryParam) && needSync(queryParam)) {
|
||||
log.info("sync sharding Zookeeper to Nacos serviceName:{},local servicesName :{}", queryParam.get(INTERFACE_KEY), sharding.getLocalServices(null));
|
||||
processEvent(taskDO, destNamingService, event, path, queryParam);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
@ -126,9 +131,6 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
|||
|
||||
private void processEvent(TaskDO taskDO, NamingService destNamingService, TreeCacheEvent event, String path,
|
||||
Map<String, String> queryParam) throws NacosException {
|
||||
if(!com.alibaba.nacossync.util.StringUtils.isDubboProviderPath(path)) {
|
||||
return;
|
||||
}
|
||||
|
||||
Map<String, String> ipAndPortParam = parseIpAndPortString(path);
|
||||
Instance instance = buildSyncInstance(queryParam, ipAndPortParam, taskDO);
|
||||
|
@ -139,6 +141,8 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
|||
|
||||
destNamingService.registerInstance(
|
||||
getServiceNameFromCache(serviceName, queryParam), instance);
|
||||
//getServiceNameFromCache(serviceName, queryParam, instance), instance);
|
||||
log.info("syn add service : {} ,instance:{}", serviceName, instance);
|
||||
break;
|
||||
case NODE_REMOVED:
|
||||
|
||||
|
@ -147,6 +151,7 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
|||
ipAndPortParam.get(INSTANCE_IP_KEY),
|
||||
Integer.parseInt(ipAndPortParam.get(INSTANCE_PORT_KEY)));
|
||||
nacosServiceNameMap.remove(serviceName);
|
||||
log.info("syn delete service : {} ,instance:{}", serviceName, instance);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
@ -155,12 +160,20 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
|||
|
||||
private void registerAllInstances(TaskDO taskDO, NamingService destNamingService) throws Exception {
|
||||
CuratorFramework zk = zookeeperServerHolder.get(taskDO.getSourceClusterId(), "");
|
||||
sharding.start(taskDO);//幂等 可重复添加
|
||||
if (!ALL_SERVICE_NAME_PATTERN.equals(taskDO.getServiceName())) {
|
||||
sharding.doSharding(null, new ArrayList<>(Arrays.asList(taskDO.getServiceName())));
|
||||
TreeSet<String> shardingServices = sharding.getLocalServices(null);
|
||||
if (shardingServices.contains(taskDO.getServiceName())) {
|
||||
registerALLInstances0(taskDO, destNamingService, zk, taskDO.getServiceName());
|
||||
}
|
||||
} else {
|
||||
// 同步全部
|
||||
List<String> serviceList = zk.getChildren().forPath(DUBBO_ROOT_PATH);
|
||||
sharding.doSharding(null, filterNoProviderPath(serviceList));
|
||||
TreeSet<String> shardingServices = sharding.getLocalServices(null);
|
||||
for (String serviceName : serviceList) {
|
||||
if (shardingServices.contains(serviceName) && !IGNORED_DUBBO_PATH.contains(serviceName))//add
|
||||
registerALLInstances0(taskDO, destNamingService, zk, serviceName);
|
||||
}
|
||||
}
|
||||
|
@ -226,8 +239,6 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
|||
}
|
||||
|
||||
|
||||
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("delete task from zookeeper to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
|
||||
metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
|
||||
|
@ -262,7 +273,7 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
|||
protected boolean isMatch(TaskDO taskDO, Map<String, String> queryParam) {
|
||||
Predicate<TaskDO> isVersionEq = (task) -> StringUtils.isBlank(taskDO.getVersion())
|
||||
|| StringUtils.equals(task.getVersion(), queryParam.get(VERSION_KEY));
|
||||
Predicate<TaskDO> isGroupEq = (task) -> StringUtils.isBlank(taskDO.getGroupName())
|
||||
Predicate<TaskDO> isGroupEq = (task) -> StringUtils.isBlank(taskDO.getGroupName()) || StringUtils.isBlank(queryParam.get(GROUP_KEY)) //fix
|
||||
|| StringUtils.equals(task.getGroupName(), queryParam.get(GROUP_KEY));
|
||||
return isVersionEq.and(isGroupEq).test(taskDO);
|
||||
}
|
||||
|
@ -302,4 +313,70 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
|
|||
return nacosServiceNameMap.computeIfAbsent(serviceName, (key) -> createServiceName(queryParam));
|
||||
}
|
||||
|
||||
private List<String> filterNoProviderPath(List<String> sourceInstances) {
|
||||
Iterator<String> iterator = sourceInstances.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
if (IGNORED_DUBBO_PATH.contains(iterator.next())) {
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
return sourceInstances;
|
||||
}
|
||||
|
||||
/**
|
||||
* 取消service下的instance注册,防止在server变化的时候,server之前sharding的service被分配到其他server上,这里需要手动将这部分instance下线;
|
||||
* (这里没有直接调用Nacos的deregisterInstance是因为在当前分布式下存在时序问题,可能导致该任务误删除其他server刚注册上的instance,这里使用停止发送心跳的方法,让instance自己下线)
|
||||
*
|
||||
* @param namingService
|
||||
* @param serviceNames
|
||||
*/
|
||||
private void deregisterService(NamingService namingService, Queue<ShardingLog> serviceNames, TaskDO taskDO) {
|
||||
log.info("zk->nacos current deal with serviceNames:" + sharding.getLocalServices(null));
|
||||
log.info("zk->nacos current change serviceNames count:" + serviceNames.size());
|
||||
while (!serviceNames.isEmpty()) {
|
||||
ShardingLog shardingLog = serviceNames.poll();
|
||||
if (!shardingLog.getType().equals(ShardingLogTypeEnum.DELETE.getType())) {
|
||||
log.info("zk->nacos current add serviceName:{},will skip...", shardingLog.getServiceName());
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
List<Instance> allInstances =
|
||||
namingService.getAllInstances(nacosServiceNameMap.get(shardingLog.getServiceName()));
|
||||
for (Instance instance : allInstances) {
|
||||
if (needDelete(instance.getMetadata(), taskDO)) {
|
||||
log.info("zk->nacos current will stop beat:" + instance.getIp() + instance.getPort() + " ,key:" + instance.getServiceName());
|
||||
((NacosNamingService) namingService).getBeatReactor().removeBeatInfo(instance.getServiceName(), instance.getIp(), instance.getPort());
|
||||
}
|
||||
nacosServiceNameMap.remove(shardingLog.getServiceName());
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("deregisterService faild ,cause by:{}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否本机处理的service
|
||||
*
|
||||
* @param taskDO
|
||||
* @param destNamingService
|
||||
* @param serviceName
|
||||
* @return
|
||||
*/
|
||||
private boolean isProcess(TaskDO taskDO, NamingService destNamingService, String serviceName) {
|
||||
try {
|
||||
if (IGNORED_DUBBO_PATH.contains(serviceName))
|
||||
return false;
|
||||
sharding.doSharding(null, new ArrayList<>(Arrays.asList(serviceName)));
|
||||
deregisterService(destNamingService, sharding.getChangeService(), taskDO);
|
||||
if (sharding.getLocalServices(null).contains(serviceName)) {
|
||||
return true;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("zk->nacos sharding faild ,taskid:{}", taskDO.getId(), e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,210 @@
|
|||
package com.alibaba.nacossync.extension.impl.extend;
|
||||
|
||||
import com.alibaba.nacos.api.naming.NamingService;
|
||||
import com.alibaba.nacos.client.naming.utils.NetUtils;
|
||||
import com.alibaba.nacossync.constant.ShardingLogTypeEnum;
|
||||
import com.alibaba.nacossync.extension.SyncManagerService;
|
||||
import com.alibaba.nacossync.extension.holder.NacosServerHolder;
|
||||
import com.alibaba.nacossync.extension.impl.NacosSyncToZookeeperServiceImpl;
|
||||
import com.alibaba.nacossync.extension.sharding.ConsistentHashServiceSharding;
|
||||
import com.alibaba.nacossync.extension.sharding.ServiceSharding;
|
||||
import com.alibaba.nacossync.pojo.ShardingLog;
|
||||
import com.alibaba.nacossync.pojo.model.TaskDO;
|
||||
import com.alibaba.nacossync.util.DubboConstants;
|
||||
import com.alibaba.nacossync.util.SkyWalkerUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
/**
|
||||
* Created by maj on 2020/10/29.
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class NacosSyncToZookeeperServicesSharding implements Sharding {
|
||||
|
||||
@Autowired
|
||||
private NacosServerHolder nacosServerHolder;
|
||||
|
||||
public static final int DEFAULT_SERVICE_PAGENO = 1;
|
||||
|
||||
public static final int DEFAULT_SERVICE_PAGE_SIZE = 10000;
|
||||
|
||||
private volatile String serviceListMd5;
|
||||
|
||||
@Autowired
|
||||
private SyncManagerService syncManagerService;
|
||||
|
||||
@Lazy
|
||||
@Resource(type = ConsistentHashServiceSharding.class)
|
||||
private ServiceSharding serviceSharding;
|
||||
|
||||
private static final String SHARDING_KEY_NAME = NacosSyncToZookeeperServicesSharding.class.getName();
|
||||
|
||||
private static final long DEFAULT_SERVICES_CHANGE_THREAD_DELAY = 10;
|
||||
|
||||
private static final long DEFAULT_SERVICES_CHANGE_THREAD_INTERVAL = 5;
|
||||
|
||||
//add cache taskDO
|
||||
private Map<String, TaskDO> taskDOMap = new ConcurrentHashMap<>();
|
||||
|
||||
@Value("${server.port}")
|
||||
private String serverPort;
|
||||
|
||||
|
||||
private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread thread = new Thread(r);
|
||||
thread.setDaemon(true);
|
||||
thread.setName(" com.alibaba.nacossync.sharding.getServiceName");
|
||||
return thread;
|
||||
}
|
||||
});
|
||||
|
||||
protected boolean servicesIschanged(TaskDO taskDO) throws Exception {
|
||||
NamingService sourceNamingService =
|
||||
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace());
|
||||
List<String> serviceNames = sourceNamingService.getServicesOfServer(DEFAULT_SERVICE_PAGENO, DEFAULT_SERVICE_PAGE_SIZE, SkyWalkerUtil.getGroupName(taskDO.getGroupName())).getData();
|
||||
Collections.sort(serviceNames);
|
||||
String md5 = SkyWalkerUtil.StringToMd5(serviceNames.toString());
|
||||
if (!md5.equals(serviceListMd5)) {
|
||||
serviceListMd5 = md5;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
protected synchronized void reSubscribeService(TaskDO taskDO) {
|
||||
log.error("reSubscribe start");
|
||||
if (Objects.isNull(taskDO)) return;
|
||||
try {
|
||||
NamingService sourceNamingService =
|
||||
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getNameSpace());
|
||||
List<String> serviceNames = sourceNamingService.getServicesOfServer(DEFAULT_SERVICE_PAGENO, DEFAULT_SERVICE_PAGE_SIZE, taskDO.getGroupName()).getData();//如果使用同一个groupName暂时没问题,如果配置了多个group,需要升级sdk1.4+,支持按照*的group查询
|
||||
serviceSharding.sharding(SHARDING_KEY_NAME, serviceNames);
|
||||
} catch (Exception e) {
|
||||
log.error("reSubscribe faild,task id:{}", taskDO.getId(), e);
|
||||
}
|
||||
if (!serviceSharding.getChangeServices(SHARDING_KEY_NAME).isEmpty()) {
|
||||
try {
|
||||
syncChangedServices();
|
||||
} catch (Exception e) {
|
||||
log.error("reSubscribe -->delete service faild,task id:{}", taskDO.getId(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//暂时不支持多source_cluster_id,多nammespace维度(默认取第一个task的source_cluster_id和namespace)
|
||||
@Override
|
||||
public void start(TaskDO taskDO) {
|
||||
taskDOMap.putIfAbsent(taskDO.getServiceName(), taskDO);
|
||||
if (!serviceSharding.addServerChange(SHARDING_KEY_NAME, this)) {
|
||||
return;
|
||||
}
|
||||
executorService.scheduleWithFixedDelay(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
if (servicesIschanged(taskDO)) {
|
||||
reSubscribeService(taskDO);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("schedule reSubscribe service thread faild ,task id :", taskDO.getId(), e);
|
||||
}
|
||||
}
|
||||
}, DEFAULT_SERVICES_CHANGE_THREAD_DELAY, DEFAULT_SERVICES_CHANGE_THREAD_INTERVAL, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onServerChange() {
|
||||
if (taskDOMap.size() > 0) {
|
||||
for (TaskDO taskDO : taskDOMap.values()) {//任意取一个taskDo
|
||||
reSubscribeService(taskDO);
|
||||
return;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Queue<ShardingLog> getChangeService() {
|
||||
return serviceSharding.getChangeServices(SHARDING_KEY_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doSharding(String key, List<String> serviceNames) {
|
||||
serviceSharding.sharding(SHARDING_KEY_NAME, serviceNames);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TreeSet<String> getLocalServices(String key) {
|
||||
return serviceSharding.getLocalServices(key);
|
||||
}
|
||||
|
||||
|
||||
private void syncAddedServices(String serviceName) {
|
||||
if (taskDOMap.containsKey(DubboConstants.ALL_SERVICE_NAME_PATTERN)) {//如果有配置为* 的 则不用处理单独配置serviceName的task
|
||||
TaskDO taskDO = buildNewTaskDo(taskDOMap.get(DubboConstants.ALL_SERVICE_NAME_PATTERN), serviceName);
|
||||
((NacosSyncToZookeeperServiceImpl) syncManagerService.getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId())).addSyncService(taskDO);
|
||||
log.info("reSubscribe ,{} is add", serviceName);
|
||||
return;
|
||||
}
|
||||
if (taskDOMap.containsKey(serviceName)) {//如果有配置变更的serviceName,而且sharding到本server则处理
|
||||
TaskDO taskDO = buildNewTaskDo(taskDOMap.get(serviceName), serviceName);
|
||||
((NacosSyncToZookeeperServiceImpl) syncManagerService.getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId())).addSyncService(taskDO);
|
||||
log.info("reSubscribe ,{} is add", serviceName);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void syncRemovedServices(String serviceName) {
|
||||
if (taskDOMap.containsKey(DubboConstants.ALL_SERVICE_NAME_PATTERN)) {
|
||||
TaskDO taskDO = buildNewTaskDo(taskDOMap.get(DubboConstants.ALL_SERVICE_NAME_PATTERN), serviceName);
|
||||
syncManagerService.getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId()).delete(taskDO);
|
||||
log.info("reSubscribe ,{} is remove", serviceName);
|
||||
return;
|
||||
}
|
||||
if (taskDOMap.containsKey(serviceName)) {
|
||||
TaskDO taskDO = buildNewTaskDo(taskDOMap.get(serviceName), serviceName);
|
||||
syncManagerService.getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId()).delete(taskDO);
|
||||
log.info("reSubscribe ,{} is remove", serviceName);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void syncChangedServices() {
|
||||
log.info("reSubscribe ,local ip: {},current sharding service:{}", NetUtils.localIP() + ":" + serverPort, serviceSharding.getLocalServices(SHARDING_KEY_NAME).toString());
|
||||
while (!serviceSharding.getChangeServices(SHARDING_KEY_NAME).isEmpty()) {
|
||||
ShardingLog shardingLog = serviceSharding.getChangeServices(SHARDING_KEY_NAME).poll();
|
||||
if (shardingLog.getType().equals(ShardingLogTypeEnum.ADD.getType())) {
|
||||
syncAddedServices(shardingLog.getServiceName());
|
||||
}
|
||||
if (shardingLog.getType().equals(ShardingLogTypeEnum.DELETE.getType())) {
|
||||
syncRemovedServices(shardingLog.getServiceName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(TaskDO taskDO) {
|
||||
if (taskDOMap.containsKey(taskDO.getServiceName())) {
|
||||
taskDOMap.remove(taskDO.getServiceName());
|
||||
}
|
||||
}
|
||||
|
||||
private TaskDO buildNewTaskDo(TaskDO taskDO, String serviceName) {
|
||||
TaskDO taskDO1 = new TaskDO();
|
||||
BeanUtils.copyProperties(taskDO, taskDO1);
|
||||
taskDO1.setTaskId(serviceName);//需要一个key替换以前的taskid很多封装维度,暂时使用serviceName
|
||||
taskDO1.setServiceName(serviceName);
|
||||
return taskDO1;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
package com.alibaba.nacossync.extension.impl.extend;
|
||||
|
||||
import com.alibaba.nacossync.pojo.ShardingLog;
|
||||
import com.alibaba.nacossync.pojo.model.TaskDO;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.TreeSet;
|
||||
|
||||
/**
|
||||
* Created by maj on 2020/10/30.
|
||||
*/
|
||||
public interface Sharding {
|
||||
|
||||
public void onServerChange();
|
||||
|
||||
public void start(TaskDO taskDO);
|
||||
|
||||
public void stop(TaskDO taskDO);
|
||||
|
||||
public void doSharding(String key, List<String> serviceNames);
|
||||
|
||||
public TreeSet<String> getLocalServices(String key);
|
||||
|
||||
public Queue<ShardingLog> getChangeService();
|
||||
}
|
|
@ -0,0 +1,84 @@
|
|||
package com.alibaba.nacossync.extension.impl.extend;
|
||||
|
||||
import com.alibaba.nacossync.extension.sharding.ConsistentHashServiceSharding;
|
||||
import com.alibaba.nacossync.extension.sharding.ServiceSharding;
|
||||
import com.alibaba.nacossync.pojo.ShardingLog;
|
||||
import com.alibaba.nacossync.pojo.model.TaskDO;
|
||||
import com.alibaba.nacossync.util.SkyWalkerUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.TreeSet;
|
||||
|
||||
/**
|
||||
* Created by maj on 2020/10/30.
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class ZookeeperSyncToNacosServiceSharding implements Sharding {
|
||||
|
||||
private static final String SHARDING_KEY_NAME = ZookeeperSyncToNacosServiceSharding.class.getName();
|
||||
|
||||
private volatile String serviceListMd5;
|
||||
|
||||
private volatile boolean serverChange = false;
|
||||
|
||||
@Lazy
|
||||
@Resource(type = ConsistentHashServiceSharding.class)
|
||||
private ServiceSharding serviceSharding;
|
||||
|
||||
|
||||
@Override
|
||||
public void onServerChange() {
|
||||
serverChange = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(TaskDO taskDO) {
|
||||
serviceSharding.addServerChange(SHARDING_KEY_NAME, this);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Queue<ShardingLog> getChangeService() {
|
||||
return serviceSharding.getChangeServices(SHARDING_KEY_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doSharding(String key, List<String> serviceNames) {
|
||||
try {
|
||||
if (servicesIschanged(serviceNames) || serverChange) {
|
||||
log.info("zk ->nacos reshading start");
|
||||
serverChange = false;
|
||||
serviceSharding.sharding(SHARDING_KEY_NAME, serviceNames);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("zk ->nacos reshading faild.", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TreeSet<String> getLocalServices(String key) {
|
||||
return serviceSharding.getLocalServices(SHARDING_KEY_NAME);
|
||||
}
|
||||
|
||||
protected boolean servicesIschanged(List<String> serviceNames) throws Exception {//zk区分不了是service变化还是instance变化
|
||||
Collections.sort(serviceNames);
|
||||
String md5 = SkyWalkerUtil.StringToMd5(serviceNames.toString());
|
||||
if (!md5.equals(serviceListMd5)) {
|
||||
serviceListMd5 = md5;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(TaskDO taskDO) {
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,148 @@
|
|||
package com.alibaba.nacossync.extension.sharding;
|
||||
|
||||
import com.alibaba.nacos.api.naming.listener.Event;
|
||||
import com.alibaba.nacos.api.naming.listener.EventListener;
|
||||
import com.alibaba.nacos.client.naming.utils.NetUtils;
|
||||
import com.alibaba.nacossync.constant.ShardingLogTypeEnum;
|
||||
import com.alibaba.nacossync.extension.impl.extend.Sharding;
|
||||
import com.alibaba.nacossync.pojo.ShardingLog;
|
||||
import com.alibaba.nacossync.util.SkyWalkerUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
/**
|
||||
* Created by maj on 2020/10/27.
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class AbstractServiceSharding implements ServiceSharding, InitializingBean {
|
||||
|
||||
protected volatile List<String> servers = new LinkedList<String>();
|
||||
|
||||
private Map<String, ConcurrentLinkedQueue<ShardingLog>> localServicesChangeMap = new ConcurrentHashMap<String, ConcurrentLinkedQueue<ShardingLog>>();
|
||||
|
||||
private volatile Map<String, TreeSet<String>> localServicesMap = new ConcurrentHashMap<String, TreeSet<String>>();
|
||||
|
||||
private final static String LOCAL_IP = NetUtils.localIP();
|
||||
|
||||
private volatile String serverListMd5;
|
||||
|
||||
private Map<String, Sharding> serverListens = new ConcurrentHashMap<String, Sharding>();
|
||||
|
||||
@Value("${server.port}")
|
||||
private String serverPort;
|
||||
|
||||
@Lazy
|
||||
@Resource(type = NacosServersManager.class)
|
||||
private ServersManager serversManager;
|
||||
|
||||
protected List<String> getServers() {
|
||||
return servers;
|
||||
}
|
||||
|
||||
protected void listenServer() {
|
||||
try {
|
||||
serversManager.subscribeServers(new EventListener() {
|
||||
@Override
|
||||
public void onEvent(Event event) {
|
||||
try {
|
||||
shadingServers();
|
||||
for (Sharding sharding : serverListens.values()) {
|
||||
sharding.onServerChange();
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("subscribe servers faild.", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
log.error("subscribe servers faild.", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected void shadingServers() throws Exception {
|
||||
List<String> serversList = serversManager.getServers();
|
||||
Collections.sort(serversList);
|
||||
String md5 = SkyWalkerUtil.StringToMd5(serversList.toString());
|
||||
if (!md5.equals(serverListMd5)) {
|
||||
servers = serversList;
|
||||
serverListMd5 = md5;
|
||||
doSharding();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//need fix: 暂时同步话解决
|
||||
protected void shadingServices(String key, List<String> serviceNames) {
|
||||
if (!localServicesMap.containsKey(key)) {
|
||||
TreeSet<String> localServicesSet = new TreeSet<String>();
|
||||
localServicesMap.putIfAbsent(key, localServicesSet);
|
||||
}
|
||||
if (!localServicesChangeMap.containsKey(key)) {
|
||||
ConcurrentLinkedQueue<ShardingLog> removeQueue = new ConcurrentLinkedQueue<ShardingLog>();
|
||||
localServicesChangeMap.putIfAbsent(key, removeQueue);
|
||||
}
|
||||
TreeSet<String> localServices = localServicesMap.get(key);
|
||||
try {
|
||||
for (String serviceName : serviceNames) {
|
||||
if (getShardingServer(serviceName).equals(LOCAL_IP + ":" + serverPort)) {
|
||||
if (!localServices.contains(serviceName)) {
|
||||
localServicesMap.get(key).add(serviceName);
|
||||
localServicesChangeMap.get(key).offer(new ShardingLog(serviceName, ShardingLogTypeEnum.ADD.getType()));
|
||||
}
|
||||
} else {
|
||||
if (localServices.contains(serviceName)) {
|
||||
localServicesMap.get(key).remove(serviceName);
|
||||
localServicesChangeMap.get(key).offer(new ShardingLog(serviceName, ShardingLogTypeEnum.DELETE.getType()));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("shading services faild.", e);
|
||||
}
|
||||
}
|
||||
|
||||
//need fix:按照service维度做sharding,但是在service维度存在zk->nacos nacos->zk两种service,而目前避免环的处理在instance维度的metadata中,如果每次做sharding都去判断instance消耗太大,而且也不能完全避免service中存在多种源的instance,故目前做法是按照zk和nacos注册上的所有
|
||||
//serviceName List做sharding,可能存在sharding不均衡问题,如导致大部分的service都落在一个node上的可能
|
||||
@Override
|
||||
public void sharding(String key, List<String> serviceNames) {
|
||||
try {
|
||||
shadingServices(key, serviceNames);
|
||||
} catch (Exception e) {
|
||||
log.error("sharding faild. sharding key is:{}", key, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean addServerChange(String key, Sharding sharding) {
|
||||
return serverListens.putIfAbsent(key, sharding) == null ? true : false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TreeSet<String> getLocalServices(String key) {
|
||||
return localServicesMap.get(key);
|
||||
}
|
||||
|
||||
protected abstract void doSharding();
|
||||
|
||||
protected abstract String getShardingServer(String key);
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
listenServer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Queue<ShardingLog> getChangeServices(String key) {
|
||||
return localServicesChangeMap.get(key);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
package com.alibaba.nacossync.extension.sharding;
|
||||
|
||||
import com.alibaba.nacos.common.utils.StringUtils;
|
||||
import com.alibaba.nacossync.util.SkyWalkerUtil;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* Created by maj on 2020/10/27.
|
||||
*/
|
||||
@Service
|
||||
@Lazy
|
||||
public class ConsistentHashServiceSharding extends AbstractServiceSharding {
|
||||
|
||||
public static final String HASH_NODES = "-hash.vn.nodes";
|
||||
|
||||
private List<String> nodes = new LinkedList<String>();
|
||||
|
||||
private SortedMap<Integer, String> virtualNodes = new TreeMap<Integer, String>();
|
||||
|
||||
private static final int VIRTUAL_COUNT = 100;
|
||||
|
||||
public ConsistentHashServiceSharding() {
|
||||
super();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void doSharding() {
|
||||
List<String> servers = getServers();
|
||||
nodes.clear();
|
||||
virtualNodes.clear();
|
||||
for (String node : servers) {
|
||||
nodes.add(node);
|
||||
}
|
||||
for (String node : nodes) {
|
||||
for (int i = 0; i < VIRTUAL_COUNT; i++) {
|
||||
String virtualNodeName = node + HASH_NODES + String.valueOf(i);
|
||||
virtualNodes.put(getHash(virtualNodeName), virtualNodeName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getShardingServer(String key) {
|
||||
int hash = getHash(SkyWalkerUtil.StringToMd5(key));
|
||||
SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);
|
||||
String virtualNode;
|
||||
if (subMap.isEmpty()) {
|
||||
Integer i = virtualNodes.firstKey();
|
||||
virtualNode = virtualNodes.get(i);
|
||||
} else {
|
||||
Integer i = subMap.firstKey();
|
||||
virtualNode = subMap.get(i);
|
||||
}
|
||||
if (StringUtils.isNotBlank(virtualNode)) {
|
||||
return virtualNode.substring(0, virtualNode.indexOf("-"));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private int getHash(String str) {
|
||||
final int p = 16777619;
|
||||
int hash = (int) 2166136261L;
|
||||
for (int i = 0; i < str.length(); i++)
|
||||
hash = (hash ^ str.charAt(i)) * p;
|
||||
hash += hash << 13;
|
||||
hash ^= hash >> 7;
|
||||
hash += hash << 3;
|
||||
hash ^= hash >> 17;
|
||||
hash += hash << 5;
|
||||
if (hash < 0)
|
||||
hash = Math.abs(hash);
|
||||
return hash;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,106 @@
|
|||
package com.alibaba.nacossync.extension.sharding;
|
||||
|
||||
import com.alibaba.nacos.api.PropertyKeyConst;
|
||||
import com.alibaba.nacos.api.naming.NamingFactory;
|
||||
import com.alibaba.nacos.api.naming.NamingService;
|
||||
import com.alibaba.nacos.api.naming.listener.EventListener;
|
||||
import com.alibaba.nacos.api.naming.pojo.Instance;
|
||||
import com.alibaba.nacos.client.naming.utils.NetUtils;
|
||||
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
|
||||
import com.alibaba.nacossync.util.StringUtils;
|
||||
import com.google.common.base.Joiner;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Created by maj on 2020/10/27.
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
@Lazy
|
||||
public class NacosServersManager implements ServersManager<EventListener>, InitializingBean {
|
||||
|
||||
private NamingService namingService;
|
||||
|
||||
@Value("${server.port}")
|
||||
private String serverPort;
|
||||
|
||||
@Autowired
|
||||
private SkyWalkerCacheServices skyWalkerCacheServices;
|
||||
|
||||
//不动现有业务表和逻辑基础上,指定一个固定的key--"SHARDINGKEY" 生成md5值 作为默认连接的nacos路径
|
||||
private static final String DEFAULT_SHARDING_NACOS_KEY = "3ac01a8c7501f121ab01efb920aa4764";
|
||||
|
||||
private static final String DEFAULT_SHARDING_NACOS_NAMESPACES = "public";
|
||||
|
||||
private static final String DEFAULT_SHARDING_NACOS_GOURPID = "shadinggroup";
|
||||
|
||||
private static final String DEFAULT_SHARDING_NACOS_SERVICENAME = "com.dmall.sharding";
|
||||
|
||||
@Value("${sharding.nacos.url}")
|
||||
private String shardingNacosUrl;
|
||||
|
||||
@Value("${sharding.nacos.namespace}")
|
||||
private String shardingNacosnameSpace;
|
||||
|
||||
@Value("${sharding.nacos.groupname}")
|
||||
private String shardingNacosGroupName;
|
||||
|
||||
@Value("${sharding.nacos.servicename}")
|
||||
private String shardingNacosServiceName;
|
||||
|
||||
@Override
|
||||
public List<String> getServers() throws Exception {
|
||||
List<Instance> instanceList = namingService.getAllInstances(DEFAULT_SHARDING_NACOS_SERVICENAME, DEFAULT_SHARDING_NACOS_GOURPID);
|
||||
List<String> serverList = new LinkedList<String>();
|
||||
for (Instance instance : instanceList) {
|
||||
serverList.add(instance.getIp() + ":" + instance.getPort());
|
||||
}
|
||||
return serverList;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribeServers(EventListener listener) throws Exception {
|
||||
namingService.subscribe(DEFAULT_SHARDING_NACOS_SERVICENAME, DEFAULT_SHARDING_NACOS_GOURPID, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register(String ip, int port) throws Exception {
|
||||
namingService.registerInstance(DEFAULT_SHARDING_NACOS_SERVICENAME, DEFAULT_SHARDING_NACOS_GOURPID, ip, port);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
try {
|
||||
log.info("start init nacos servers.");
|
||||
if (StringUtils.isEmpty(shardingNacosUrl)) {
|
||||
shardingNacosUrl = DEFAULT_SHARDING_NACOS_KEY;
|
||||
}
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty(PropertyKeyConst.SERVER_ADDR, getNacosUrl());
|
||||
properties.setProperty(PropertyKeyConst.NAMESPACE, StringUtils.isEmpty(shardingNacosnameSpace) ? DEFAULT_SHARDING_NACOS_NAMESPACES : shardingNacosnameSpace);
|
||||
namingService = NamingFactory.createNamingService(properties);
|
||||
register(NetUtils.localIP(), Integer.parseInt(serverPort));
|
||||
log.info("init nacos servers sucess.");
|
||||
} catch (Exception e) {
|
||||
log.info("init nacos faild .", e);
|
||||
}
|
||||
}
|
||||
|
||||
private String getNacosUrl() {
|
||||
if (!StringUtils.isIPV4AndPorts(shardingNacosUrl, ",")) {
|
||||
List<String> allClusterConnectKey = skyWalkerCacheServices
|
||||
.getAllClusterConnectKey(StringUtils.isEmpty(shardingNacosUrl) ? DEFAULT_SHARDING_NACOS_KEY : shardingNacosUrl);
|
||||
return Joiner.on(",").join(allClusterConnectKey);
|
||||
}
|
||||
return shardingNacosUrl;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
package com.alibaba.nacossync.extension.sharding;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Created by maj on 2020/10/27.
|
||||
*/
|
||||
public interface ServersManager<T> {
|
||||
|
||||
public List<String> getServers() throws Exception;
|
||||
|
||||
public void subscribeServers(T listener) throws Exception;
|
||||
|
||||
public void register(String ip, int port) throws Exception;
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
package com.alibaba.nacossync.extension.sharding;
|
||||
|
||||
import com.alibaba.nacossync.extension.impl.extend.Sharding;
|
||||
import com.alibaba.nacossync.pojo.ShardingLog;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.TreeSet;
|
||||
|
||||
/**
|
||||
* Created by maj on 2020/10/27.
|
||||
*/
|
||||
public interface ServiceSharding {
|
||||
|
||||
public void sharding(String key, List<String> serviceNames);
|
||||
|
||||
public TreeSet<String> getLocalServices(String key);
|
||||
|
||||
public boolean addServerChange(String name, Sharding sharding);
|
||||
|
||||
public Queue<ShardingLog> getChangeServices(String key);
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
package com.alibaba.nacossync.pojo;
|
||||
|
||||
/**
|
||||
* Created by maj on 2020/11/18.
|
||||
*/
|
||||
public class ShardingLog {
|
||||
|
||||
private String serviceName;
|
||||
|
||||
private String type;
|
||||
|
||||
public ShardingLog() {
|
||||
}
|
||||
|
||||
public ShardingLog(String serviceName, String type) {
|
||||
this.serviceName = serviceName;
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public String getServiceName() {
|
||||
return serviceName;
|
||||
}
|
||||
|
||||
public void setServiceName(String serviceName) {
|
||||
this.serviceName = serviceName;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public void setType(String type) {
|
||||
this.type = type;
|
||||
}
|
||||
}
|
|
@ -31,5 +31,6 @@ public class TaskModel {
|
|||
private String serviceName;
|
||||
private String groupName;
|
||||
private String taskStatus;
|
||||
private String nameSpace;
|
||||
|
||||
}
|
||||
|
|
|
@ -56,6 +56,7 @@ public class TaskDetailProcessor implements Processor<TaskDetailQueryRequest, Ta
|
|||
taskModel.setSourceClusterId(taskDO.getSourceClusterId());
|
||||
taskModel.setTaskStatus(taskDO.getTaskStatus());
|
||||
taskModel.setTaskId(taskDO.getTaskId());
|
||||
taskModel.setNameSpace(taskDO.getNameSpace());
|
||||
|
||||
taskDetailQueryResult.setTaskModel(taskModel);
|
||||
}
|
||||
|
|
|
@ -74,6 +74,7 @@ public class TaskListQueryProcessor implements Processor<TaskListQueryRequest, T
|
|||
taskModel.setServiceName(taskDO.getServiceName());
|
||||
taskModel.setGroupName(taskDO.getGroupName());
|
||||
taskModel.setTaskStatus(taskDO.getTaskStatus());
|
||||
taskModel.setNameSpace(taskDO.getNameSpace());
|
||||
taskList.add(taskModel);
|
||||
});
|
||||
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
package com.alibaba.nacossync.util;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
|
||||
/**
|
||||
* Created by maj on 2020/11/2.
|
||||
*/
|
||||
public class ExpirySet<T> {
|
||||
|
||||
private Set<T> set = new CopyOnWriteArraySet();
|
||||
|
||||
private Map<T, Long> expiryMap = new ConcurrentHashMap<T, Long>();
|
||||
|
||||
private long timeOut = 10;//默认10s窗口期
|
||||
|
||||
public ExpirySet() {
|
||||
|
||||
}
|
||||
|
||||
public ExpirySet(long timeout) {
|
||||
this.timeOut = timeout;
|
||||
}
|
||||
|
||||
public boolean set(T key) {
|
||||
if (!set.add(key)) {
|
||||
if (getIntervalBySecond(expiryMap.get(key), new Date().getTime()) > timeOut) {
|
||||
expiryMap.putIfAbsent(key, new Date().getTime());
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
expiryMap.putIfAbsent(key, (new Date().getTime()));
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public boolean remove(T key) {
|
||||
return set.remove(key);
|
||||
}
|
||||
|
||||
public long getIntervalBySecond(long beforeTime, long currentTime) {
|
||||
return (currentTime - beforeTime) / 1000;
|
||||
}
|
||||
|
||||
}
|
|
@ -39,8 +39,8 @@ import java.util.UUID;
|
|||
public class SkyWalkerUtil {
|
||||
|
||||
/**
|
||||
*
|
||||
* Gets the string md5
|
||||
*
|
||||
* @param value
|
||||
* @return
|
||||
*/
|
||||
|
@ -69,6 +69,7 @@ public class SkyWalkerUtil {
|
|||
|
||||
/**
|
||||
* The rules of generating taskId
|
||||
*
|
||||
* @param addTaskRequest
|
||||
* @return
|
||||
*/
|
||||
|
@ -116,6 +117,7 @@ public class SkyWalkerUtil {
|
|||
|
||||
/**
|
||||
* Avoid getting a return address
|
||||
*
|
||||
* @return
|
||||
* @throws Exception
|
||||
*/
|
||||
|
@ -153,4 +155,11 @@ public class SkyWalkerUtil {
|
|||
|
||||
return UUID.randomUUID().toString();
|
||||
}
|
||||
|
||||
public static String getGroupName(String groupName) {
|
||||
if (StringUtils.isEmpty(groupName)) {
|
||||
return "DEFAULT_GROUP";
|
||||
}
|
||||
return groupName;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,7 +16,6 @@ import com.google.common.base.Joiner;
|
|||
import com.google.common.collect.Maps;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.URLDecoder;
|
||||
import java.net.URLEncoder;
|
||||
|
@ -137,4 +136,29 @@ public final class StringUtils {
|
|||
public static boolean isDubboProviderPath(String path) {
|
||||
return DUBBO_PROVIDER_PATTERN.matcher(path).matches();
|
||||
}
|
||||
|
||||
public static boolean isIPV4AndPort(String addr) {
|
||||
if (StringUtils.isEmpty(addr)) {
|
||||
return false;
|
||||
}
|
||||
String rexp = "^(([1-9]|([1-9]\\d)|(1\\d\\d)|(2([0-4]\\d|5[0-5])))\\.)(([0-9]|([1-9]\\d)|(1\\d\\d)|(2([0-4]\\d|5[0-5])))\\.){2}([1-9]|([1-9]\\d)|(1\\d\\d)|(2([0-4]\\d|5[0-5]))):([0-9]|[1-9]\\d|[1-9]\\d{2}|[1-9]\\d{3}|[1-5]\\d{4}|6[0-4]\\d{3}|65[0-4]\\d{2}|655[0-2]\\d|6553[0-5])$";
|
||||
|
||||
Pattern pat = Pattern.compile(rexp);
|
||||
|
||||
Matcher mat = pat.matcher(addr);
|
||||
|
||||
return mat.find();
|
||||
}
|
||||
|
||||
public static boolean isIPV4AndPorts(String ips, String reg) {
|
||||
if (StringUtils.isEmpty(ips)) {
|
||||
return false;
|
||||
}
|
||||
String[] ipvs = ips.split(reg);
|
||||
for (String ipv : ipvs) {
|
||||
if (!isIPV4AndPort(ipv))
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,3 +13,12 @@ spring.datasource.password=root
|
|||
|
||||
management.endpoints.web.exposure.include=*
|
||||
management.endpoint.health.show-details=always
|
||||
|
||||
##以下配置为使用sharding功能时,使用nacos维持nacos-sync的节点状态时用到的namespace,gourp,serviceName,如果不配置则使用(public-->DEFAULT_GROUP---->com.default.service);
|
||||
#(建议配置,不要和业务耦合在一个空间下,否则会导致做全量同步的时候,这部分节点也会同步)
|
||||
#url为nacos的连接地址,可以配置成ip+port方式,也可以通过web配置-->集群配置配置在cluster表中,这里填写cluster_id(如果不配置则默认一个md5值)
|
||||
sharding.nacos.url=3ac01a8c7501f121ab01efb920aa4764
|
||||
#nanespace建议参考nacos社区,需要手动提前配置,不会自动创建,而且不是填写定义的名字而是md5值,默认public除外
|
||||
sharding.nacos.namespace=6ab2bbda-4b84-42d0-8392-99aa1446cffb
|
||||
sharding.nacos.groupname=shadinggroup
|
||||
sharding.nacos.servicename=com.dmall.sharding
|
Loading…
Reference in New Issue