diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index 0eb1843b2f..a9dab95f60 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -47,6 +47,8 @@ public class ConfigUtils { public static final String TIKV_METRICS_ENABLE = "tikv.metrics.enable"; public static final String TIKV_METRICS_PORT = "tikv.metrics.port"; + public static final String TIKV_NETWORK_MAPPING_NAME = "tikv.network.mapping"; + public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; public static final String DEF_TIMEOUT = "600ms"; public static final String DEF_SCAN_TIMEOUT = "20s"; @@ -73,6 +75,7 @@ public class ConfigUtils { public static final boolean DEF_IS_REPLICA_READ = false; public static final boolean DEF_METRICS_ENABLE = false; public static final int DEF_METRICS_PORT = 3140; + public static final String DEF_TIKV_NETWORK_MAPPING_NAME = ""; public static final String NORMAL_COMMAND_PRIORITY = "NORMAL"; public static final String LOW_COMMAND_PRIORITY = "LOW"; diff --git a/src/main/java/org/tikv/common/HostMapping.java b/src/main/java/org/tikv/common/HostMapping.java new file mode 100644 index 0000000000..08ae8048c7 --- /dev/null +++ b/src/main/java/org/tikv/common/HostMapping.java @@ -0,0 +1,87 @@ +/* + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.tikv.common; + +import static org.tikv.common.pd.PDUtils.addrToUri; + +import com.google.common.annotations.Beta; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.Client; +import io.etcd.jetcd.KeyValue; +import io.etcd.jetcd.kv.GetResponse; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HostMapping { + private static final String NETWORK_MAPPING_PATH = "/client/url-mapping"; + private final Client etcdClient; + private final String networkMappingName; + private final ConcurrentMap hostMapping; + private final Logger logger = LoggerFactory.getLogger(HostMapping.class); + + public HostMapping(Client etcdClient, String networkMappingName) { + this.etcdClient = etcdClient; + this.networkMappingName = networkMappingName; + this.hostMapping = new ConcurrentHashMap<>(); + } + + private ByteSequence hostToNetworkMappingKey(String host) { + String path = NETWORK_MAPPING_PATH + "/" + networkMappingName + "/" + host; + return ByteSequence.from(path, StandardCharsets.UTF_8); + } + + @Beta + private String getMappedHostFromPD(String host) { + ByteSequence hostKey = hostToNetworkMappingKey(host); + for (int i = 0; i < 5; i++) { + CompletableFuture future = etcdClient.getKVClient().get(hostKey); + try { + GetResponse resp = future.get(); + List kvs = resp.getKvs(); + if (kvs.size() != 1) { + break; + } + return kvs.get(0).getValue().toString(StandardCharsets.UTF_8); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + logger.info("failed to get mapped Host from PD: " + host, e); + break; + } catch (Exception ignore) { + // ignore + break; + } + } + return host; + } + + public URI getMappedURI(URI uri) { + if (networkMappingName.isEmpty()) { + return uri; + } + return addrToUri( + hostMapping.computeIfAbsent(uri.getHost(), this::getMappedHostFromPD) + + ":" + + uri.getPort()); + } +} diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index e0a2ff8636..e652102bb6 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -18,6 +18,8 @@ package org.tikv.common; import static com.google.common.base.Preconditions.checkNotNull; import static org.tikv.common.operation.PDErrorHandler.getRegionResponseErrorExtractor; import static org.tikv.common.pd.PDError.buildFromPdpbError; +import static org.tikv.common.pd.PDUtils.addrToUri; +import static org.tikv.common.pd.PDUtils.uriToAddr; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -52,7 +54,6 @@ import org.tikv.common.exception.TiClientInternalException; import org.tikv.common.meta.TiTimestamp; import org.tikv.common.operation.NoopHandler; import org.tikv.common.operation.PDErrorHandler; -import org.tikv.common.pd.PDUtils; import org.tikv.common.region.TiRegion; import org.tikv.common.util.BackOffFunction.BackOffFuncType; import org.tikv.common.util.BackOffer; @@ -96,6 +97,7 @@ public class PDClient extends AbstractGRPCClient private List pdAddrs; private Client etcdClient; private ConcurrentMap tiflashReplicaMap; + private HostMapping hostMapping; public static final Histogram PD_GET_REGION_BY_KEY_REQUEST_LATENCY = Histogram.build() @@ -118,6 +120,10 @@ public class PDClient extends AbstractGRPCClient return new PDClient(conf, channelFactory); } + public HostMapping getHostMapping() { + return hostMapping; + } + @Override public TiTimestamp getTimestamp(BackOffer backOffer) { Supplier request = () -> tsoReq; @@ -383,9 +389,9 @@ public class PDClient extends AbstractGRPCClient return leaderWrapper; } - private GetMembersResponse getMembers(URI url) { + private GetMembersResponse getMembers(URI uri) { try { - ManagedChannel probChan = channelFactory.getChannel(url.getHost() + ":" + url.getPort()); + ManagedChannel probChan = channelFactory.getChannel(uriToAddr(uri), hostMapping); PDGrpc.PDBlockingStub stub = PDGrpc.newBlockingStub(probChan); GetMembersRequest request = GetMembersRequest.newBuilder().setHeader(RequestHeader.getDefaultInstance()).build(); @@ -414,14 +420,14 @@ public class PDClient extends AbstractGRPCClient private boolean createLeaderWrapper(String leaderUrlStr) { try { - URI newLeader = PDUtils.addrToUrl(leaderUrlStr); - leaderUrlStr = newLeader.getHost() + ":" + newLeader.getPort(); + URI newLeader = addrToUri(leaderUrlStr); + leaderUrlStr = uriToAddr(newLeader); if (leaderWrapper != null && leaderUrlStr.equals(leaderWrapper.getLeaderInfo())) { return true; } // create new Leader - ManagedChannel clientChannel = channelFactory.getChannel(leaderUrlStr); + ManagedChannel clientChannel = channelFactory.getChannel(leaderUrlStr, hostMapping); leaderWrapper = new LeaderWrapper( leaderUrlStr, @@ -524,6 +530,9 @@ public class PDClient extends AbstractGRPCClient private void initCluster() { GetMembersResponse resp = null; List pdAddrs = getConf().getPdAddrs(); + this.pdAddrs = pdAddrs; + this.etcdClient = Client.builder().endpoints(pdAddrs).build(); + this.hostMapping = new HostMapping(this.etcdClient, conf.getNetworkMappingName()); for (URI u : pdAddrs) { resp = getMembers(u); if (resp != null) { @@ -534,8 +543,6 @@ public class PDClient extends AbstractGRPCClient long clusterId = resp.getHeader().getClusterId(); header = RequestHeader.newBuilder().setClusterId(clusterId).build(); tsoReq = TsoRequest.newBuilder().setHeader(header).setCount(1).build(); - this.pdAddrs = pdAddrs; - this.etcdClient = Client.builder().endpoints(pdAddrs).build(); this.tiflashReplicaMap = new ConcurrentHashMap<>(); createLeaderWrapper(resp.getLeader().getClientUrls(0)); service = diff --git a/src/main/java/org/tikv/common/ReadOnlyPDClient.java b/src/main/java/org/tikv/common/ReadOnlyPDClient.java index 4e9c3f5177..e759e03ce4 100644 --- a/src/main/java/org/tikv/common/ReadOnlyPDClient.java +++ b/src/main/java/org/tikv/common/ReadOnlyPDClient.java @@ -52,6 +52,8 @@ public interface ReadOnlyPDClient { Future getRegionByIDAsync(BackOffer backOffer, long id); + HostMapping getHostMapping(); + /** * Get Store by StoreId * diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 36cba186fd..7d81540b0b 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -70,6 +70,7 @@ public class TiConfiguration implements Serializable { setIfMissing(TIKV_IS_REPLICA_READ, DEF_IS_REPLICA_READ); setIfMissing(TIKV_METRICS_ENABLE, DEF_METRICS_ENABLE); setIfMissing(TIKV_METRICS_PORT, DEF_METRICS_PORT); + setIfMissing(TIKV_NETWORK_MAPPING_NAME, DEF_TIKV_NETWORK_MAPPING_NAME); } public static void listAll() { @@ -239,6 +240,8 @@ public class TiConfiguration implements Serializable { private boolean metricsEnable = getBoolean(TIKV_METRICS_ENABLE); private int metricsPort = getInt(TIKV_METRICS_PORT); + private final String networkMappingName = get(TIKV_NETWORK_MAPPING_NAME); + public enum KVMode { TXN, RAW @@ -480,4 +483,8 @@ public class TiConfiguration implements Serializable { this.metricsPort = metricsPort; return this; } + + public String getNetworkMappingName() { + return this.networkMappingName; + } } diff --git a/src/main/java/org/tikv/common/pd/PDUtils.java b/src/main/java/org/tikv/common/pd/PDUtils.java index c4b2be9ea6..af693705a0 100644 --- a/src/main/java/org/tikv/common/pd/PDUtils.java +++ b/src/main/java/org/tikv/common/pd/PDUtils.java @@ -20,7 +20,7 @@ import java.net.URI; import java.util.List; public class PDUtils { - public static URI addrToUrl(String addr) { + public static URI addrToUri(String addr) { if (addr.contains("://")) { return URI.create(addr); } else { @@ -31,8 +31,12 @@ public class PDUtils { public static List addrsToUrls(String[] addrs) { ImmutableList.Builder urlsBuilder = new ImmutableList.Builder<>(); for (String addr : addrs) { - urlsBuilder.add(addrToUrl(addr)); + urlsBuilder.add(addrToUri(addr)); } return urlsBuilder.build(); } + + public static String uriToAddr(URI uri) { + return uri.getHost() + ":" + uri.getPort(); + } } diff --git a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java index d16727a08a..3494268885 100644 --- a/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java @@ -86,7 +86,8 @@ public abstract class AbstractRegionStoreClient } region = newRegion; String addressStr = regionManager.getStoreById(region.getLeader().getStoreId()).getAddress(); - ManagedChannel channel = channelFactory.getChannel(addressStr); + ManagedChannel channel = + channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); blockingStub = TikvGrpc.newBlockingStub(channel); asyncStub = TikvGrpc.newStub(channel); return true; @@ -95,7 +96,8 @@ public abstract class AbstractRegionStoreClient @Override public void onStoreNotMatch(Metapb.Store store) { String addressStr = store.getAddress(); - ManagedChannel channel = channelFactory.getChannel(addressStr); + ManagedChannel channel = + channelFactory.getChannel(addressStr, regionManager.getPDClient().getHostMapping()); blockingStub = TikvGrpc.newBlockingStub(channel); asyncStub = TikvGrpc.newStub(channel); if (region.getLeader().getStoreId() != store.getId()) { diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 111b8f5de9..4cf6fc668b 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -79,6 +79,10 @@ public class RegionManager { return cacheInvalidateCallback; } + public ReadOnlyPDClient getPDClient() { + return this.cache.pdClient; + } + public TiRegion getRegionByKey(ByteString key) { return getRegionByKey(key, ConcreteBackOffer.newGetBackOff()); } diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 89d414afd1..ebcbdca071 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -119,7 +119,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { if (logger.isDebugEnabled()) { logger.debug(String.format("Create region store client on address %s", addressStr)); } - ManagedChannel channel = channelFactory.getChannel(addressStr); + ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); TikvBlockingStub tikvBlockingStub = TikvGrpc.newBlockingStub(channel); TikvStub tikvAsyncStub = TikvGrpc.newStub(channel); @@ -1242,7 +1242,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient { if (logger.isDebugEnabled()) { logger.debug(String.format("Create region store client on address %s", addressStr)); } - ManagedChannel channel = channelFactory.getChannel(addressStr); + ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); TikvBlockingStub blockingStub = TikvGrpc.newBlockingStub(channel); TikvStub asyncStub = TikvGrpc.newStub(channel); diff --git a/src/main/java/org/tikv/common/util/ChannelFactory.java b/src/main/java/org/tikv/common/util/ChannelFactory.java index 60447e2a41..9435e9d69d 100644 --- a/src/main/java/org/tikv/common/util/ChannelFactory.java +++ b/src/main/java/org/tikv/common/util/ChannelFactory.java @@ -18,31 +18,38 @@ package org.tikv.common.util; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import java.net.URI; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import org.tikv.common.HostMapping; +import org.tikv.common.pd.PDUtils; public class ChannelFactory implements AutoCloseable { private final int maxFrameSize; - private final Map connPool = new ConcurrentHashMap<>(); + private final ConcurrentHashMap connPool = new ConcurrentHashMap<>(); public ChannelFactory(int maxFrameSize) { this.maxFrameSize = maxFrameSize; } - public ManagedChannel getChannel(String addressStr) { + public ManagedChannel getChannel(String addressStr, HostMapping hostMapping) { return connPool.computeIfAbsent( addressStr, key -> { URI address; + URI mappedAddr; try { - address = URI.create("http://" + key); + address = PDUtils.addrToUri(key); } catch (Exception e) { - throw new IllegalArgumentException("failed to form address " + key); + throw new IllegalArgumentException("failed to form address " + key, e); + } + try { + mappedAddr = hostMapping.getMappedURI(address); + } catch (Exception e) { + throw new IllegalArgumentException("failed to get mapped address " + address, e); } // Channel should be lazy without actual connection until first call // So a coarse grain lock is ok here - return ManagedChannelBuilder.forAddress(address.getHost(), address.getPort()) + return ManagedChannelBuilder.forAddress(mappedAddr.getHost(), mappedAddr.getPort()) .maxInboundMessageSize(maxFrameSize) .usePlaintext(true) .idleTimeout(60, TimeUnit.SECONDS) diff --git a/src/main/java/org/tikv/common/util/RangeSplitter.java b/src/main/java/org/tikv/common/util/RangeSplitter.java index 146fa24151..4c5187a12a 100644 --- a/src/main/java/org/tikv/common/util/RangeSplitter.java +++ b/src/main/java/org/tikv/common/util/RangeSplitter.java @@ -231,7 +231,7 @@ public class RangeSplitter { this.ranges = ranges; String host = null; try { - host = PDUtils.addrToUrl(store.getAddress()).getHost(); + host = PDUtils.addrToUri(store.getAddress()).getHost(); } catch (Exception ignored) { } this.host = host;