The initial version of Raw KV implementation (#14)

* Initial version of Raw Kv client

Note: raw::Client::batch_scan is not implemented yet.

Signed-off-by: Xiaoguang Sun <sunxiaoguang@zhihu.com>

* Document most public code.

Signed-off-by: Ana Hobden <operator@hoverbear.org>

* Reduce pub surface

Signed-off-by: Ana Hobden <operator@hoverbear.org>

* fmt/lint

Signed-off-by: Ana Hobden <operator@hoverbear.org>

* Add cf to concrete builder types

Signed-off-by: Xiaoguang Sun <sunxiaoguang@zhihu.com>

* Fixed some comments and confusing name

Signed-off-by: Xiaoguang Sun <sunxiaoguang@zhihu.com>

* Change Request from struct to enum

Signed-off-by: Xiaoguang Sun <sunxiaoguang@zhihu.com>

* Change tso_tx/rx channel to bounded

Signed-off-by: Xiaoguang Sun <sunxiaoguang@zhihu.com>

* Fix format issues and improve implementations

Signed-off-by: Xiaoguang Sun <sunxiaoguang@zhihu.com>

* Change to dyn trait syntax

Signed-off-by: Xiaoguang Sun <sunxiaoguang@zhihu.com>

* inline some functions

Signed-off-by: Ana Hobden <operator@hoverbear.org>

* Better note on KvPair

Signed-off-by: Ana Hobden <operator@hoverbear.org>

* Use 3 PDs in raw example

Signed-off-by: Ana Hobden <operator@hoverbear.org>

* Clarify documentation

Signed-off-by: Ana Hobden <operator@hoverbear.org>

* Get CI green

Signed-off-by: Ana Hobden <operator@hoverbear.org>

* Remove not useful PrivateKey type

Signed-off-by: Xiaoguang Sun <sunxiaoguang@zhihu.com>

* Change CUSTOM_CF to "default" in examples/raw.rs

Signed-off-by: Xiaoguang Sun <sunxiaoguang@zhihu.com>
This commit is contained in:
Xiaoguang Sun 2019-01-14 06:03:21 +08:00 committed by Ana Hobden
parent 5ed332709e
commit ba8675409d
23 changed files with 4920 additions and 587 deletions

View File

@ -16,6 +16,10 @@ env:
cache: cargo
rust:
os:
- linux
- windows
- osx
matrix:
include:
@ -33,6 +37,8 @@ matrix:
script:
- docker run -d --net=host --name pd --rm pingcap/pd --name "pd" --data-dir "pd" --client-urls "http://127.0.0.1:2379" --advertise-client-urls "http://127.0.0.1:2379"
- docker run -d --net=host --name kv --rm pingcap/tikv --pd-endpoints "127.0.0.1:2379" --addr "127.0.0.1:2378" --data-dir "kv"
- cargo test --all -- --nocapture
# Validate benches still work.
- cargo bench --all -- --test

View File

@ -17,3 +17,20 @@ serde = "1.0"
serde_derive = "1.0"
quick-error = "1.2"
grpcio = { version = "0.4", features = [ "secure" ] }
protobuf = "~2.0"
tokio-core = "0.1"
tokio-timer = "0.2"
fxhash = "0.2"
lazy_static = "0.2.1"
log = "0.3.9"
[dependencies.kvproto]
git = "https://github.com/pingcap/kvproto.git"
[dependencies.prometheus]
version = "0.4.2"
default-features = false
features = ["push", "process"]
[dev-dependencies]
tempdir = "0.3"

View File

@ -13,66 +13,104 @@
use futures::future::Future;
use std::path::PathBuf;
use tikv_client::*;
use tikv_client::{raw::Client, Config, Key, KvPair, Result, Value};
fn main() {
let config = Config::new(vec!["127.0.0.1:3379"]).with_security(
const KEY: &str = "TiKV";
const VALUE: &str = "Rust";
const CUSTOM_CF: &str = "default";
fn main() -> Result<()> {
// Create a configuration to use for the example.
// Optionally encrypt the traffic.
let config = Config::new(vec![
"192.168.0.100:3379", // Avoid a single point of failure,
"192.168.0.101:3379", // use more than one PD endpoint.
"192.168.0.102:3379",
])
.with_security(
PathBuf::from("/path/to/ca.pem"),
PathBuf::from("/path/to/client.pem"),
PathBuf::from("/path/to/client-key.pem"),
);
let raw = raw::Client::new(&config)
.wait()
.expect("Could not connect to tikv");
let key: Key = b"Company".to_vec().into();
let value: Value = b"PingCAP".to_vec().into();
// When we first create a client we recieve a `Connect` structure which must be resolved before
// the client is actually connected and usable.
let unconnnected_client = Client::new(&config);
let client = unconnnected_client.wait()?;
raw.put(key.clone(), value.clone())
.cf("test_cf")
.wait()
.expect("Could not put kv pair to tikv");
println!("Successfully put {:?}:{:?} to tikv", key, value);
// Requests are created from the connected client. These calls return structures which
// implement `Future`. This means the `Future` must be resolved before the action ever takes
// place.
//
// Here we set the key `TiKV` to have the value `Rust` associated with it.
let put_request = client.put(KEY, VALUE);
put_request.wait()?; // Returns a `tikv_client::Error` on failure.
println!("Put key \"{}\", value \"{}\".", KEY, VALUE);
let value = raw
.get(&key)
.cf("test_cf")
.wait()
.expect("Could not get value");
println!("Found val: {:?} for key: {:?}", value, key);
//
// Unlike a standard Rust HashMap all calls take owned values. This is because under the hood
// protobufs must take ownership of the data. If we only took a borrow we'd need to internally
// clone it. This is against Rust API guidelines, so you must manage this yourself.
//
// Above, you saw we can use a `&'static str`, this is primarily for making examples short.
// This type is practical to use for real things, and usage forces an internal copy.
//
// It is best to pass a `Vec<u8>` in terms of explictness and speed. `String`s and a few other
// types are supported as well, but it all ends up as `Vec<u8>` in the end.
let key: String = String::from(KEY);
let value: Value = client.get(key.clone()).wait()?;
assert_eq!(value.as_ref(), VALUE.as_bytes());
println!("Get key \"{:?}\" returned value \"{:?}\".", value, KEY);
raw.delete(&key)
.cf("test_cf")
// You can also set the `ColumnFamily` used by the request.
// This is *advanced usage* and should have some special considerations.
client
.delete(key.clone())
.cf(CUSTOM_CF)
.wait()
.expect("Could not delete value");
println!("Key: {:?} deleted", key);
raw.get(&key)
.cf("test_cf")
client
.get(key)
.cf(CUSTOM_CF)
.wait()
.expect_err("Get returned value for not existing key");
let keys = vec![b"k1".to_vec().into(), b"k2".to_vec().into()];
let pairs: Vec<KvPair> = (1..3)
.map(|i| KvPair::from((Key::from(format!("k{}", i)), Value::from(format!("v{}", i)))))
.collect();
client
.batch_put(pairs.clone())
.wait()
.expect("Could not put pairs");
let values = raw
.batch_get(&keys)
.cf("test_cf")
let keys = vec![Key::from(b"k1".to_vec()), Key::from(b"k2".to_vec())];
let values = client
.batch_get(keys.clone())
.cf(CUSTOM_CF)
.wait()
.expect("Could not get values");
println!("Found values: {:?} for keys: {:?}", values, keys);
let start: Key = b"k1".to_vec().into();
let end: Key = b"k2".to_vec().into();
raw.scan(&start..&end, 10)
.cf("test_cf")
client
.scan(start.clone()..end.clone(), 10)
.cf(CUSTOM_CF)
.key_only()
.wait()
.expect("Could not scan");
let ranges = [&start..&end, &start..&end];
raw.batch_scan(&ranges, 10)
.cf("test_cf")
let ranges = vec![start.clone()..end.clone(), start.clone()..end.clone()];
client
.batch_scan(ranges, 10)
.cf(CUSTOM_CF)
.key_only()
.wait()
.expect("Could not batch scan");
// Cleanly exit.
Ok(())
}

View File

@ -32,7 +32,7 @@ fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>>) {
txn.commit().wait().expect("Could not commit transaction");
}
fn get(client: &Client, key: &Key) -> Value {
fn get(client: &Client, key: Key) -> Value {
let txn = client.begin();
txn.get(key).wait().expect("Could not get value")
}
@ -70,7 +70,7 @@ fn dels(client: &Client, keys: impl IntoIterator<Item = Key>) {
}
fn main() {
let config = Config::new(vec!["127.0.0.1:3379"]).with_security(
let config = Config::new(vec!["127.0.0.1:2379"]).with_security(
PathBuf::from("/path/to/ca.pem"),
PathBuf::from("/path/to/client.pem"),
PathBuf::from("/path/to/client-key.pem"),
@ -88,7 +88,7 @@ fn main() {
// get
let key1: Key = b"key1".to_vec().into();
let value1 = get(&txn, &key1);
let value1 = get(&txn, key1.clone());
println!("{:?}", (key1, value1));
// scan

View File

@ -1 +1 @@
stable
stable

View File

@ -16,62 +16,121 @@ use quick_error::quick_error;
use std::{error, result};
quick_error! {
/// An error originating from the TiKV client or dependencies.
///
/// This client currently uses [`quick_error`](https://docs.rs/quick-error/1.2.2/quick_error/)
/// for errors. *This may change in future versions.*
#[derive(Debug)]
pub enum Error {
/// Wraps a `std::io::Error`.
Io(err: ::std::io::Error) {
from()
cause(err)
description(err.description())
}
/// Wraps a `grpcio::Error`.
Grpc(err: grpcio::Error) {
from()
cause(err)
description(err.description())
}
/// Represents that a futures oneshot channel was cancelled.
Canceled(err: ::futures::sync::oneshot::Canceled) {
from()
cause(err)
description(err.description())
}
/// An unknown error.
///
/// Generally, this is not an expected error. Please report it if encountered.
Other(err: Box<error::Error + Sync + Send>) {
from()
cause(err.as_ref())
description(err.description())
display("unknown error {:?}", err)
}
/// A region was not found for the given key.
RegionForKeyNotFound(key: Vec<u8>) {
description("region is not found")
display("region is not found for key {:?}", key)
}
RegionNotFound(id: u64) {
/// A region was not found.
RegionNotFound(region_id: u64, message: Option<String>) {
description("region is not found")
display("region {:?} is not found", id)
display("region {:?} is not found. {}", region_id, message.as_ref().unwrap_or(&"".to_owned()))
}
NotLeader(region_id: u64) {
/// The peer is not a leader of the given region.
NotLeader(region_id: u64, message: Option<String>) {
description("peer is not leader")
display("peer is not leader for region {:?}.", region_id)
display("peer is not leader for region {}. {}", region_id, message.as_ref().unwrap_or(&"".to_owned()))
}
StoreNotMatch {
/// The store does not match.
StoreNotMatch(request_store_id: u64, actual_store_id: u64, message: String) {
description("store not match")
display("store not match")
display("requesting store '{}' when actual store is '{}'. {}", request_store_id, actual_store_id, message)
}
/// The given key is not within the given region.
KeyNotInRegion(key: Vec<u8>, region_id: u64, start_key: Vec<u8>, end_key: Vec<u8>) {
description("region is not found")
display("key {:?} is not in region {:?}: [{:?}, {:?})", key, region_id, start_key, end_key)
}
StaleEpoch {
/// A stale epoch.
StaleEpoch(message: Option<String>) {
description("stale epoch")
display("stale epoch")
display("{}", message.as_ref().unwrap_or(&"".to_owned()))
}
ServerIsBusy(reason: String) {
StaleCommand(message: String) {
description("stale command")
display("{}", message)
}
/// The server is too busy.
ServerIsBusy(reason: String, backoff: u64) {
description("server is busy")
display("server is busy: {:?}", reason)
display("server is busy: {:?}. Backoff {} ms", reason, backoff)
}
RaftEntryTooLarge(region_id: u64, entry_size: u64) {
/// The given raft entry is too large for the region.
RaftEntryTooLarge(region_id: u64, entry_size: u64, message: String) {
description("raft entry too large")
display("{:?} bytes raft entry of region {:?} is too large", entry_size, region_id)
display("{:?} bytes raft entry of region {:?} is too large. {}", entry_size, region_id, message)
}
KeyError(message: String) {
description("key error")
display("{}", message)
}
KVError(message: String) {
description("kv error")
display("{}", message)
}
InternalError(message: String) {
description("internal error")
display("{}", message)
}
InvalidKeyRange {
description("invalid key range")
display("Only left closed intervals are supported")
}
Unimplemented {
description("unimplemented feature")
display("Unimplemented feature")
}
EmptyValue {
description("can not set empty value")
display("Can not set empty value")
}
NoSuchKey {
description("key does not exist")
display("Key doest not exist")
}
InvalidOverlappingRanges {
description("ranges can not be overlapping")
display("Ranges can not be overlapping")
}
MaxScanLimitExceeded(limit: u32, max_limit: u32) {
description("limit exceeds max scan limit")
display("Limit {} excceds max scan limit {}", limit, max_limit)
}
}
}
/// A result holding an [`Error`](enum.Error.html).
pub type Result<T> = result::Result<T, Error>;

View File

@ -10,28 +10,109 @@
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
#![recursion_limit = "128"]
#![type_length_limit = "1572864"]
use futures::Future;
use serde_derive::*;
use std::ops::Deref;
use std::path::PathBuf;
use std::{
ops::{
Bound, Deref, DerefMut, Range, RangeFrom, RangeFull, RangeInclusive, RangeTo,
RangeToInclusive,
},
path::PathBuf,
time::Duration,
u8::{MAX as U8_MAX, MIN as U8_MIN},
};
pub mod errors;
mod errors;
pub mod raw;
mod rpc;
pub mod transaction;
#[doc(inline)]
pub use crate::errors::Error;
#[doc(inline)]
pub use crate::errors::Result;
/// The key part of a key/value pair.
///
/// In TiKV, keys are an ordered sequence of bytes. This has an advantage over choosing `String` as
/// valid `UTF-8` is not required. This means that the user is permitted to store any data they wish,
/// as long as it can be represented by bytes. (Which is to say, pretty much anything!)
///
/// This is a *wrapper type* that implements `Deref<Target=[u8]>` so it can be used like one transparently.
///
/// This type also implements `From` for many types. With one exception, these are all done without
/// reallocation. Using a `&'static str`, like many examples do for simplicity, has an internal
/// allocation cost.
///
/// This type wraps around an owned value, so it should be treated it like `String` or `Vec<u8>`
/// over a `&str` or `&[u8]`.
///
/// ```rust
/// use tikv_client::Key;
///
/// let static_str: &'static str = "TiKV";
/// let from_static_str = Key::from(static_str);
///
/// let string: String = String::from(static_str);
/// let from_string = Key::from(string);
/// assert_eq!(from_static_str, from_string);
///
/// let vec: Vec<u8> = static_str.as_bytes().to_vec();
/// let from_vec = Key::from(vec);
/// assert_eq!(from_static_str, from_vec);
///
/// let bytes = static_str.as_bytes().to_vec();
/// let from_bytes = Key::from(bytes);
/// assert_eq!(from_static_str, from_bytes);
/// ```
///
/// **But, you should not need to worry about all this:** Many functions which accept a `Key`
/// accept an `Into<Key>`, which means all of the above types can be passed directly to those
/// functions.
#[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)]
pub struct Key(Vec<u8>);
#[derive(Default, Clone, Eq, PartialEq, Hash, Debug)]
pub struct Value(Vec<u8>);
#[derive(Default, Clone, Eq, PartialEq, Debug)]
pub struct KvPair(Key, Value);
impl Into<Key> for Vec<u8> {
fn into(self) -> Key {
Key(self)
impl Key {
#[inline]
pub fn new(value: Vec<u8>) -> Self {
Key(value)
}
#[inline]
fn into_inner(self) -> Vec<u8> {
self.0
}
#[inline]
fn push(&mut self, v: u8) {
self.0.push(v)
}
#[inline]
fn pop(&mut self) {
self.0.pop();
}
}
impl From<Vec<u8>> for Key {
fn from(v: Vec<u8>) -> Self {
Key(v)
}
}
impl From<String> for Key {
fn from(v: String) -> Key {
Key(v.into_bytes())
}
}
impl<'a> From<&'static str> for Key {
fn from(v: &'static str) -> Key {
let mut vec = Vec::new();
vec.extend_from_slice(v.as_bytes());
Key(vec)
}
}
@ -41,68 +122,252 @@ impl AsRef<Key> for Key {
}
}
impl AsRef<[u8]> for Key {
fn as_ref(&self) -> &[u8] {
&self.0
}
}
impl Deref for Key {
type Target = Vec<u8>;
type Target = [u8];
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Into<Value> for Vec<u8> {
fn into(self) -> Value {
Value(self)
impl DerefMut for Key {
fn deref_mut(&mut self) -> &mut [u8] {
&mut self.0
}
}
/// The value part of a key/value pair.
///
/// In TiKV, values are an ordered sequence of bytes. This has an advantage over choosing `String`
/// as valid `UTF-8` is not required. This means that the user is permitted to store any data they wish,
/// as long as it can be represented by bytes. (Which is to say, pretty much anything!)
///
/// This is a *wrapper type* that implements `Deref<Target=[u8]>` so it can be used like one transparently.
///
/// This type also implements `From` for many types. With one exception, these are all done without
/// reallocation. Using a `&'static str`, like many examples do for simplicity, has an internal
/// allocation cost.
///
/// This type wraps around an owned value, so it should be treated it like `String` or `Vec<u8>`
/// over a `&str` or `&[u8]`.
///
/// ```rust
/// use tikv_client::Value;
///
/// let static_str: &'static str = "TiKV";
/// let from_static_str = Value::from(static_str);
///
/// let string: String = String::from(static_str);
/// let from_string = Value::from(string);
/// assert_eq!(from_static_str, from_string);
///
/// let vec: Vec<u8> = static_str.as_bytes().to_vec();
/// let from_vec = Value::from(vec);
/// assert_eq!(from_static_str, from_vec);
///
/// let bytes = static_str.as_bytes().to_vec();
/// let from_bytes = Value::from(bytes);
/// assert_eq!(from_static_str, from_bytes);
/// ```
///
/// **But, you should not need to worry about all this:** Many functions which accept a `Value`
/// accept an `Into<Value>`, which means all of the above types can be passed directly to those
/// functions.
#[derive(Default, Clone, Eq, PartialEq, Hash, Debug)]
pub struct Value(Vec<u8>);
impl Value {
#[inline]
pub fn new(value: Vec<u8>) -> Self {
Value(value)
}
#[inline]
fn into_inner(self) -> Vec<u8> {
self.0
}
}
impl From<Vec<u8>> for Value {
fn from(v: Vec<u8>) -> Self {
Value(v)
}
}
impl From<String> for Value {
fn from(v: String) -> Value {
Value(v.into_bytes())
}
}
impl From<&'static str> for Value {
fn from(v: &'static str) -> Value {
let mut vec = Vec::new();
vec.extend_from_slice(v.as_bytes());
Value(vec)
}
}
impl Deref for Value {
type Target = Vec<u8>;
type Target = [u8];
fn deref(&self) -> &Self::Target {
&self.0
}
}
/// A key/value pair.
///
/// ```rust
/// # use tikv_client::{Key, Value, KvPair};
/// let key = "key";
/// let value = "value";
/// let constructed = KvPair::new(key, value);
/// let from_tuple = KvPair::from((key, value));
/// assert_eq!(constructed, from_tuple);
/// ```
///
/// Many functions which accept a `KvPair` accept an `Into<KvPair>`, which means all of the above
/// types (Like a `(Key, Value)`) can be passed directly to those functions.
#[derive(Default, Clone, Eq, PartialEq, Debug)]
pub struct KvPair(Key, Value);
impl KvPair {
pub fn new(key: Key, value: Value) -> Self {
KvPair(key, value)
/// Create a new `KvPair`.
#[inline]
pub fn new(key: impl Into<Key>, value: impl Into<Value>) -> Self {
KvPair(key.into(), value.into())
}
/// Immutably borrow the `Key` part of the `KvPair`.
#[inline]
pub fn key(&self) -> &Key {
&self.0
}
/// Immutably borrow the `Value` part of the `KvPair`.
#[inline]
pub fn value(&self) -> &Value {
&self.1
}
}
impl Into<KvPair> for (Key, Value) {
fn into(self) -> KvPair {
KvPair(self.0, self.1)
#[inline]
pub fn into_inner(self) -> (Key, Value) {
(self.0, self.1)
}
#[inline]
pub fn into_key(self) -> Key {
self.0
}
#[inline]
pub fn into_value(self) -> Value {
self.1
}
/// Mutably borrow the `Key` part of the `KvPair`.
#[inline]
pub fn key_mut(&mut self) -> &mut Key {
&mut self.0
}
/// Mutably borrow the `Value` part of the `KvPair`.
#[inline]
pub fn value_mut(&mut self) -> &mut Value {
&mut self.1
}
/// Set the `Key` part of the `KvPair`.
#[inline]
pub fn set_key(&mut self, k: impl Into<Key>) {
self.0 = k.into();
}
/// Set the `Value` part of the `KvPair`.
#[inline]
pub fn set_value(&mut self, v: impl Into<Value>) {
self.1 = v.into();
}
}
impl<K, V> From<(K, V)> for KvPair
where
K: Into<Key>,
V: Into<Value>,
{
fn from((k, v): (K, V)) -> Self {
KvPair(k.into(), v.into())
}
}
/// The configuration for either a [`raw::Client`](raw/struct.Client.html) or a
/// [`transaction::Client`](transaction/struct.Client.html).
///
/// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for PD
/// must be provided, **not** the TiKV nodes.
///
/// It's important to **include more than one PD endpoint** (include all, if possible!)
/// This helps avoid having a *single point of failure*.
///
/// By default, this client will use an insecure connection over instead of one protected by
/// Transport Layer Security (TLS). Your deployment may have chosen to rely on security measures
/// such as a private network, or a VPN layer to provid secure transmission.
///
/// To use a TLS secured connection, use the `with_security` function to set the required
/// parameters.
///
/// TiKV does not currently offer encrypted storage (or encryption-at-rest).
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
#[serde(default)]
#[serde(rename_all = "kebab-case")]
pub struct Config {
pub pd_endpoints: Vec<String>,
pub ca_path: Option<PathBuf>,
pub cert_path: Option<PathBuf>,
pub key_path: Option<PathBuf>,
pd_endpoints: Vec<String>,
ca_path: Option<PathBuf>,
cert_path: Option<PathBuf>,
key_path: Option<PathBuf>,
timeout: Duration,
}
const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(2);
impl Config {
/// Create a new [`Config`](struct.Config.html) which coordinates with the given PD endpoints.
///
/// It's important to **include more than one PD endpoint** (include all, if possible!)
/// This helps avoid having a *single point of failure*.
///
/// ```rust
/// # use tikv_client::Config;
/// let config = Config::new(vec!["192.168.0.100:2379", "192.168.0.101:2379"]);
/// ```
pub fn new(pd_endpoints: impl IntoIterator<Item = impl Into<String>>) -> Self {
Config {
pd_endpoints: pd_endpoints.into_iter().map(Into::into).collect(),
ca_path: None,
cert_path: None,
key_path: None,
timeout: DEFAULT_REQUEST_TIMEOUT,
}
}
/// Set the certificate authority, certificate, and key locations for the
/// [`Config`](struct.Config.html).
///
/// By default, TiKV connections do not utilize transport layer security. Enable it by setting
/// these values.
///
/// ```rust
/// # use tikv_client::Config;
/// let config = Config::new(vec!["192.168.0.100:2379", "192.168.0.101:2379"])
/// .with_security("root.ca", "internal.cert", "internal.key");
/// ```
pub fn with_security(
mut self,
ca_path: impl Into<PathBuf>,
@ -114,4 +379,160 @@ impl Config {
self.key_path = Some(key_path.into());
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
}
pub type KvFuture<Resp> = Box<dyn Future<Item = Resp, Error = Error>>;
/// A convenience trait for expressing ranges.
///
/// In TiKV, keys are an ordered sequence of bytes. This means we can have ranges over those
/// bytes. Eg `001` is before `010`.
///
/// This trait has implementations for common range types like `a..b`, `a..=b` where `a` and `b`
/// `impl Into<Key>`. You could implement this for your own types.
///
/// ```rust
/// use tikv_client::{KeyRange, Key};
/// use std::ops::{Range, RangeInclusive, RangeTo, RangeToInclusive, RangeFrom, RangeFull, Bound};
///
/// let explict_range: Range<Key> = Range { start: Key::from("Rust"), end: Key::from("TiKV") };
/// let from_explict_range = explict_range.into_bounds();
///
/// let range: Range<&str> = "Rust".."TiKV";
/// let from_range = range.into_bounds();
/// assert_eq!(from_explict_range, from_range);
///
/// let range: RangeInclusive<&str> = "Rust"..="TiKV";
/// let from_range = range.into_bounds();
/// assert_eq!(
/// (Bound::Included(Key::from("Rust")), Bound::Included(Key::from("TiKV"))),
/// from_range
/// );
///
/// let range_from: RangeFrom<&str> = "Rust"..;
/// let from_range_from = range_from.into_bounds();
/// assert_eq!(
/// (Bound::Included(Key::from("Rust")), Bound::Unbounded),
/// from_range_from,
/// );
///
/// let range_to: RangeTo<&str> = .."TiKV";
/// let from_range_to = range_to.into_bounds();
/// assert_eq!(
/// (Bound::Unbounded, Bound::Excluded(Key::from("TiKV"))),
/// from_range_to,
/// );
///
/// let range_to_inclusive: RangeToInclusive<&str> = ..="TiKV";
/// let from_range_to_inclusive = range_to_inclusive.into_bounds();
/// assert_eq!(
/// (Bound::Unbounded, Bound::Included(Key::from("TiKV"))),
/// from_range_to_inclusive,
/// );
///
/// let range_full: RangeFull = ..;
/// let from_range_full = range_full.into_bounds();
/// assert_eq!(
/// (Bound::Unbounded, Bound::Unbounded),
/// from_range_full
/// );
/// ```
///
/// **But, you should not need to worry about all this:** Many functions accept a `impl KeyRange`
/// which means all of the above types can be passed directly to those functions.
pub trait KeyRange: Sized {
fn into_bounds(self) -> (Bound<Key>, Bound<Key>);
fn into_keys(self) -> Result<(Key, Option<Key>)> {
range_to_keys(self.into_bounds())
}
}
fn range_to_keys(range: (Bound<Key>, Bound<Key>)) -> Result<(Key, Option<Key>)> {
let start = match range.0 {
Bound::Included(v) => v,
Bound::Excluded(mut v) => {
match v.last_mut() {
None | Some(&mut U8_MAX) => v.push(0),
Some(v) => *v += 1,
}
v
}
Bound::Unbounded => return Err(Error::InvalidKeyRange),
};
let end = match range.1 {
Bound::Included(v) => Some(v),
Bound::Excluded(mut v) => Some({
match v.last_mut() {
None => (),
Some(&mut U8_MIN) => v.pop(),
Some(v) => *v -= 1,
}
v
}),
Bound::Unbounded => None,
};
Ok((start, end))
}
impl<T: Into<Key>> KeyRange for Range<T> {
fn into_bounds(self) -> (Bound<Key>, Bound<Key>) {
(
Bound::Included(self.start.into()),
Bound::Excluded(self.end.into()),
)
}
}
impl<T: Into<Key>> KeyRange for RangeFrom<T> {
fn into_bounds(self) -> (Bound<Key>, Bound<Key>) {
(Bound::Included(self.start.into()), Bound::Unbounded)
}
}
impl KeyRange for RangeFull {
fn into_bounds(self) -> (Bound<Key>, Bound<Key>) {
(Bound::Unbounded, Bound::Unbounded)
}
}
impl<T: Into<Key>> KeyRange for RangeInclusive<T> {
fn into_bounds(self) -> (Bound<Key>, Bound<Key>) {
let (start, end) = self.into_inner();
(Bound::Included(start.into()), Bound::Included(end.into()))
}
}
impl<T: Into<Key>> KeyRange for RangeTo<T> {
fn into_bounds(self) -> (Bound<Key>, Bound<Key>) {
(Bound::Unbounded, Bound::Excluded(self.end.into()))
}
}
impl<T: Into<Key>> KeyRange for RangeToInclusive<T> {
fn into_bounds(self) -> (Bound<Key>, Bound<Key>) {
(Bound::Unbounded, Bound::Included(self.end.into()))
}
}
impl<T: Into<Key>> KeyRange for (Bound<T>, Bound<T>) {
fn into_bounds(self) -> (Bound<Key>, Bound<Key>) {
(convert_to_bound_key(self.0), convert_to_bound_key(self.1))
}
}
fn convert_to_bound_key<K>(b: Bound<K>) -> Bound<Key>
where
K: Into<Key>,
{
use std::ops::Bound::*;
match b {
Included(k) => Included(k.into()),
Excluded(k) => Excluded(k.into()),
Unbounded => Unbounded,
}
}

1188
src/raw.rs

File diff suppressed because it is too large Load Diff

706
src/rpc/client.rs Normal file
View File

@ -0,0 +1,706 @@
// Copyright 2018 The TiKV Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
collections::HashMap,
fmt,
ops::Deref,
sync::{Arc, RwLock},
time::Duration,
};
use futures::{
future::{self, loop_fn, Either, Loop},
Future,
};
use grpcio::{EnvBuilder, Environment};
use kvproto::kvrpcpb;
use log::*;
use crate::{
raw::ColumnFamily,
rpc::{
pd::{PdClient, PdTimestamp, Region, RegionId, RegionVerId, Store, StoreId},
security::SecurityManager,
tikv::KvClient,
util::HandyRwLock,
},
Config, Error, Key, KvPair, Result, Value,
};
const CQ_COUNT: usize = 1;
const CLIENT_PREFIX: &str = "tikv-client";
struct RpcClientInner {
pd: Arc<PdClient>,
tikv: Arc<RwLock<HashMap<String, Arc<KvClient>>>>,
env: Arc<Environment>,
security_mgr: Arc<SecurityManager>,
timeout: Duration,
}
impl RpcClientInner {
fn connect(config: &Config) -> Result<RpcClientInner> {
let env = Arc::new(
EnvBuilder::new()
.cq_count(CQ_COUNT)
.name_prefix(thd_name!(CLIENT_PREFIX))
.build(),
);
let security_mgr = Arc::new(
if let (Some(ca_path), Some(cert_path), Some(key_path)) =
(&config.ca_path, &config.cert_path, &config.key_path)
{
SecurityManager::load(ca_path, cert_path, key_path)?
} else {
SecurityManager::default()
},
);
let pd = Arc::new(PdClient::connect(
Arc::clone(&env),
&config.pd_endpoints,
Arc::clone(&security_mgr),
config.timeout,
)?);
let tikv = Default::default();
Ok(RpcClientInner {
pd,
tikv,
env,
security_mgr,
timeout: config.timeout,
})
}
fn get_all_stores(&self) -> impl Future<Item = Vec<Store>, Error = Error> {
self.pd.get_all_stores()
}
fn get_store_by_id(&self, id: StoreId) -> impl Future<Item = Store, Error = Error> {
self.pd.get_store(id)
}
fn get_region(&self, key: &[u8]) -> impl Future<Item = Region, Error = Error> {
self.pd.get_region(key)
}
fn get_region_by_id(&self, id: RegionId) -> impl Future<Item = Region, Error = Error> {
self.pd.get_region_by_id(id)
}
fn get_ts(&self) -> impl Future<Item = PdTimestamp, Error = Error> {
self.pd.get_ts()
}
fn load_store(&self, id: StoreId) -> impl Future<Item = Store, Error = Error> {
info!("reload info for store {}", id);
self.pd.get_store(id).map(Into::into)
}
fn load_region(&self, key: &Key) -> impl Future<Item = Region, Error = Error> {
self.pd.get_region(key.as_ref())
}
fn load_region_by_id(&self, id: RegionId) -> impl Future<Item = Region, Error = Error> {
self.pd.get_region_by_id(id)
}
fn locate_key(&self, key: &Key) -> impl Future<Item = KeyLocation, Error = Error> {
self.load_region(key).map(KeyLocation::new)
}
fn kv_client(&self, context: RegionContext) -> Result<(RegionContext, Arc<KvClient>)> {
if let Some(conn) = self.tikv.rl().get(context.address()) {
return Ok((context, Arc::clone(conn)));
};
info!("connect to tikv endpoint: {:?}", context.address());
let tikv = Arc::clone(&self.tikv);
KvClient::connect(
Arc::clone(&self.env),
context.address(),
&self.security_mgr,
self.timeout,
)
.map(Arc::new)
.map(|c| {
tikv.wl()
.insert(context.address().to_owned(), Arc::clone(&c));
(context, c)
})
}
}
pub struct RpcClient {
inner: Arc<RpcClientInner>,
}
impl RpcClient {
pub fn connect(config: &Config) -> Result<RpcClient> {
Ok(RpcClient {
inner: Arc::new(RpcClientInner::connect(config)?),
})
}
fn group_tasks_by_region<Task>(
&self,
tasks: Vec<Task>,
) -> impl Future<Item = GroupedTasks<Task>, Error = Error>
where
Task: GroupingTask,
{
let result: Option<GroupedTasks<Task>> = None;
let inner = self.inner();
loop_fn((0, tasks, result), move |(mut index, tasks, mut result)| {
if index == tasks.len() {
Either::A(future::ok(Loop::Break(result)))
} else {
let inner = Arc::clone(&inner);
Either::B(inner.locate_key(tasks[index].key()).map(move |location| {
while let Some(item) = tasks.get(index) {
if !location.contains(item.key()) {
break;
}
let ver_id = location.ver_id();
let item = item.clone();
if let Some(ref mut grouped) = result {
grouped.add(ver_id, item);
} else {
result = Some(GroupedTasks::new(ver_id, item));
}
index += 1;
}
if index == tasks.len() {
Loop::Break(result)
} else {
Loop::Continue((index, tasks, result))
}
}))
}
})
.map(|r| r.unwrap_or_default())
}
fn region_context(
inner: Arc<RpcClientInner>,
key: &Key,
) -> impl Future<Item = (RegionContext, Arc<KvClient>), Error = Error> {
let inner2 = Arc::clone(&inner);
inner
.locate_key(key)
.and_then(move |location| {
let peer = location.peer().expect("leader must exist");
let store_id = peer.get_store_id();
inner.load_store(store_id).map(|store| RegionContext {
region: location.into_inner(),
store,
})
})
.and_then(move |region| inner2.kv_client(region))
}
fn region_context_by_id(
inner: Arc<RpcClientInner>,
id: RegionId,
) -> impl Future<Item = (RegionContext, Arc<KvClient>), Error = Error> {
let inner2 = Arc::clone(&inner);
inner
.load_region_by_id(id)
.and_then(move |region| {
let peer = region.peer().expect("leader must exist");
let store_id = peer.get_store_id();
inner
.load_store(store_id)
.map(|store| RegionContext { region, store })
})
.and_then(move |region| inner2.kv_client(region))
}
fn raw(
inner: Arc<RpcClientInner>,
key: &Key,
cf: Option<ColumnFamily>,
) -> impl Future<Item = RawContext, Error = Error> {
Self::region_context(inner, key)
.map(move |(region, client)| RawContext::new(region, client, cf))
}
fn txn(inner: Arc<RpcClientInner>, key: &Key) -> impl Future<Item = TxnContext, Error = Error> {
Self::region_context(inner, key).map(move |(region, _client)| TxnContext::new(region))
}
#[inline]
fn inner(&self) -> Arc<RpcClientInner> {
Arc::clone(&self.inner)
}
pub fn raw_get(
&self,
key: Key,
cf: Option<ColumnFamily>,
) -> impl Future<Item = Value, Error = Error> {
Self::raw(self.inner(), &key, cf)
.and_then(move |context| context.client().raw_get(context, key))
.and_then(move |value| {
if value.is_empty() {
Err(Error::NoSuchKey)
} else {
Ok(value)
}
})
}
pub fn raw_batch_get(
&self,
keys: Vec<Key>,
cf: Option<ColumnFamily>,
) -> impl Future<Item = Vec<KvPair>, Error = Error> {
let inner = self.inner();
self.group_tasks_by_region(keys)
.and_then(move |task_groups| {
let task_groups = task_groups.into_inner();
let mut tasks = Vec::with_capacity(task_groups.len());
for (region, keys) in task_groups.into_iter() {
let inner = Arc::clone(&inner);
let cf = cf.clone();
let task = Self::region_context_by_id(inner, region.id)
.map(move |(region, client)| RawContext::new(region, client, cf))
.and_then(move |context| {
context.client().raw_batch_get(context, keys.into_iter())
});
tasks.push(task);
}
future::join_all(tasks)
})
.map(|r| r.into_iter().flat_map(|a| a.into_iter()).collect())
}
pub fn raw_put(
&self,
key: Key,
value: Value,
cf: Option<ColumnFamily>,
) -> impl Future<Item = (), Error = Error> {
if value.is_empty() {
Either::A(future::err(Error::EmptyValue))
} else {
Either::B(
Self::raw(self.inner(), &key, cf)
.and_then(move |context| context.client().raw_put(context, key, value)),
)
}
}
pub fn raw_batch_put(
&self,
pairs: Vec<KvPair>,
cf: Option<ColumnFamily>,
) -> impl Future<Item = (), Error = Error> {
if pairs.iter().any(|p| p.value().is_empty()) {
Either::A(future::err(Error::EmptyValue))
} else {
let inner = self.inner();
Either::B(
self.group_tasks_by_region(pairs)
.and_then(move |task_groups| {
let task_groups = task_groups.into_inner();
let mut tasks = Vec::with_capacity(task_groups.len());
for (region, pairs) in task_groups.into_iter() {
let inner = Arc::clone(&inner);
let cf = cf.clone();
let task = Self::region_context_by_id(inner, region.id)
.map(move |(region, client)| RawContext::new(region, client, cf))
.and_then(move |context| {
context.client().raw_batch_put(context, pairs)
});
tasks.push(task);
}
future::join_all(tasks)
})
.map(|_| ()),
)
}
}
pub fn raw_delete(
&self,
key: Key,
cf: Option<ColumnFamily>,
) -> impl Future<Item = (), Error = Error> {
Self::raw(self.inner(), &key, cf)
.and_then(move |context| context.client().raw_delete(context, key))
}
pub fn raw_batch_delete(
&self,
keys: Vec<Key>,
cf: Option<ColumnFamily>,
) -> impl Future<Item = (), Error = Error> {
let inner = self.inner();
self.group_tasks_by_region(keys)
.and_then(move |task_groups| {
let task_groups = task_groups.into_inner();
let mut tasks = Vec::with_capacity(task_groups.len());
for (region, keys) in task_groups.into_iter() {
let inner = Arc::clone(&inner);
let cf = cf.clone();
let task = Self::region_context_by_id(inner, region.id)
.map(move |(region, client)| RawContext::new(region, client, cf))
.and_then(move |context| context.client().raw_batch_delete(context, keys));
tasks.push(task);
}
future::join_all(tasks)
})
.map(|_| ())
}
pub fn raw_scan(
&self,
range: (Key, Option<Key>),
limit: u32,
key_only: bool,
cf: Option<ColumnFamily>,
) -> impl Future<Item = Vec<KvPair>, Error = Error> {
struct State {
limit: u32,
key_only: bool,
cf: Option<ColumnFamily>,
};
let scan: ScanRegionsContext<Vec<KvPair>, State> = ScanRegionsContext::new(
range,
State {
limit,
key_only,
cf,
},
);
let inner = Arc::clone(&self.inner);
loop_fn((inner, scan), |(inner, scan)| {
inner
.locate_key(scan.start_key())
.and_then(move |location| {
let region = location.into_inner();
let cf = scan.cf.clone();
Self::region_context_by_id(Arc::clone(&inner), region.id)
.map(move |(region, client)| {
(scan, region.range(), RawContext::new(region, client, cf))
})
.and_then(move |(mut scan, region_range, context)| {
let (start_key, end_key) = scan.range();
context
.client()
.raw_scan(context, start_key, end_key, scan.limit, scan.key_only)
.map(move |pairs| (scan, region_range, pairs))
})
.map(move |(mut scan, region_range, mut pairs)| {
let limit = scan.limit;
scan.result_mut().append(&mut pairs);
if scan.result().len() as u32 >= limit {
Loop::Break(scan.into_inner())
} else {
match scan.next(region_range) {
ScanRegionsStatus::Continue => Loop::Continue((inner, scan)),
ScanRegionsStatus::Break => Loop::Break(scan.into_inner()),
}
}
})
})
})
}
pub fn raw_batch_scan(
&self,
ranges: Vec<(Key, Option<Key>)>,
_each_limit: u32,
_key_only: bool,
cf: Option<ColumnFamily>,
) -> impl Future<Item = Vec<KvPair>, Error = Error> {
drop(ranges);
drop(cf);
future::err(Error::Unimplemented)
}
pub fn raw_delete_range(
&self,
range: (Key, Option<Key>),
cf: Option<ColumnFamily>,
) -> impl Future<Item = (), Error = Error> {
let scan: ScanRegionsContext<(), Option<ColumnFamily>> = ScanRegionsContext::new(range, cf);
let inner = Arc::clone(&self.inner);
loop_fn((inner, scan), |(inner, scan)| {
inner
.locate_key(scan.start_key())
.and_then(move |location| {
let region = location.into_inner();
let cf = scan.clone();
Self::region_context_by_id(Arc::clone(&inner), region.id)
.map(move |(region, client)| {
(scan, region.range(), RawContext::new(region, client, cf))
})
.and_then(move |(mut scan, region_range, context)| {
let (start_key, end_key) = scan.range();
let start_key = start_key.expect("start key must be specified");
let end_key = end_key.expect("end key must be specified");
context
.client()
.raw_delete_range(context, start_key, end_key)
.map(move |_| (scan, region_range))
})
.map(
move |(mut scan, region_range)| match scan.next(region_range) {
ScanRegionsStatus::Continue => Loop::Continue((inner, scan)),
ScanRegionsStatus::Break => Loop::Break(()),
},
)
})
})
}
}
impl fmt::Debug for RpcClient {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("tikv-client")
.field("pd", &self.inner.pd)
.finish()
}
}
pub struct RegionContext {
region: Region,
store: Store,
}
impl RegionContext {
fn address(&self) -> &str {
self.store.get_address()
}
fn start_key(&self) -> Key {
self.region.start_key().to_vec().into()
}
fn end_key(&self) -> Key {
self.region.end_key().to_vec().into()
}
fn range(&self) -> (Key, Key) {
(self.start_key(), self.end_key())
}
}
impl From<RegionContext> for kvrpcpb::Context {
fn from(mut ctx: RegionContext) -> kvrpcpb::Context {
let mut kvctx = kvrpcpb::Context::new();
kvctx.set_region_id(ctx.region.id);
kvctx.set_region_epoch(ctx.region.take_region_epoch());
kvctx.set_peer(ctx.region.peer().expect("leader must exist").into_inner());
kvctx
}
}
pub struct RawContext {
region: RegionContext,
client: Arc<KvClient>,
cf: Option<ColumnFamily>,
}
impl RawContext {
fn new(region: RegionContext, client: Arc<KvClient>, cf: Option<ColumnFamily>) -> Self {
RawContext { region, client, cf }
}
fn client(&self) -> Arc<KvClient> {
Arc::clone(&self.client)
}
pub fn into_inner(self) -> (RegionContext, Option<ColumnFamily>) {
(self.region, self.cf)
}
}
pub struct TxnContext {
region: RegionContext,
}
impl TxnContext {
fn new(region: RegionContext) -> Self {
TxnContext { region }
}
pub fn into_inner(self) -> RegionContext {
self.region
}
}
struct KeyLocation(Region);
impl KeyLocation {
fn new(region: Region) -> Self {
KeyLocation(region)
}
fn contains(&self, key: &Key) -> bool {
let start = self.start_key();
let end = self.end_key();
let key: &[u8] = key.as_ref();
start <= key && (end < key || end.is_empty())
}
fn into_inner(self) -> Region {
self.0
}
}
impl Deref for KeyLocation {
type Target = Region;
fn deref(&self) -> &Self::Target {
&self.0
}
}
trait GroupingTask: Clone + Default + Sized {
fn key(&self) -> &Key;
}
#[derive(Default)]
struct GroupedTasks<Task: GroupingTask>(HashMap<RegionVerId, Vec<Task>>, RegionVerId);
impl<Task> GroupedTasks<Task>
where
Task: GroupingTask,
{
fn new(ver_id: RegionVerId, task: Task) -> Self {
let mut map = HashMap::with_capacity(1);
map.insert(ver_id.clone(), vec![task]);
GroupedTasks(map, ver_id)
}
fn add(&mut self, ver_id: RegionVerId, task: Task) {
self.0
.entry(ver_id)
.or_insert_with(|| Vec::with_capacity(1))
.push(task)
}
fn into_inner(self) -> HashMap<RegionVerId, Vec<Task>> {
self.0
}
}
impl<Task> Deref for GroupedTasks<Task>
where
Task: GroupingTask,
{
type Target = HashMap<RegionVerId, Vec<Task>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl GroupingTask for Key {
fn key(&self) -> &Key {
self
}
}
impl GroupingTask for KvPair {
fn key(&self) -> &Key {
self.key()
}
}
impl GroupingTask for (Key, Option<Key>) {
fn key(&self) -> &Key {
&self.0
}
}
enum ScanRegionsStatus {
Continue,
Break,
}
struct ScanRegionsContext<Res, State>
where
Res: Default,
State: Sized,
{
start_key: Option<Key>,
end_key: Option<Key>,
result: Res,
state: State,
}
impl<Res, State> ScanRegionsContext<Res, State>
where
Res: Default,
State: Sized,
{
fn new(range: (Key, Option<Key>), state: State) -> Self {
ScanRegionsContext {
start_key: Some(range.0),
end_key: range.1,
result: Res::default(),
state,
}
}
fn range(&mut self) -> (Option<Key>, Option<Key>) {
(self.start_key.take(), self.end_key.clone())
}
fn start_key(&self) -> &Key {
self.start_key.as_ref().unwrap()
}
fn end_key(&self) -> Option<&Key> {
self.end_key.as_ref()
}
fn next(&mut self, region_range: (Key, Key)) -> ScanRegionsStatus {
{
let region_end = &region_range.1;
if self.end_key().map(|x| x < region_end).unwrap_or(false) || region_end.is_empty() {
return ScanRegionsStatus::Break;
}
}
self.start_key = Some(region_range.1);
ScanRegionsStatus::Continue
}
fn into_inner(self) -> Res {
self.result
}
fn result_mut(&mut self) -> &mut Res {
&mut self.result
}
fn result(&self) -> &Res {
&self.result
}
}
impl<Res, State> Deref for ScanRegionsContext<Res, State>
where
Res: Default,
State: Sized,
{
type Target = State;
fn deref(&self) -> &Self::Target {
&self.state
}
}

68
src/rpc/context.rs Normal file
View File

@ -0,0 +1,68 @@
// Copyright 2018 The TiKV Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Instant;
use prometheus::{HistogramVec, IntCounterVec};
use crate::{rpc::util::duration_to_sec, Result};
pub struct RequestContext<Executor> {
start: Instant,
cmd: &'static str,
duration: &'static HistogramVec,
failed_duration: &'static HistogramVec,
failed_counter: &'static IntCounterVec,
executor: Option<Executor>,
}
impl<Executor> RequestContext<Executor> {
pub fn new(
cmd: &'static str,
duration: &'static HistogramVec,
counter: &'static IntCounterVec,
failed_duration: &'static HistogramVec,
failed_counter: &'static IntCounterVec,
executor: Executor,
) -> Self {
counter.with_label_values(&[cmd]).inc();
RequestContext {
start: Instant::now(),
cmd,
duration,
failed_duration,
failed_counter,
executor: Some(executor),
}
}
pub fn executor(&mut self) -> Executor {
self.executor
.take()
.expect("executor can only be take once")
}
pub fn done<R>(&self, r: Result<R>) -> Result<R> {
if r.is_ok() {
self.duration
.with_label_values(&[self.cmd])
.observe(duration_to_sec(self.start.elapsed()));
} else {
self.failed_duration
.with_label_values(&[self.cmd])
.observe(duration_to_sec(self.start.elapsed()));
self.failed_counter.with_label_values(&[self.cmd]).inc();
}
r
}
}

22
src/rpc/mod.rs Normal file
View File

@ -0,0 +1,22 @@
// Copyright 2018 The TiKV Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
#[macro_use]
mod util;
mod client;
mod context;
mod pd;
mod security;
mod tikv;
pub(crate) use crate::rpc::client::RpcClient;

223
src/rpc/pd/client.rs Normal file
View File

@ -0,0 +1,223 @@
// Copyright 2018 The TiKV Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
fmt,
sync::{Arc, RwLock},
time::Duration,
};
use futures::Future;
use grpcio::{CallOption, Environment};
use kvproto::{metapb, pdpb, pdpb_grpc::PdClient as RpcClient};
use crate::{
rpc::{
pd::{
context::{request_context, PdRequestContext},
leader::LeaderClient,
request::Request,
PdTimestamp, Region, RegionId, Store, StoreId,
},
security::SecurityManager,
util::HandyRwLock,
},
Error, Result,
};
const LEADER_CHANGE_RETRY: usize = 10;
trait PdResponse {
fn header(&self) -> &pdpb::ResponseHeader;
}
impl PdResponse for pdpb::GetStoreResponse {
fn header(&self) -> &pdpb::ResponseHeader {
self.get_header()
}
}
impl PdResponse for pdpb::GetRegionResponse {
fn header(&self) -> &pdpb::ResponseHeader {
self.get_header()
}
}
impl PdResponse for pdpb::GetAllStoresResponse {
fn header(&self) -> &pdpb::ResponseHeader {
self.get_header()
}
}
pub struct PdClient {
cluster_id: u64,
leader: Arc<RwLock<LeaderClient>>,
timeout: Duration,
}
impl PdClient {
pub fn connect(
env: Arc<Environment>,
endpoints: &[String],
security_mgr: Arc<SecurityManager>,
timeout: Duration,
) -> Result<PdClient> {
let leader = LeaderClient::connect(env, endpoints, security_mgr, timeout)?;
let cluster_id = leader.rl().cluster_id();
Ok(PdClient {
cluster_id,
leader,
timeout,
})
}
fn get_leader(&self) -> pdpb::Member {
self.leader.rl().members.get_leader().clone()
}
fn get_region_and_leader(
&self,
key: &[u8],
) -> impl Future<Item = (metapb::Region, Option<metapb::Peer>), Error = Error> {
let mut req = pd_request!(self.cluster_id, pdpb::GetRegionRequest);
req.set_region_key(key.to_owned());
let key = req.get_region_key().to_owned();
self.execute(request_context(
"get_region",
move |cli: &RpcClient, opt: _| cli.get_region_async_opt(&req, opt),
))
.and_then(move |mut resp| {
let region = if resp.has_region() {
resp.take_region()
} else {
return Err(Error::RegionForKeyNotFound(key));
};
let leader = if resp.has_leader() {
Some(resp.take_leader())
} else {
None
};
Ok((region, leader))
})
}
fn get_region_and_leader_by_id(
&self,
region_id: u64,
) -> impl Future<Item = (metapb::Region, Option<metapb::Peer>), Error = Error> {
let mut req = pd_request!(self.cluster_id, pdpb::GetRegionByIDRequest);
req.set_region_id(region_id);
self.execute(request_context(
"get_region_by_id",
move |cli: &RpcClient, opt: _| cli.get_region_by_id_async_opt(&req, opt),
))
.and_then(move |mut resp| {
let region = if resp.has_region() {
resp.take_region()
} else {
return Err(Error::RegionNotFound(region_id, None));
};
let leader = if resp.has_leader() {
Some(resp.take_leader())
} else {
None
};
Ok((region, leader))
})
}
fn execute<Resp, Executor, RpcFuture>(
&self,
mut context: PdRequestContext<Executor>,
) -> impl Future<Item = Resp, Error = Error>
where
Resp: PdResponse + Send + fmt::Debug + 'static,
RpcFuture: Future<Item = Resp, Error = ::grpcio::Error> + Send + 'static,
Executor: FnMut(&RpcClient, CallOption) -> ::grpcio::Result<RpcFuture> + Send + 'static,
{
let timeout = self.timeout;
let mut executor = context.executor();
let wrapper = move |cli: &RwLock<LeaderClient>| {
let option = CallOption::default().timeout(timeout);
let cli = &cli.rl().client;
executor(cli, option)
.unwrap()
.map_err(Error::Grpc)
.and_then(|r| {
{
let header = r.header();
if header.has_error() {
return Err(internal_err!(header.get_error().get_message()));
}
}
Ok(r)
})
};
Request::new(
wrapper,
Arc::clone(&self.leader),
LeaderClient::reconnect,
LEADER_CHANGE_RETRY,
)
.execute()
.then(move |r| context.done(r))
}
pub fn get_all_stores(&self) -> impl Future<Item = Vec<Store>, Error = Error> {
let req = pd_request!(self.cluster_id, pdpb::GetAllStoresRequest);
self.execute(request_context(
"get_all_stores",
move |cli: &RpcClient, opt: _| cli.get_all_stores_async_opt(&req, opt),
))
.map(|mut resp| resp.take_stores().into_iter().map(Into::into).collect())
}
pub fn get_store(&self, store_id: StoreId) -> impl Future<Item = Store, Error = Error> {
let mut req = pd_request!(self.cluster_id, pdpb::GetStoreRequest);
req.set_store_id(store_id);
self.execute(request_context(
"get_store",
move |cli: &RpcClient, opt: _| cli.get_store_async_opt(&req, opt),
))
.map(|mut resp| resp.take_store().into())
}
pub fn get_region(&self, key: &[u8]) -> impl Future<Item = Region, Error = Error> {
self.get_region_and_leader(key)
.map(|x| Region::new(x.0, x.1))
}
pub fn get_region_by_id(&self, id: RegionId) -> impl Future<Item = Region, Error = Error> {
self.get_region_and_leader_by_id(id)
.map(|x| Region::new(x.0, x.1))
}
pub fn get_ts(&self) -> impl Future<Item = PdTimestamp, Error = Error> {
self.leader.wl().get_ts()
}
}
impl fmt::Debug for PdClient {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("PdClient")
.field("cluster_id", &self.cluster_id)
.field("leader", &self.get_leader())
.field("timeout", &self.timeout)
.finish()
}
}

90
src/rpc/pd/context.rs Normal file
View File

@ -0,0 +1,90 @@
// Copyright 2018 The TiKV Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::{Deref, DerefMut};
use lazy_static::*;
use prometheus::*;
use crate::rpc::context::RequestContext;
pub struct PdRequestContext<Executor> {
target: RequestContext<Executor>,
}
impl<Executor> Deref for PdRequestContext<Executor> {
type Target = RequestContext<Executor>;
fn deref(&self) -> &Self::Target {
&self.target
}
}
impl<Executor> DerefMut for PdRequestContext<Executor> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.target
}
}
pub fn request_context<Executor>(
cmd: &'static str,
executor: Executor,
) -> PdRequestContext<Executor> {
PdRequestContext {
target: RequestContext::new(
cmd,
&PD_REQUEST_DURATION_HISTOGRAM_VEC,
&PD_REQUEST_COUNTER_VEC,
&PD_FAILED_REQUEST_DURATION_HISTOGRAM_VEC,
&PD_FAILED_REQUEST_COUNTER_VEC,
executor,
),
}
}
pub fn observe_tso_batch(batch_size: usize) -> u32 {
PD_TSO_BATCH_SIZE_HISTOGRAM.observe(batch_size as f64);
batch_size as u32
}
lazy_static! {
static ref PD_REQUEST_DURATION_HISTOGRAM_VEC: HistogramVec = register_histogram_vec!(
"pd_request_duration_seconds",
"Bucketed histogram of PD requests duration",
&["type"]
)
.unwrap();
static ref PD_REQUEST_COUNTER_VEC: IntCounterVec = register_int_counter_vec!(
"pd_request_total",
"Total number of requests sent to PD",
&["type"]
)
.unwrap();
static ref PD_FAILED_REQUEST_DURATION_HISTOGRAM_VEC: HistogramVec = register_histogram_vec!(
"pd_failed_request_duration_seconds",
"Bucketed histogram of failed PD requests duration",
&["type"]
)
.unwrap();
static ref PD_FAILED_REQUEST_COUNTER_VEC: IntCounterVec = register_int_counter_vec!(
"pd_failed_request_total",
"Total number of failed requests sent to PD",
&["type"]
)
.unwrap();
static ref PD_TSO_BATCH_SIZE_HISTOGRAM: Histogram = register_histogram!(
"pd_tso_batch_size",
"Bucketed histogram of TSO request batch size"
)
.unwrap();
}

434
src/rpc/pd/leader.rs Normal file
View File

@ -0,0 +1,434 @@
// Copyright 2018 The TiKV Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
sync::{Arc, RwLock},
thread::{self, JoinHandle},
time::{Duration, Instant},
};
use futures::{
sync::{
mpsc::{channel, unbounded, Receiver, Sender, UnboundedReceiver, UnboundedSender},
oneshot,
},
Future, Sink, Stream,
};
use fxhash::FxHashSet as HashSet;
use grpcio::{CallOption, Environment, WriteFlags};
use kvproto::{pdpb, pdpb_grpc};
use log::*;
use tokio_core::reactor::{Core, Handle as OtherHandle};
use crate::{
rpc::{
pd::{
context::{observe_tso_batch, request_context},
PdTimestamp,
},
security::SecurityManager,
util::HandyRwLock,
},
Error, Result,
};
macro_rules! pd_request {
($cluster_id:expr, $type:ty) => {{
let mut request = <$type>::new();
let mut header = ::kvproto::pdpb::RequestHeader::new();
header.set_cluster_id($cluster_id);
request.set_header(header);
request
}};
}
type TsoChannel = oneshot::Sender<PdTimestamp>;
enum PdTask {
Init,
Request,
Response(Vec<oneshot::Sender<PdTimestamp>>, pdpb::TsoResponse),
}
struct PdReactor {
task_tx: Option<UnboundedSender<Option<PdTask>>>,
tso_tx: Sender<pdpb::TsoRequest>,
tso_rx: Option<Receiver<pdpb::TsoRequest>>,
handle: Option<JoinHandle<()>>,
tso_pending: Option<Vec<TsoChannel>>,
tso_buffer: Option<Vec<TsoChannel>>,
tso_batch: Vec<TsoChannel>,
}
impl Drop for PdReactor {
fn drop(&mut self) {
if let Some(handle) = self.handle.take() {
handle.join().unwrap();
}
}
}
impl PdReactor {
fn new() -> Self {
let (tso_tx, tso_rx) = channel(1);
PdReactor {
task_tx: None,
tso_tx,
tso_rx: Some(tso_rx),
handle: None,
tso_buffer: Some(Vec::with_capacity(8)),
tso_batch: Vec::with_capacity(8),
tso_pending: None,
}
}
fn start(&mut self, client: Arc<RwLock<LeaderClient>>) {
if self.handle.is_none() {
info!("starting pd reactor thread");
let (task_tx, task_rx) = unbounded();
task_tx.unbounded_send(Some(PdTask::Init)).unwrap();
self.task_tx = Some(task_tx);
self.handle = Some(
thread::Builder::new()
.name("dispatcher thread".to_owned())
.spawn(move || Self::poll(&client, task_rx))
.unwrap(),
)
} else {
warn!("tso sender and receiver are stale, refreshing...");
let (tso_tx, tso_rx) = channel(1);
self.tso_tx = tso_tx;
self.tso_rx = Some(tso_rx);
self.schedule(PdTask::Init);
}
}
fn schedule(&self, task: PdTask) {
self.task_tx
.as_ref()
.unwrap()
.unbounded_send(Some(task))
.expect("unbounded send should never fail");
}
fn poll(client: &Arc<RwLock<LeaderClient>>, rx: UnboundedReceiver<Option<PdTask>>) {
let mut core = Core::new().unwrap();
let handle = core.handle();
{
let f = rx.take_while(|t| Ok(t.is_some())).for_each(|t| {
Self::dispatch(&client, t.unwrap(), &handle);
Ok(())
});
core.run(f).unwrap();
}
}
fn init(client: &Arc<RwLock<LeaderClient>>, handle: &OtherHandle) {
let client = Arc::clone(client);
let (tx, rx) = client.wl().client.tso().unwrap();
let tso_rx = client.wl().reactor.tso_rx.take().unwrap();
handle.spawn(
tx.sink_map_err(Error::Grpc)
.send_all(tso_rx.then(|r| match r {
Ok(r) => Ok((r, WriteFlags::default())),
Err(()) => Err(internal_err!("failed to recv tso requests")),
}))
.then(|r| match r {
Ok((mut sender, _)) => {
sender.get_mut().cancel();
Ok(())
}
Err(e) => {
error!("failed to send tso requests: {:?}", e);
Err(())
}
}),
);
handle.spawn(
rx.for_each(move |resp| {
let mut client = client.wl();
let reactor = &mut client.reactor;
let tso_pending = reactor.tso_pending.take().unwrap();
reactor.schedule(PdTask::Response(tso_pending, resp));
if !reactor.tso_batch.is_empty() {
// Schedule another tso_batch of request
reactor.schedule(PdTask::Request);
}
Ok(())
})
.map_err(|e| panic!("unexpected error: {:?}", e)),
);
}
fn tso_request(client: &Arc<RwLock<LeaderClient>>) {
let mut client = client.wl();
let cluster_id = client.cluster_id;
let reactor = &mut client.reactor;
let mut tso_batch = reactor.tso_buffer.take().unwrap();
tso_batch.extend(reactor.tso_batch.drain(..));
let mut request = pd_request!(cluster_id, pdpb::TsoRequest);
let batch_size = observe_tso_batch(tso_batch.len());
request.set_count(batch_size);
reactor.tso_pending = Some(tso_batch);
reactor
.tso_tx
.try_send(request)
.expect("channel can never be full");
}
fn tso_response(
client: &Arc<RwLock<LeaderClient>>,
mut requests: Vec<TsoChannel>,
response: &pdpb::TsoResponse,
) {
let timestamp = response.get_timestamp();
for (offset, request) in requests.drain(..).enumerate() {
request
.send(PdTimestamp {
physical: timestamp.physical,
logical: timestamp.logical + offset as i64,
})
.unwrap();
}
client.wl().reactor.tso_buffer = Some(requests);
}
fn dispatch(client: &Arc<RwLock<LeaderClient>>, task: PdTask, handle: &OtherHandle) {
match task {
PdTask::Request => Self::tso_request(client),
PdTask::Response(requests, response) => Self::tso_response(client, requests, &response),
PdTask::Init => Self::init(client, handle),
}
}
fn get_ts(&mut self) -> impl Future<Item = PdTimestamp, Error = Error> {
let context = request_context("get_ts", ());
let (tx, rx) = oneshot::channel::<PdTimestamp>();
self.tso_batch.push(tx);
if self.tso_pending.is_none() {
// Schedule tso request to run.
self.schedule(PdTask::Request);
}
rx.map_err(Error::Canceled).then(move |r| context.done(r))
}
}
pub struct LeaderClient {
pub client: pdpb_grpc::PdClient,
pub members: pdpb::GetMembersResponse,
env: Arc<Environment>,
cluster_id: u64,
security_mgr: Arc<SecurityManager>,
last_update: Instant,
reactor: PdReactor,
timeout: Duration,
}
impl LeaderClient {
pub fn cluster_id(&self) -> u64 {
self.cluster_id
}
pub fn connect(
env: Arc<Environment>,
endpoints: &[String],
security_mgr: Arc<SecurityManager>,
timeout: Duration,
) -> Result<Arc<RwLock<LeaderClient>>> {
let (client, members) = validate_endpoints(&env, endpoints, &security_mgr, timeout)?;
let cluster_id = members.get_header().get_cluster_id();
let client = Arc::new(RwLock::new(LeaderClient {
env,
client,
members,
security_mgr,
last_update: Instant::now(),
reactor: PdReactor::new(),
cluster_id,
timeout,
}));
client.wl().reactor.start(Arc::clone(&client));
Ok(client)
}
pub fn get_ts(&mut self) -> impl Future<Item = PdTimestamp, Error = Error> {
self.reactor.get_ts()
}
// Re-establish connection with PD leader in synchronized fashion.
pub fn reconnect(leader: &Arc<RwLock<LeaderClient>>, interval: u64) -> Result<()> {
warn!("updating pd client, blocking the tokio core");
let ((client, members), start) = {
let leader = leader.rl();
if leader.last_update.elapsed() < Duration::from_secs(interval) {
// Avoid unnecessary updating.
return Ok(());
}
let start = Instant::now();
let timeout = leader.timeout;
(
try_connect_leader(&leader.env, &leader.security_mgr, &leader.members, timeout)?,
start,
)
};
{
let leader_clone = Arc::clone(leader);
let mut leader = leader.wl();
leader.client = client;
leader.members = members;
leader.last_update = Instant::now();
leader.reactor.start(leader_clone);
}
warn!("updating PD client done, spent {:?}", start.elapsed());
Ok(())
}
}
pub fn validate_endpoints(
env: &Arc<Environment>,
endpoints: &[String],
security_mgr: &SecurityManager,
timeout: Duration,
) -> Result<(pdpb_grpc::PdClient, pdpb::GetMembersResponse)> {
let len = endpoints.len();
let mut endpoints_set = HashSet::with_capacity_and_hasher(len, Default::default());
let mut members = None;
let mut cluster_id = None;
for ep in endpoints {
if !endpoints_set.insert(ep) {
return Err(internal_err!("duplicated PD endpoint {}", ep));
}
let (_, resp) = match connect(Arc::clone(&env), security_mgr, ep, timeout) {
Ok(resp) => resp,
// Ignore failed PD node.
Err(e) => {
error!("PD endpoint {} failed to respond: {:?}", ep, e);
continue;
}
};
// Check cluster ID.
let cid = resp.get_header().get_cluster_id();
if let Some(sample) = cluster_id {
if sample != cid {
return Err(internal_err!(
"PD response cluster_id mismatch, want {}, got {}",
sample,
cid
));
}
} else {
cluster_id = Some(cid);
}
// TODO: check all fields later?
if members.is_none() {
members = Some(resp);
}
}
match members {
Some(members) => {
let (client, members) = try_connect_leader(&env, security_mgr, &members, timeout)?;
info!("All PD endpoints are consistent: {:?}", endpoints);
Ok((client, members))
}
_ => Err(internal_err!("PD cluster failed to respond")),
}
}
fn connect(
env: Arc<Environment>,
security_mgr: &SecurityManager,
addr: &str,
timeout: Duration,
) -> Result<(pdpb_grpc::PdClient, pdpb::GetMembersResponse)> {
let client = security_mgr.connect(env, addr, pdpb_grpc::PdClient::new)?;
let option = CallOption::default().timeout(timeout);
match client.get_members_opt(&pdpb::GetMembersRequest::new(), option) {
Ok(resp) => Ok((client, resp)),
Err(e) => Err(Error::Grpc(e)),
}
}
fn try_connect(
env: &Arc<Environment>,
security_mgr: &SecurityManager,
addr: &str,
cluster_id: u64,
timeout: Duration,
) -> Result<(pdpb_grpc::PdClient, pdpb::GetMembersResponse)> {
let (client, r) = connect(Arc::clone(&env), security_mgr, addr, timeout)?;
let new_cluster_id = r.get_header().get_cluster_id();
if new_cluster_id != cluster_id {
Err(internal_err!(
"{} no longer belongs to cluster {}, it is in {}",
addr,
cluster_id,
new_cluster_id
))
} else {
Ok((client, r))
}
}
pub fn try_connect_leader(
env: &Arc<Environment>,
security_mgr: &SecurityManager,
previous: &pdpb::GetMembersResponse,
timeout: Duration,
) -> Result<(pdpb_grpc::PdClient, pdpb::GetMembersResponse)> {
let previous_leader = previous.get_leader();
let members = previous.get_members();
let cluster_id = previous.get_header().get_cluster_id();
let mut resp = None;
// Try to connect to other members, then the previous leader.
'outer: for m in members
.into_iter()
.filter(|m| *m != previous_leader)
.chain(&[previous_leader.clone()])
{
for ep in m.get_client_urls() {
match try_connect(&env, security_mgr, ep.as_str(), cluster_id, timeout) {
Ok((_, r)) => {
resp = Some(r);
break 'outer;
}
Err(e) => {
error!("failed to connect to {}, {:?}", ep, e);
continue;
}
}
}
}
// Then try to connect the PD cluster leader.
if let Some(resp) = resp {
let leader = resp.get_leader().clone();
for ep in leader.get_client_urls() {
let r = try_connect(&env, security_mgr, ep.as_str(), cluster_id, timeout);
if r.is_ok() {
return r;
}
}
}
Err(internal_err!("failed to connect to {:?}", members))
}

180
src/rpc/pd/mod.rs Normal file
View File

@ -0,0 +1,180 @@
// Copyright 2018 The TiKV Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::{Deref, DerefMut};
use kvproto::{kvrpcpb, metapb};
pub use crate::rpc::pd::client::PdClient;
use crate::{Error, Key, Result};
#[macro_use]
mod leader;
mod client;
mod context;
mod request;
pub type RegionId = u64;
pub type StoreId = u64;
#[derive(Eq, PartialEq, Hash, Clone, Default, Debug)]
pub struct RegionVerId {
pub id: RegionId,
pub conf_ver: u64,
pub ver: u64,
}
#[derive(Clone, Default, Debug, PartialEq)]
pub struct Region {
pub region: metapb::Region,
pub leader: Option<Peer>,
}
impl Deref for Region {
type Target = metapb::Region;
fn deref(&self) -> &Self::Target {
&self.region
}
}
impl DerefMut for Region {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.region
}
}
impl Region {
pub fn new(region: metapb::Region, leader: Option<metapb::Peer>) -> Self {
Region {
region,
leader: leader.map(Peer),
}
}
pub fn switch_peer(&mut self, _to: StoreId) -> Result<()> {
unimplemented!()
}
pub fn contains(&self, key: &Key) -> bool {
let key: &[u8] = key.as_ref();
let start_key = self.region.get_start_key();
let end_key = self.region.get_end_key();
start_key <= key && (end_key > key || end_key.is_empty())
}
pub fn context(&self) -> Result<kvrpcpb::Context> {
self.leader
.as_ref()
.ok_or_else(|| Error::NotLeader(self.region.get_id(), None))
.map(|l| {
let mut ctx = kvrpcpb::Context::default();
ctx.set_region_id(self.region.get_id());
ctx.set_region_epoch(Clone::clone(self.region.get_region_epoch()));
ctx.set_peer(Clone::clone(l));
ctx
})
}
pub fn start_key(&self) -> &[u8] {
self.region.get_start_key()
}
pub fn end_key(&self) -> &[u8] {
self.region.get_end_key()
}
pub fn ver_id(&self) -> RegionVerId {
let region = &self.region;
let epoch = region.get_region_epoch();
RegionVerId {
id: region.get_id(),
conf_ver: epoch.get_conf_ver(),
ver: epoch.get_version(),
}
}
pub fn id(&self) -> RegionId {
self.region.get_id()
}
pub fn peer(&self) -> Result<Peer> {
self.leader
.as_ref()
.map(Clone::clone)
.map(Into::into)
.ok_or_else(|| Error::StaleEpoch(None))
}
pub fn meta(&self) -> metapb::Region {
Clone::clone(&self.region)
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct Store(metapb::Store);
impl From<metapb::Store> for Store {
fn from(store: metapb::Store) -> Store {
Store(store)
}
}
impl Deref for Store {
type Target = metapb::Store;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for Store {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[derive(Clone, Default, Debug, PartialEq)]
pub struct Peer(metapb::Peer);
impl From<metapb::Peer> for Peer {
fn from(peer: metapb::Peer) -> Peer {
Peer(peer)
}
}
impl Deref for Peer {
type Target = metapb::Peer;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for Peer {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl Peer {
pub fn into_inner(self) -> metapb::Peer {
self.0
}
}
#[derive(Eq, PartialEq, Debug)]
pub struct PdTimestamp {
pub physical: i64,
pub logical: i64,
}

136
src/rpc/pd/request.rs Normal file
View File

@ -0,0 +1,136 @@
// Copyright 2018 The TiKV Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
result,
sync::{Arc, RwLock},
time::{Duration, Instant},
};
use futures::future::{loop_fn, ok, Either, Future, Loop};
use log::*;
use tokio_timer::timer::Handle;
use crate::{rpc::util::GLOBAL_TIMER_HANDLE, Error, Result};
pub const RECONNECT_INTERVAL_SEC: u64 = 1; // 1s
/// The context of sending requets.
pub struct Request<Resp, Func, Cli, Reconnect> {
reconnect_count: usize,
request_sent: usize,
client: Arc<RwLock<Cli>>,
timer: Handle,
resp: Option<Result<Resp>>,
func: Func,
reconnect: Reconnect,
}
const MAX_REQUEST_COUNT: usize = 3;
impl<Resp, Func, Cli, Reconnect, RespFuture> Request<Resp, Func, Cli, Reconnect>
where
Resp: Send + 'static,
Func: FnMut(&RwLock<Cli>) -> RespFuture + Send + 'static,
Cli: Send + Sync + 'static,
Reconnect: FnMut(&Arc<RwLock<Cli>>, u64) -> Result<()> + Send + 'static,
RespFuture: Future<Item = Resp, Error = Error> + Send + 'static,
{
pub fn new(func: Func, client: Arc<RwLock<Cli>>, reconnect: Reconnect, retry: usize) -> Self {
Request {
reconnect_count: retry,
request_sent: 0,
client,
timer: GLOBAL_TIMER_HANDLE.clone(),
resp: None,
func,
reconnect,
}
}
fn reconnect_if_needed(mut self) -> impl Future<Item = Self, Error = Self> + Send {
debug!("reconnect remains: {}", self.reconnect_count);
if self.request_sent < MAX_REQUEST_COUNT {
return Either::A(ok(self));
}
// Updating client.
self.reconnect_count -= 1;
// FIXME: should not block the core.
match (self.reconnect)(&self.client, RECONNECT_INTERVAL_SEC) {
Ok(_) => {
self.request_sent = 0;
Either::A(ok(self))
}
Err(_) => Either::B(
self.timer
.delay(Instant::now() + Duration::from_secs(RECONNECT_INTERVAL_SEC))
.then(|_| Err(self)),
),
}
}
fn send_and_receive(mut self) -> impl Future<Item = Self, Error = Self> + Send {
self.request_sent += 1;
debug!("request sent: {}", self.request_sent);
ok(self).and_then(|mut ctx| {
let req = (ctx.func)(&ctx.client);
req.then(|resp| match resp {
Ok(resp) => {
ctx.resp = Some(Ok(resp));
Ok(ctx)
}
Err(err) => {
error!("request failed: {:?}", err);
Err(ctx)
}
})
})
}
fn break_or_continue(ctx: result::Result<Self, Self>) -> Result<Loop<Self, Self>> {
let ctx = match ctx {
Ok(ctx) | Err(ctx) => ctx,
};
let done = ctx.reconnect_count == 0 || ctx.resp.is_some();
if done {
Ok(Loop::Break(ctx))
} else {
Ok(Loop::Continue(ctx))
}
}
fn post_loop(ctx: Result<Self>) -> Result<Resp> {
let ctx = ctx.expect("end loop with Ok(_)");
ctx.resp
.unwrap_or_else(|| Err(internal_err!("fail to request")))
}
/// Returns a Future, it is resolves once a future returned by the closure
/// is resolved successfully, otherwise it repeats `retry` times.
pub fn execute(self) -> impl Future<Item = Resp, Error = Error> {
let ctx = self;
loop_fn(ctx, |ctx| {
ctx.reconnect_if_needed()
.and_then(Self::send_and_receive)
.then(Self::break_or_continue)
})
.then(Self::post_loop)
.map_err(|e| e)
}
}

132
src/rpc/security.rs Normal file
View File

@ -0,0 +1,132 @@
// Copyright 2018 The TiKV Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
fs::File,
io::Read,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use grpcio::{Channel, ChannelBuilder, ChannelCredentialsBuilder, Environment};
use log::*;
use crate::Result;
fn check_pem_file(tag: &str, path: &Path) -> Result<File> {
File::open(path)
.map_err(|e| internal_err!("failed to open {} to load {}: {:?}", path.display(), tag, e))
}
fn load_pem_file(tag: &str, path: &Path) -> Result<Vec<u8>> {
let mut file = check_pem_file(tag, path)?;
let mut key = vec![];
file.read_to_end(&mut key)
.map_err(|e| {
internal_err!(
"failed to load {} from path {}: {:?}",
tag,
path.display(),
e
)
})
.map(|_| key)
}
#[derive(Default)]
pub struct SecurityManager {
ca: Vec<u8>,
cert: Vec<u8>,
key: PathBuf,
}
impl SecurityManager {
pub fn load(
ca_path: impl AsRef<Path>,
cert_path: impl AsRef<Path>,
key_path: impl Into<PathBuf>,
) -> Result<SecurityManager> {
let key_path = key_path.into();
check_pem_file("private key", &key_path)?;
Ok(SecurityManager {
ca: load_pem_file("ca", ca_path.as_ref())?,
cert: load_pem_file("certificate", cert_path.as_ref())?,
key: key_path,
})
}
pub fn connect<Factory, Client>(
&self,
env: Arc<Environment>,
addr: &str,
factory: Factory,
) -> Result<Client>
where
Factory: FnOnce(Channel) -> Client,
{
info!("connect to rpc server at endpoint: {:?}", addr);
let addr = addr
.trim_left_matches("http://")
.trim_left_matches("https://");
let cb = ChannelBuilder::new(env)
.keepalive_time(Duration::from_secs(10))
.keepalive_timeout(Duration::from_secs(3));
let channel = if self.ca.is_empty() {
cb.connect(addr)
} else {
let cred = ChannelCredentialsBuilder::new()
.root_cert(self.ca.clone())
.cert(self.cert.clone(), load_pem_file("private key", &self.key)?)
.build();
cb.secure_connect(addr, cred)
};
Ok(factory(channel))
}
}
#[cfg(test)]
mod tests {
extern crate tempdir;
use super::*;
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
use self::tempdir::TempDir;
#[test]
fn test_security() {
let temp = TempDir::new("test_cred").unwrap();
let example_ca = temp.path().join("ca");
let example_cert = temp.path().join("cert");
let example_pem = temp.path().join("key");
for (id, f) in (&[&example_ca, &example_cert, &example_pem])
.into_iter()
.enumerate()
{
File::create(f).unwrap().write_all(&[id as u8]).unwrap();
}
let cert_path: PathBuf = format!("{}", example_cert.display()).into();
let key_path: PathBuf = format!("{}", example_pem.display()).into();
let ca_path: PathBuf = format!("{}", example_ca.display()).into();
let mgr = SecurityManager::load(&ca_path, &cert_path, &key_path).unwrap();
assert_eq!(mgr.ca, vec![0]);
assert_eq!(mgr.cert, vec![1]);
let key = load_pem_file("private key", &key_path).unwrap();
assert_eq!(key, vec![2]);
}
}

701
src/rpc/tikv/client.rs Normal file
View File

@ -0,0 +1,701 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{fmt, sync::Arc, time::Duration};
use futures::Future;
use grpcio::{CallOption, Environment};
use kvproto::{errorpb, kvrpcpb, tikvpb_grpc::TikvClient};
use protobuf;
use crate::{
rpc::{
client::{RawContext, TxnContext},
security::SecurityManager,
tikv::context::{request_context, RequestContext},
},
transaction::{Mutation, TxnInfo},
Error, Key, KvPair, Result, Value,
};
trait HasRegionError {
fn region_error(&mut self) -> Option<Error>;
}
trait HasError {
fn error(&mut self) -> Option<Error>;
}
impl From<errorpb::Error> for Error {
fn from(mut e: errorpb::Error) -> Error {
let message = e.take_message();
if e.has_not_leader() {
let e = e.get_not_leader();
Error::NotLeader(
e.get_region_id(),
Some(format!("{}. Leader: {:?}", message, e.get_leader())),
)
} else if e.has_region_not_found() {
Error::RegionNotFound(e.get_region_not_found().get_region_id(), Some(message))
} else if e.has_key_not_in_region() {
let mut e = e.take_key_not_in_region();
Error::KeyNotInRegion(
e.take_key(),
e.get_region_id(),
e.take_start_key(),
e.take_end_key(),
)
} else if e.has_stale_epoch() {
Error::StaleEpoch(Some(format!(
"{}. New epoch: {:?}",
message,
e.get_stale_epoch().get_new_regions()
)))
} else if e.has_server_is_busy() {
let mut e = e.take_server_is_busy();
Error::ServerIsBusy(e.take_reason(), e.get_backoff_ms())
} else if e.has_stale_command() {
Error::StaleCommand(message)
} else if e.has_store_not_match() {
let e = e.get_store_not_match();
Error::StoreNotMatch(e.get_request_store_id(), e.get_actual_store_id(), message)
} else if e.has_raft_entry_too_large() {
let e = e.get_raft_entry_too_large();
Error::RaftEntryTooLarge(e.get_region_id(), e.get_entry_size(), message)
} else {
Error::InternalError(message)
}
}
}
macro_rules! has_region_error {
($type:ty) => {
impl HasRegionError for $type {
fn region_error(&mut self) -> Option<Error> {
if self.has_region_error() {
Some(self.take_region_error().into())
} else {
None
}
}
}
};
}
has_region_error!(kvrpcpb::GetResponse);
has_region_error!(kvrpcpb::ScanResponse);
has_region_error!(kvrpcpb::PrewriteResponse);
has_region_error!(kvrpcpb::CommitResponse);
has_region_error!(kvrpcpb::ImportResponse);
has_region_error!(kvrpcpb::BatchRollbackResponse);
has_region_error!(kvrpcpb::CleanupResponse);
has_region_error!(kvrpcpb::BatchGetResponse);
has_region_error!(kvrpcpb::ScanLockResponse);
has_region_error!(kvrpcpb::ResolveLockResponse);
has_region_error!(kvrpcpb::GCResponse);
has_region_error!(kvrpcpb::RawGetResponse);
has_region_error!(kvrpcpb::RawBatchGetResponse);
has_region_error!(kvrpcpb::RawPutResponse);
has_region_error!(kvrpcpb::RawBatchPutResponse);
has_region_error!(kvrpcpb::RawDeleteResponse);
has_region_error!(kvrpcpb::RawBatchDeleteResponse);
has_region_error!(kvrpcpb::DeleteRangeResponse);
has_region_error!(kvrpcpb::RawDeleteRangeResponse);
has_region_error!(kvrpcpb::RawScanResponse);
has_region_error!(kvrpcpb::RawBatchScanResponse);
impl From<kvrpcpb::KeyError> for Error {
fn from(e: kvrpcpb::KeyError) -> Error {
Error::KeyError(format!("{:?}", e))
}
}
macro_rules! has_key_error {
($type:ty) => {
impl HasError for $type {
fn error(&mut self) -> Option<Error> {
if self.has_error() {
Some(self.take_error().into())
} else {
None
}
}
}
};
}
has_key_error!(kvrpcpb::GetResponse);
has_key_error!(kvrpcpb::CommitResponse);
has_key_error!(kvrpcpb::BatchRollbackResponse);
has_key_error!(kvrpcpb::CleanupResponse);
has_key_error!(kvrpcpb::ScanLockResponse);
has_key_error!(kvrpcpb::ResolveLockResponse);
has_key_error!(kvrpcpb::GCResponse);
macro_rules! has_str_error {
($type:ty) => {
impl HasError for $type {
fn error(&mut self) -> Option<Error> {
if self.get_error().is_empty() {
None
} else {
Some(Error::KVError(self.take_error()))
}
}
}
};
}
has_str_error!(kvrpcpb::RawGetResponse);
has_str_error!(kvrpcpb::RawPutResponse);
has_str_error!(kvrpcpb::RawBatchPutResponse);
has_str_error!(kvrpcpb::RawDeleteResponse);
has_str_error!(kvrpcpb::RawBatchDeleteResponse);
has_str_error!(kvrpcpb::RawDeleteRangeResponse);
has_str_error!(kvrpcpb::ImportResponse);
has_str_error!(kvrpcpb::DeleteRangeResponse);
macro_rules! has_no_error {
($type:ty) => {
impl HasError for $type {
fn error(&mut self) -> Option<Error> {
None
}
}
};
}
has_no_error!(kvrpcpb::ScanResponse);
has_no_error!(kvrpcpb::PrewriteResponse);
has_no_error!(kvrpcpb::BatchGetResponse);
has_no_error!(kvrpcpb::RawBatchGetResponse);
has_no_error!(kvrpcpb::RawScanResponse);
has_no_error!(kvrpcpb::RawBatchScanResponse);
macro_rules! raw_request {
($context:expr, $type:ty) => {{
let mut req = <$type>::new();
let (region, cf) = $context.into_inner();
req.set_context(region.into());
if let Some(cf) = cf {
req.set_cf(cf.into_inner());
}
req
}};
}
macro_rules! txn_request {
($context:expr, $type:ty) => {{
let mut req = <$type>::new();
req.set_context($context.into_inner().into());
req
}};
}
impl From<Mutation> for kvrpcpb::Mutation {
fn from(mutation: Mutation) -> kvrpcpb::Mutation {
let mut pb = kvrpcpb::Mutation::new();
match mutation {
Mutation::Put(k, v) => {
pb.set_op(kvrpcpb::Op::Put);
pb.set_key(k.into_inner());
pb.set_value(v.into_inner());
}
Mutation::Del(k) => {
pb.set_op(kvrpcpb::Op::Del);
pb.set_key(k.into_inner());
}
Mutation::Lock(k) => {
pb.set_op(kvrpcpb::Op::Lock);
pb.set_key(k.into_inner());
}
Mutation::Rollback(k) => {
pb.set_op(kvrpcpb::Op::Rollback);
pb.set_key(k.into_inner());
}
};
pb
}
}
impl From<TxnInfo> for kvrpcpb::TxnInfo {
fn from(txn_info: TxnInfo) -> kvrpcpb::TxnInfo {
let mut pb = kvrpcpb::TxnInfo::new();
pb.set_txn(txn_info.txn);
pb.set_status(txn_info.status);
pb
}
}
pub struct KvClient {
client: Arc<TikvClient>,
timeout: Duration,
address: String,
}
impl KvClient {
pub fn connect(
env: Arc<Environment>,
addr: &str,
security_mgr: &Arc<SecurityManager>,
timeout: Duration,
) -> Result<KvClient> {
let client = Arc::new(security_mgr.connect(env, addr, TikvClient::new)?);
Ok(KvClient {
client,
timeout,
address: addr.to_owned(),
})
}
pub fn kv_get(
&self,
context: TxnContext,
version: u64,
key: Key,
) -> impl Future<Item = kvrpcpb::GetResponse, Error = Error> {
let mut req = txn_request!(context, kvrpcpb::GetRequest);
req.set_key(key.into_inner());
req.set_version(version);
self.execute(request_context(
"kv_get",
move |cli: Arc<TikvClient>, opt: _| cli.kv_get_async_opt(&req, opt),
))
}
pub fn kv_scan(
&self,
context: TxnContext,
version: u64,
start_key: Key,
end_key: Key,
limit: u32,
key_only: bool,
) -> impl Future<Item = kvrpcpb::ScanResponse, Error = Error> {
let mut req = txn_request!(context, kvrpcpb::ScanRequest);
req.set_start_key(start_key.into_inner());
req.set_end_key(end_key.into_inner());
req.set_version(version);
req.set_limit(limit);
req.set_key_only(key_only);
self.execute(request_context(
"kv_scan",
move |cli: Arc<TikvClient>, opt: _| cli.kv_scan_async_opt(&req, opt),
))
}
pub fn kv_prewrite(
&self,
context: TxnContext,
mutations: impl Iterator<Item = Mutation>,
primary_lock: Key,
start_version: u64,
lock_ttl: u64,
skip_constraint_check: bool,
) -> impl Future<Item = kvrpcpb::PrewriteResponse, Error = Error> {
let mut req = txn_request!(context, kvrpcpb::PrewriteRequest);
req.set_mutations(mutations.map(Into::into).collect());
req.set_primary_lock(primary_lock.into_inner());
req.set_start_version(start_version);
req.set_lock_ttl(lock_ttl);
req.set_skip_constraint_check(skip_constraint_check);
self.execute(request_context(
"kv_prewrite",
move |cli: Arc<TikvClient>, opt: _| cli.kv_prewrite_async_opt(&req, opt),
))
}
pub fn kv_commit(
&self,
context: TxnContext,
keys: impl Iterator<Item = Key>,
start_version: u64,
commit_version: u64,
) -> impl Future<Item = kvrpcpb::CommitResponse, Error = Error> {
let mut req = txn_request!(context, kvrpcpb::CommitRequest);
req.set_keys(keys.map(|x| x.into_inner()).collect());
req.set_start_version(start_version);
req.set_commit_version(commit_version);
self.execute(request_context(
"kv_commit",
move |cli: Arc<TikvClient>, opt: _| cli.kv_commit_async_opt(&req, opt),
))
}
pub fn kv_import(
&self,
mutations: impl Iterator<Item = Mutation>,
commit_version: u64,
) -> impl Future<Item = kvrpcpb::ImportResponse, Error = Error> {
let mut req = kvrpcpb::ImportRequest::new();
req.set_mutations(mutations.map(Into::into).collect());
req.set_commit_version(commit_version);
self.execute(request_context(
"kv_import",
move |cli: Arc<TikvClient>, opt: _| cli.kv_import_async_opt(&req, opt),
))
}
pub fn kv_cleanup(
&self,
context: TxnContext,
key: Key,
start_version: u64,
) -> impl Future<Item = kvrpcpb::CleanupResponse, Error = Error> {
let mut req = txn_request!(context, kvrpcpb::CleanupRequest);
req.set_key(key.into_inner());
req.set_start_version(start_version);
self.execute(request_context(
"kv_cleanup",
move |cli: Arc<TikvClient>, opt: _| cli.kv_cleanup_async_opt(&req, opt),
))
}
pub fn kv_batch_get(
&self,
context: TxnContext,
keys: impl Iterator<Item = Key>,
version: u64,
) -> impl Future<Item = kvrpcpb::BatchGetResponse, Error = Error> {
let mut req = txn_request!(context, kvrpcpb::BatchGetRequest);
req.set_keys(keys.map(|x| x.into_inner()).collect());
req.set_version(version);
self.execute(request_context(
"kv_batch_get",
move |cli: Arc<TikvClient>, opt: _| cli.kv_batch_get_async_opt(&req, opt),
))
}
pub fn kv_batch_rollback(
&self,
context: TxnContext,
keys: impl Iterator<Item = Key>,
start_version: u64,
) -> impl Future<Item = kvrpcpb::BatchRollbackResponse, Error = Error> {
let mut req = txn_request!(context, kvrpcpb::BatchRollbackRequest);
req.set_keys(keys.map(|x| x.into_inner()).collect());
req.set_start_version(start_version);
self.execute(request_context(
"kv_batch_rollback",
move |cli: Arc<TikvClient>, opt: _| cli.kv_batch_rollback_async_opt(&req, opt),
))
}
pub fn kv_scan_lock(
&self,
context: TxnContext,
start_key: Key,
max_version: u64,
limit: u32,
) -> impl Future<Item = kvrpcpb::ScanLockResponse, Error = Error> {
let mut req = txn_request!(context, kvrpcpb::ScanLockRequest);
req.set_start_key(start_key.into_inner());
req.set_max_version(max_version);
req.set_limit(limit);
self.execute(request_context(
"kv_scan_lock",
move |cli: Arc<TikvClient>, opt: _| cli.kv_scan_lock_async_opt(&req, opt),
))
}
pub fn kv_resolve_lock(
&self,
context: TxnContext,
txn_infos: impl Iterator<Item = TxnInfo>,
start_version: u64,
commit_version: u64,
) -> impl Future<Item = kvrpcpb::ResolveLockResponse, Error = Error> {
let mut req = txn_request!(context, kvrpcpb::ResolveLockRequest);
req.set_start_version(start_version);
req.set_commit_version(commit_version);
req.set_txn_infos(txn_infos.map(Into::into).collect());
self.execute(request_context(
"kv_resolve_lock",
move |cli: Arc<TikvClient>, opt: _| cli.kv_resolve_lock_async_opt(&req, opt),
))
}
pub fn kv_gc(
&self,
context: TxnContext,
safe_point: u64,
) -> impl Future<Item = kvrpcpb::GCResponse, Error = Error> {
let mut req = txn_request!(context, kvrpcpb::GCRequest);
req.set_safe_point(safe_point);
self.execute(request_context(
"kv_gc",
move |cli: Arc<TikvClient>, opt: _| cli.kv_gc_async_opt(&req, opt),
))
}
pub fn kv_delete_range(
&self,
context: TxnContext,
start_key: Key,
end_key: Key,
) -> impl Future<Item = kvrpcpb::DeleteRangeResponse, Error = Error> {
let mut req = txn_request!(context, kvrpcpb::DeleteRangeRequest);
req.set_start_key(start_key.into_inner());
req.set_end_key(end_key.into_inner());
self.execute(request_context(
"kv_delete_range",
move |cli: Arc<TikvClient>, opt: _| cli.kv_delete_range_async_opt(&req, opt),
))
}
pub fn raw_get(
&self,
context: RawContext,
key: Key,
) -> impl Future<Item = Value, Error = Error> {
let mut req = raw_request!(context, kvrpcpb::RawGetRequest);
req.set_key(key.into_inner());
self.execute(request_context(
"raw_get",
move |cli: Arc<TikvClient>, opt: _| cli.raw_get_async_opt(&req, opt),
))
.map(|mut resp| resp.take_value().into())
}
pub fn raw_batch_get(
&self,
context: RawContext,
keys: impl Iterator<Item = Key>,
) -> impl Future<Item = Vec<KvPair>, Error = Error> {
let mut req = raw_request!(context, kvrpcpb::RawBatchGetRequest);
req.set_keys(keys.map(|x| x.into_inner()).collect());
self.execute(request_context(
"raw_batch_get",
move |cli: Arc<TikvClient>, opt: _| cli.raw_batch_get_async_opt(&req, opt),
))
.map(|mut resp| Self::convert_from_grpc_pairs(resp.take_pairs()))
}
pub fn raw_put(
&self,
context: RawContext,
key: Key,
value: Value,
) -> impl Future<Item = (), Error = Error> {
let mut req = raw_request!(context, kvrpcpb::RawPutRequest);
req.set_key(key.into_inner());
req.set_value(value.into_inner());
self.execute(request_context(
"raw_put",
move |cli: Arc<TikvClient>, opt: _| cli.raw_put_async_opt(&req, opt),
))
.map(|_| ())
}
pub fn raw_batch_put(
&self,
context: RawContext,
pairs: Vec<KvPair>,
) -> impl Future<Item = (), Error = Error> {
let mut req = raw_request!(context, kvrpcpb::RawBatchPutRequest);
req.set_pairs(Self::convert_to_grpc_pairs(pairs));
self.execute(request_context(
"raw_batch_put",
move |cli: Arc<TikvClient>, opt: _| cli.raw_batch_put_async_opt(&req, opt),
))
.map(|_| ())
}
pub fn raw_delete(
&self,
context: RawContext,
key: Key,
) -> impl Future<Item = (), Error = Error> {
let mut req = raw_request!(context, kvrpcpb::RawDeleteRequest);
req.set_key(key.into_inner());
self.execute(request_context(
"raw_delete",
move |cli: Arc<TikvClient>, opt: _| cli.raw_delete_async_opt(&req, opt),
))
.map(|_| ())
}
pub fn raw_batch_delete(
&self,
context: RawContext,
keys: Vec<Key>,
) -> impl Future<Item = (), Error = Error> {
let mut req = raw_request!(context, kvrpcpb::RawBatchDeleteRequest);
req.set_keys(keys.into_iter().map(|x| x.into_inner()).collect());
self.execute(request_context(
"raw_batch_delete",
move |cli: Arc<TikvClient>, opt: _| cli.raw_batch_delete_async_opt(&req, opt),
))
.map(|_| ())
}
pub fn raw_scan(
&self,
context: RawContext,
start_key: Option<Key>,
end_key: Option<Key>,
limit: u32,
key_only: bool,
) -> impl Future<Item = Vec<KvPair>, Error = Error> {
let mut req = raw_request!(context, kvrpcpb::RawScanRequest);
start_key
.map(|k| req.set_start_key(k.into_inner()))
.unwrap();
end_key.map(|k| req.set_end_key(k.into_inner())).unwrap();
req.set_limit(limit);
req.set_key_only(key_only);
self.execute(request_context(
"raw_scan",
move |cli: Arc<TikvClient>, opt: _| cli.raw_scan_async_opt(&req, opt),
))
.map(|mut resp| Self::convert_from_grpc_pairs(resp.take_kvs()))
}
pub fn raw_batch_scan(
&self,
context: RawContext,
ranges: impl Iterator<Item = (Option<Key>, Option<Key>)>,
each_limit: u32,
key_only: bool,
) -> impl Future<Item = Vec<KvPair>, Error = Error> {
let mut req = raw_request!(context, kvrpcpb::RawBatchScanRequest);
req.set_ranges(Self::convert_to_grpc_ranges(ranges));
req.set_each_limit(each_limit);
req.set_key_only(key_only);
self.execute(request_context(
"raw_batch_scan",
move |cli: Arc<TikvClient>, opt: _| cli.raw_batch_scan_async_opt(&req, opt),
))
.map(|mut resp| Self::convert_from_grpc_pairs(resp.take_kvs()))
}
pub fn raw_delete_range(
&self,
context: RawContext,
start_key: Key,
end_key: Key,
) -> impl Future<Item = (), Error = Error> {
let mut req = raw_request!(context, kvrpcpb::RawDeleteRangeRequest);
req.set_start_key(start_key.into_inner());
req.set_end_key(end_key.into_inner());
self.execute(request_context(
"raw_delete_range",
move |cli: Arc<TikvClient>, opt: _| cli.raw_delete_range_async_opt(&req, opt),
))
.map(|_| ())
}
fn execute<Executor, Resp, RpcFuture>(
&self,
mut context: RequestContext<Executor>,
) -> impl Future<Item = Resp, Error = Error>
where
Executor: FnOnce(Arc<TikvClient>, CallOption) -> ::grpcio::Result<RpcFuture>,
RpcFuture: Future<Item = Resp, Error = ::grpcio::Error>,
Resp: HasRegionError + HasError + Sized + Clone,
{
let executor = context.executor();
executor(
Arc::clone(&self.client),
CallOption::default().timeout(self.timeout),
)
.unwrap()
.then(|r| match r {
Err(e) => Err(Error::Grpc(e)),
Ok(mut r) => {
if let Some(e) = r.region_error() {
Err(e)
} else if let Some(e) = r.error() {
Err(e)
} else {
Ok(r)
}
}
})
.then(move |r| context.done(r))
}
#[inline]
fn convert_to_grpc_pair(pair: KvPair) -> kvrpcpb::KvPair {
let mut result = kvrpcpb::KvPair::new();
let (key, value) = pair.into_inner();
result.set_key(key.into_inner());
result.set_value(value.into_inner());
result
}
#[inline]
fn convert_to_grpc_pairs(pairs: Vec<KvPair>) -> protobuf::RepeatedField<kvrpcpb::KvPair> {
pairs.into_iter().map(Self::convert_to_grpc_pair).collect()
}
#[inline]
fn convert_from_grpc_pair(mut pair: kvrpcpb::KvPair) -> KvPair {
KvPair::new(Key::from(pair.take_key()), Value::from(pair.take_value()))
}
#[inline]
fn convert_from_grpc_pairs(pairs: protobuf::RepeatedField<kvrpcpb::KvPair>) -> Vec<KvPair> {
pairs
.into_vec()
.into_iter()
.map(Self::convert_from_grpc_pair)
.collect()
}
#[inline]
fn convert_to_grpc_range(range: (Option<Key>, Option<Key>)) -> kvrpcpb::KeyRange {
let (start, end) = range;
let mut range = kvrpcpb::KeyRange::new();
start.map(|k| range.set_start_key(k.into_inner())).unwrap();
end.map(|k| range.set_end_key(k.into_inner())).unwrap();
range
}
#[inline]
fn convert_to_grpc_ranges(
ranges: impl Iterator<Item = (Option<Key>, Option<Key>)>,
) -> protobuf::RepeatedField<kvrpcpb::KeyRange> {
ranges.map(Self::convert_to_grpc_range).collect()
}
}
impl fmt::Debug for KvClient {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("KvClient")
.field("address", &self.address)
.field("timeout", &self.timeout)
.finish()
}
}

58
src/rpc/tikv/context.rs Normal file
View File

@ -0,0 +1,58 @@
// Copyright 2018 The TiKV Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
use lazy_static::*;
use prometheus::*;
pub use crate::rpc::context::RequestContext;
pub fn request_context<Executor>(
cmd: &'static str,
executor: Executor,
) -> RequestContext<Executor> {
RequestContext::new(
cmd,
&TIKV_REQUEST_DURATION_HISTOGRAM_VEC,
&TIKV_REQUEST_COUNTER_VEC,
&TIKV_FAILED_REQUEST_DURATION_HISTOGRAM_VEC,
&TIKV_FAILED_REQUEST_COUNTER_VEC,
executor,
)
}
lazy_static! {
static ref TIKV_REQUEST_DURATION_HISTOGRAM_VEC: HistogramVec = register_histogram_vec!(
"tikv_request_duration_seconds",
"Bucketed histogram of TiKV requests duration",
&["type"]
)
.unwrap();
static ref TIKV_REQUEST_COUNTER_VEC: IntCounterVec = register_int_counter_vec!(
"tikv_request_total",
"Total number of requests sent to TiKV",
&["type"]
)
.unwrap();
static ref TIKV_FAILED_REQUEST_DURATION_HISTOGRAM_VEC: HistogramVec = register_histogram_vec!(
"tikv_failed_request_duration_seconds",
"Bucketed histogram of failed TiKV requests duration",
&["type"]
)
.unwrap();
static ref TIKV_FAILED_REQUEST_COUNTER_VEC: IntCounterVec = register_int_counter_vec!(
"tikv_failed_request_total",
"Total number of failed requests sent to TiKV",
&["type"]
)
.unwrap();
}

17
src/rpc/tikv/mod.rs Normal file
View File

@ -0,0 +1,17 @@
// Copyright 2018 The TiKV Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod client;
pub mod context;
pub use self::client::KvClient;

144
src/rpc/util.rs Normal file
View File

@ -0,0 +1,144 @@
// Copyright 2018 The TiKV Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
sync::{mpsc, RwLock, RwLockReadGuard, RwLockWriteGuard},
thread,
time::Duration,
};
use lazy_static::*;
use tokio_timer::{self, timer::Handle};
macro_rules! internal_err {
($e:expr) => ({
$crate::Error::InternalError(format!("[{}:{}]: {}", file!(), line!(), $e))
});
($f:tt, $($arg:expr),+) => ({
internal_err!(format!($f, $($arg),+))
});
}
/// make a thread name with additional tag inheriting from current thread.
macro_rules! thd_name {
($name:expr) => {{
$crate::rpc::util::get_tag_from_thread_name()
.map(|tag| format!("{}::{}", $name, tag))
.unwrap_or_else(|| $name.to_owned())
}};
}
/// A handy shortcut to replace `RwLock` write/read().unwrap() pattern to
/// shortcut wl and rl.
pub trait HandyRwLock<T> {
fn wl(&self) -> RwLockWriteGuard<T>;
fn rl(&self) -> RwLockReadGuard<T>;
}
impl<T> HandyRwLock<T> for RwLock<T> {
fn wl(&self) -> RwLockWriteGuard<T> {
self.write().unwrap()
}
fn rl(&self) -> RwLockReadGuard<T> {
self.read().unwrap()
}
}
pub fn get_tag_from_thread_name() -> Option<String> {
thread::current()
.name()
.and_then(|name| name.split("::").skip(1).last())
.map(From::from)
}
/// Convert Duration to seconds.
#[inline]
pub fn duration_to_sec(d: Duration) -> f64 {
let nanos = f64::from(d.subsec_nanos());
// In most cases, we can't have so large Duration, so here just panic if overflow now.
d.as_secs() as f64 + (nanos / 1_000_000_000.0)
}
lazy_static! {
pub static ref GLOBAL_TIMER_HANDLE: Handle = start_global_timer();
}
fn start_global_timer() -> Handle {
let (tx, rx) = mpsc::channel();
thread::Builder::new()
.name(thd_name!("timer"))
.spawn(move || {
let mut timer = tokio_timer::Timer::default();
tx.send(timer.handle()).unwrap();
loop {
timer.turn(None).unwrap();
}
})
.unwrap();
rx.recv().unwrap()
}
#[cfg(test)]
mod tests {
use super::*;
use futures::Future;
use std::*;
#[test]
fn test_rwlock_deadlock() {
// If the test runs over 60s, then there is a deadlock.
let mu = RwLock::new(Some(1));
{
let _clone = foo(&mu.rl());
let mut data = mu.wl();
assert!(data.is_some());
*data = None;
}
{
match foo(&mu.rl()) {
Some(_) | None => {
let res = mu.try_write();
assert!(res.is_err());
}
}
}
#[cfg_attr(feature = "cargo-clippy", allow(clippy::clone_on_copy))]
fn foo(a: &Option<usize>) -> Option<usize> {
a.clone()
}
}
#[test]
fn test_internal_error() {
let file_name = file!();
let line_number = line!();
let e = internal_err!("{}", "hi");
assert_eq!(
format!("{}", e),
format!("[{}:{}]: hi", file_name, line_number + 1)
);
}
#[test]
fn test_global_timer() {
let handle = super::GLOBAL_TIMER_HANDLE.clone();
let delay =
handle.delay(::std::time::Instant::now() + ::std::time::Duration::from_millis(100));
let timer = ::std::time::Instant::now();
delay.wait().unwrap();
assert!(timer.elapsed() >= ::std::time::Duration::from_millis(100));
}
}

View File

@ -11,16 +11,148 @@
// See the License for the specific language governing permissions and
// limitations under the License.
/*! Transactional related functionality.
Using the [`transaction::Client`](struct.Client.html) you can utilize TiKV's transactional interface.
This interface offers SQL-like transactions on top of the raw interface.
**Warning:** It is not advisible to use the both raw and transactional functionality in the same keyspace.
*/
use crate::{Config, Error, Key, KvPair, Value};
use futures::{Future, Poll, Stream};
use std::ops::RangeBounds;
/// The TiKV transactional [`Client`](struct.Client.html) is used to issue requests to the TiKV server and PD cluster.
pub struct Client;
impl Client {
/// Create a new [`Client`](struct.Client.html) once the [`Connect`](struct.Connect.html) resolves.
///
/// ```rust,no_run
/// use tikv_client::{Config, transaction::Client};
/// use futures::Future;
/// let connect = Client::new(&Config::default());
/// let client = connect.wait();
/// ```
#[cfg_attr(feature = "cargo-clippy", allow(clippy::new_ret_no_self))]
pub fn new(config: &Config) -> Connect {
Connect::new(config.clone())
}
/// Create a new [`Transaction`](struct.Transaction.html) using the timestamp from [`current_timestamp`](struct.Client.html#method.current_timestamp).
///
/// Using the transaction you can issue commands like [`get`](struct.Transaction.html#method.get) or [`set`](file:///home/hoverbear/git/client-rust/target/doc/tikv_client/transaction/struct.Transaction.html#method.set).
///
/// ```rust,no_run
/// use tikv_client::{Config, transaction::Client};
/// use futures::Future;
/// let connect = Client::new(&Config::default());
/// let client = connect.wait().unwrap();
/// let transaction = client.begin();
/// // ... Issue some commands.
/// let commit = transaction.commit();
/// let result: () = commit.wait().unwrap();
/// ```
pub fn begin(&self) -> Transaction {
unimplemented!()
}
/// Create a new [`Transaction`](struct.Transaction.html) at the provded timestamp.
///
/// ```rust,no_run
/// use tikv_client::{Config, transaction::Client};
/// use futures::Future;
/// let connect = Client::new(&Config::default());
/// let client = connect.wait().unwrap();
/// let timestamp = client.current_timestamp();
/// let transaction = client.begin_with_timestamp(timestamp);
/// // ... Issue some commands.
/// let commit = transaction.commit();
/// let result: () = commit.wait().unwrap();
/// ```
pub fn begin_with_timestamp(&self, _timestamp: Timestamp) -> Transaction {
unimplemented!()
}
/// Get a [`Snapshot`](struct.Snapshot.html) using the timestamp from [`current_timestamp`](struct.Client.html#method.current_timestamp).
///
/// ```rust,no_run
/// use tikv_client::{Config, transaction::Client};
/// use futures::Future;
/// let connect = Client::new(&Config::default());
/// let client = connect.wait().unwrap();
/// let snapshot = client.snapshot();
/// // ... Issue some commands.
/// ```
pub fn snapshot(&self) -> Snapshot {
unimplemented!()
}
/// Retrieve the current [`Timestamp`](struct.Timestamp.html).
///
/// ```rust,no_run
/// use tikv_client::{Config, transaction::Client};
/// use futures::Future;
/// let connect = Client::new(&Config::default());
/// let client = connect.wait().unwrap();
/// let timestamp = client.current_timestamp();
/// ```
pub fn current_timestamp(&self) -> Timestamp {
unimplemented!()
}
}
/// An unresolved [`Client`](struct.Client.html) connection to a TiKV cluster.
///
/// Once resolved it will result in a connected [`Client`](struct.Client.html).
///
/// ```rust,no_run
/// use tikv_client::{Config, transaction::{Client, Connect}};
/// use futures::Future;
///
/// let connect: Connect = Client::new(&Config::default());
/// let client: Client = connect.wait().unwrap();
/// ```
pub struct Connect {
config: Config,
}
impl Connect {
fn new(config: Config) -> Self {
Connect { config }
}
}
pub enum Mutation {
Put(Key, Value),
Del(Key),
Lock(Key),
Rollback(Key),
}
pub struct TxnInfo {
pub txn: u64,
pub status: u64,
}
impl Future for Connect {
type Item = Client;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let _config = &self.config;
unimplemented!()
}
}
/// A logical timestamp produced by PD.
#[derive(Copy, Clone)]
pub struct Timestamp(u64);
impl Into<Timestamp> for u64 {
fn into(self) -> Timestamp {
Timestamp(self)
impl From<u64> for Timestamp {
fn from(v: u64) -> Self {
Timestamp(v)
}
}
@ -38,6 +170,246 @@ impl Timestamp {
}
}
/// A undo-able set of actions on the dataset.
///
/// Using a transaction you can prepare a set of actions (such as `get`, or `set`) on data at a
/// particular timestamp obtained from the placement driver.
///
/// Once a transaction is commited, a new commit timestamp is obtained from the placement driver.
pub struct Transaction {
snapshot: Snapshot,
}
impl Transaction {
/// Create a new transaction operating on the given snapshot.
///
/// ```rust,no_run
/// use tikv_client::{Config, transaction::Client};
/// use futures::Future;
/// let connect = Client::new(&Config::default());
/// let client = connect.wait().unwrap();
/// let txn = client.begin();
/// ```
pub fn new(snapshot: Snapshot) -> Self {
Self { snapshot }
}
/// Commit the actions of the transaction.
///
/// Once committed, it is no longer possible to `rollback` the actions in the transaction.
///
/// ```rust,no_run
/// # use tikv_client::{Config, transaction::Client};
/// # use futures::Future;
/// # let connect = Client::new(&Config::default());
/// # let connected_client = connect.wait().unwrap();
/// let txn = connected_client.begin();
/// // ... Do some actions.
/// let req = txn.commit();
/// let result: () = req.wait().unwrap();
/// ```
pub fn commit(self) -> Commit {
Commit::new(self)
}
/// Rollback the actions of the transaction.
///
/// ```rust,no_run
/// # use tikv_client::{Config, transaction::Client};
/// # use futures::Future;
/// # let connect = Client::new(&Config::default());
/// # let connected_client = connect.wait().unwrap();
/// let txn = connected_client.begin();
/// // ... Do some actions.
/// let req = txn.rollback();
/// let result: () = req.wait().unwrap();
/// ```
pub fn rollback(self) -> Rollback {
Rollback::new(self)
}
/// Lock the given keys.
///
/// ```rust,no_run
/// # use tikv_client::{Config, transaction::Client};
/// # use futures::Future;
/// # let connect = Client::new(&Config::default());
/// # let connected_client = connect.wait().unwrap();
/// let mut txn = connected_client.begin();
/// // ... Do some actions.
/// let req = txn.lock_keys(vec!["TiKV", "Rust"]);
/// let result: () = req.wait().unwrap();
/// ```
pub fn lock_keys(&mut self, keys: impl IntoIterator<Item = impl Into<Key>>) -> LockKeys {
LockKeys::new(keys.into_iter().map(|v| v.into()).collect())
}
pub fn is_readonly(&self) -> bool {
unimplemented!()
}
/// Returns the timestamp which the transaction started at.
///
/// ```rust,no_run
/// # use tikv_client::{Config, transaction::{Client, Timestamp}};
/// # use futures::Future;
/// # let connect = Client::new(&Config::default());
/// # let connected_client = connect.wait().unwrap();
/// let txn = connected_client.begin();
/// // ... Do some actions.
/// let ts: Timestamp = txn.start_ts();
/// ```
pub fn start_ts(&self) -> Timestamp {
unimplemented!()
}
/// Get the `Snapshot` the transaction is operating on.
///
/// ```rust,no_run
/// # use tikv_client::{Config, transaction::{Client, Snapshot}};
/// # use futures::Future;
/// # let connect = Client::new(&Config::default());
/// # let connected_client = connect.wait().unwrap();
/// let txn = connected_client.begin();
/// // ... Do some actions.
/// let snap: Snapshot = txn.snapshot();
/// ```
pub fn snapshot(&self) -> Snapshot {
unimplemented!()
}
/// Set the isolation level of the transaction.
///
/// ```rust,no_run
/// # use tikv_client::{Config, transaction::{Client, IsolationLevel}};
/// # use futures::Future;
/// # let connect = Client::new(&Config::default());
/// # let connected_client = connect.wait().unwrap();
/// let mut txn = connected_client.begin();
/// txn.set_isolation_level(IsolationLevel::SnapshotIsolation);
/// ```
pub fn set_isolation_level(&mut self, _level: IsolationLevel) {
unimplemented!()
}
/// Create a new [`Get`](struct.Get.html) request.
///
/// Once resolved this request will result in the fetching of the value associated with the
/// given key.
///
/// ```rust,no_run
/// # use tikv_client::{Value, Config, transaction::Client};
/// # use futures::Future;
/// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.wait().unwrap();
/// let mut txn = connected_client.begin();
/// let key = "TiKV";
/// let req = txn.get(key);
/// let result: Value = req.wait().unwrap();
/// // Finish the transaction...
/// txn.commit().wait().unwrap();
/// ```
pub fn get(&self, key: impl Into<Key>) -> Get {
self.snapshot.get(key.into())
}
/// Create a new [`BatchGet`](struct.BatchGet.html) request.
///
/// Once resolved this request will result in the fetching of the values associated with the
/// given keys.
///
/// ```rust,no_run
/// # use tikv_client::{KvPair, Config, transaction::Client};
/// # use futures::Future;
/// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.wait().unwrap();
/// let mut txn = connected_client.begin();
/// let keys = vec!["TiKV", "TiDB"];
/// let req = txn.batch_get(keys);
/// let result: Vec<KvPair> = req.wait().unwrap();
/// // Finish the transaction...
/// txn.commit().wait().unwrap();
/// ```
pub fn batch_get(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> BatchGet {
self.snapshot.batch_get(keys)
}
pub fn scan(&self, range: impl RangeBounds<Key>) -> Scanner {
self.snapshot.scan(range)
}
pub fn scan_reverse(&self, range: impl RangeBounds<Key>) -> Scanner {
self.snapshot.scan_reverse(range)
}
/// Create a new [`Set`](struct.Set.html) request.
///
/// Once resolved this request will result in the setting of the value associated with the given key.
///
/// ```rust,no_run
/// # use tikv_client::{Key, Value, Config, transaction::Client};
/// # use futures::Future;
/// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.wait().unwrap();
/// let mut txn = connected_client.begin();
/// let key = "TiKV";
/// let val = "TiKV";
/// let req = txn.set(key, val);
/// let result: () = req.wait().unwrap();
/// // Finish the transaction...
/// txn.commit().wait().unwrap();
/// ```
pub fn set(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Set {
Set::new(key.into(), value.into())
}
/// Create a new [`Delete`](struct.Delete.html) request.
///
/// Once resolved this request will result in the deletion of the given key.
///
/// ```rust,no_run
/// # use tikv_client::{Key, Config, transaction::Client};
/// # use futures::Future;
/// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.wait().unwrap();
/// let mut txn = connected_client.begin();
/// let key = "TiKV";
/// let req = txn.delete(key);
/// let result: () = req.wait().unwrap();
/// // Finish the transaction...
/// txn.commit().wait().unwrap();
/// ```
pub fn delete(&mut self, key: impl Into<Key>) -> Delete {
Delete::new(key.into())
}
}
/// A snapshot of dataset at a particular point in time.
pub struct Snapshot;
impl Snapshot {
pub fn get(&self, key: impl Into<Key>) -> Get {
Get::new(key.into())
}
pub fn batch_get(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> BatchGet {
BatchGet::new(keys.into_iter().map(|v| v.into()).collect())
}
pub fn scan(&self, range: impl RangeBounds<Key>) -> Scanner {
drop(range);
unimplemented!()
}
pub fn scan_reverse(&self, range: impl RangeBounds<Key>) -> Scanner {
drop(range);
unimplemented!()
}
}
/// An unresolved [`Transaction::scan`](struct.Transaction.html#method.scan) request.
///
/// Once resolved this request will result in a scanner over the given keys.
pub struct Scanner;
impl Stream for Scanner {
@ -49,12 +421,41 @@ impl Stream for Scanner {
}
}
/// The isolation level guarantees provided by the transaction.
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum IsolationLevel {
/// Consistent reads and conflict free writes.
///
/// Snapshot isolation guarantees:
/// * All reads will see the last committed value of the data at the snapshot timestamp.
/// * The transaction will only successfully commit if no updates to the data have created a
/// conflict with concurrent updates made sine the snapshot.
///
/// Using this level means:
/// * Lost updates don't occur.
/// * Dirty reads don't occur.
/// * Non-repeatable reads don't occur.
/// * Phantom reads don't occur.
SnapshotIsolation,
/// Reads may not be consistent, but writes are conflict free.
///
/// Read committed guarantees:
/// * All reads are committed at the moment it is read.
/// not repeatable.
/// * Write locks are only released at the end of the transaction.
///
/// Using this level means:
/// * Lost updates don't occur.
/// * Dirty reads don't occur.
/// * Non-repeatable reads may occur.
/// * Phantom reads may occur.
ReadCommitted,
}
/// An unresolved [`Transaction::get`](struct.Transaction.html#method.get) request.
///
/// Once resolved this request will result in the fetching of the value associated with the given
/// key.
pub struct Get {
key: Key,
}
@ -75,6 +476,10 @@ impl Future for Get {
}
}
/// An unresolved [`Transaction::batch_get`](struct.Transaction.html#method.batch_get) request.
///
/// Once resolved this request will result in the fetching of the values associated with the given
/// keys.
pub struct BatchGet {
keys: Vec<Key>,
}
@ -86,7 +491,7 @@ impl BatchGet {
}
impl Future for BatchGet {
type Item = Value;
type Item = Vec<KvPair>;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -95,6 +500,9 @@ impl Future for BatchGet {
}
}
/// An unresolved [`Transaction::commit`](struct.Transaction.html#method.commit) request.
///
/// Once resolved this request will result in the committing of the transaction.
pub struct Commit {
txn: Transaction,
}
@ -115,6 +523,9 @@ impl Future for Commit {
}
}
/// An unresolved [`Transaction::rollback`](struct.Transaction.html#method.rollback) request.
///
/// Once resolved this request will result in the rolling back of the transaction.
pub struct Rollback {
txn: Transaction,
}
@ -135,6 +546,9 @@ impl Future for Rollback {
}
}
/// An unresolved [`Transaction::lock_keys`](struct.Transaction.html#method.lock_keys) request.
///
/// Once resolved this request will result in the locking of the given keys.
pub struct LockKeys {
keys: Vec<Key>,
}
@ -155,6 +569,10 @@ impl Future for LockKeys {
}
}
/// An unresolved [`Transaction::set`](struct.Transaction.html#method.set) request.
///
/// Once resolved this request will result in the setting of the value associated with the given
/// key.
pub struct Set {
key: Key,
value: Value,
@ -177,6 +595,9 @@ impl Future for Set {
}
}
/// An unresolved [`Transaction::delete`](struct.Transaction.html#method.delete) request.
///
/// Once resolved this request will result in the deletion of the given key.
pub struct Delete {
key: Key,
}
@ -196,128 +617,3 @@ impl Future for Delete {
unimplemented!()
}
}
pub struct Transaction {
snapshot: Snapshot,
}
impl Transaction {
pub fn commit(self) -> Commit {
Commit::new(self)
}
pub fn rollback(self) -> Rollback {
Rollback::new(self)
}
pub fn lock_keys(&mut self, keys: impl AsRef<[Key]>) -> LockKeys {
LockKeys::new(keys.as_ref().to_vec().clone())
}
pub fn is_readonly(&self) -> bool {
unimplemented!()
}
pub fn start_ts(&self) -> Timestamp {
unimplemented!()
}
pub fn snapshot(&self) -> Snapshot {
unimplemented!()
}
pub fn set_isolation_level(&mut self, _level: IsolationLevel) {
unimplemented!()
}
pub fn get(&self, key: impl AsRef<Key>) -> Get {
self.snapshot.get(key)
}
pub fn batch_get(&self, keys: impl AsRef<[Key]>) -> BatchGet {
self.snapshot.batch_get(keys)
}
pub fn scan(&self, range: impl RangeBounds<Key>) -> Scanner {
self.snapshot.scan(range)
}
pub fn scan_reverse(&self, range: impl RangeBounds<Key>) -> Scanner {
self.snapshot.scan_reverse(range)
}
pub fn set(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Set {
Set::new(key.into(), value.into())
}
pub fn delete(&mut self, key: impl AsRef<Key>) -> Delete {
Delete::new(key.as_ref().clone())
}
}
pub struct Snapshot;
impl Snapshot {
pub fn get(&self, key: impl AsRef<Key>) -> Get {
Get::new(key.as_ref().clone())
}
pub fn batch_get(&self, keys: impl AsRef<[Key]>) -> BatchGet {
BatchGet::new(keys.as_ref().to_vec().clone())
}
pub fn scan(&self, range: impl RangeBounds<Key>) -> Scanner {
drop(range);
unimplemented!()
}
pub fn scan_reverse(&self, range: impl RangeBounds<Key>) -> Scanner {
drop(range);
unimplemented!()
}
}
pub struct Connect {
config: Config,
}
impl Connect {
fn new(config: Config) -> Self {
Connect { config }
}
}
impl Future for Connect {
type Item = Client;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let _config = &self.config;
unimplemented!()
}
}
pub struct Client {}
impl Client {
#![allow(clippy::new_ret_no_self)]
pub fn new(config: &Config) -> Connect {
Connect::new(config.clone())
}
pub fn begin(&self) -> Transaction {
unimplemented!()
}
pub fn begin_with_timestamp(&self, _timestamp: Timestamp) -> Transaction {
unimplemented!()
}
pub fn snapshot(&self) -> Snapshot {
unimplemented!()
}
pub fn current_timestamp(&self) -> Timestamp {
unimplemented!()
}
}

167
tests/raw.rs Normal file
View File

@ -0,0 +1,167 @@
// Copyright 2018 The TiKV Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
use futures::future::Future;
const NUM_TEST_KEYS: u32 = 100;
use tikv_client::{raw::Client, Config, Key, KvPair, Value};
fn generate_key(id: i32) -> Key {
format!("testkey_{}", id).into_bytes().into()
}
fn generate_value(id: i32) -> Value {
format!("testvalue_{}", id).into_bytes().into()
}
fn wipe_all(client: &Client) {
let test_key_start = generate_key(0);
let test_key_end = generate_key(NUM_TEST_KEYS as i32 - 1);
client
.delete_range(test_key_start..test_key_end)
.wait()
.expect("Could not delete test keys");
}
fn connect() -> Client {
let client = Client::new(&Config::new(vec!["127.0.0.1:2379"]))
.wait()
.expect("Could not connect to tikv");
wipe_all(&client);
client
}
fn test_empty(client: &Client) {
let test_key_start = generate_key(0);
let test_key_end = generate_key(NUM_TEST_KEYS as i32 - 1);
assert!(client
.scan(test_key_start..test_key_end, NUM_TEST_KEYS)
.wait()
.expect("Could not scan")
.is_empty());
}
fn test_existence(client: &Client, existing_pairs: &[KvPair], not_existing_keys: Vec<Key>) {
let test_key_start = generate_key(0);
let test_key_end = generate_key(NUM_TEST_KEYS as i32 - 1);
for pair in existing_pairs.iter().map(Clone::clone) {
let (key, value) = pair.into_inner();
assert_eq!(
client.get(key).wait().expect("Could not get value"),
value.clone(),
);
}
for key in not_existing_keys.clone().into_iter() {
let r = client.get(key).wait();
assert!(r.is_err());
}
let mut existing_keys = Vec::with_capacity(existing_pairs.len());
let mut existing_key_only_pairs = Vec::with_capacity(existing_pairs.len());
for pair in existing_pairs.iter() {
let key = pair.key().clone();
existing_keys.push(key.clone());
existing_key_only_pairs.push(KvPair::new(key, Value::default()));
}
let mut all_keys = existing_keys.clone();
all_keys.extend_from_slice(&not_existing_keys);
assert_eq!(
client
.batch_get(all_keys)
.wait()
.expect("Could not get value in batch"),
existing_pairs,
);
assert_eq!(
client
.batch_get(not_existing_keys)
.wait()
.expect("Could not get value in batch"),
Vec::new(),
);
assert_eq!(
client
.scan(test_key_start.clone()..test_key_end.clone(), NUM_TEST_KEYS)
.wait()
.expect("Could not scan"),
existing_pairs,
);
assert_eq!(
client
.scan(test_key_start.clone()..test_key_end.clone(), NUM_TEST_KEYS)
.key_only()
.wait()
.expect("Could not scan"),
existing_key_only_pairs,
);
}
#[test]
fn basic_raw_test() {
let client = connect();
test_empty(&client);
assert!(client
.put(generate_key(0), generate_value(0))
.wait()
.is_ok());
test_existence(
&client,
&[KvPair::new(generate_key(0), generate_value(0))],
vec![generate_key(1), generate_key(2)],
);
let empty_pairs = Vec::new();
assert!(client.delete(generate_key(0)).wait().is_ok());
test_existence(
&client,
&empty_pairs,
vec![generate_key(0), generate_key(1), generate_key(2)],
);
let pairs: Vec<KvPair> = (0..10)
.map(|i| KvPair::new(generate_key(i), generate_value(i)))
.collect();
assert!(client.batch_put(pairs.clone()).wait().is_ok());
test_existence(
&client,
&pairs,
vec![generate_key(10), generate_key(11), generate_key(12)],
);
let keys: Vec<Key> = vec![generate_key(8), generate_key(9)];
assert!(client.batch_delete(keys).wait().is_ok());
let mut pairs = pairs;
pairs.truncate(8);
test_existence(
&client,
&pairs,
vec![generate_key(8), generate_key(9), generate_key(10)],
);
wipe_all(&client);
test_existence(
&client,
&empty_pairs,
pairs.into_iter().map(|x| x.into_inner().0).collect(),
);
}