mirror of https://github.com/tikv/client-java.git
PDClient: Add function to call pause checker API (#277)
This commit is contained in:
parent
28380512f3
commit
d543ae73c0
5
pom.xml
5
pom.xml
|
|
@ -202,6 +202,11 @@
|
|||
<version>3.9</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpclient</artifactId>
|
||||
<version>4.5.13</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.prometheus</groupId>
|
||||
<artifactId>simpleclient</artifactId>
|
||||
|
|
|
|||
|
|
@ -0,0 +1,31 @@
|
|||
package org.tikv.common;
|
||||
|
||||
public enum PDChecker {
|
||||
Learner,
|
||||
Replica,
|
||||
Rule,
|
||||
Split,
|
||||
Merge,
|
||||
JointState,
|
||||
Priority;
|
||||
|
||||
public String apiName() {
|
||||
switch (this) {
|
||||
case Learner:
|
||||
return "learner";
|
||||
case Replica:
|
||||
return "replica";
|
||||
case Rule:
|
||||
return "rule";
|
||||
case Split:
|
||||
return "split";
|
||||
case Merge:
|
||||
return "merge";
|
||||
case JointState:
|
||||
return "joint-state";
|
||||
case Priority:
|
||||
return "priority";
|
||||
}
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
}
|
||||
|
|
@ -21,6 +21,9 @@ import static org.tikv.common.pd.PDError.buildFromPdpbError;
|
|||
import static org.tikv.common.pd.PDUtils.addrToUri;
|
||||
import static org.tikv.common.pd.PDUtils.uriToAddr;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.json.JsonMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.google.protobuf.ByteString;
|
||||
|
|
@ -34,7 +37,9 @@ import io.grpc.Metadata;
|
|||
import io.grpc.stub.MetadataUtils;
|
||||
import io.prometheus.client.Histogram;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
|
@ -46,6 +51,11 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.entity.ByteArrayEntity;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.tikv.common.codec.Codec.BytesCodec;
|
||||
|
|
@ -93,12 +103,15 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
|||
implements ReadOnlyPDClient {
|
||||
private static final String TIFLASH_TABLE_SYNC_PROGRESS_PATH = "/tiflash/table/sync";
|
||||
private static final long MIN_TRY_UPDATE_DURATION = 50;
|
||||
private static final int PAUSE_CHECKER_TIMEOUT = 300; // in seconds
|
||||
private static final int KEEP_CHECKER_PAUSE_PERIOD = PAUSE_CHECKER_TIMEOUT / 5; // in seconds
|
||||
private final Logger logger = LoggerFactory.getLogger(PDClient.class);
|
||||
private RequestHeader header;
|
||||
private TsoRequest tsoReq;
|
||||
private volatile PDClientWrapper pdClientWrapper;
|
||||
private ScheduledExecutorService service;
|
||||
private ScheduledExecutorService tiflashReplicaService;
|
||||
private final HashMap<PDChecker, ScheduledExecutorService> pauseCheckerService = new HashMap<>();
|
||||
private List<URI> pdAddrs;
|
||||
private Client etcdClient;
|
||||
private ConcurrentMap<Long, Double> tiflashReplicaMap;
|
||||
|
|
@ -144,6 +157,70 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
|
|||
return new TiTimestamp(timestamp.getPhysical(), timestamp.getLogical());
|
||||
}
|
||||
|
||||
public synchronized void keepPauseChecker(PDChecker checker) {
|
||||
if (!this.pauseCheckerService.containsKey(checker)) {
|
||||
ScheduledExecutorService newService =
|
||||
Executors.newSingleThreadScheduledExecutor(
|
||||
new ThreadFactoryBuilder()
|
||||
.setNameFormat(String.format("PDClient-pause-%s-pool-%%d", checker.name()))
|
||||
.setDaemon(true)
|
||||
.build());
|
||||
newService.scheduleAtFixedRate(
|
||||
() -> pauseChecker(checker, PAUSE_CHECKER_TIMEOUT),
|
||||
0,
|
||||
KEEP_CHECKER_PAUSE_PERIOD,
|
||||
TimeUnit.SECONDS);
|
||||
this.pauseCheckerService.put(checker, newService);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void stopKeepPauseChecker(PDChecker checker) {
|
||||
if (this.pauseCheckerService.containsKey(checker)) {
|
||||
this.pauseCheckerService.get(checker).shutdown();
|
||||
this.pauseCheckerService.remove(checker);
|
||||
}
|
||||
}
|
||||
|
||||
public void resumeChecker(PDChecker checker) {
|
||||
pauseChecker(checker, 0);
|
||||
}
|
||||
|
||||
private void pauseChecker(PDChecker checker, int timeout) {
|
||||
String verb = timeout == 0 ? "resume" : "pause";
|
||||
URI url = pdAddrs.get(0);
|
||||
String api = url.toString() + "/pd/api/v1/checker/" + checker.apiName();
|
||||
HashMap<String, Integer> arguments = new HashMap<>();
|
||||
arguments.put("delay", timeout);
|
||||
try (CloseableHttpClient client = HttpClients.createDefault()) {
|
||||
JsonMapper jsonMapper = new JsonMapper();
|
||||
byte[] body = jsonMapper.writeValueAsBytes(arguments);
|
||||
HttpPost post = new HttpPost(api);
|
||||
post.setEntity(new ByteArrayEntity(body));
|
||||
try (CloseableHttpResponse resp = client.execute(post)) {
|
||||
if (resp.getStatusLine().getStatusCode() != 200) {
|
||||
logger.error("failed to {} checker.", verb);
|
||||
}
|
||||
logger.info("checker {} {}d", checker.apiName(), verb);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error(String.format("failed to %s checker.", verb), e);
|
||||
}
|
||||
}
|
||||
|
||||
public Boolean isCheckerPaused(PDChecker checker) {
|
||||
URI url = pdAddrs.get(0);
|
||||
String api = url.toString() + "/pd/api/v1/checker/" + checker.apiName();
|
||||
try {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
HashMap<String, Boolean> status =
|
||||
mapper.readValue(new URL(api), new TypeReference<HashMap<String, Boolean>>() {});
|
||||
return status.get("paused");
|
||||
} catch (Exception e) {
|
||||
logger.error(String.format("failed to get %s checker status.", checker.apiName()), e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends request to pd to scatter region.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -0,0 +1,44 @@
|
|||
package org.tikv.common;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class PDClientIntegrationTest {
|
||||
private TiSession session;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
TiConfiguration conf = TiConfiguration.createRawDefault();
|
||||
conf.setTest(true);
|
||||
session = TiSession.create(conf);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (session != null) {
|
||||
session.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPauseCheck() throws Exception {
|
||||
try (PDClient client = session.getPDClient()) {
|
||||
PDChecker checker = PDChecker.Merge;
|
||||
for (int i = 0; i < 2; i++) {
|
||||
client.keepPauseChecker(checker);
|
||||
Thread.sleep(1000);
|
||||
assertTrue(client.isCheckerPaused(checker));
|
||||
|
||||
client.stopKeepPauseChecker(checker);
|
||||
Thread.sleep(1000);
|
||||
|
||||
client.resumeChecker(checker);
|
||||
assertFalse(client.isCheckerPaused(checker));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue