Compare commits

...

33 Commits

Author SHA1 Message Date
paderlol 0c35e096c2
Merge pull request #384 from nacos-group/develop
Optimize some codes
2024-11-30 15:23:35 +08:00
paderlol 142a36f608
Merge pull request #383 from paderlol/develop
Optimize some codes
2024-11-30 15:22:07 +08:00
paderlol b18afb95c1
Optimize some codes 2024-11-30 15:20:40 +08:00
paderlol aa87b24e1a
Merge pull request #381 from nacos-group/develop
Develop
2024-10-29 10:55:13 +08:00
paderlol fdc51ba605
Merge pull request #380 from paderlol/develop
Write unit tests for all utils classes
2024-10-29 10:53:56 +08:00
paderlol ce6249142f
Write unit tests for all utils classes 2024-10-29 10:52:01 +08:00
paderlol 26c8b307d7
Merge pull request #378 from nacos-group/develop
Develop
2024-09-28 13:29:35 +08:00
paderlol 6585b217bc
Merge pull request #377 from paderlol/develop
1. 对代码中依赖注入方式全部修改成构造函数注入
2024-09-28 13:28:32 +08:00
paderlol b609c729c3
1. 对代码中依赖注入方式全部修改成构造函数注入 2024-09-28 13:26:07 +08:00
paderlol 432ced4e0e
Merge pull request #375 from paderlol/develop
优化部分代码
2024-08-29 16:49:31 +08:00
paderlol 116116c8c4
1. 优化部分集群新增部分冗余代码
2. 升级Java JDK到17
2024-08-29 16:48:58 +08:00
paderlol ca08bd7d6e
1. 优化部分集群新增部分冗余代码
2. 升级Java JDK到17
2024-08-29 16:43:05 +08:00
paderlol 81610e505a
Merge pull request #373 from paderlol/add_validation_cluster
修复有任务运行时可以删除集群信息的问题 #371
2024-07-31 15:57:00 +08:00
paderlol 17439be724
修复有任务运行时可以删除集群信息的问题 #371 2024-07-31 15:45:05 +08:00
paderlol 91999fbdcb
Merge pull request #368 from nacos-group/develop
优化以及修复小错误
2024-07-20 16:44:16 +08:00
paderlol 6645b00f24
Develop (#367)
* 1. 优化部分代码

* 整理代码:
 1. 移除枚举类冗余方法以及SkyWalkerCacheServices中未使用方法
 2. 部分服务类改用构造注入
 3. Entity对象重写equals和hashCode
2024-07-20 16:33:19 +08:00
paderlol acb7b49551
Merge pull request #360 from MajorHe1/patch-1
[#ISSUE 359]Update SkyWalkerCacheServices.java, fix operationId empty-judge problem
2024-07-20 14:26:31 +08:00
MajorHe1 ba416d1496
[#ISSUE 359]Update SkyWalkerCacheServices.java, fix operationId empty-judge problem 2024-06-13 20:05:49 +08:00
paderlol ff5e484997
Merge pull request #357 from nacos-group/develop
Develop
2024-06-10 22:19:41 +08:00
paderlol b900c28347
1. 优化部分代码 (#356) 2024-06-10 22:18:13 +08:00
paderlol 5d17242ffc
Develop (#355)
* Feat/sync support2.x#mutiple thread sync02 (#304)

* update port

* Multithreading sync

* solve conflict

* imple SyncService

* adapter deregister

* optimization some code

* fix deregister instance equals logic

Co-authored-by: Oliver <wqdyxnbd@163.com>
Co-authored-by: paderlol <huangmnlove@163.com>

* Optimize the code for assigning tasks. (#320)

* Develop (#321)

* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix #305 (#322)

* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix .#305

* Fix cyclic dependency code (#323)

* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix .#305

* Fix cyclic dependency code.

* Refactoring the Nacos Sync to Consul Logic (#324)

* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix .#305

* Fix cyclic dependency code.

* Refactoring the Nacos Sync to Consul Logic.

* #346 #350:nacos 2 nacos 同步关闭后心跳没有停止问题 ,nacos-sync删除目标节点问题 (#347)

* update port

* fix #297 (#298)

Co-authored-by: yangchun2 <yangchun2@joyy.com>

* Revert "fix #297 (#298)" (#318)

This reverts commit a9df169b5a.

* 0.4.9-pre (#325)

* Feat/sync support2.x#mutiple thread sync02 (#304)

* update port

* Multithreading sync

* solve conflict

* imple SyncService

* adapter deregister

* optimization some code

* fix deregister instance equals logic

Co-authored-by: Oliver <wqdyxnbd@163.com>
Co-authored-by: paderlol <huangmnlove@163.com>

* Optimize the code for assigning tasks. (#320)

* Develop (#321)

* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix #305 (#322)

* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix .#305

* Fix cyclic dependency code (#323)

* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix .#305

* Fix cyclic dependency code.

* Refactoring the Nacos Sync to Consul Logic (#324)

* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix .#305

* Fix cyclic dependency code.

* Refactoring the Nacos Sync to Consul Logic.

---------

Co-authored-by: chenhao26 <35129699+chenhao26-nineteen@users.noreply.github.com>
Co-authored-by: Oliver <wqdyxnbd@163.com>

* fix:nacos 2 nacos 同步关闭后心跳没有停止问题

* fix:nacos 2 nacos 同步关闭后心跳没有停止问题

* fix:集群信息保存 cluster_level 为 null

* fix:
1、注册时根据中心化逻辑判断同步,但是删除时逻辑不一致问题
2、如果停止同步nameservice为空问题

* 问题在于对destInstances列表的更新方式。在Java中,方法参数是按值传递的。这意味着当你传递一个对象到方法中时,实际上传递的是对象引用的副本。因此,如果你在方法内部改变了这个引用指向的对象(例如,将其指向一个新的对象),这个改变不会影响到原始的对象引用。

在你的代码中,destInstances = newDestInstance;这一行只是改变了destInstances引用在方法内部的指向,而不会改变方法外部传入的destInstances列表对象。这意味着,尽管你筛选出了需要反注册的实例,但这个改变不会反映到方法调用者那里。

---------

Co-authored-by: Oliver <wqdyxnbd@163.com>
Co-authored-by: paderlol <huangmnlove@163.com>
Co-authored-by: 杨春 <chun@kpromise.top>
Co-authored-by: yangchun2 <yangchun2@joyy.com>
Co-authored-by: chenhao26 <35129699+chenhao26-nineteen@users.noreply.github.com>

* 0.5.0 (#351)

* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix .#305

* Fix cyclic dependency code.

* Refactoring the Nacos Sync to Consul Logic.

* 1. 重新设计全量 Nacos 同步 Nacos
2. 修复Nacos Instance equals无效导致出现无法注册成功问题
3. 升级Nacos Sync JDK/Spring Boot版本
4. 保底同步从改成并发同步
5. 增加部分注释

* Develop (#353)

* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix .#305

* Fix cyclic dependency code.

* Refactoring the Nacos Sync to Consul Logic.

* 1. 重新设计全量 Nacos 同步 Nacos
2. 修复Nacos Instance equals无效导致出现无法注册成功问题
3. 升级Nacos Sync JDK/Spring Boot版本
4. 保底同步从改成并发同步
5. 增加部分注释

* 1. 优化部分代码 (#354)

* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix .#305

* Fix cyclic dependency code.

* Refactoring the Nacos Sync to Consul Logic.

* 1. 重新设计全量 Nacos 同步 Nacos
2. 修复Nacos Instance equals无效导致出现无法注册成功问题
3. 升级Nacos Sync JDK/Spring Boot版本
4. 保底同步从改成并发同步
5. 增加部分注释

* 1. 优化部分代码

---------

Co-authored-by: chenhao26 <35129699+chenhao26-nineteen@users.noreply.github.com>
Co-authored-by: Oliver <wqdyxnbd@163.com>
Co-authored-by: 龙竹 <34528665+dragonTalon@users.noreply.github.com>
Co-authored-by: 杨春 <chun@kpromise.top>
Co-authored-by: yangchun2 <yangchun2@joyy.com>
2024-06-10 22:04:01 +08:00
paderlol 39357660ab
Merge branch 'master' into develop 2024-06-10 22:02:39 +08:00
paderlol 49abb7c286
1. 优化部分代码 (#354)
* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix .#305

* Fix cyclic dependency code.

* Refactoring the Nacos Sync to Consul Logic.

* 1. 重新设计全量 Nacos 同步 Nacos
2. 修复Nacos Instance equals无效导致出现无法注册成功问题
3. 升级Nacos Sync JDK/Spring Boot版本
4. 保底同步从改成并发同步
5. 增加部分注释

* 1. 优化部分代码
2024-06-10 22:01:12 +08:00
paderlol 5789d4d774
Develop (#353)
* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix .#305

* Fix cyclic dependency code.

* Refactoring the Nacos Sync to Consul Logic.

* 1. 重新设计全量 Nacos 同步 Nacos
2. 修复Nacos Instance equals无效导致出现无法注册成功问题
3. 升级Nacos Sync JDK/Spring Boot版本
4. 保底同步从改成并发同步
5. 增加部分注释
2024-05-19 00:15:50 +08:00
paderlol 2fdf44d308
Merge branch 'master' into develop 2024-05-19 00:08:04 +08:00
paderlol 5941bfb9bc
0.5.0 (#351)
* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix .#305

* Fix cyclic dependency code.

* Refactoring the Nacos Sync to Consul Logic.

* 1. 重新设计全量 Nacos 同步 Nacos
2. 修复Nacos Instance equals无效导致出现无法注册成功问题
3. 升级Nacos Sync JDK/Spring Boot版本
4. 保底同步从改成并发同步
5. 增加部分注释
2024-05-18 23:50:23 +08:00
龙竹 0c24dcb0a2
#346 #350:nacos 2 nacos 同步关闭后心跳没有停止问题 ,nacos-sync删除目标节点问题 (#347)
* update port

* fix #297 (#298)

Co-authored-by: yangchun2 <yangchun2@joyy.com>

* Revert "fix #297 (#298)" (#318)

This reverts commit a9df169b5a.

* 0.4.9-pre (#325)

* Feat/sync support2.x#mutiple thread sync02 (#304)

* update port

* Multithreading sync

* solve conflict

* imple SyncService

* adapter deregister

* optimization some code

* fix deregister instance equals logic

Co-authored-by: Oliver <wqdyxnbd@163.com>
Co-authored-by: paderlol <huangmnlove@163.com>

* Optimize the code for assigning tasks. (#320)

* Develop (#321)

* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix #305 (#322)

* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix .#305

* Fix cyclic dependency code (#323)

* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix .#305

* Fix cyclic dependency code.

* Refactoring the Nacos Sync to Consul Logic (#324)

* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix .#305

* Fix cyclic dependency code.

* Refactoring the Nacos Sync to Consul Logic.

---------

Co-authored-by: chenhao26 <35129699+chenhao26-nineteen@users.noreply.github.com>
Co-authored-by: Oliver <wqdyxnbd@163.com>

* fix:nacos 2 nacos 同步关闭后心跳没有停止问题

* fix:nacos 2 nacos 同步关闭后心跳没有停止问题

* fix:集群信息保存 cluster_level 为 null

* fix:
1、注册时根据中心化逻辑判断同步,但是删除时逻辑不一致问题
2、如果停止同步nameservice为空问题

* 问题在于对destInstances列表的更新方式。在Java中,方法参数是按值传递的。这意味着当你传递一个对象到方法中时,实际上传递的是对象引用的副本。因此,如果你在方法内部改变了这个引用指向的对象(例如,将其指向一个新的对象),这个改变不会影响到原始的对象引用。

在你的代码中,destInstances = newDestInstance;这一行只是改变了destInstances引用在方法内部的指向,而不会改变方法外部传入的destInstances列表对象。这意味着,尽管你筛选出了需要反注册的实例,但这个改变不会反映到方法调用者那里。

---------

Co-authored-by: Oliver <wqdyxnbd@163.com>
Co-authored-by: paderlol <huangmnlove@163.com>
Co-authored-by: 杨春 <chun@kpromise.top>
Co-authored-by: yangchun2 <yangchun2@joyy.com>
Co-authored-by: chenhao26 <35129699+chenhao26-nineteen@users.noreply.github.com>
2024-05-15 09:53:45 +08:00
paderlol be65db1892
Refactoring the Nacos Sync to Consul Logic (#324)
* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix .#305

* Fix cyclic dependency code.

* Refactoring the Nacos Sync to Consul Logic.
2023-05-15 17:25:22 +08:00
paderlol eb93fdd52a
Fix cyclic dependency code (#323)
* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix .#305

* Fix cyclic dependency code.
2023-05-15 17:21:53 +08:00
paderlol e7e52acfa7
Fix #305 (#322)
* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308

* Fix .#305
2023-05-15 16:51:11 +08:00
paderlol 02838da959
Develop (#321)
* Optimize the code for assigning tasks.

* Adds prefix to the input string if it doesn't already have it.#308
2023-05-15 16:13:39 +08:00
paderlol 0e73a0864c
Optimize the code for assigning tasks. (#320) 2023-05-15 15:58:39 +08:00
chenhao26 98efe26906
Feat/sync support2.x#mutiple thread sync02 (#304)
* update port

* Multithreading sync

* solve conflict

* imple SyncService

* adapter deregister

* optimization some code

* fix deregister instance equals logic

Co-authored-by: Oliver <wqdyxnbd@163.com>
Co-authored-by: paderlol <huangmnlove@163.com>
2022-08-01 08:53:33 +08:00
66 changed files with 1267 additions and 930 deletions

View File

@ -47,6 +47,7 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<!-- 默认使用HikariCP连接池 -->
<dependency>
@ -164,9 +165,17 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>11</source>
<target>11</target>
<source>17</source>
<target>17</target>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.34</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</build>

View File

@ -17,9 +17,11 @@
package com.alibaba.nacossync;
import com.alibaba.nacossync.util.BatchTaskExecutor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
/**
* @author NacosSync
@ -30,6 +32,14 @@ public class NacosSyncMain {
public static void main(String[] args) {
SpringApplication.run(NacosSyncMain.class, args);
ConfigurableApplicationContext context = SpringApplication.run(NacosSyncMain.class, args);
// Register shutdown callback using Spring Boot's context lifecycle
context.registerShutdownHook();
context.addApplicationListener(event -> {
if (event instanceof org.springframework.context.event.ContextClosedEvent) {
BatchTaskExecutor.shutdown();
}
});
}
}

View File

@ -44,14 +44,18 @@ import com.alibaba.nacossync.template.processor.ConfigQueryProcessor;
@RestController
public class SystemConfigApi {
@Autowired
private ConfigQueryProcessor configQueryProcessor;
private final ConfigQueryProcessor configQueryProcessor;
@Autowired
private ConfigDeleteProcessor configDeleteProcessor;
private final ConfigDeleteProcessor configDeleteProcessor;
@Autowired
private ConfigAddProcessor configAddProcessor;
private final ConfigAddProcessor configAddProcessor;
public SystemConfigApi(ConfigQueryProcessor configQueryProcessor, ConfigDeleteProcessor configDeleteProcessor,
ConfigAddProcessor configAddProcessor) {
this.configQueryProcessor = configQueryProcessor;
this.configDeleteProcessor = configDeleteProcessor;
this.configAddProcessor = configAddProcessor;
}
@RequestMapping(path = "/v1/systemconfig/list", method = RequestMethod.GET)
public ConfigQueryResult tasks(ConfigQueryRequest configQueryRequest) {

View File

@ -26,7 +26,6 @@ import com.alibaba.nacossync.pojo.model.TaskDO;
import com.alibaba.nacossync.util.SkyWalkerUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
@ -43,17 +42,19 @@ import java.util.concurrent.ThreadLocalRandom;
@Service
public class SkyWalkerCacheServices {
@Autowired
private ClusterAccessService clusterAccessService;
private static final Map<String, FinishedTask> FINISHED_TASK_MAP = new ConcurrentHashMap<>();
@Autowired
private ObjectMapper objectMapper;
private final ClusterAccessService clusterAccessService;
private static final Map<String, FinishedTask> finishedTaskMap = new ConcurrentHashMap<>();
private final ObjectMapper objectMapper;
public SkyWalkerCacheServices(ClusterAccessService clusterAccessService, ObjectMapper objectMapper) {
this.clusterAccessService = clusterAccessService;
this.objectMapper = objectMapper;
}
public String getClusterConnectKey(String clusterId) {
List<String> allClusterConnectKey = getAllClusterConnectKey(clusterId);
return allClusterConnectKey.get(ThreadLocalRandom.current().nextInt(allClusterConnectKey.size()));
}
@ -84,37 +85,31 @@ public class SkyWalkerCacheServices {
FinishedTask finishedTask = new FinishedTask();
finishedTask.setOperationId(operationId);
finishedTaskMap.put(operationId, finishedTask);
FINISHED_TASK_MAP.put(operationId, finishedTask);
}
public FinishedTask getFinishedTask(TaskDO taskDO) {
String operationId = SkyWalkerUtil.getOperationId(taskDO);
if (StringUtils.hasLength(operationId)) {
if (!StringUtils.hasLength(operationId)) {
return null;
}
return finishedTaskMap.get(operationId);
return FINISHED_TASK_MAP.get(operationId);
}
public FinishedTask getFinishedTask(String operationId) {
if (StringUtils.hasLength(operationId)) {
return null;
}
return finishedTaskMap.get(operationId);
}
public FinishedTask removeFinishedTask(String operationId) {
if (StringUtils.hasLength(operationId)) {
return null;
public void removeFinishedTask(String operationId) {
if (!StringUtils.hasLength(operationId)) {
return;
}
return finishedTaskMap.remove(operationId);
FINISHED_TASK_MAP.remove(operationId);
}
public Map<String, FinishedTask> getFinishedTaskMap() {
return finishedTaskMap;
return FINISHED_TASK_MAP;
}

View File

@ -12,6 +12,8 @@
*/
package com.alibaba.nacossync.constant;
import lombok.Getter;
import java.util.ArrayList;
import java.util.List;
@ -19,6 +21,7 @@ import java.util.List;
* @author NacosSync
* @version $Id: ClusterTypeEnum.java, v 0.1 2018-09-25 下午4:38 NacosSync Exp $$
*/
@Getter
public enum ClusterTypeEnum {
CS("CS", "configserver集群"),
@ -32,18 +35,16 @@ public enum ClusterTypeEnum {
ZK("ZK", "zookeeper集群");
private String code;
private final String code;
private String desc;
ClusterTypeEnum(String code, String desc) {
this.code = code;
this.desc = desc;
}
public static List<String> getClusterTypeCodes() {
List<String> list = new ArrayList<String>();
List<String> list = new ArrayList<>();
for (ClusterTypeEnum clusterTypeEnum : ClusterTypeEnum.values()) {
list.add(clusterTypeEnum.getCode());
@ -51,41 +52,6 @@ public enum ClusterTypeEnum {
return list;
}
/**
* Getter method for property <tt>code</tt>.
*
* @return property value of code
*/
public String getCode() {
return code;
}
/**
* Setter method for property <tt>code </tt>.
*
* @param code value to be assigned to property code
*/
public void setCode(String code) {
this.code = code;
}
/**
* Getter method for property <tt>desc</tt>.
*
* @return property value of desc
*/
public String getDesc() {
return desc;
}
/**
* Setter method for property <tt>desc </tt>.
*
* @param desc value to be assigned to property desc
*/
public void setDesc(String desc) {
this.desc = desc;
}
public static boolean contains(String clusterType) {

View File

@ -3,10 +3,13 @@
*/
package com.alibaba.nacossync.constant;
import lombok.Getter;
/**
* @author NacosSync
* @version $Id: MetricsStatisticsType.java, v 0.1 2019年02月28日 下午2:17 NacosSync Exp $
*/
@Getter
public enum MetricsStatisticsType {
CACHE_SIZE("nacosSync.finished.taskMap.cacheSize", "任务执行完成缓存列表数"),
@ -28,15 +31,10 @@ public enum MetricsStatisticsType {
/**
* metricsName
*/
private String metricsName;
private String desc;
private final String metricsName;
MetricsStatisticsType(String code, String desc) {
this.metricsName = code;
this.desc = desc;
}
public String getMetricsName() {
return metricsName;
}
}

View File

@ -16,46 +16,23 @@
*/
package com.alibaba.nacossync.constant;
import lombok.Getter;
/**
* @author NacosSync
* @version $Id: ResultCodeEnum.java, v 0.1 2018-09-25 PM4:38 NacosSync Exp $$
*/
@Getter
public enum ResultCodeEnum {
SUCCESS("SUCCESS", "请求成功", "请求成功"),
SYSTEM_ERROR("SYSTEM_ERROR", "系统异常", "系统异常");
private String code;
private String errorMessage;
private String detail;
private final String code;
ResultCodeEnum(String code, String errorMessage, String detail) {
this.code = code;
this.errorMessage = errorMessage;
this.detail = detail;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getErrorMessage() {
return errorMessage;
}
public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}
public String getDetail() {
return detail;
}
public void setDetail(String detail) {
this.detail = detail;
}
}

View File

@ -24,10 +24,10 @@ public class SkyWalkerConstants {
public static final String UNDERLINE = "_";
public static final String DEST_CLUSTERID_KEY = "destClusterId";
public static final String DEST_CLUSTER_ID_KEY = "destClusterId";
public static final String GROUP_NAME = "groupName";
public static final String SYNC_SOURCE_KEY = "syncSource";
public static final String SOURCE_CLUSTERID_KEY = "sourceClusterId";
public static final String SOURCE_CLUSTER_ID_KEY = "sourceClusterId";
public static final String MANAGEMENT_PORT_KEY="management.port";
public static final String MANAGEMENT_CONTEXT_PATH_KEY="management.context-path";

View File

@ -16,6 +16,8 @@
*/
package com.alibaba.nacossync.constant;
import lombok.Getter;
/**
* @author NacosSync
* @version $Id: TaskStatusEnum.java, v 0.1 2018-09-26 上午2:38 NacosSync Exp $$
@ -31,49 +33,16 @@ public enum TaskStatusEnum {
*/
DELETE("DELETE", "任务需要被删除");
private String code;
private String desc;
@Getter
private final String code;
private final String desc;
TaskStatusEnum(String code, String desc) {
this.code = code;
this.desc = desc;
}
/**
* Getter method for property <tt>code</tt>.
*
* @return property value of code
*/
public String getCode() {
return code;
}
/**
* Setter method for property <tt>code </tt>.
*
* @param code value to be assigned to property code
*/
public void setCode(String code) {
this.code = code;
}
/**
* Getter method for property <tt>desc</tt>.
*
* @return property value of desc
*/
public String getDesc() {
return desc;
}
/**
* Setter method for property <tt>desc </tt>.
*
* @param desc value to be assigned to property desc
*/
public void setDesc(String desc) {
this.desc = desc;
}
public static boolean contains(String code) {

View File

@ -16,10 +16,10 @@
*/
package com.alibaba.nacossync.dao;
import com.alibaba.nacossync.dao.repository.ClusterRepository;
import com.alibaba.nacossync.pojo.QueryCondition;
import com.alibaba.nacossync.pojo.model.ClusterDO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
@ -27,9 +27,6 @@ import org.springframework.data.domain.Sort;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service;
import com.alibaba.nacossync.dao.repository.ClusterRepository;
import com.alibaba.nacossync.pojo.model.ClusterDO;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root;
@ -44,8 +41,11 @@ import java.util.List;
@Slf4j
public class ClusterAccessService implements PageQueryService<ClusterDO> {
@Autowired
private ClusterRepository clusterRepository;
private final ClusterRepository clusterRepository;
public ClusterAccessService(ClusterRepository clusterRepository) {
this.clusterRepository = clusterRepository;
}
public ClusterDO insert(ClusterDO clusterDO) {

View File

@ -14,11 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacossync.dao;
import com.alibaba.nacossync.constant.SkyWalkerConstants;
import com.alibaba.nacossync.dao.repository.TaskRepository;
import com.alibaba.nacossync.pojo.QueryCondition;
import org.springframework.beans.factory.annotation.Autowired;
import com.alibaba.nacossync.pojo.model.TaskDO;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
@ -26,9 +28,6 @@ import org.springframework.data.domain.Sort;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service;
import com.alibaba.nacossync.dao.repository.TaskRepository;
import com.alibaba.nacossync.pojo.model.TaskDO;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root;
@ -42,8 +41,11 @@ import java.util.List;
@Service
public class TaskAccessService implements PageQueryService<TaskDO> {
@Autowired
private TaskRepository taskRepository;
private final TaskRepository taskRepository;
public TaskAccessService(TaskRepository taskRepository) {
this.taskRepository = taskRepository;
}
public TaskDO findByTaskId(String taskId) {
@ -56,8 +58,9 @@ public class TaskAccessService implements PageQueryService<TaskDO> {
/**
* batch delete tasks by taskIds
* @author yongchao9
*
* @param taskIds
* @author yongchao9
*/
public void deleteTaskInBatch(List<String> taskIds) {
List<TaskDO> tds = taskRepository.findAllByTaskIdIn(taskIds);
@ -75,12 +78,17 @@ public class TaskAccessService implements PageQueryService<TaskDO> {
}
public int countByDestClusterIdOrSourceClusterId(String destClusterId, String sourceClusterId) {
return taskRepository.countByDestClusterIdOrSourceClusterId(destClusterId, sourceClusterId);
}
private Predicate getPredicate(CriteriaBuilder criteriaBuilder, List<Predicate> predicates) {
Predicate[] p = new Predicate[predicates.size()];
return criteriaBuilder.and(predicates.toArray(p));
}
private List<Predicate> getPredicates(Root<TaskDO> root, CriteriaBuilder criteriaBuilder, QueryCondition queryCondition) {
private List<Predicate> getPredicates(Root<TaskDO> root, CriteriaBuilder criteriaBuilder,
QueryCondition queryCondition) {
List<Predicate> predicates = new ArrayList<>();
predicates.add(criteriaBuilder.like(root.get("serviceName"), "%" + queryCondition.getServiceName() + "%"));
@ -105,11 +113,9 @@ public class TaskAccessService implements PageQueryService<TaskDO> {
}
private Page<TaskDO> getTaskDOS(QueryCondition queryCondition, Pageable pageable) {
return taskRepository.findAll(
(Specification<TaskDO>) (root, criteriaQuery, criteriaBuilder) -> {
return taskRepository.findAll((Specification<TaskDO>) (root, criteriaQuery, criteriaBuilder) -> {
List<Predicate> predicates = getPredicates(root,
criteriaBuilder, queryCondition);
List<Predicate> predicates = getPredicates(root, criteriaBuilder, queryCondition);
return getPredicate(criteriaBuilder, predicates);

View File

@ -16,13 +16,13 @@
*/
package com.alibaba.nacossync.dao.repository;
import javax.transaction.Transactional;
import com.alibaba.nacossync.pojo.model.ClusterDO;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.repository.CrudRepository;
import com.alibaba.nacossync.pojo.model.ClusterDO;
import javax.transaction.Transactional;
/**
* @author NacosSync
@ -34,6 +34,6 @@ public interface ClusterRepository extends CrudRepository<ClusterDO, Integer>, J
ClusterDO findByClusterId(String clusterId);
@Transactional
int deleteByClusterId(String clusterId);
void deleteByClusterId(String clusterId);
}

View File

@ -34,18 +34,15 @@ public interface TaskRepository extends CrudRepository<TaskDO, Integer>, JpaRepo
TaskDO findByTaskId(String taskId);
@Transactional
int deleteByTaskId(String taskId);
void deleteByTaskId(String taskId);
List<TaskDO> findAllByTaskIdIn(List<String> taskIds);
List<TaskDO> getAllByWorkerIp(String workerIp);
/**
* query service is alluse ns leven sync data
* @param serviceName
* @return
*/
List<TaskDO> findAllByServiceNameEqualsIgnoreCase(String serviceName);
List<TaskDO> findAllByServiceNameNotIgnoreCase(String serviceName);
int countByDestClusterIdOrSourceClusterId(String destClusterId,String sourceClusterId);
}

View File

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacossync.event.listener;
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
@ -22,10 +23,10 @@ import com.alibaba.nacossync.event.DeleteTaskEvent;
import com.alibaba.nacossync.event.SyncTaskEvent;
import com.alibaba.nacossync.extension.SyncManagerService;
import com.alibaba.nacossync.monitor.MetricsManager;
import com.google.common.base.Stopwatch;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@ -38,17 +39,21 @@ import javax.annotation.PostConstruct;
@Service
public class EventListener {
@Autowired
private MetricsManager metricsManager;
private final MetricsManager metricsManager;
@Autowired
private SyncManagerService syncManagerService;
private final SyncManagerService syncManagerService;
@Autowired
private EventBus eventBus;
private final EventBus eventBus;
@Autowired
private SkyWalkerCacheServices skyWalkerCacheServices;
private final SkyWalkerCacheServices skyWalkerCacheServices;
public EventListener(MetricsManager metricsManager, SyncManagerService syncManagerService, EventBus eventBus,
SkyWalkerCacheServices skyWalkerCacheServices) {
this.metricsManager = metricsManager;
this.syncManagerService = syncManagerService;
this.eventBus = eventBus;
this.skyWalkerCacheServices = skyWalkerCacheServices;
}
@PostConstruct
public void register() {
@ -56,35 +61,36 @@ public class EventListener {
}
@Subscribe
public void listenerSyncTaskEvent(SyncTaskEvent syncTaskEvent) {
public void sync(SyncTaskEvent syncTaskEvent) {
try {
long start = System.currentTimeMillis();
Stopwatch stopwatch = Stopwatch.createStarted();
if (syncManagerService.sync(syncTaskEvent.getTaskDO(), null)) {
skyWalkerCacheServices.addFinishedTask(syncTaskEvent.getTaskDO());
metricsManager.record(MetricsStatisticsType.SYNC_TASK_RT, System.currentTimeMillis() - start);
metricsManager.record(MetricsStatisticsType.SYNC_TASK_RT, stopwatch.elapsed().toMillis());
} else {
log.warn("listenerSyncTaskEvent sync failure");
log.warn("syncTaskEvent process error");
}
} catch (Exception e) {
log.warn("listenerSyncTaskEvent process error", e);
log.warn("syncTaskEvent process error", e);
}
}
@Subscribe
public void listenerDeleteTaskEvent(DeleteTaskEvent deleteTaskEvent) {
public void delete(DeleteTaskEvent deleteTaskEvent) {
try {
long start = System.currentTimeMillis();
Stopwatch stopwatch = Stopwatch.createStarted();
if (syncManagerService.delete(deleteTaskEvent.getTaskDO())) {
skyWalkerCacheServices.removeFinishedTask(deleteTaskEvent.getTaskDO().getOperationId());
metricsManager.record(MetricsStatisticsType.DELETE_TASK_RT, System.currentTimeMillis() - start);
metricsManager.record(MetricsStatisticsType.DELETE_TASK_RT, stopwatch.elapsed().toMillis());
} else {
log.warn("listenerDeleteTaskEvent delete failure");
log.warn("deleteTaskEvent delete failure");
}
} catch (Exception e) {
log.warn("listenerDeleteTaskEvent process error", e);
log.warn("deleteTaskEvent delete failure.", e);
}
}
}

View File

@ -39,7 +39,7 @@ public class SyncManagerService implements InitializingBean, ApplicationContextA
protected final SkyWalkerCacheServices skyWalkerCacheServices;
private ConcurrentHashMap<String, SyncService> syncServiceMap = new ConcurrentHashMap<String, SyncService>();
private final ConcurrentHashMap<String, SyncService> syncServiceMap = new ConcurrentHashMap<String, SyncService>();
private ApplicationContext applicationContext;

View File

@ -14,9 +14,10 @@ package com.alibaba.nacossync.extension;
import com.alibaba.nacossync.constant.SkyWalkerConstants;
import com.alibaba.nacossync.pojo.model.TaskDO;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import java.util.Map;
/**
* @author NacosSync
* @version $Id: SyncManagerService.java, v 0.1 2018-09-25 下午5:17 NacosSync Exp $$
@ -32,7 +33,6 @@ public interface SyncService {
boolean delete(TaskDO taskDO);
/**
* execute sync
*
* @param taskDO
* @param index
@ -45,7 +45,7 @@ public interface SyncService {
*/
default boolean needSync(Map<String, String> sourceMetaData) {
boolean syncTag = StringUtils.isBlank(sourceMetaData.get(SkyWalkerConstants.SYNC_INSTANCE_TAG));
boolean blank = StringUtils.isBlank(sourceMetaData.get(SkyWalkerConstants.SOURCE_CLUSTERID_KEY));
boolean blank = StringUtils.isBlank(sourceMetaData.get(SkyWalkerConstants.SOURCE_CLUSTER_ID_KEY));
return syncTag && blank;
}
@ -54,7 +54,7 @@ public interface SyncService {
* cluster ID of the task
*/
default boolean needDelete(Map<String, String> destMetaData, TaskDO taskDO) {
return StringUtils.equals(destMetaData.get(SkyWalkerConstants.SOURCE_CLUSTERID_KEY),
return StringUtils.equals(destMetaData.get(SkyWalkerConstants.SOURCE_CLUSTER_ID_KEY),
taskDO.getSourceClusterId());
}

View File

@ -1,15 +0,0 @@
package com.alibaba.nacossync.extension.client;
import lombok.Data;
@Data
public class InstanceQueryModel {
private String sourceClusterId;
private String destClusterId;
private String serviceName;
private String groupName;
private String version;
private int pageNo;
private int pageSize;
}

View File

@ -1,12 +0,0 @@
package com.alibaba.nacossync.extension.client;
import com.alibaba.nacossync.pojo.view.TaskModel;
import java.util.List;
public interface SyncQueryClient {
List<TaskModel> getAllInstance(InstanceQueryModel instanceQueryModel);
}

View File

@ -1,54 +0,0 @@
package com.alibaba.nacossync.extension.client.impl;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.ListView;
import com.alibaba.nacossync.extension.client.InstanceQueryModel;
import com.alibaba.nacossync.extension.client.SyncQueryClient;
import com.alibaba.nacossync.extension.holder.NacosServerHolder;
import com.alibaba.nacossync.pojo.view.TaskModel;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class NacosSyncQueryClientImpl implements SyncQueryClient {
private final NacosServerHolder nacosServerHolder;
public NacosSyncQueryClientImpl(
NacosServerHolder nacosServerHolder) {
this.nacosServerHolder = nacosServerHolder;
}
@Override
public List<TaskModel> getAllInstance(InstanceQueryModel instanceQueryModel) {
NamingService namingService = nacosServerHolder
.get(instanceQueryModel.getSourceClusterId());
try {
ListView<String> servicesOfServer = namingService
.getServicesOfServer(instanceQueryModel.getPageNo(),
instanceQueryModel.getPageSize());
return servicesOfServer.getData().stream()
.map(serviceName -> buildTaskModel(instanceQueryModel, serviceName))
.collect(Collectors.toList());
} catch (NacosException e) {
log.error("When using nacos client failure query tasks", e);
return Collections.emptyList();
}
}
private TaskModel buildTaskModel(InstanceQueryModel instanceQueryModel, String serviceName) {
TaskModel taskModel = new TaskModel();
taskModel.setServiceName(serviceName);
taskModel.setSourceClusterId(instanceQueryModel.getSourceClusterId());
taskModel.setDestClusterId(instanceQueryModel.getDestClusterId());
return taskModel;
}
}

View File

@ -29,11 +29,11 @@ import java.util.concurrent.TimeUnit;
*/
@Slf4j
public class EurekaBeatReactor {
private ScheduledExecutorService executorService;
private final ScheduledExecutorService executorService;
private volatile long clientBeatInterval = 5 * 1000;
private static final long CLIENT_BEAT_INTERVAL = 5 * 1000;
private final Map<String, InstanceInfo> eurekaBeat = new ConcurrentHashMap<>();
private EurekaHttpClient eurekaHttpClient;
private final EurekaHttpClient eurekaHttpClient;
public EurekaBeatReactor(EurekaHttpClient eurekaHttpClient) {
this.eurekaHttpClient = eurekaHttpClient;
@ -71,7 +71,7 @@ public class EurekaBeatReactor {
} catch (Exception e) {
log.error("[CLIENT-BEAT] Exception while scheduling beat.", e);
} finally {
executorService.schedule(this, clientBeatInterval, TimeUnit.MILLISECONDS);
executorService.schedule(this, CLIENT_BEAT_INTERVAL, TimeUnit.MILLISECONDS);
}
}
}

View File

@ -26,8 +26,8 @@ import java.util.Objects;
* @date 2019-06-26
*/
public class EurekaNamingService {
private EurekaHttpClient eurekaHttpClient;
private EurekaBeatReactor beatReactor;
private final EurekaHttpClient eurekaHttpClient;
private final EurekaBeatReactor beatReactor;
public EurekaNamingService(EurekaHttpClient eurekaHttpClient) {

View File

@ -25,7 +25,7 @@ import java.util.function.Consumer;
*/
@Service
public class SpecialSyncEventBus {
private ConcurrentHashMap<String, SpecialSyncEvent> specialSyncEventRegistry = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, SpecialSyncEvent> specialSyncEventRegistry = new ConcurrentHashMap<>();
public void subscribe(TaskDO taskDO, Consumer<TaskDO> syncAction) {
SpecialSyncEvent specialSyncEvent = new SpecialSyncEvent();

View File

@ -21,7 +21,6 @@ import com.alibaba.nacossync.pojo.model.TaskDO;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@ -34,8 +33,11 @@ import java.util.function.Consumer;
@Service
@Slf4j
public class SpecialSyncEventListener {
@Autowired
private EventBus eventBus;
private final EventBus eventBus;
public SpecialSyncEventListener(EventBus eventBus) {
this.eventBus = eventBus;
}
@PostConstruct
public void init() {

View File

@ -27,7 +27,8 @@ import java.util.function.Supplier;
@Slf4j
public abstract class AbstractServerHolderImpl<T> implements Holder<T> {
protected final Map<String, T> serviceMap = new ConcurrentHashMap<>();
private final Map<String, T> serviceMap = new ConcurrentHashMap<>();
@Autowired
protected SkyWalkerCacheServices skyWalkerCacheServices;

View File

@ -32,13 +32,13 @@ import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.health.model.HealthService;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
/**
* Consul 同步 Nacos
@ -50,8 +50,7 @@ import org.springframework.beans.factory.annotation.Autowired;
@NacosSyncService(sourceCluster = ClusterTypeEnum.CONSUL, destinationCluster = ClusterTypeEnum.NACOS)
public class ConsulSyncToNacosServiceImpl implements SyncService {
@Autowired
private MetricsManager metricsManager;
private final MetricsManager metricsManager;
private final ConsulServerHolder consulServerHolder;
private final SkyWalkerCacheServices skyWalkerCacheServices;
@ -60,14 +59,15 @@ public class ConsulSyncToNacosServiceImpl implements SyncService {
private final SpecialSyncEventBus specialSyncEventBus;
@Autowired
public ConsulSyncToNacosServiceImpl(ConsulServerHolder consulServerHolder,
SkyWalkerCacheServices skyWalkerCacheServices, NacosServerHolder nacosServerHolder,
SpecialSyncEventBus specialSyncEventBus) {
SpecialSyncEventBus specialSyncEventBus, MetricsManager metricsManager) {
this.consulServerHolder = consulServerHolder;
this.skyWalkerCacheServices = skyWalkerCacheServices;
this.nacosServerHolder = nacosServerHolder;
this.specialSyncEventBus = specialSyncEventBus;
this.metricsManager = metricsManager;
}
@Override
@ -146,10 +146,10 @@ public class ConsulSyncToNacosServiceImpl implements SyncService {
temp.setIp(instance.getService().getAddress());
temp.setPort(instance.getService().getPort());
Map<String, String> metaData = new HashMap<>(ConsulUtils.transferMetadata(instance.getService().getTags()));
metaData.put(SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId());
metaData.put(SkyWalkerConstants.DEST_CLUSTER_ID_KEY, taskDO.getDestClusterId());
metaData.put(SkyWalkerConstants.SYNC_SOURCE_KEY,
skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
metaData.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, taskDO.getSourceClusterId());
metaData.put(SkyWalkerConstants.SOURCE_CLUSTER_ID_KEY, taskDO.getSourceClusterId());
temp.setMetadata(metaData);
return temp;
}

View File

@ -29,13 +29,14 @@ import com.alibaba.nacossync.monitor.MetricsManager;
import com.alibaba.nacossync.pojo.model.TaskDO;
import com.alibaba.nacossync.util.NacosUtils;
import com.netflix.appinfo.InstanceInfo;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* eureka
*
@ -182,10 +183,10 @@ public class EurekaSyncToNacosServiceImpl implements SyncService {
temp.setHealthy(true);
Map<String, String> metaData = new HashMap<>(instance.getMetadata());
metaData.put(SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId());
metaData.put(SkyWalkerConstants.DEST_CLUSTER_ID_KEY, taskDO.getDestClusterId());
metaData.put(SkyWalkerConstants.SYNC_SOURCE_KEY,
skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
metaData.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, taskDO.getSourceClusterId());
metaData.put(SkyWalkerConstants.SOURCE_CLUSTER_ID_KEY, taskDO.getSourceClusterId());
temp.setMetadata(metaData);
return temp;
}

View File

@ -44,6 +44,8 @@ import java.util.stream.Collectors;
public class NacosSyncToConsulServiceImpl extends AbstractNacosSync {
private static final String DELIMITER = "=";
private final SkyWalkerCacheServices skyWalkerCacheServices;
private final ConsulServerHolder consulServerHolder;
@ -57,7 +59,7 @@ public class NacosSyncToConsulServiceImpl extends AbstractNacosSync {
@Override
public String composeInstanceKey(String ip, int port) {
return ip + ":" + port;
return String.join(":", ip, String.valueOf(port));
}
@Override
@ -76,7 +78,7 @@ public class NacosSyncToConsulServiceImpl extends AbstractNacosSync {
if (needDelete(ConsulUtils.transferMetadata(healthService.getService().getTags()), taskDO)) {
consulClient.agentServiceDeregister(
URLEncoder.encode(healthService.getService().getId(), StandardCharsets.UTF_8.name()));
URLEncoder.encode(healthService.getService().getId(), StandardCharsets.UTF_8));
}
}
}
@ -94,7 +96,7 @@ public class NacosSyncToConsulServiceImpl extends AbstractNacosSync {
&& !invalidInstanceKeys.contains(composeInstanceKey(healthService.getService().getAddress(),
healthService.getService().getPort()))) {
consulClient.agentServiceDeregister(
URLEncoder.encode(healthService.getService().getId(), StandardCharsets.UTF_8.toString()));
URLEncoder.encode(healthService.getService().getId(), StandardCharsets.UTF_8));
}
}
}
@ -107,11 +109,11 @@ public class NacosSyncToConsulServiceImpl extends AbstractNacosSync {
newService.setId(instance.getInstanceId());
List<String> tags = Lists.newArrayList();
tags.addAll(instance.getMetadata().entrySet().stream()
.map(entry -> String.join("=", entry.getKey(), entry.getValue())).collect(Collectors.toList()));
tags.add(String.join("=", SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId()));
tags.add(String.join("=", SkyWalkerConstants.SYNC_SOURCE_KEY,
.map(entry -> String.join(DELIMITER, entry.getKey(), entry.getValue())).collect(Collectors.toList()));
tags.add(String.join(DELIMITER, SkyWalkerConstants.DEST_CLUSTER_ID_KEY, taskDO.getDestClusterId()));
tags.add(String.join(DELIMITER, SkyWalkerConstants.SYNC_SOURCE_KEY,
skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode()));
tags.add(String.join("=", SkyWalkerConstants.SOURCE_CLUSTERID_KEY, taskDO.getSourceClusterId()));
tags.add(String.join(DELIMITER, SkyWalkerConstants.SOURCE_CLUSTER_ID_KEY, taskDO.getSourceClusterId()));
newService.setTags(tags);
return newService;
}

View File

@ -95,10 +95,10 @@ public class NacosSyncToEurekaServiceImpl extends AbstractNacosSync {
DataCenterInfo dataCenterInfo = new MyDataCenterInfo(DataCenterInfo.Name.MyOwn);
final Map<String, String> instanceMetadata = instance.getMetadata();
HashMap<String, String> metadata = new HashMap<>(16);
metadata.put(SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId());
metadata.put(SkyWalkerConstants.DEST_CLUSTER_ID_KEY, taskDO.getDestClusterId());
metadata.put(SkyWalkerConstants.SYNC_SOURCE_KEY,
skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
metadata.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, taskDO.getSourceClusterId());
metadata.put(SkyWalkerConstants.SOURCE_CLUSTER_ID_KEY, taskDO.getSourceClusterId());
metadata.putAll(instanceMetadata);
String homePageUrl = obtainHomePageUrl(instance, instanceMetadata);
String serviceName = taskDO.getServiceName();

View File

@ -47,7 +47,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.alibaba.nacossync.constant.SkyWalkerConstants.SOURCE_CLUSTERID_KEY;
import static com.alibaba.nacossync.constant.SkyWalkerConstants.SOURCE_CLUSTER_ID_KEY;
import static com.alibaba.nacossync.util.NacosUtils.getGroupNameOrDefault;
/**
@ -343,7 +343,7 @@ public class NacosSyncToNacosServiceImpl implements SyncService, InitializingBea
private boolean hasSync(Instance instance, String sourceClusterId) {
if (instance.getMetadata() != null) {
String sourceClusterKey = instance.getMetadata().get(SkyWalkerConstants.SOURCE_CLUSTERID_KEY);
String sourceClusterKey = instance.getMetadata().get(SkyWalkerConstants.SOURCE_CLUSTER_ID_KEY);
return sourceClusterKey != null && sourceClusterKey.equals(sourceClusterId);
}
return false;
@ -355,7 +355,7 @@ public class NacosSyncToNacosServiceImpl implements SyncService, InitializingBea
* if the target cluster still has instances synchronized with the source cluster,
* perform unregistration.
*
* @param destNamingService
* @param destNamingService Destination cluster naming service
* @throws NacosException
*/
private void processDeRegisterInstances(TaskDO taskDO, NamingService destNamingService) throws NacosException {
@ -379,7 +379,7 @@ public class NacosSyncToNacosServiceImpl implements SyncService, InitializingBea
private List<Instance> filterInstancesForRemoval(List<Instance> destInstances, String sourceClusterId) {
return destInstances.stream().filter(instance -> !instance.getMetadata().isEmpty())
.filter(instance -> needDeregister(instance.getMetadata().get(SOURCE_CLUSTERID_KEY), sourceClusterId))
.filter(instance -> needDeregister(instance.getMetadata().get(SOURCE_CLUSTER_ID_KEY), sourceClusterId))
.collect(Collectors.toList());
}
@ -398,16 +398,16 @@ public class NacosSyncToNacosServiceImpl implements SyncService, InitializingBea
}
// Central cluster, as long as the instance is not from the target cluster,
// it needs to be synchronized (extended functionality)
return !destClusterId.equals(sourceMetaData.get(SOURCE_CLUSTERID_KEY));
return !destClusterId.equals(sourceMetaData.get(SOURCE_CLUSTER_ID_KEY));
}
private Instance buildSyncInstance(Instance instance, TaskDO taskDO) {
Instance temp = getInstance(instance);
temp.addMetadata(SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId());
temp.addMetadata(SkyWalkerConstants.DEST_CLUSTER_ID_KEY, taskDO.getDestClusterId());
temp.addMetadata(SkyWalkerConstants.SYNC_SOURCE_KEY,
skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
temp.addMetadata(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, taskDO.getSourceClusterId());
temp.addMetadata(SkyWalkerConstants.SOURCE_CLUSTER_ID_KEY, taskDO.getSourceClusterId());
//The flag is a synchronous instance
temp.addMetadata(SkyWalkerConstants.SYNC_INSTANCE_TAG,
taskDO.getSourceClusterId() + "@@" + taskDO.getVersion());

View File

@ -35,7 +35,6 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
@ -60,8 +59,7 @@ import static com.alibaba.nacossync.util.StringUtils.convertDubboProvidersPath;
@NacosSyncService(sourceCluster = ClusterTypeEnum.NACOS, destinationCluster = ClusterTypeEnum.ZK)
public class NacosSyncToZookeeperServiceImpl implements SyncService {
@Autowired
private MetricsManager metricsManager;
private final MetricsManager metricsManager;
/**
* @description The Nacos listener map.
@ -93,12 +91,13 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
private final ZookeeperServerHolder zookeeperServerHolder;
@Autowired
public NacosSyncToZookeeperServiceImpl(SkyWalkerCacheServices skyWalkerCacheServices,
NacosServerHolder nacosServerHolder, ZookeeperServerHolder zookeeperServerHolder) {
NacosServerHolder nacosServerHolder, ZookeeperServerHolder zookeeperServerHolder, MetricsManager metricsManager) {
this.skyWalkerCacheServices = skyWalkerCacheServices;
this.nacosServerHolder = nacosServerHolder;
this.zookeeperServerHolder = zookeeperServerHolder;
this.metricsManager = metricsManager;
}
@Override
@ -141,11 +140,11 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
getGroupNameOrDefault(taskDO.getGroupName()), new ArrayList<>(), true);
Set<String> newInstanceUrlSet = getWaitingToAddInstance(taskDO, client, sourceInstances);
// 获取之前的备份 删除无效实例
// fetch the instance backup
deleteInvalidInstances(taskDO, client, newInstanceUrlSet);
// 替换当前备份为最新备份
// replace the instance backup
instanceBackupMap.put(taskDO.getTaskId(), newInstanceUrlSet);
// 尝试恢复因为zk客户端意外断开导致的实例数据
// try to compensate for the removed instance
tryToCompensate(taskDO, sourceNamingService, sourceInstances);
} catch (Exception e) {
log.error("event process fail, taskId:{}", taskDO.getTaskId(), e);
@ -236,10 +235,10 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
protected String buildSyncInstance(Instance instance, TaskDO taskDO) throws UnsupportedEncodingException {
Map<String, String> metaData = new HashMap<>(instance.getMetadata());
metaData.put(SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId());
metaData.put(SkyWalkerConstants.DEST_CLUSTER_ID_KEY, taskDO.getDestClusterId());
metaData.put(SkyWalkerConstants.SYNC_SOURCE_KEY,
skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
metaData.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, taskDO.getSourceClusterId());
metaData.put(SkyWalkerConstants.SOURCE_CLUSTER_ID_KEY, taskDO.getSourceClusterId());
String servicePath = monitorPath.computeIfAbsent(taskDO.getTaskId(),
key -> convertDubboProvidersPath(metaData.get(DubboConstants.INTERFACE_KEY)));
@ -249,10 +248,10 @@ public class NacosSyncToZookeeperServiceImpl implements SyncService {
/**
* 获取zk path child 监听缓存类
* fetch zk path cache
*
* @param taskDO 任务对象
* @return zk节点操作缓存对象
* @param taskDO task instance
* @return zk path cache
*/
private PathChildrenCache getPathCache(TaskDO taskDO) {
return pathChildrenCacheMap.computeIfAbsent(taskDO.getTaskId(), (key) -> {

View File

@ -32,7 +32,6 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.utils.CloseableUtils;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.Collections;
@ -71,18 +70,17 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
private static final String DEFAULT_WEIGHT = "1.0";
@Autowired
private MetricsManager metricsManager;
private final MetricsManager metricsManager;
/**
* Listener cache of Zookeeper format taskId -> PathChildrenCache instance
*/
private Map<String, TreeCache> treeCacheMap = new ConcurrentHashMap<>();
private final Map<String, TreeCache> treeCacheMap = new ConcurrentHashMap<>();
/**
* service name cache
*/
private Map<String, String> nacosServiceNameMap = new ConcurrentHashMap<>();
private final Map<String, String> nacosServiceNameMap = new ConcurrentHashMap<>();
private final ZookeeperServerHolder zookeeperServerHolder;
@ -90,12 +88,14 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
private final SkyWalkerCacheServices skyWalkerCacheServices;
@Autowired
public ZookeeperSyncToNacosServiceImpl(ZookeeperServerHolder zookeeperServerHolder,
NacosServerHolder nacosServerHolder, SkyWalkerCacheServices skyWalkerCacheServices) {
NacosServerHolder nacosServerHolder, SkyWalkerCacheServices skyWalkerCacheServices,
MetricsManager metricsManager) {
this.zookeeperServerHolder = zookeeperServerHolder;
this.nacosServerHolder = nacosServerHolder;
this.skyWalkerCacheServices = skyWalkerCacheServices;
this.metricsManager = metricsManager;
}
@Override
@ -292,9 +292,9 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
private Map<String, String> buildMetadata(Map<String, String> queryParam, Map<String, String> ipAndPortMap, TaskDO taskDO) {
Map<String, String> metaData = new HashMap<>(queryParam);
metaData.put(PROTOCOL_KEY, ipAndPortMap.get(PROTOCOL_KEY));
metaData.put(SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId());
metaData.put(SkyWalkerConstants.DEST_CLUSTER_ID_KEY, taskDO.getDestClusterId());
metaData.put(SkyWalkerConstants.SYNC_SOURCE_KEY, skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
metaData.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, taskDO.getSourceClusterId());
metaData.put(SkyWalkerConstants.SOURCE_CLUSTER_ID_KEY, taskDO.getSourceClusterId());
return metaData;
}

View File

@ -23,14 +23,18 @@ import java.util.concurrent.TimeUnit;
@Service
public class MetricsManager implements CommandLineRunner {
@Autowired
private SkyWalkerCacheServices skyWalkerCacheServices;
private final SkyWalkerCacheServices skyWalkerCacheServices;
@Autowired
private ClusterAccessService clusterAccessService;
private final ClusterAccessService clusterAccessService;
@Autowired
private TaskAccessService taskAccessService;
private final TaskAccessService taskAccessService;
public MetricsManager(SkyWalkerCacheServices skyWalkerCacheServices, ClusterAccessService clusterAccessService,
TaskAccessService taskAccessService) {
this.skyWalkerCacheServices = skyWalkerCacheServices;
this.clusterAccessService = clusterAccessService;
this.taskAccessService = taskAccessService;
}
/**
* Callback used to run the bean.

View File

@ -17,17 +17,28 @@
package com.alibaba.nacossync.pojo.model;
import com.alibaba.nacossync.constant.ClusterTypeEnum;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.hibernate.proxy.HibernateProxy;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
import java.io.Serializable;
import javax.persistence.*;
import lombok.Data;
import java.util.Objects;
/**
* @author NacosSync
* @version $Id: EnvDO.java, v 0.1 2018-09-25 PM 4:17 NacosSync Exp $$
*/
@Data
@Getter
@Setter
@ToString
@RequiredArgsConstructor
@Entity
@Table(name = "cluster")
public class ClusterDO implements Serializable {
@ -70,4 +81,30 @@ public class ClusterDO implements Serializable {
private Integer clusterLevel;
@Override
public final boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null) {
return false;
}
Class<?> oEffectiveClass =
o instanceof HibernateProxy ? ((HibernateProxy) o).getHibernateLazyInitializer().getPersistentClass()
: o.getClass();
Class<?> thisEffectiveClass =
this instanceof HibernateProxy ? ((HibernateProxy) this).getHibernateLazyInitializer()
.getPersistentClass() : this.getClass();
if (thisEffectiveClass != oEffectiveClass) {
return false;
}
ClusterDO clusterDO = (ClusterDO) o;
return getId() != null && Objects.equals(getId(), clusterDO.getId());
}
@Override
public final int hashCode() {
return this instanceof HibernateProxy ? ((HibernateProxy) this).getHibernateLazyInitializer()
.getPersistentClass().hashCode() : getClass().hashCode();
}
}

View File

@ -16,15 +16,28 @@
*/
package com.alibaba.nacossync.pojo.model;
import javax.persistence.*;
import lombok.Data;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.hibernate.proxy.HibernateProxy;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
import java.util.Objects;
/**
* @author NacosSync
* @version $Id: SystemConfig.java, v 0.1 2018-09-26 上午1:48 NacosSync Exp $$
*/
@Data
@Getter
@Setter
@ToString
@RequiredArgsConstructor
@Entity
@Table(name = "system_config")
public class SystemConfigDO {
@ -35,4 +48,30 @@ public class SystemConfigDO {
private String configValue;
private String configDesc;
@Override
public final boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null) {
return false;
}
Class<?> oEffectiveClass =
o instanceof HibernateProxy ? ((HibernateProxy) o).getHibernateLazyInitializer().getPersistentClass()
: o.getClass();
Class<?> thisEffectiveClass =
this instanceof HibernateProxy ? ((HibernateProxy) this).getHibernateLazyInitializer()
.getPersistentClass() : this.getClass();
if (thisEffectiveClass != oEffectiveClass) {
return false;
}
SystemConfigDO that = (SystemConfigDO) o;
return getId() != null && Objects.equals(getId(), that.getId());
}
@Override
public final int hashCode() {
return this instanceof HibernateProxy ? ((HibernateProxy) this).getHibernateLazyInitializer()
.getPersistentClass().hashCode() : getClass().hashCode();
}
}

View File

@ -12,16 +12,29 @@
*/
package com.alibaba.nacossync.pojo.model;
import lombok.Data;
import javax.persistence.*;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.hibernate.proxy.HibernateProxy;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
import java.io.Serializable;
import java.util.Objects;
/**
* @author NacosSync
* @version $Id: TaskDo.java, v 0.1 2018-09-24 PM11:53 NacosSync Exp $$
*/
@Data
@Getter
@Setter
@ToString
@RequiredArgsConstructor
@Entity
@Table(name = "task")
public class TaskDO implements Serializable {
@ -71,4 +84,30 @@ public class TaskDO implements Serializable {
*/
private String operationId;
@Override
public final boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null) {
return false;
}
Class<?> oEffectiveClass =
o instanceof HibernateProxy ? ((HibernateProxy) o).getHibernateLazyInitializer().getPersistentClass()
: o.getClass();
Class<?> thisEffectiveClass =
this instanceof HibernateProxy ? ((HibernateProxy) this).getHibernateLazyInitializer()
.getPersistentClass() : this.getClass();
if (thisEffectiveClass != oEffectiveClass) {
return false;
}
TaskDO taskDO = (TaskDO) o;
return getId() != null && Objects.equals(getId(), taskDO.getId());
}
@Override
public final int hashCode() {
return this instanceof HibernateProxy ? ((HibernateProxy) this).getHibernateLazyInitializer()
.getPersistentClass().hashCode() : getClass().hashCode();
}
}

View File

@ -17,10 +17,10 @@
package com.alibaba.nacossync.pojo.request;
import com.alibaba.nacossync.constant.ClusterTypeEnum;
import java.util.List;
import lombok.Data;
import java.util.List;
/**
* @author NacosSync
* @version $Id: AddClusterRequest.java, v 0.1 2018-09-25 PM 10:27 NacosSync Exp $$

View File

@ -14,13 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacossync.pojo.view;
import com.alibaba.nacossync.constant.ClusterTypeEnum;
import java.io.Serializable;
import com.alibaba.nacossync.pojo.model.ClusterDO;
import lombok.Data;
import java.io.Serializable;
/**
* @author NacosSync
* @version $Id: ClusterModel.java, v 0.1 2018-09-25 下午11:09 NacosSync Exp $$
@ -29,14 +31,17 @@ import lombok.Data;
public class ClusterModel implements Serializable {
private String clusterId;
/**
* json format["192.168.1:8080","192.168.2?key=1"]
*/
private String connectKeyList;
/**
* cluster name, egcluster of ShangHaiedas-sh
*/
private String clusterName;
/**
* cluster type, eg cluster of CS,cluster of Nacos,
*
@ -45,5 +50,17 @@ public class ClusterModel implements Serializable {
private String clusterType;
private String namespace;
private String userName;
public static ClusterModel from(ClusterDO clusterDO) {
ClusterModel clusterModel = new ClusterModel();
clusterModel.setClusterId(clusterDO.getClusterId());
clusterModel.setConnectKeyList(clusterDO.getConnectKeyList());
clusterModel.setClusterType(clusterDO.getClusterType());
clusterModel.setClusterName(clusterDO.getClusterName());
clusterModel.setNamespace(clusterDO.getNamespace());
clusterModel.setUserName(clusterDO.getUserName());
return clusterModel;
}
}

View File

@ -14,8 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacossync.pojo.view;
import com.alibaba.nacossync.pojo.model.TaskDO;
import lombok.Data;
/**
@ -26,10 +28,25 @@ import lombok.Data;
public class TaskModel {
private String taskId;
private String sourceClusterId;
private String destClusterId;
private String serviceName;
private String groupName;
private String taskStatus;
public static TaskModel from(TaskDO taskDO) {
TaskModel taskModel = new TaskModel();
taskModel.setDestClusterId(taskDO.getDestClusterId());
taskModel.setGroupName(taskDO.getGroupName());
taskModel.setServiceName(taskDO.getServiceName());
taskModel.setSourceClusterId(taskDO.getSourceClusterId());
taskModel.setTaskStatus(taskDO.getTaskStatus());
taskModel.setTaskId(taskDO.getTaskId());
return taskModel;
}
}

View File

@ -45,8 +45,7 @@ public class SkyWalkerTemplate {
}
private static <T extends BaseResult> void initExceptionResult(T result, Throwable e) {
if (e instanceof SkyWalkerException) {
SkyWalkerException skyWalkerException = (SkyWalkerException) e;
if (e instanceof SkyWalkerException skyWalkerException) {
if (null != skyWalkerException.getResultCode()) {
result.setResultCode(skyWalkerException.getResultCode().getCode());
}

View File

@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacossync.template.processor;
import com.alibaba.nacossync.constant.ClusterTypeEnum;
import com.alibaba.nacossync.dao.ClusterAccessService;
import com.alibaba.nacossync.exception.SkyWalkerException;
import com.alibaba.nacossync.monitor.MetricsManager;
import com.alibaba.nacossync.pojo.model.ClusterDO;
import com.alibaba.nacossync.pojo.request.ClusterAddRequest;
import com.alibaba.nacossync.pojo.result.ClusterAddResult;
@ -28,7 +28,6 @@ import com.alibaba.nacossync.util.SkyWalkerUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
@ -39,18 +38,19 @@ import org.springframework.stereotype.Service;
@Service
public class ClusterAddProcessor implements Processor<ClusterAddRequest, ClusterAddResult> {
@Autowired
private MetricsManager metricsManager;
@Autowired
private ClusterAccessService clusterAccessService;
private final ClusterAccessService clusterAccessService;
@Autowired
private ObjectMapper objectMapper;
private final ObjectMapper objectMapper;
public ClusterAddProcessor(ClusterAccessService clusterAccessService, ObjectMapper objectMapper) {
this.clusterAccessService = clusterAccessService;
this.objectMapper = objectMapper;
}
@Override
public void process(ClusterAddRequest clusterAddRequest, ClusterAddResult clusterAddResult,
Object... others) throws Exception {
public void process(ClusterAddRequest clusterAddRequest, ClusterAddResult clusterAddResult, Object... others)
throws Exception {
ClusterDO clusterDO = new ClusterDO();
if (null == clusterAddRequest.getConnectKeyList() || clusterAddRequest.getConnectKeyList().isEmpty()) {
@ -58,8 +58,8 @@ public class ClusterAddProcessor implements Processor<ClusterAddRequest, Cluster
throw new SkyWalkerException("集群列表不能为空!");
}
if (StringUtils.isBlank(clusterAddRequest.getClusterName()) || StringUtils
.isBlank(clusterAddRequest.getClusterType())) {
if (StringUtils.isBlank(clusterAddRequest.getClusterName()) || StringUtils.isBlank(
clusterAddRequest.getClusterType())) {
throw new SkyWalkerException("集群名字或者类型不能为空!");
}

View File

@ -14,15 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacossync.template.processor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.alibaba.nacossync.dao.ClusterAccessService;
import com.alibaba.nacossync.pojo.result.ClusterDeleteResult;
import com.alibaba.nacossync.dao.TaskAccessService;
import com.alibaba.nacossync.exception.SkyWalkerException;
import com.alibaba.nacossync.pojo.request.ClusterDeleteRequest;
import com.alibaba.nacossync.pojo.result.ClusterDeleteResult;
import com.alibaba.nacossync.template.Processor;
import org.springframework.stereotype.Service;
/**
* @author NacosSync
@ -31,12 +32,23 @@ import com.alibaba.nacossync.template.Processor;
@Service
public class ClusterDeleteProcessor implements Processor<ClusterDeleteRequest, ClusterDeleteResult> {
@Autowired
private ClusterAccessService clusterAccessService;
private final ClusterAccessService clusterAccessService;
private final TaskAccessService taskAccessService;
public ClusterDeleteProcessor(ClusterAccessService clusterAccessService, TaskAccessService taskAccessService) {
this.clusterAccessService = clusterAccessService;
this.taskAccessService = taskAccessService;
}
@Override
public void process(ClusterDeleteRequest clusterDeleteRequest,
ClusterDeleteResult clusterDeleteResult, Object... others) throws Exception {
public void process(ClusterDeleteRequest clusterDeleteRequest, ClusterDeleteResult clusterDeleteResult,
Object... others) throws Exception {
int count = taskAccessService.countByDestClusterIdOrSourceClusterId(clusterDeleteRequest.getClusterId(),
clusterDeleteRequest.getClusterId());
if (count > 0) {
throw new SkyWalkerException(String.format("集群下有%d个任务请先删除任务", count));
}
clusterAccessService.deleteByClusterId(clusterDeleteRequest.getClusterId());

View File

@ -14,45 +14,37 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacossync.template.processor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.alibaba.nacossync.dao.ClusterAccessService;
import com.alibaba.nacossync.pojo.result.ClusterDetailQueryResult;
import com.alibaba.nacossync.pojo.model.ClusterDO;
import com.alibaba.nacossync.pojo.request.ClusterDetailQueryRequest;
import com.alibaba.nacossync.pojo.result.ClusterDetailQueryResult;
import com.alibaba.nacossync.pojo.view.ClusterModel;
import com.alibaba.nacossync.template.Processor;
import org.springframework.stereotype.Service;
/**
* @author NacosSync
* @version $Id: ClusterDetailQueryProcessor.java, v 0.1 2018-09-30 PM2:39 NacosSync Exp $$
*/
@Service
public class ClusterDetailQueryProcessor
implements
Processor<ClusterDetailQueryRequest, ClusterDetailQueryResult> {
@Autowired
private ClusterAccessService clusterAccessService;
public class ClusterDetailQueryProcessor implements Processor<ClusterDetailQueryRequest, ClusterDetailQueryResult> {
private final ClusterAccessService clusterAccessService;
public ClusterDetailQueryProcessor(ClusterAccessService clusterAccessService) {
this.clusterAccessService = clusterAccessService;
}
@Override
public void process(ClusterDetailQueryRequest clusterDetailQueryRequest,
ClusterDetailQueryResult clusterDetailQueryResult, Object... others)
throws Exception {
ClusterDetailQueryResult clusterDetailQueryResult, Object... others) throws Exception {
ClusterDO clusterDO = clusterAccessService.findByClusterId(clusterDetailQueryRequest
.getClusterId());
ClusterDO clusterDO = clusterAccessService.findByClusterId(clusterDetailQueryRequest.getClusterId());
ClusterModel clusterModel = new ClusterModel();
clusterModel.setClusterId(clusterDO.getClusterId());
clusterModel.setConnectKeyList(clusterDO.getConnectKeyList());
clusterModel.setClusterType(clusterDO.getClusterType());
clusterModel.setClusterName(clusterDO.getClusterName());
clusterModel.setNamespace(clusterDO.getNamespace());
clusterModel.setUserName(clusterDO.getUserName());
clusterDetailQueryResult.setClusterModel(clusterModel);
clusterDetailQueryResult.setClusterModel(ClusterModel.from(clusterDO));
}
}

View File

@ -14,38 +14,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacossync.template.processor;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.nacossync.dao.ClusterAccessService;
import com.alibaba.nacossync.pojo.QueryCondition;
import com.alibaba.nacossync.pojo.model.ClusterDO;
import com.alibaba.nacossync.pojo.request.ClusterListQueryRequest;
import com.alibaba.nacossync.pojo.result.ClusterListQueryResult;
import com.alibaba.nacossync.pojo.view.ClusterModel;
import com.alibaba.nacossync.template.Processor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.stereotype.Service;
import com.alibaba.nacossync.dao.ClusterAccessService;
import com.alibaba.nacossync.pojo.result.ClusterListQueryResult;
import com.alibaba.nacossync.pojo.model.ClusterDO;
import com.alibaba.nacossync.pojo.request.ClusterListQueryRequest;
import com.alibaba.nacossync.pojo.view.ClusterModel;
import com.alibaba.nacossync.template.Processor;
import java.util.List;
/**
* @author NacosSync
* @version $Id: ClusterListQueryProcessor.java, v 0.1 2018-09-30 PM2:33 NacosSync Exp $$
*/
@Service
public class ClusterListQueryProcessor implements
Processor<ClusterListQueryRequest, ClusterListQueryResult> {
public class ClusterListQueryProcessor implements Processor<ClusterListQueryRequest, ClusterListQueryResult> {
@Autowired
private ClusterAccessService clusterAccessService;
private final ClusterAccessService clusterAccessService;
public ClusterListQueryProcessor(ClusterAccessService clusterAccessService) {
this.clusterAccessService = clusterAccessService;
}
@Override
public void process(ClusterListQueryRequest clusterListQueryRequest,
ClusterListQueryResult clusterListQueryResult, Object... others) {
public void process(ClusterListQueryRequest clusterListQueryRequest, ClusterListQueryResult clusterListQueryResult,
Object... others) {
Page<ClusterDO> clusterDOS;
@ -62,19 +63,7 @@ public class ClusterListQueryProcessor implements
clusterListQueryRequest.getPageSize());
}
List<ClusterModel> clusterModels = new ArrayList<>();
clusterDOS.forEach(clusterDO -> {
ClusterModel clusterModel = new ClusterModel();
clusterModel.setClusterId(clusterDO.getClusterId());
clusterModel.setClusterName(clusterDO.getClusterName());
clusterModel.setClusterType(clusterDO.getClusterType());
clusterModel.setConnectKeyList(clusterDO.getConnectKeyList());
clusterModel.setNamespace(clusterDO.getNamespace());
clusterModels.add(clusterModel);
});
List<ClusterModel> clusterModels = clusterDOS.stream().map(ClusterModel::from).toList();
clusterListQueryResult.setClusterModels(clusterModels);
clusterListQueryResult.setTotalPage(clusterDOS.getTotalPages());
clusterListQueryResult.setCurrentSize(clusterModels.size());

View File

@ -16,19 +16,12 @@
*/
package com.alibaba.nacossync.template.processor;
import java.util.ArrayList;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.alibaba.nacossync.dao.TaskAccessService;
import com.alibaba.nacossync.pojo.model.TaskDO;
import com.alibaba.nacossync.pojo.request.TaskDeleteInBatchRequest;
import com.alibaba.nacossync.pojo.result.BaseResult;
import com.alibaba.nacossync.template.Processor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
@ -40,20 +33,15 @@ import lombok.extern.slf4j.Slf4j;
@Service
public class TaskDeleteInBatchProcessor implements Processor<TaskDeleteInBatchRequest, BaseResult> {
@Autowired
private TaskAccessService taskAccessService;
private final TaskAccessService taskAccessService;
public TaskDeleteInBatchProcessor(TaskAccessService taskAccessService) {
this.taskAccessService = taskAccessService;
}
@Override
public void process(TaskDeleteInBatchRequest taskBatchDeleteRequest, BaseResult baseResult,
Object... others) {
//
// String[] taskIds= taskBatchDeleteRequest.getTaskIds();
// List<TaskDO> taskDOs = new ArrayList<TaskDO>();
// for (String taskId : taskIds) {
// TaskDO taskDO = new TaskDO();
// taskDO.setTaskId(taskId);
// taskDOs.add(taskDO);
// }
taskAccessService.deleteTaskInBatch(taskBatchDeleteRequest.getTaskIds());
}
}

View File

@ -14,17 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacossync.template.processor;
import com.alibaba.nacossync.dao.TaskAccessService;
import com.alibaba.nacossync.exception.SkyWalkerException;
import com.alibaba.nacossync.pojo.result.TaskDetailQueryResult;
import com.alibaba.nacossync.pojo.model.TaskDO;
import com.alibaba.nacossync.pojo.request.TaskDetailQueryRequest;
import com.alibaba.nacossync.pojo.result.TaskDetailQueryResult;
import com.alibaba.nacossync.pojo.view.TaskModel;
import com.alibaba.nacossync.template.Processor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
@ -35,12 +35,15 @@ import org.springframework.stereotype.Service;
@Service
public class TaskDetailProcessor implements Processor<TaskDetailQueryRequest, TaskDetailQueryResult> {
@Autowired
private TaskAccessService taskAccessService;
private final TaskAccessService taskAccessService;
public TaskDetailProcessor(TaskAccessService taskAccessService) {
this.taskAccessService = taskAccessService;
}
@Override
public void process(TaskDetailQueryRequest taskDetailQueryRequest, TaskDetailQueryResult taskDetailQueryResult, Object... others)
throws Exception {
public void process(TaskDetailQueryRequest taskDetailQueryRequest, TaskDetailQueryResult taskDetailQueryResult,
Object... others) throws Exception {
TaskDO taskDO = taskAccessService.findByTaskId(taskDetailQueryRequest.getTaskId());
@ -48,15 +51,6 @@ public class TaskDetailProcessor implements Processor<TaskDetailQueryRequest, Ta
throw new SkyWalkerException("taskDo is null,taskId :" + taskDetailQueryRequest.getTaskId());
}
TaskModel taskModel = new TaskModel();
taskModel.setDestClusterId(taskDO.getDestClusterId());
taskModel.setGroupName(taskDO.getGroupName());
taskModel.setServiceName(taskDO.getServiceName());
taskModel.setSourceClusterId(taskDO.getSourceClusterId());
taskModel.setTaskStatus(taskDO.getTaskStatus());
taskModel.setTaskId(taskDO.getTaskId());
taskDetailQueryResult.setTaskModel(taskModel);
taskDetailQueryResult.setTaskModel(TaskModel.from(taskDO));
}
}

View File

@ -14,25 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacossync.template.processor;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.nacossync.dao.TaskAccessService;
import com.alibaba.nacossync.pojo.QueryCondition;
import com.alibaba.nacossync.pojo.model.TaskDO;
import com.alibaba.nacossync.pojo.request.TaskListQueryRequest;
import com.alibaba.nacossync.pojo.result.TaskListQueryResult;
import com.alibaba.nacossync.pojo.view.TaskModel;
import com.alibaba.nacossync.template.Processor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.stereotype.Service;
import com.alibaba.nacossync.dao.TaskAccessService;
import com.alibaba.nacossync.pojo.result.TaskListQueryResult;
import com.alibaba.nacossync.pojo.request.TaskListQueryRequest;
import com.alibaba.nacossync.pojo.view.TaskModel;
import com.alibaba.nacossync.template.Processor;
import java.util.List;
/**
* @author NacosSync
@ -42,12 +40,15 @@ import com.alibaba.nacossync.template.Processor;
@Slf4j
public class TaskListQueryProcessor implements Processor<TaskListQueryRequest, TaskListQueryResult> {
@Autowired
private TaskAccessService taskAccessService;
private final TaskAccessService taskAccessService;
public TaskListQueryProcessor(TaskAccessService taskAccessService) {
this.taskAccessService = taskAccessService;
}
@Override
public void process(TaskListQueryRequest taskListQueryRequest,
TaskListQueryResult taskListQueryResult, Object... others) {
public void process(TaskListQueryRequest taskListQueryRequest, TaskListQueryResult taskListQueryResult,
Object... others) {
Page<TaskDO> taskDOPage;
@ -64,18 +65,7 @@ public class TaskListQueryProcessor implements Processor<TaskListQueryRequest, T
}
List<TaskModel> taskList = new ArrayList<>();
taskDOPage.forEach(taskDO -> {
TaskModel taskModel = new TaskModel();
taskModel.setTaskId(taskDO.getTaskId());
taskModel.setDestClusterId(taskDO.getDestClusterId());
taskModel.setSourceClusterId(taskDO.getSourceClusterId());
taskModel.setServiceName(taskDO.getServiceName());
taskModel.setGroupName(taskDO.getGroupName());
taskModel.setTaskStatus(taskDO.getTaskStatus());
taskList.add(taskModel);
});
List<TaskModel> taskList = taskDOPage.stream().map(TaskModel::from).toList();
taskListQueryResult.setTaskModels(taskList);
taskListQueryResult.setTotalPage(taskDOPage.getTotalPages());

View File

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacossync.template.processor;
import com.alibaba.nacossync.constant.TaskStatusEnum;
@ -25,7 +26,6 @@ import com.alibaba.nacossync.pojo.result.BaseResult;
import com.alibaba.nacossync.template.Processor;
import com.alibaba.nacossync.util.SkyWalkerUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
@ -35,29 +35,29 @@ import org.springframework.stereotype.Service;
@Slf4j
@Service
public class TaskUpdateProcessor implements Processor<TaskUpdateRequest, BaseResult> {
@Autowired
private TaskAccessService taskAccessService;
private final TaskAccessService taskAccessService;
public TaskUpdateProcessor(TaskAccessService taskAccessService) {
this.taskAccessService = taskAccessService;
}
@Override
public void process(TaskUpdateRequest taskUpdateRequest, BaseResult baseResult,
Object... others) throws Exception {
public void process(TaskUpdateRequest taskUpdateRequest, BaseResult baseResult, Object... others) throws Exception {
TaskDO taskDO = taskAccessService.findByTaskId(taskUpdateRequest.getTaskId());
if (!TaskStatusEnum.contains(taskUpdateRequest.getTaskStatus())) {
throw new SkyWalkerException(
"taskUpdateRequest.getTaskStatus() is not exist , value is :"
+ taskUpdateRequest.getTaskStatus());
"taskUpdateRequest.getTaskStatus() is not exist , value is :" + taskUpdateRequest.getTaskStatus());
}
if (null == taskDO) {
throw new SkyWalkerException("taskDo is null ,taskId is :"
+ taskUpdateRequest.getTaskId());
throw new SkyWalkerException("taskDo is null ,taskId is :" + taskUpdateRequest.getTaskId());
}
taskDO.setTaskStatus(taskUpdateRequest.getTaskStatus());
taskDO.setOperationId(SkyWalkerUtil.generateOperationId());
taskAccessService.addTask(taskDO);

View File

@ -120,7 +120,7 @@ public class CheckRunningStatusAllNacosThread implements Runnable {
// Build the list of sub-tasks pending removal
List<TaskDO> servicesPendingRemoval = serviceNameSet.stream()
.map(serviceName -> buildSubTaskDO(task, serviceName))
.collect(Collectors.toUnmodifiableList());
.toList();
// Handle the removal of the pending sub-tasks
handleRemoval(servicesPendingRemoval, serviceNameSet);
@ -154,13 +154,13 @@ public class CheckRunningStatusAllNacosThread implements Runnable {
List<TaskDO> servicesPendingInsertion = serviceNameList.stream()
.filter(serviceName -> !serviceSet.contains(serviceName))
.map(serviceName -> buildSubTaskDO(task, serviceName))
.collect(Collectors.toUnmodifiableList());
.toList();
// Determine the services that need to be removed (those in the current service set but not in the service name list)
List<TaskDO> servicesPendingRemoval = serviceSet.stream()
.filter(serviceName -> !serviceNameList.contains(serviceName))
.map(serviceName -> buildSubTaskDO(task, serviceName))
.collect(Collectors.toUnmodifiableList());
.toList();
// If all lists are empty, there is nothing to process

View File

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacossync.timer;
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
@ -21,15 +22,15 @@ import com.alibaba.nacossync.dao.TaskAccessService;
import com.alibaba.nacossync.pojo.FinishedTask;
import com.alibaba.nacossync.pojo.model.TaskDO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Service;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
* @author NacosSync
@ -39,20 +40,29 @@ import java.util.concurrent.TimeUnit;
@Service
public class CleanExceedOperationIdTimer implements CommandLineRunner {
@Autowired
private SkyWalkerCacheServices skyWalkerCacheServices;
private static final long INITIAL_DELAY = 0;
@Autowired
private TaskAccessService taskAccessService;
private static final long PERIOD = 12;
@Autowired
private ScheduledExecutorService scheduledExecutorService;
private final SkyWalkerCacheServices skyWalkerCacheServices;
private final TaskAccessService taskAccessService;
private final ScheduledExecutorService scheduledExecutorService;
public CleanExceedOperationIdTimer(SkyWalkerCacheServices skyWalkerCacheServices,
TaskAccessService taskAccessService, ScheduledExecutorService scheduledExecutorService) {
this.skyWalkerCacheServices = skyWalkerCacheServices;
this.taskAccessService = taskAccessService;
this.scheduledExecutorService = scheduledExecutorService;
}
@Override
public void run(String... args) {
/** Clean up the OperationId cache once every 12 hours */
scheduledExecutorService.scheduleWithFixedDelay(new CleanExceedOperationIdThread(), 0, 12,
scheduledExecutorService.scheduleWithFixedDelay(new CleanExceedOperationIdThread(), INITIAL_DELAY, PERIOD,
TimeUnit.HOURS);
log.info("CleanExceedOperationIdTimer has started successfully");
}
@ -63,19 +73,9 @@ public class CleanExceedOperationIdTimer implements CommandLineRunner {
try {
Map<String, FinishedTask> finishedTaskMap = skyWalkerCacheServices
.getFinishedTaskMap();
Iterable<TaskDO> taskDOS = taskAccessService.findAll();
Set<String> operationIds = getDbOperations(taskDOS);
for (String operationId : finishedTaskMap.keySet()) {
if (!operationIds.contains(operationId)) {
finishedTaskMap.remove(operationId);
}
}
Map<String, FinishedTask> finishedTaskMap = skyWalkerCacheServices.getFinishedTaskMap();
Set<String> operationIds = getDbOperations(taskAccessService.findAll());
finishedTaskMap.keySet().removeIf(operationId -> !operationIds.contains(operationId));
} catch (Exception e) {
log.warn("CleanExceedOperationIdThread Exception", e);
@ -84,9 +84,8 @@ public class CleanExceedOperationIdTimer implements CommandLineRunner {
}
private Set<String> getDbOperations(Iterable<TaskDO> taskDOS) {
Set<String> operationIds = new HashSet<>();
taskDOS.forEach(taskDO -> operationIds.add(taskDO.getOperationId()));
return operationIds;
return StreamSupport.stream(taskDOS.spliterator(), false).map(TaskDO::getOperationId)
.collect(Collectors.toSet());
}
}
}

View File

@ -27,7 +27,6 @@ import com.alibaba.nacossync.monitor.MetricsManager;
import com.alibaba.nacossync.pojo.model.TaskDO;
import com.google.common.eventbus.EventBus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Service;
@ -42,33 +41,44 @@ import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class QuerySyncTaskTimer implements CommandLineRunner {
@Autowired
private MetricsManager metricsManager;
@Autowired
private SkyWalkerCacheServices skyWalkerCacheServices;
private static final int INITIAL_DELAY = 0;
@Autowired
private TaskAccessService taskAccessService;
private static final int DELAY = 3000;
@Autowired
private EventBus eventBus;
private final MetricsManager metricsManager;
@Autowired
private ScheduledExecutorService scheduledExecutorService;
private final SkyWalkerCacheServices skyWalkerCacheServices;
@Autowired
private NacosServerHolder nacosServerHolder;
private final TaskAccessService taskAccessService;
private final EventBus eventBus;
private final ScheduledExecutorService scheduledExecutorService;
private final NacosServerHolder nacosServerHolder;
public QuerySyncTaskTimer(MetricsManager metricsManager, SkyWalkerCacheServices skyWalkerCacheServices,
TaskAccessService taskAccessService, EventBus eventBus, ScheduledExecutorService scheduledExecutorService,
NacosServerHolder nacosServerHolder) {
this.metricsManager = metricsManager;
this.skyWalkerCacheServices = skyWalkerCacheServices;
this.taskAccessService = taskAccessService;
this.eventBus = eventBus;
this.scheduledExecutorService = scheduledExecutorService;
this.nacosServerHolder = nacosServerHolder;
}
@Override
public void run(String... args) {
/** Fetch the task list from the database every 3 seconds */
scheduledExecutorService.scheduleWithFixedDelay(new CheckRunningStatusThread(), 0, 3000,
scheduledExecutorService.scheduleWithFixedDelay(new CheckRunningStatusThread(), INITIAL_DELAY, DELAY,
TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleWithFixedDelay(new CheckRunningStatusAllNacosThread(metricsManager,
taskAccessService, nacosServerHolder, eventBus), 0, 3000,
taskAccessService, nacosServerHolder, eventBus), INITIAL_DELAY, DELAY,
TimeUnit.MILLISECONDS);
log.info("QuerySyncTaskTimer has started successfully");
}
private class CheckRunningStatusThread implements Runnable {
@ -76,7 +86,7 @@ public class QuerySyncTaskTimer implements CommandLineRunner {
@Override
public void run() {
Long start = System.currentTimeMillis();
long start = System.currentTimeMillis();
try {
List<TaskDO> taskDOS = taskAccessService.findAllByServiceNameNotEqualAll();
@ -91,13 +101,13 @@ public class QuerySyncTaskTimer implements CommandLineRunner {
if (TaskStatusEnum.SYNC.getCode().equals(taskDO.getTaskStatus())) {
eventBus.post(new SyncTaskEvent(taskDO));
log.info("从数据库中查询到一个同步任务,发出一个同步事件:" + taskDO);
log.info("从数据库中查询到一个同步任务,发出一个同步事件:{}", taskDO);
}
if (TaskStatusEnum.DELETE.getCode().equals(taskDO.getTaskStatus())) {
eventBus.post(new DeleteTaskEvent(taskDO));
log.info("从数据库中查询到一个删除任务,发出一个同步事件:" + taskDO);
log.info("从数据库中查询到一个删除任务,发出一个同步事件:{}", taskDO);
}
});

View File

@ -17,7 +17,6 @@ import com.alibaba.nacossync.extension.event.SpecialSyncEvent;
import com.alibaba.nacossync.extension.event.SpecialSyncEventBus;
import com.google.common.eventbus.EventBus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Service;
@ -33,20 +32,25 @@ import java.util.concurrent.TimeUnit;
@Service
public class SpecialSyncEventTimer implements CommandLineRunner {
@Autowired
private SpecialSyncEventBus specialSyncEventBus;
private final SpecialSyncEventBus specialSyncEventBus;
@Autowired
private EventBus eventBus;
private final EventBus eventBus;
@Autowired
private ScheduledExecutorService scheduledExecutorService;
private final ScheduledExecutorService scheduledExecutorService;
public SpecialSyncEventTimer(SpecialSyncEventBus specialSyncEventBus, EventBus eventBus,
ScheduledExecutorService scheduledExecutorService) {
this.specialSyncEventBus = specialSyncEventBus;
this.eventBus = eventBus;
this.scheduledExecutorService = scheduledExecutorService;
}
@Override
public void run(String... args) throws Exception {
scheduledExecutorService.scheduleWithFixedDelay(new SpecialSyncEventTimer.SpecialSyncEventThread(), 0, 3000,
TimeUnit.MILLISECONDS);
log.info("SpecialSyncEventTimer has started successfully");
}
private class SpecialSyncEventThread implements Runnable {
@ -58,9 +62,9 @@ public class SpecialSyncEventTimer implements CommandLineRunner {
allSpecialSyncEvent.stream()
.filter(specialSyncEvent -> TaskStatusEnum.SYNC.getCode()
.equals(specialSyncEvent.getTaskDO().getTaskStatus()))
.forEach(specialSyncEvent -> eventBus.post(specialSyncEvent));
.forEach(eventBus::post);
} catch (Exception e) {
log.warn("SpecialSyncEventThread Exception", e);
log.error("Exception occurred while processing special sync events", e);
}
}

View File

@ -8,11 +8,9 @@ import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.function.Consumer;
@Slf4j
public class BatchTaskExecutor {
@ -20,39 +18,49 @@ public class BatchTaskExecutor {
private static final ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREAD_NUM);
/**
* 批量操作方法
* Batch operation method
*
* @param items 任务列表
* @param operation 要执行的操作
* @param items Task list
* @param operation Operation to be executed
*/
public static void batchOperation(List<TaskDO> items, Consumer<TaskDO> operation) {
Stopwatch stopwatch = Stopwatch.createStarted();
List<Tuple<Integer, List<TaskDO>>> taskGroupList = averageAssign(items, MAX_THREAD_NUM);
// 创建一个包含所有任务的 CompletableFuture
// Create a CompletableFuture for each task group
CompletableFuture<?>[] futures = taskGroupList.stream().map(tuple -> CompletableFuture.runAsync(() -> {
for (TaskDO taskDO : tuple.getT2()) {
operation.accept(taskDO);
try {
// Add timeout control for each task to avoid long-running tasks
CompletableFuture.runAsync(() -> operation.accept(taskDO), executorService)
.orTimeout(5, TimeUnit.SECONDS) // Task timeout set to 5 seconds
.exceptionally(ex -> {
log.error("Task execution timed out: {}", taskDO.getServiceName(), ex);
return null;
}).join();
} catch (Exception e) {
log.error("Error occurred during task execution: {}", taskDO.getServiceName(), e);
}
}
}, executorService)).toArray(CompletableFuture[]::new);
try {
// 等待所有任务完成
// Wait for all tasks to complete
CompletableFuture.allOf(futures).join();
} catch (Exception e) {
log.error("Error occurred during sync operation", e);
} finally {
log.info("Total sync tasks: {}, Execution time: {} ms", items.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
log.debug("Total sync tasks: {}, Execution time: {} ms", items.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
/**
* 将一个List均分成n个list, 主要通过偏移量来实现的
*
* @param source 源集合
* @param limit 最大值
* @return 均分后的列表
* Divide a list into n sublists, mainly implemented by offset
* @param source collection to be divided
* @param limit maximum value
* @return list after division
* @param <T> object type
*/
private static <T> List<Tuple<Integer, List<T>>> averageAssign(List<T> source, int limit) {
if (CollectionUtils.isEmpty(source)) {
@ -60,7 +68,7 @@ public class BatchTaskExecutor {
}
int size = source.size();
int listCount = (int) Math.ceil((double) size / limit); // Calculate the number of sublist
int listCount = (int) Math.ceil((double) size / limit); // Calculate the number of sublists
int remainder = size % listCount; // Calculate the number of remaining elements after even distribution
List<Tuple<Integer, List<T>>> result = new ArrayList<>(listCount); // Initialize the result list with the expected size
@ -73,4 +81,24 @@ public class BatchTaskExecutor {
return result;
}
/**
* Shutdown the executor service to avoid resource leakage
*/
public static void shutdown() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
log.error("Executor service did not terminate");
}
}
} catch (InterruptedException ie) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

View File

@ -1,15 +1,3 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.alibaba.nacossync.util;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
@ -20,16 +8,21 @@ import java.util.Map;
import java.util.stream.Collectors;
/**
* @author paderlol
* @date: 2019-04-25 00:01
* Utility class for handling Consul metadata.
*/
public class ConsulUtils {
public static Map<String, String> transferMetadata(List<String> tags) {
Map<String, String> metadata = new HashMap<>();
if (!CollectionUtils.isEmpty(tags)) {
return tags.stream().filter(tag -> tag.split("=", -1).length == 2).map(tag -> tag.split("=", -1))
.collect(Collectors.toMap(tagSplitArray -> tagSplitArray[0], tagSplitArray -> tagSplitArray[1]));
if (CollectionUtils.isEmpty(tags)) {
return new HashMap<>();
}
return metadata;
return tags.stream()
.map(tag -> tag.split("=", -1))
.filter(tagArray -> tagArray.length == 2)
.collect(Collectors.toMap(
tagArray -> tagArray[0],
tagArray -> tagArray[1],
(existing, replacement) -> existing // In case of duplicate keys, keep the existing value
));
}
}

View File

@ -20,7 +20,6 @@ import org.apache.commons.lang3.StringUtils;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
/**
* @author paderlol
@ -51,43 +50,49 @@ public final class DubboConstants {
public static final String ALL_SERVICE_NAME_PATTERN = "*";
/**
* Creates a service name based on Dubbo version compatibility.
* if Dubbo version greater than 2.7.2, service name is providers:interface:version:
* if Dubbo version less than 2.7.2, service name is providers:interface:version
* @param queryParam
* @return
*
* @param queryParam the query parameters that include keys such as interface, version, group, etc.
* @return the constructed service name string
*/
public static String createServiceName(Map<String, String> queryParam) {
String group = queryParam.get(GROUP_KEY);
String release = queryParam.get(RELEASE_KEY);
Predicate<String> isBlankGroup = StringUtils::isBlank;
Predicate<String> isNotBlankRelease = StringUtils::isNotBlank;
String serviceName = Joiner.on(SEPARATOR_KEY).skipNulls().join(CATALOG_KEY, queryParam.get(INTERFACE_KEY),
String baseServiceName = Joiner.on(SEPARATOR_KEY).skipNulls().join(CATALOG_KEY, queryParam.get(INTERFACE_KEY),
queryParam.get(VERSION_KEY), group);
//TODO The code here is to deal with service metadata format problems caused by dubbo version incompatibility
if (isBlankGroup.test(group) && isNotBlankRelease.test(release)) {
if (StringUtils.isBlank(group) && StringUtils.isNotBlank(release)) {
List<String> versions = Splitter.on(RELEASE_SEPARATOR_KEY).splitToList(release);
if (!CollectionUtils.isEmpty(versions) && versions.size() >= DUBBO_VERSION_INDEX) {
String firstVersion = versions.get(0);
String secondVersion = versions.get(1);
if (DUBBO_VERSION_INDEX == Integer.parseInt(firstVersion)) {
if (MIDDLE_DUBBO_VERSION_INDEX <= versions.size()) {
String thirdVersion = versions.get(2);
BigDecimal bigDecimal =
new BigDecimal(Joiner.on(RELEASE_SEPARATOR_KEY).join(secondVersion, thirdVersion));
if (bigDecimal.compareTo(COMPARE_NUMBER) > 0) {
serviceName = serviceName.concat(SEPARATOR_KEY);
}
} else if (versions.size() == DUBBO_VERSION_INDEX && Integer.parseInt(secondVersion) > 7) {
serviceName = serviceName.concat(SEPARATOR_KEY);
}
} else if (MIN_DUBBO_VERSION < Integer.parseInt(firstVersion)) {
serviceName = serviceName.concat(SEPARATOR_KEY);
BigDecimal bigDecimal = new BigDecimal(Joiner.on(RELEASE_SEPARATOR_KEY).join(secondVersion,
versions.size() > 2 ? versions.get(2) : "0"));
if (isVersionRequiresSeparator(firstVersion, secondVersion, bigDecimal)) {
baseServiceName = baseServiceName.concat(SEPARATOR_KEY);
}
}
}
return serviceName;
return baseServiceName;
}
/**
* Checks if the version requires a separator to be appended to the service name.
*
* @param firstVersion the major version
* @param secondVersion the minor version
* @param bigDecimal the version number as BigDecimal
* @return true if separator should be added, otherwise false
*/
private static boolean isVersionRequiresSeparator(String firstVersion, String secondVersion, BigDecimal bigDecimal) {
int majorVersion = Integer.parseInt(firstVersion);
int minorVersion = Integer.parseInt(secondVersion);
return (DUBBO_VERSION_INDEX == majorVersion && (MIDDLE_DUBBO_VERSION_INDEX <= minorVersion ||
bigDecimal.compareTo(COMPARE_NUMBER) > 0)) || (MIN_DUBBO_VERSION < majorVersion);
}
}

View File

@ -1,19 +1,3 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacossync.util;
import com.alibaba.nacossync.constant.ClusterTypeEnum;
@ -22,135 +6,139 @@ import com.alibaba.nacossync.pojo.model.TaskDO;
import com.alibaba.nacossync.pojo.request.ClusterAddRequest;
import com.alibaba.nacossync.pojo.request.TaskAddRequest;
import com.google.common.base.Joiner;
import org.apache.commons.lang3.StringUtils;
import java.io.UnsupportedEncodingException;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Enumeration;
import java.util.UUID;
/**
* @author NacosSync
* @version $Id: SkyWalkerUtil.java, v 0.1 2018-09-26 AM12:10 NacosSync Exp $$
* Utility class for various operations in SkyWalker.
*/
public class SkyWalkerUtil {
private static final String SEPARATOR = ":";
private static final String MD5_ALGORITHM = "MD5";
/**
* Generates an MD5 hash for the given string.
*
* Gets the string md5
* @param value
* @return
* @param value The string to be encrypted.
* @return The encrypted string, or an empty string if encryption fails.
*/
public static String StringToMd5(String value) {
{
try {
MessageDigest md5 = MessageDigest.getInstance("MD5");
md5.update(value.getBytes("UTF-8"));
byte[] encryption = md5.digest();
StringBuffer strBuf = new StringBuffer();
for (int i = 0; i < encryption.length; i++) {
if (Integer.toHexString(0xff & encryption[i]).length() == 1) {
strBuf.append("0").append(Integer.toHexString(0xff & encryption[i]));
} else {
strBuf.append(Integer.toHexString(0xff & encryption[i]));
public static String stringToMd5(String value) {
if (StringUtils.isBlank(value)) {
return "";
}
try {
MessageDigest md5 = MessageDigest.getInstance(MD5_ALGORITHM);
byte[] encryption = md5.digest(value.getBytes(StandardCharsets.UTF_8));
StringBuilder strBuf = new StringBuilder();
for (byte b : encryption) {
strBuf.append(String.format("%02x", b));
}
return strBuf.toString();
} catch (NoSuchAlgorithmException e) {
return "";
} catch (UnsupportedEncodingException e) {
return "";
}
}
}
/**
* The rules of generating taskId
* @param addTaskRequest
* @return
* Generates a task ID based on the given TaskAddRequest.
*
* @param addTaskRequest The TaskAddRequest containing task details.
* @return The generated task ID.
*/
public static String generateTaskId(TaskAddRequest addTaskRequest) {
return generateTaskId(addTaskRequest.getServiceName(), addTaskRequest.getGroupName(),
addTaskRequest.getSourceClusterId(), addTaskRequest.getDestClusterId());
}
/**
* The rules of generating taskId
* Generates a task ID based on the given parameters.
*
* @return
* @param serviceName The service name.
* @param groupName The group name.
* @param sourceClusterId The source cluster ID.
* @param destClusterId The destination cluster ID.
* @return The generated task ID.
*/
public static String generateTaskId(String serviceName, String groupName,
String sourceClusterId, String destClusterId) {
StringBuilder sb = new StringBuilder();
sb.append(serviceName);
sb.append(SkyWalkerConstants.UNDERLINE);
sb.append(groupName);
sb.append(SkyWalkerConstants.UNDERLINE);
sb.append(sourceClusterId);
sb.append(SkyWalkerConstants.UNDERLINE);
sb.append(destClusterId);
return SkyWalkerUtil.StringToMd5(sb.toString());
String rawId = String.join(SkyWalkerConstants.UNDERLINE, serviceName, groupName, sourceClusterId, destClusterId);
return stringToMd5(rawId);
}
/**
* 生成集群clusterId的规则
* Generates a cluster ID based on the given ClusterAddRequest.
*
* @param addClusterRequest
* @return
* @param addClusterRequest The ClusterAddRequest containing cluster details.
* @return The generated cluster ID.
*/
public static String generateClusterId(ClusterAddRequest addClusterRequest) {
StringBuilder sb = new StringBuilder();
sb.append(addClusterRequest.getClusterName());
sb.append(SkyWalkerConstants.UNDERLINE);
sb.append(addClusterRequest.getClusterType());
return SkyWalkerUtil.StringToMd5(sb.toString());
String rawId = String.join(SkyWalkerConstants.UNDERLINE, addClusterRequest.getClusterName(), addClusterRequest.getClusterType());
return stringToMd5(rawId);
}
/**
* Avoid getting a return address
* @return
* @throws Exception
* Gets the local IP address, avoiding loopback addresses.
*
* @return The local IP address.
* @throws Exception If an error occurs while fetching the IP address.
*/
public static String getLocalIp() throws Exception {
InetAddress addr = InetAddress.getLocalHost();
String localIp = addr.getHostAddress();
if (addr.isLoopbackAddress()) {
if (!addr.isLoopbackAddress()) {
return addr.getHostAddress();
}
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface in = interfaces.nextElement();
Enumeration<InetAddress> addrs = in.getInetAddresses();
NetworkInterface networkInterface = interfaces.nextElement();
Enumeration<InetAddress> addrs = networkInterface.getInetAddresses();
while (addrs.hasMoreElements()) {
InetAddress address = addrs.nextElement();
if (!address.isLoopbackAddress() && address instanceof Inet4Address) {
localIp = address.getHostAddress();
return address.getHostAddress();
}
}
}
}
return localIp;
return addr.getHostAddress();
}
/**
* Generates a synchronization key based on source and destination cluster types.
*
* @param sourceClusterType The source cluster type.
* @param destClusterType The destination cluster type.
* @return The generated synchronization key.
*/
public static String generateSyncKey(ClusterTypeEnum sourceClusterType, ClusterTypeEnum destClusterType) {
return Joiner.on(":").join(sourceClusterType.getCode(), destClusterType.getCode());
return Joiner.on(SEPARATOR).join(sourceClusterType.getCode(), destClusterType.getCode());
}
/**
* Gets the operation ID from the given TaskDO.
*
* @param taskDO The TaskDO containing the operation ID.
* @return The operation ID.
*/
public static String getOperationId(TaskDO taskDO) {
return taskDO.getOperationId();
}
/**
* Generates a unique operation ID.
*
* @return The generated operation ID.
*/
public static String generateOperationId() {
return UUID.randomUUID().toString();
}
}

View File

@ -12,6 +12,17 @@
*/
package com.alibaba.nacossync.util;
import com.google.common.base.Joiner;
import lombok.extern.slf4j.Slf4j;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static com.alibaba.nacossync.util.DubboConstants.DUBBO_PATH_FORMAT;
import static com.alibaba.nacossync.util.DubboConstants.DUBBO_URL_FORMAT;
import static com.alibaba.nacossync.util.DubboConstants.INSTANCE_IP_KEY;
@ -20,17 +31,6 @@ import static com.alibaba.nacossync.util.DubboConstants.INTERFACE_KEY;
import static com.alibaba.nacossync.util.DubboConstants.PROTOCOL_KEY;
import static com.alibaba.nacossync.util.DubboConstants.ZOOKEEPER_SEPARATOR;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
/**
* @author paderlol
* @date: 2018-12-25 21:08
@ -56,9 +56,9 @@ public final class StringUtils {
*/
private static Map<String, String> parseKeyValuePair(String str, String itemSeparator) {
String[] tmp = str.split(itemSeparator);
Map<String, String> map = new HashMap<String, String>(tmp.length);
for (int i = 0; i < tmp.length; i++) {
Matcher matcher = KVP_PATTERN.matcher(tmp[i]);
Map<String, String> map = new HashMap<>(tmp.length);
for (String s : tmp) {
Matcher matcher = KVP_PATTERN.matcher(s);
if (!matcher.matches()) {
continue;
}
@ -74,19 +74,14 @@ public final class StringUtils {
* @return Parameters instance.
*/
public static Map<String, String> parseQueryString(String qs) {
try {
String decodePath = URLDecoder.decode(qs, "UTF-8");
String decodePath = URLDecoder.decode(qs, StandardCharsets.UTF_8);
if (isEmpty(decodePath)) {
return new HashMap<>();
}
decodePath = substringAfter(decodePath, "?");
return parseKeyValuePair(decodePath, "&");
} catch (UnsupportedEncodingException e) {
log.warn("parse query string failed", e);
return Maps.newHashMap();
}
}
/**
@ -101,8 +96,7 @@ public final class StringUtils {
public static Map<String, String> parseIpAndPortString(String path) {
try {
String decodePath = URLDecoder.decode(path, "UTF-8");
String decodePath = URLDecoder.decode(path, StandardCharsets.UTF_8);
Matcher matcher = IP_PORT_PATTERN.matcher(decodePath);
// extract the ones that match the rules
Map<String, String> instanceMap = new HashMap<>(3);
@ -116,10 +110,6 @@ public final class StringUtils {
break;
}
return instanceMap;
} catch (UnsupportedEncodingException e) {
log.warn("parse query string failed", e);
return Maps.newHashMap();
}
}
@ -169,16 +159,11 @@ public final class StringUtils {
public static String convertDubboFullPathForZk(Map<String, String> metaData, String providersPath, String ip,
int port) {
try {
String urlParam = Joiner.on("&").withKeyValueSeparator("=").join(metaData);
String instanceUrl = String.format(DUBBO_URL_FORMAT, metaData.get(PROTOCOL_KEY), ip, port,
metaData.get(INTERFACE_KEY), urlParam);
return Joiner.on(ZOOKEEPER_SEPARATOR).join(providersPath, URLEncoder.encode(instanceUrl, "UTF-8"));
} catch (UnsupportedEncodingException e) {
log.warn("convert Dubbo full path", e);
return "";
}
return Joiner.on(ZOOKEEPER_SEPARATOR).join(providersPath, URLEncoder.encode(instanceUrl, StandardCharsets.UTF_8));
}

View File

@ -91,7 +91,7 @@ public class ConsulSyncToNacosServiceImplTest {
public void mockSync(TaskDO taskDO) throws Exception {
Instance instance = mock(Instance.class);
Map<String, String> metadata = Maps.newHashMap();
metadata.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, TEST_SOURCE_CLUSTER_ID);
metadata.put(SkyWalkerConstants.SOURCE_CLUSTER_ID_KEY, TEST_SOURCE_CLUSTER_ID);
HealthService healthServiceUp = buildHealthService(TEST_INSTANCE_ADDRESS, 8080, Maps.newHashMap());
HealthService healthServiceDown = buildHealthService(TEST_INSTANCE_ADDRESS, 8081, metadata);
List<HealthService> healthServiceList = Lists.newArrayList(healthServiceUp, healthServiceDown);
@ -129,7 +129,7 @@ public class ConsulSyncToNacosServiceImplTest {
when(taskDO.getDestClusterId()).thenReturn(TEST_DEST_CLUSTER_ID);
doReturn(destNamingService).when(nacosServerHolder).get(anyString());
Map<String, String> metadata = Maps.newHashMap();
metadata.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, TEST_SOURCE_CLUSTER_ID);
metadata.put(SkyWalkerConstants.SOURCE_CLUSTER_ID_KEY, TEST_SOURCE_CLUSTER_ID);
List<Instance> allInstances = Lists.newArrayList(instance);
doReturn(allInstances).when(destNamingService).getAllInstances(anyString());
doReturn(metadata).when(instance).getMetadata();

View File

@ -111,7 +111,7 @@ public class EurekaSyncToNacosServiceImplTest {
List<InstanceInfo> eurekaInstances = Lists.newArrayList();
doReturn(eurekaInstances).when(eurekaNamingService).getApplications(anyString());
Map<String, String> metadata = Maps.newHashMap();
metadata.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, TEST_SOURCE_CLUSTER_ID);
metadata.put(SkyWalkerConstants.SOURCE_CLUSTER_ID_KEY, TEST_SOURCE_CLUSTER_ID);
List<Instance> allInstances = Lists.newArrayList(instance);
doReturn(allInstances).when(destNamingService).getAllInstances(anyString());
doReturn(metadata).when(instance).getMetadata();

View File

@ -90,7 +90,7 @@ public class NacosSyncToNacosServiceImplTest {
doReturn(sourceNamingService).when(nacosServerHolder).get(anyString());
doNothing().when(sourceNamingService).unsubscribe(any(), any());
Map<String, String> metadata = Maps.newHashMap();
metadata.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, TEST_SOURCE_CLUSTER_ID);
metadata.put(SkyWalkerConstants.SOURCE_CLUSTER_ID_KEY, TEST_SOURCE_CLUSTER_ID);
List<Instance> allInstances = Lists.newArrayList(instance);
doReturn(allInstances).when(sourceNamingService).getAllInstances(anyString());
doReturn(metadata).when(instance).getMetadata();

View File

@ -107,7 +107,7 @@ public class ZookeeperSyncToNacosServiceImplTest {
doNothing().when(treeCache).close();
Instance instance = mock(Instance.class);
Map<String, String> metadata = Maps.newHashMap();
metadata.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, TEST_SOURCE_CLUSTER_ID);
metadata.put(SkyWalkerConstants.SOURCE_CLUSTER_ID_KEY, TEST_SOURCE_CLUSTER_ID);
List<Instance> allInstances = Lists.newArrayList(instance);
doReturn(allInstances).when(destNamingService).getAllInstances(anyString());

View File

@ -0,0 +1,60 @@
package com.alibaba.nacossync.utils;
import com.alibaba.nacossync.pojo.model.TaskDO;
import com.alibaba.nacossync.util.BatchTaskExecutor;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class BatchTaskExecutorTest {
/**
* Test the batch operation to ensure all tasks are executed.
*/
@Test
public void testBatchOperation() {
// Prepare a list of tasks
List<TaskDO> tasks = new ArrayList<>();
for (int i = 0; i < 10; i++) {
TaskDO task = new TaskDO();
task.setServiceName("Service" + i);
tasks.add(task);
}
// Create a counter to track the number of executed tasks
AtomicInteger counter = new AtomicInteger(0);
// Execute batch operation
BatchTaskExecutor.batchOperation(tasks, task -> counter.incrementAndGet());
// Verify that all tasks have been executed
assertEquals(tasks.size(), counter.get());
}
/**
* Test the averageAssign method to ensure the correct distribution of tasks.
*/
/**
* Test the batch operation with an empty list to ensure no errors occur.
*/
@Test
public void testBatchOperationWithEmptyList() {
// Prepare an empty list of tasks
List<TaskDO> tasks = new ArrayList<>();
// Create a counter to track the number of executed tasks
AtomicInteger counter = new AtomicInteger(0);
// Execute batch operation
BatchTaskExecutor.batchOperation(tasks, task -> counter.incrementAndGet());
// Verify that no tasks have been executed
assertEquals(0, counter.get());
}
}

View File

@ -0,0 +1,81 @@
package com.alibaba.nacossync.utils;
import com.alibaba.nacossync.util.ConsulUtils;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ConsulUtilsTest {
/**
* Test the transferMetadata method with valid tags containing key-value pairs.
* This test ensures that all valid tags are correctly parsed into the resulting metadata map.
*/
@Test
public void testTransferMetadata_withValidTags() {
// Test with valid tags containing key-value pairs
List<String> tags = Arrays.asList("key1=value1", "key2=value2");
Map<String, String> expectedMetadata = new HashMap<>();
expectedMetadata.put("key1", "value1");
expectedMetadata.put("key2", "value2");
Map<String, String> actualMetadata = ConsulUtils.transferMetadata(tags);
// Assert that the actual metadata matches the expected metadata
assertEquals(expectedMetadata, actualMetadata);
}
/**
* Test the transferMetadata method with invalid tags that do not contain key-value pairs.
* This test ensures that only valid key-value pairs are included in the resulting metadata map.
*/
@Test
public void testTransferMetadata_withInvalidTags() {
// Test with tags that do not contain key-value pairs
List<String> tags = Arrays.asList("key1", "key2=value2", "invalidTag");
Map<String, String> expectedMetadata = new HashMap<>();
expectedMetadata.put("key2", "value2");
Map<String, String> actualMetadata = ConsulUtils.transferMetadata(tags);
// Assert that the actual metadata matches the expected metadata
assertEquals(expectedMetadata, actualMetadata);
}
/**
* Test the transferMetadata method with an empty list of tags.
* This test ensures that an empty list results in an empty metadata map.
*/
@Test
public void testTransferMetadata_withEmptyTags() {
// Test with an empty list of tags
List<String> tags = Collections.emptyList();
Map<String, String> expectedMetadata = new HashMap<>();
Map<String, String> actualMetadata = ConsulUtils.transferMetadata(tags);
// Assert that the actual metadata is empty
assertEquals(expectedMetadata, actualMetadata);
}
/**
* Test the transferMetadata method with null tags.
* This test ensures that a null input results in an empty metadata map.
*/
@Test
public void testTransferMetadata_withNullTags() {
// Test with null tags
Map<String, String> expectedMetadata = new HashMap<>();
Map<String, String> actualMetadata = ConsulUtils.transferMetadata(null);
// Assert that the actual metadata is empty
assertEquals(expectedMetadata, actualMetadata);
}
}

View File

@ -0,0 +1,62 @@
package com.alibaba.nacossync.utils;
import com.alibaba.nacossync.util.DubboConstants;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import static com.alibaba.nacossync.util.DubboConstants.GROUP_KEY;
import static com.alibaba.nacossync.util.DubboConstants.INTERFACE_KEY;
import static com.alibaba.nacossync.util.DubboConstants.RELEASE_KEY;
import static com.alibaba.nacossync.util.DubboConstants.VERSION_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class DubboConstantsTest {
@Test
public void testCreateServiceName_withValidParameters() {
Map<String, String> queryParams = new HashMap<>();
queryParams.put(INTERFACE_KEY, "com.example.Service");
queryParams.put(VERSION_KEY, "1.0.0");
queryParams.put(GROUP_KEY, "testGroup");
String expectedServiceName = "providers:com.example.Service:1.0.0:testGroup";
String actualServiceName = DubboConstants.createServiceName(queryParams);
assertEquals(expectedServiceName, actualServiceName);
}
@Test
public void testCreateServiceName_withBlankGroup() {
Map<String, String> queryParams = new HashMap<>();
queryParams.put(INTERFACE_KEY, "com.example.Service");
queryParams.put(VERSION_KEY, "1.0.0");
queryParams.put(RELEASE_KEY, "2.7.3");
String expectedServiceName = "providers:com.example.Service:1.0.0:";
String actualServiceName = DubboConstants.createServiceName(queryParams);
assertEquals(expectedServiceName, actualServiceName);
}
@Test
public void testCreateServiceName_withNoGroupOrRelease() {
Map<String, String> queryParams = new HashMap<>();
queryParams.put(INTERFACE_KEY, "com.example.Service");
queryParams.put(VERSION_KEY, "1.0.0");
String expectedServiceName = "providers:com.example.Service:1.0.0";
String actualServiceName = DubboConstants.createServiceName(queryParams);
assertEquals(expectedServiceName, actualServiceName);
}
@Test
public void testCreateServiceName_withComplexRelease() {
Map<String, String> queryParams = new HashMap<>();
queryParams.put(INTERFACE_KEY, "com.example.Service");
queryParams.put(VERSION_KEY, "1.0.0");
queryParams.put(RELEASE_KEY, "2.8.1");
String expectedServiceName = "providers:com.example.Service:1.0.0:";
String actualServiceName = DubboConstants.createServiceName(queryParams);
assertEquals(expectedServiceName, actualServiceName);
}
}

View File

@ -0,0 +1,39 @@
package com.alibaba.nacossync.utils;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacossync.util.NacosUtils;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class NacosUtilsTest {
/**
* Test getGroupNameOrDefault with a non-blank group name.
*/
@Test
public void testGetGroupNameOrDefault_withNonBlankGroupName() {
String groupName = "testGroup";
String result = NacosUtils.getGroupNameOrDefault(groupName);
assertEquals("testGroup", result);
}
/**
* Test getGroupNameOrDefault with a blank group name.
*/
@Test
public void testGetGroupNameOrDefault_withBlankGroupName() {
String groupName = " ";
String result = NacosUtils.getGroupNameOrDefault(groupName);
assertEquals(Constants.DEFAULT_GROUP, result);
}
/**
* Test getGroupNameOrDefault with a null group name.
*/
@Test
public void testGetGroupNameOrDefault_withNullGroupName() {
String result = NacosUtils.getGroupNameOrDefault(null);
assertEquals(Constants.DEFAULT_GROUP, result);
}
}

View File

@ -1,22 +1,118 @@
package com.alibaba.nacossync.utils;
import org.junit.Assert;
import com.alibaba.nacossync.constant.ClusterTypeEnum;
import com.alibaba.nacossync.pojo.request.ClusterAddRequest;
import com.alibaba.nacossync.pojo.request.TaskAddRequest;
import com.alibaba.nacossync.util.SkyWalkerUtil;
import org.junit.Test;
import com.alibaba.nacossync.util.SkyWalkerUtil;
import java.net.InetAddress;
import java.net.UnknownHostException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
/**
* @author NacosSync
* @version $Id: SkyWalkerUtilTest.java, v 0.1 2018-09-26 下午3:47 NacosSync Exp $$
*/
public class SkyWalkerUtilTest {
/**
* Test the MD5 hashing function with a sample string.
*/
@Test
public void skyWalkerUtilTest() throws Exception {
String ip = SkyWalkerUtil.getLocalIp();
Assert.assertNotEquals("127.0.0.1", ip);
public void testStringToMd5() {
String value = "testValue";
String expectedMd5 = SkyWalkerUtil.stringToMd5(value);
assertFalse(expectedMd5.isEmpty());
assertEquals(32, expectedMd5.length());
}
/**
* Test the task ID generation using a TaskAddRequest object.
*/
@Test
public void testGenerateTaskId_withTaskAddRequest() {
TaskAddRequest request = new TaskAddRequest();
request.setServiceName("testService");
request.setGroupName("testGroup");
request.setSourceClusterId("sourceCluster");
request.setDestClusterId("destCluster");
String taskId = SkyWalkerUtil.generateTaskId(request);
String expectedMd5 = SkyWalkerUtil.stringToMd5("testService_testGroup_sourceCluster_destCluster");
assertEquals(expectedMd5, taskId);
}
/**
* Test the task ID generation using individual parameters.
*/
@Test
public void testGenerateTaskId_withParameters() {
String serviceName = "testService";
String groupName = "testGroup";
String sourceClusterId = "sourceCluster";
String destClusterId = "destCluster";
String taskId = SkyWalkerUtil.generateTaskId(serviceName, groupName, sourceClusterId, destClusterId);
String expectedMd5 = SkyWalkerUtil.stringToMd5("testService_testGroup_sourceCluster_destCluster");
assertEquals(expectedMd5, taskId);
}
/**
* Test the cluster ID generation using a ClusterAddRequest object.
*/
@Test
public void testGenerateClusterId() {
ClusterAddRequest request = new ClusterAddRequest();
request.setClusterName("testCluster");
request.setClusterType("Nacos");
String clusterId = SkyWalkerUtil.generateClusterId(request);
String expectedMd5 = SkyWalkerUtil.stringToMd5("testCluster_Nacos");
assertEquals(expectedMd5, clusterId);
}
/**
* Test the retrieval of the local IP address.
* This ensures that the local IP address is not null and is a valid IP address.
*/
@Test
public void testGetLocalIp() throws Exception {
String localIp = SkyWalkerUtil.getLocalIp();
assertNotNull(localIp);
assertFalse(localIp.isEmpty());
try {
InetAddress ip = InetAddress.getByName(localIp);
assertNotNull(ip);
} catch (UnknownHostException e) {
fail("The IP address is invalid: " + e.getMessage());
}
}
/**
* Test the synchronization key generation using source and destination cluster types.
*/
@Test
public void testGenerateSyncKey() {
ClusterTypeEnum sourceClusterType = ClusterTypeEnum.NACOS;
ClusterTypeEnum destClusterType = ClusterTypeEnum.ZK;
String syncKey = SkyWalkerUtil.generateSyncKey(sourceClusterType, destClusterType);
String expectedSyncKey = sourceClusterType.getCode() + ":" + destClusterType.getCode();
assertEquals(expectedSyncKey, syncKey);
}
/**
* Test the operation ID generation to ensure it is unique each time.
*/
@Test
public void testGenerateOperationId() {
String operationId1 = SkyWalkerUtil.generateOperationId();
String operationId2 = SkyWalkerUtil.generateOperationId();
assertNotNull(operationId1);
assertNotNull(operationId2);
assertNotEquals(operationId1, operationId2);
}
}

15
pom.xml
View File

@ -33,13 +33,13 @@
<curator.version>4.3.0</curator.version>
<mockito.version>1.10.19</mockito.version>
<nacos-client.verison>1.4.7</nacos-client.verison>
<lombok.verison>1.18.2</lombok.verison>
<lombok.verison>1.18.34</lombok.verison>
<consul-api.verison>1.3.1</consul-api.verison>
<commoms-lang3.verison>3.12.0</commoms-lang3.verison>
<guava.verison>33.2.0-jre</guava.verison>
<versions-maven-plugin.version>2.2</versions-maven-plugin.version>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<maven-jar-plugin.version>3.2.0</maven-jar-plugin.version>
<maven-enforcer-plugin.version>1.4.1</maven-enforcer-plugin.version>
<maven-source-plugin.version>3.0.1</maven-source-plugin.version>
@ -181,6 +181,8 @@
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
@ -384,4 +386,11 @@
</plugin>
</plugins>
</reporting>
<repositories>
<repository>
<id>maven_central</id>
<name>Maven Central</name>
<url>https://repo.maven.apache.org/maven2/</url>
</repository>
</repositories>
</project>