Add Network Mapping (#143)

Signed-off-by: birdstorm <samuelwyf@hotmail.com>
This commit is contained in:
birdstorm 2021-03-23 17:42:04 +08:00 committed by GitHub
parent eecae1663e
commit 762153a550
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 144 additions and 21 deletions

View File

@ -47,6 +47,8 @@ public class ConfigUtils {
public static final String TIKV_METRICS_ENABLE = "tikv.metrics.enable";
public static final String TIKV_METRICS_PORT = "tikv.metrics.port";
public static final String TIKV_NETWORK_MAPPING_NAME = "tikv.network.mapping";
public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "600ms";
public static final String DEF_SCAN_TIMEOUT = "20s";
@ -73,6 +75,7 @@ public class ConfigUtils {
public static final boolean DEF_IS_REPLICA_READ = false;
public static final boolean DEF_METRICS_ENABLE = false;
public static final int DEF_METRICS_PORT = 3140;
public static final String DEF_TIKV_NETWORK_MAPPING_NAME = "";
public static final String NORMAL_COMMAND_PRIORITY = "NORMAL";
public static final String LOW_COMMAND_PRIORITY = "LOW";

View File

@ -0,0 +1,87 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.common;
import static org.tikv.common.pd.PDUtils.addrToUri;
import com.google.common.annotations.Beta;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.kv.GetResponse;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HostMapping {
private static final String NETWORK_MAPPING_PATH = "/client/url-mapping";
private final Client etcdClient;
private final String networkMappingName;
private final ConcurrentMap<String, String> hostMapping;
private final Logger logger = LoggerFactory.getLogger(HostMapping.class);
public HostMapping(Client etcdClient, String networkMappingName) {
this.etcdClient = etcdClient;
this.networkMappingName = networkMappingName;
this.hostMapping = new ConcurrentHashMap<>();
}
private ByteSequence hostToNetworkMappingKey(String host) {
String path = NETWORK_MAPPING_PATH + "/" + networkMappingName + "/" + host;
return ByteSequence.from(path, StandardCharsets.UTF_8);
}
@Beta
private String getMappedHostFromPD(String host) {
ByteSequence hostKey = hostToNetworkMappingKey(host);
for (int i = 0; i < 5; i++) {
CompletableFuture<GetResponse> future = etcdClient.getKVClient().get(hostKey);
try {
GetResponse resp = future.get();
List<KeyValue> kvs = resp.getKvs();
if (kvs.size() != 1) {
break;
}
return kvs.get(0).getValue().toString(StandardCharsets.UTF_8);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
logger.info("failed to get mapped Host from PD: " + host, e);
break;
} catch (Exception ignore) {
// ignore
break;
}
}
return host;
}
public URI getMappedURI(URI uri) {
if (networkMappingName.isEmpty()) {
return uri;
}
return addrToUri(
hostMapping.computeIfAbsent(uri.getHost(), this::getMappedHostFromPD)
+ ":"
+ uri.getPort());
}
}

View File

@ -18,6 +18,8 @@ package org.tikv.common;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.tikv.common.operation.PDErrorHandler.getRegionResponseErrorExtractor;
import static org.tikv.common.pd.PDError.buildFromPdpbError;
import static org.tikv.common.pd.PDUtils.addrToUri;
import static org.tikv.common.pd.PDUtils.uriToAddr;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -52,7 +54,6 @@ import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.operation.NoopHandler;
import org.tikv.common.operation.PDErrorHandler;
import org.tikv.common.pd.PDUtils;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffFunction.BackOffFuncType;
import org.tikv.common.util.BackOffer;
@ -96,6 +97,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
private List<URI> pdAddrs;
private Client etcdClient;
private ConcurrentMap<Long, Double> tiflashReplicaMap;
private HostMapping hostMapping;
public static final Histogram PD_GET_REGION_BY_KEY_REQUEST_LATENCY =
Histogram.build()
@ -118,6 +120,10 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
return new PDClient(conf, channelFactory);
}
public HostMapping getHostMapping() {
return hostMapping;
}
@Override
public TiTimestamp getTimestamp(BackOffer backOffer) {
Supplier<TsoRequest> request = () -> tsoReq;
@ -383,9 +389,9 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
return leaderWrapper;
}
private GetMembersResponse getMembers(URI url) {
private GetMembersResponse getMembers(URI uri) {
try {
ManagedChannel probChan = channelFactory.getChannel(url.getHost() + ":" + url.getPort());
ManagedChannel probChan = channelFactory.getChannel(uriToAddr(uri), hostMapping);
PDGrpc.PDBlockingStub stub = PDGrpc.newBlockingStub(probChan);
GetMembersRequest request =
GetMembersRequest.newBuilder().setHeader(RequestHeader.getDefaultInstance()).build();
@ -414,14 +420,14 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
private boolean createLeaderWrapper(String leaderUrlStr) {
try {
URI newLeader = PDUtils.addrToUrl(leaderUrlStr);
leaderUrlStr = newLeader.getHost() + ":" + newLeader.getPort();
URI newLeader = addrToUri(leaderUrlStr);
leaderUrlStr = uriToAddr(newLeader);
if (leaderWrapper != null && leaderUrlStr.equals(leaderWrapper.getLeaderInfo())) {
return true;
}
// create new Leader
ManagedChannel clientChannel = channelFactory.getChannel(leaderUrlStr);
ManagedChannel clientChannel = channelFactory.getChannel(leaderUrlStr, hostMapping);
leaderWrapper =
new LeaderWrapper(
leaderUrlStr,
@ -524,6 +530,9 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
private void initCluster() {
GetMembersResponse resp = null;
List<URI> pdAddrs = getConf().getPdAddrs();
this.pdAddrs = pdAddrs;
this.etcdClient = Client.builder().endpoints(pdAddrs).build();
this.hostMapping = new HostMapping(this.etcdClient, conf.getNetworkMappingName());
for (URI u : pdAddrs) {
resp = getMembers(u);
if (resp != null) {
@ -534,8 +543,6 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
long clusterId = resp.getHeader().getClusterId();
header = RequestHeader.newBuilder().setClusterId(clusterId).build();
tsoReq = TsoRequest.newBuilder().setHeader(header).setCount(1).build();
this.pdAddrs = pdAddrs;
this.etcdClient = Client.builder().endpoints(pdAddrs).build();
this.tiflashReplicaMap = new ConcurrentHashMap<>();
createLeaderWrapper(resp.getLeader().getClientUrls(0));
service =

View File

@ -52,6 +52,8 @@ public interface ReadOnlyPDClient {
Future<TiRegion> getRegionByIDAsync(BackOffer backOffer, long id);
HostMapping getHostMapping();
/**
* Get Store by StoreId
*

View File

@ -70,6 +70,7 @@ public class TiConfiguration implements Serializable {
setIfMissing(TIKV_IS_REPLICA_READ, DEF_IS_REPLICA_READ);
setIfMissing(TIKV_METRICS_ENABLE, DEF_METRICS_ENABLE);
setIfMissing(TIKV_METRICS_PORT, DEF_METRICS_PORT);
setIfMissing(TIKV_NETWORK_MAPPING_NAME, DEF_TIKV_NETWORK_MAPPING_NAME);
}
public static void listAll() {
@ -239,6 +240,8 @@ public class TiConfiguration implements Serializable {
private boolean metricsEnable = getBoolean(TIKV_METRICS_ENABLE);
private int metricsPort = getInt(TIKV_METRICS_PORT);
private final String networkMappingName = get(TIKV_NETWORK_MAPPING_NAME);
public enum KVMode {
TXN,
RAW
@ -480,4 +483,8 @@ public class TiConfiguration implements Serializable {
this.metricsPort = metricsPort;
return this;
}
public String getNetworkMappingName() {
return this.networkMappingName;
}
}

View File

@ -20,7 +20,7 @@ import java.net.URI;
import java.util.List;
public class PDUtils {
public static URI addrToUrl(String addr) {
public static URI addrToUri(String addr) {
if (addr.contains("://")) {
return URI.create(addr);
} else {
@ -31,8 +31,12 @@ public class PDUtils {
public static List<URI> addrsToUrls(String[] addrs) {
ImmutableList.Builder<URI> urlsBuilder = new ImmutableList.Builder<>();
for (String addr : addrs) {
urlsBuilder.add(addrToUrl(addr));
urlsBuilder.add(addrToUri(addr));
}
return urlsBuilder.build();
}
public static String uriToAddr(URI uri) {
return uri.getHost() + ":" + uri.getPort();
}
}

View File

@ -86,7 +86,8 @@ public abstract class AbstractRegionStoreClient
}
region = newRegion;
String addressStr = regionManager.getStoreById(region.getLeader().getStoreId()).getAddress();
ManagedChannel channel = channelFactory.getChannel(addressStr);
ManagedChannel channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
blockingStub = TikvGrpc.newBlockingStub(channel);
asyncStub = TikvGrpc.newStub(channel);
return true;
@ -95,7 +96,8 @@ public abstract class AbstractRegionStoreClient
@Override
public void onStoreNotMatch(Metapb.Store store) {
String addressStr = store.getAddress();
ManagedChannel channel = channelFactory.getChannel(addressStr);
ManagedChannel channel =
channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping());
blockingStub = TikvGrpc.newBlockingStub(channel);
asyncStub = TikvGrpc.newStub(channel);
if (region.getLeader().getStoreId() != store.getId()) {

View File

@ -79,6 +79,10 @@ public class RegionManager {
return cacheInvalidateCallback;
}
public ReadOnlyPDClient getPDClient() {
return this.cache.pdClient;
}
public TiRegion getRegionByKey(ByteString key) {
return getRegionByKey(key, ConcreteBackOffer.newGetBackOff());
}

View File

@ -119,7 +119,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
if (logger.isDebugEnabled()) {
logger.debug(String.format("Create region store client on address %s", addressStr));
}
ManagedChannel channel = channelFactory.getChannel(addressStr);
ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
TikvBlockingStub tikvBlockingStub = TikvGrpc.newBlockingStub(channel);
TikvStub tikvAsyncStub = TikvGrpc.newStub(channel);
@ -1242,7 +1242,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
if (logger.isDebugEnabled()) {
logger.debug(String.format("Create region store client on address %s", addressStr));
}
ManagedChannel channel = channelFactory.getChannel(addressStr);
ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping());
TikvBlockingStub blockingStub = TikvGrpc.newBlockingStub(channel);
TikvStub asyncStub = TikvGrpc.newStub(channel);

View File

@ -18,31 +18,38 @@ package org.tikv.common.util;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.tikv.common.HostMapping;
import org.tikv.common.pd.PDUtils;
public class ChannelFactory implements AutoCloseable {
private final int maxFrameSize;
private final Map<String, ManagedChannel> connPool = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ManagedChannel> connPool = new ConcurrentHashMap<>();
public ChannelFactory(int maxFrameSize) {
this.maxFrameSize = maxFrameSize;
}
public ManagedChannel getChannel(String addressStr) {
public ManagedChannel getChannel(String addressStr, HostMapping hostMapping) {
return connPool.computeIfAbsent(
addressStr,
key -> {
URI address;
URI mappedAddr;
try {
address = URI.create("http://" + key);
address = PDUtils.addrToUri(key);
} catch (Exception e) {
throw new IllegalArgumentException("failed to form address " + key);
throw new IllegalArgumentException("failed to form address " + key, e);
}
try {
mappedAddr = hostMapping.getMappedURI(address);
} catch (Exception e) {
throw new IllegalArgumentException("failed to get mapped address " + address, e);
}
// Channel should be lazy without actual connection until first call
// So a coarse grain lock is ok here
return ManagedChannelBuilder.forAddress(address.getHost(), address.getPort())
return ManagedChannelBuilder.forAddress(mappedAddr.getHost(), mappedAddr.getPort())
.maxInboundMessageSize(maxFrameSize)
.usePlaintext(true)
.idleTimeout(60, TimeUnit.SECONDS)

View File

@ -231,7 +231,7 @@ public class RangeSplitter {
this.ranges = ranges;
String host = null;
try {
host = PDUtils.addrToUrl(store.getAddress()).getHost();
host = PDUtils.addrToUri(store.getAddress()).getHost();
} catch (Exception ignored) {
}
this.host = host;