diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 63b6260..3ed0e76 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,6 +10,9 @@ jobs: check: name: check runs-on: ubuntu-latest + strategy: + matrix: + env: [ 'RUST_PROTOBUF=0', 'RUST_PROTOBUF=1' ] steps: - uses: actions/checkout@v2 - uses: actions-rs/toolchain@v1 @@ -17,6 +20,7 @@ jobs: profile: minimal toolchain: nightly override: true + - run: rustup component add rustfmt clippy - name: Install Protoc uses: arduino/setup-protoc@v1 with: @@ -24,55 +28,19 @@ jobs: repo-token: ${{ secrets.GITHUB_TOKEN }} - name: Rust Cache uses: Swatinem/rust-cache@v1.4.0 - - uses: actions-rs/cargo@v1 with: - command: check - args: --all-targets --all-features - - fmt: - name: rustfmt - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - uses: actions-rs/toolchain@v1 - with: - profile: minimal - toolchain: nightly - override: true - - run: rustup component add rustfmt - - uses: actions-rs/cargo@v1 - with: - command: fmt - args: --all -- --check - - clippy: - name: clippy - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - uses: actions-rs/toolchain@v1 - with: - toolchain: nightly - components: clippy - override: true - - name: Install Protoc - uses: arduino/setup-protoc@v1 - with: - version: '3.x' - repo-token: ${{ secrets.GITHUB_TOKEN }} - - name: Rust Cache - uses: Swatinem/rust-cache@v1.4.0 - - uses: actions-rs/clippy-check@v1 - with: - token: ${{ secrets.GITHUB_TOKEN }} - args: --all-targets --all-features -- -D clippy::all - name: Clippy Output + key: ${{ matrix.env }} + - name: make check + run: ${{ matrix.env }} make check unit-test: name: unit test env: CARGO_INCREMENTAL: 0 runs-on: ubuntu-latest + strategy: + matrix: + env: [ 'RUST_PROTOBUF=0', 'RUST_PROTOBUF=1' ] steps: - uses: actions/checkout@v2 - uses: actions-rs/toolchain@v1 @@ -87,14 +55,19 @@ jobs: repo-token: ${{ secrets.GITHUB_TOKEN }} - name: Rust Cache uses: Swatinem/rust-cache@v1.4.0 + with: + key: ${{ matrix.env }} - name: unit test - run: make unit-test + run: ${{ matrix.env }} make unit-test integration-test: name: integration test env: CARGO_INCREMENTAL: 0 runs-on: ubuntu-latest + strategy: + matrix: + env: [ 'RUST_PROTOBUF=0', 'RUST_PROTOBUF=1' ] steps: - uses: actions/checkout@v2 - uses: actions-rs/toolchain@v1 @@ -109,6 +82,8 @@ jobs: repo-token: ${{ secrets.GITHUB_TOKEN }} - name: Rust Cache uses: Swatinem/rust-cache@v1.4.0 + with: + key: ${{ matrix.env }} - name: install tiup run: curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh - name: start tiup playground @@ -121,4 +96,4 @@ jobs: sleep 1 done - name: integration test - run: MULTI_REGION=1 make integration-test + run: ${{ matrix.env }} MULTI_REGION=1 make integration-test diff --git a/Cargo.toml b/Cargo.toml index 77f9721..c1bcf79 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,10 +9,26 @@ description = "The Rust language implementation of TiKV client." edition = "2018" [features] -default = [] +default = [ "prost-codec", "prometheus/process" ] # Enable integration tests with a running TiKV and PD instance. # Use $PD_ADDRS, comma separated, to set the addresses the tests use. integration-tests = [] +protobuf-codec = [ + "grpcio/protobuf-codec", + "tikv-client-proto/protobuf-codec", + "tikv-client-common/protobuf-codec", + "tikv-client-pd/protobuf-codec", + "tikv-client-store/protobuf-codec", + "mock-tikv/protobuf-codec", +] +prost-codec = [ + "grpcio/prost-codec", + "tikv-client-proto/prost-codec", + "tikv-client-common/prost-codec", + "tikv-client-pd/prost-codec", + "tikv-client-store/prost-codec", + "mock-tikv/prost-codec", +] [lib] name = "tikv_client" @@ -24,10 +40,10 @@ either = "1.6" fail = "0.4" futures = { version = "0.3", features = ["async-await", "thread-pool"] } futures-timer = "3.0" -grpcio = { version = "0.10", features = [ "prost-codec", "openssl-vendored" ], default-features = false } +grpcio = { version = "0.10", features = [ "openssl-vendored" ], default-features = false } lazy_static = "1" log = "0.4" -prometheus = { version = "0.12", features = [ "push", "process" ], default-features = false } +prometheus = { version = "0.13", features = [ "push" ], default-features = false } rand = "0.8" regex = "1" semver = "1.0" @@ -70,3 +86,8 @@ members = [ name = "failpoint_tests" path = "tests/failpoint_tests.rs" required-features = ["fail/failpoints"] + +[patch.crates-io] +raft-proto = { git = "https://github.com/tikv/raft-rs", rev="95c532612ee6a83591fce9a8b51d6afe87b58835"} +protobuf-codegen = { git = "https://github.com/pingcap/rust-protobuf", rev="82b49fea7e696fd647b5aca0a6c6ec944eab3189" } +protobuf = { git = "https://github.com/pingcap/rust-protobuf", rev="82b49fea7e696fd647b5aca0a6c6ec944eab3189" } diff --git a/Makefile b/Makefile index 883bb0d..da470eb 100644 --- a/Makefile +++ b/Makefile @@ -2,21 +2,35 @@ export RUSTFLAGS=-Dwarnings .PHONY: default check unit-test integration-tests test doc docker-pd docker-kv docker all +ENABLE_FEATURES ?= +PD_ADDRS ?= "127.0.0.1:2379" +MULTI_REGION ?= 1 + +# Use Rust-protobuf instead of Prost to encode and decode protocol buffers. +ifeq ($(RUST_PROTOBUF),1) +ENABLE_FEATURES += protobuf-codec +else +ENABLE_FEATURES += prost-codec +endif + +ALL_FEATURES := ${ENABLE_FEATURES} integration-tests + +INTEGRATION_TEST_ARGS := --no-default-features --features "${ENABLE_FEATURES} integration-tests" + default: check check: - cargo check --all --all-targets --all-features + cargo check --all --all-targets --no-default-features --features "${ALL_FEATURES}" cargo fmt -- --check - cargo clippy --all-targets --all-features -- -D clippy::all + cargo clippy --all-targets --no-default-features --features "${ALL_FEATURES}" -- -D clippy::all unit-test: - cargo test --all + cargo test --all --no-default-features --features "${ENABLE_FEATURES}" integration-test: -# MULTI_REGION shall be set manually if needed - PD_ADDRS="127.0.0.1:2379" cargo test txn_ --all --features integration-tests -- --nocapture - PD_ADDRS="127.0.0.1:2379" cargo test raw_ --all --features integration-tests -- --nocapture - PD_ADDRS="127.0.0.1:2379" cargo test misc_ --all --features integration-tests -- --nocapture + cargo test txn_ --all ${INTEGRATION_TEST_ARGS} -- --nocapture + cargo test raw_ --all ${INTEGRATION_TEST_ARGS} -- --nocapture + cargo test misc_ --all ${INTEGRATION_TEST_ARGS} -- --nocapture test: unit-test integration-test @@ -36,3 +50,7 @@ tiup: tiup playground nightly --mode tikv-slim --kv 3 --without-monitor --kv.config $(shell pwd)/config/tikv.toml --pd.config $(shell pwd)/config/pd.toml & all: check doc test + +clean: + cargo clean + rm -rf target diff --git a/mock-tikv/Cargo.toml b/mock-tikv/Cargo.toml index 96cf103..d503e68 100644 --- a/mock-tikv/Cargo.toml +++ b/mock-tikv/Cargo.toml @@ -3,9 +3,13 @@ name = "mock-tikv" version = "0.0.0" edition = "2018" +[features] +protobuf-codec = ["grpcio/protobuf-codec"] +prost-codec = ["grpcio/prost-codec"] + [dependencies] derive-new = "0.5" futures = "0.3" -grpcio = { version = "0.10", features = [ "prost-codec" ], default-features = false } +grpcio = { version = "0.10", default-features = false } log = "0.4" tikv-client-proto = { path = "../tikv-client-proto"} diff --git a/mock-tikv/src/lib.rs b/mock-tikv/src/lib.rs index 8eee772..49aae1a 100644 --- a/mock-tikv/src/lib.rs +++ b/mock-tikv/src/lib.rs @@ -1,4 +1,8 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. + +// To support both prost & rust-protobuf. +#![cfg_attr(feature = "prost-codec", allow(clippy::useless_conversion))] + mod pd; mod server; mod store; diff --git a/mock-tikv/src/pd.rs b/mock-tikv/src/pd.rs index 3778e24..115d217 100644 --- a/mock-tikv/src/pd.rs +++ b/mock-tikv/src/pd.rs @@ -20,7 +20,7 @@ impl MockPd { tikv_client_proto::metapb::Region { start_key: vec![], end_key: vec![], - peers: vec![Self::leader()], + peers: vec![Self::leader()].into(), ..Default::default() } } @@ -58,12 +58,12 @@ impl Pd for MockPd { ) { let member = Member { name: "mock tikv".to_owned(), - client_urls: vec![format!("localhost:{MOCK_PD_PORT}")], + client_urls: vec![format!("localhost:{MOCK_PD_PORT}")].into(), ..Default::default() }; let resp = GetMembersResponse { - members: vec![member.clone()], - leader: Some(member), + members: vec![member.clone()].into(), + leader: Some(member).into(), ..Default::default() }; spawn_unary_success!(ctx, req, resp, sink); @@ -120,7 +120,7 @@ impl Pd for MockPd { sink: ::grpcio::UnarySink, ) { let resp = GetStoreResponse { - store: Some(Self::store()), + store: Some(Self::store()).into(), ..Default::default() }; spawn_unary_success!(ctx, req, resp, sink); @@ -169,8 +169,8 @@ impl Pd for MockPd { sink: ::grpcio::UnarySink, ) { let resp = GetRegionResponse { - region: Some(Self::region()), - leader: Some(Self::leader()), + region: Some(Self::region()).into(), + leader: Some(Self::leader()).into(), ..Default::default() }; spawn_unary_success!(ctx, req, resp, sink); @@ -192,8 +192,8 @@ impl Pd for MockPd { sink: ::grpcio::UnarySink, ) { let resp = GetRegionResponse { - region: Some(Self::region()), - leader: Some(Self::leader()), + region: Some(Self::region()).into(), + leader: Some(Self::leader()).into(), ..Default::default() }; spawn_unary_success!(ctx, req, resp, sink); diff --git a/mock-tikv/src/server.rs b/mock-tikv/src/server.rs index b595f2f..acaf66a 100644 --- a/mock-tikv/src/server.rs +++ b/mock-tikv/src/server.rs @@ -1,5 +1,7 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. +#![allow(clippy::useless_conversion)] // To support both prost & rust-protobuf. + use crate::{spawn_unary_success, KvStore}; use derive_new::new; use futures::{FutureExt, TryFutureExt}; @@ -192,7 +194,7 @@ impl Tikv for MockTikv { sink: grpcio::UnarySink, ) { let mut resp = tikv_client_proto::kvrpcpb::RawBatchGetResponse::default(); - resp.set_pairs(self.inner.raw_batch_get(req.take_keys())); + resp.set_pairs(self.inner.raw_batch_get(req.take_keys().into()).into()); spawn_unary_success!(ctx, req, resp, sink); } @@ -214,7 +216,7 @@ impl Tikv for MockTikv { mut req: tikv_client_proto::kvrpcpb::RawBatchPutRequest, sink: grpcio::UnarySink, ) { - let pairs = req.take_pairs(); + let pairs = req.take_pairs().into(); self.inner.raw_batch_put(pairs); let resp = RawBatchPutResponse::default(); spawn_unary_success!(ctx, req, resp, sink); @@ -238,7 +240,7 @@ impl Tikv for MockTikv { mut req: tikv_client_proto::kvrpcpb::RawBatchDeleteRequest, sink: grpcio::UnarySink, ) { - let keys = req.take_keys(); + let keys = req.take_keys().into(); self.inner.raw_batch_delete(keys); let resp = RawBatchDeleteResponse::default(); spawn_unary_success!(ctx, req, resp, sink); diff --git a/src/lib.rs b/src/lib.rs index e94785b..d40624c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -90,6 +90,9 @@ //! # })} //! ``` +// To support both prost & rust-protobuf. +#![cfg_attr(feature = "prost-codec", allow(clippy::useless_conversion))] + #[macro_use] pub mod request; #[macro_use] diff --git a/src/pd/client.rs b/src/pd/client.rs index a0403c9..87cf7cd 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -150,14 +150,8 @@ pub trait PdClient: Send + Sync + 'static { let region_end = region.end_key(); let mut grouped = vec![]; if !region_end.is_empty() && (end_key > region_end || end_key.is_empty()) { - grouped.push(kvrpcpb::KeyRange { - start_key: start_key.into(), - end_key: region_end.clone().into(), - }); - ranges.push(kvrpcpb::KeyRange { - start_key: region_end.into(), - end_key: end_key.into(), - }); + grouped.push(make_key_range(start_key.into(), region_end.clone().into())); + ranges.push(make_key_range(region_end.into(), end_key.into())); return Ok(Some((Some(ranges), (region, grouped)))); } grouped.push(range); @@ -170,14 +164,9 @@ pub trait PdClient: Send + Sync + 'static { break; } if !region_end.is_empty() && (end_key > region_end || end_key.is_empty()) { - grouped.push(kvrpcpb::KeyRange { - start_key: start_key.into(), - end_key: region_end.clone().into(), - }); - ranges.push(kvrpcpb::KeyRange { - start_key: region_end.into(), - end_key: end_key.into(), - }); + grouped + .push(make_key_range(start_key.into(), region_end.clone().into())); + ranges.push(make_key_range(region_end.into(), end_key.into())); return Ok(Some((Some(ranges), (region, grouped)))); } grouped.push(range); @@ -348,6 +337,13 @@ impl PdRpcClient { } } +fn make_key_range(start_key: Vec, end_key: Vec) -> kvrpcpb::KeyRange { + let mut key_range = kvrpcpb::KeyRange::default(); + key_range.set_start_key(start_key); + key_range.set_end_key(end_key); + key_range +} + #[cfg(test)] pub mod test { use super::*; @@ -430,18 +426,9 @@ pub mod test { let k3 = vec![11, 4]; let k4 = vec![16, 4]; let k_split = vec![10]; - let range1 = kvrpcpb::KeyRange { - start_key: k1.clone(), - end_key: k2.clone(), - }; - let range2 = kvrpcpb::KeyRange { - start_key: k1.clone(), - end_key: k3.clone(), - }; - let range3 = kvrpcpb::KeyRange { - start_key: k2.clone(), - end_key: k4.clone(), - }; + let range1 = make_key_range(k1.clone(), k2.clone()); + let range2 = make_key_range(k1.clone(), k3.clone()); + let range3 = make_key_range(k2.clone(), k4.clone()); let ranges = vec![range1, range2, range3]; let mut stream = executor::block_on_stream(client.group_ranges_by_region(ranges)); @@ -454,40 +441,16 @@ pub mod test { assert_eq!( ranges1.1, vec![ - kvrpcpb::KeyRange { - start_key: k1.clone(), - end_key: k2.clone() - }, - kvrpcpb::KeyRange { - start_key: k1, - end_key: k_split.clone() - } + make_key_range(k1.clone(), k2.clone()), + make_key_range(k1, k_split.clone()), ] ); assert_eq!(ranges2.0.id(), 2); - assert_eq!( - ranges2.1, - vec![kvrpcpb::KeyRange { - start_key: k_split.clone(), - end_key: k3 - }] - ); + assert_eq!(ranges2.1, vec![make_key_range(k_split.clone(), k3)]); assert_eq!(ranges3.0.id(), 1); - assert_eq!( - ranges3.1, - vec![kvrpcpb::KeyRange { - start_key: k2, - end_key: k_split.clone() - }] - ); + assert_eq!(ranges3.1, vec![make_key_range(k2, k_split.clone())]); assert_eq!(ranges4.0.id(), 2); - assert_eq!( - ranges4.1, - vec![kvrpcpb::KeyRange { - start_key: k_split, - end_key: k4 - }] - ); + assert_eq!(ranges4.1, vec![make_key_range(k_split, k4)]); assert!(stream.next().is_none()); } } diff --git a/src/pd/retry.rs b/src/pd/retry.rs index f6524ca..6607a4b 100644 --- a/src/pd/retry.rs +++ b/src/pd/retry.rs @@ -193,11 +193,11 @@ impl fmt::Debug for RetryClient { } fn region_from_response( - resp: pdpb::GetRegionResponse, + mut resp: pdpb::GetRegionResponse, err: impl FnOnce() -> Error, ) -> Result { - let region = resp.region.ok_or_else(err)?; - Ok(RegionWithLeader::new(region, resp.leader)) + let region = resp.region.take().ok_or_else(err)?; + Ok(RegionWithLeader::new(region, resp.leader.take())) } // A node-like thing that can be connected to. diff --git a/src/raw/client.rs b/src/raw/client.rs index b9ac740..efbb0fc 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -744,11 +744,19 @@ impl Client { } fn assert_non_atomic(&self) -> Result<()> { - (!self.atomic).then_some(()).ok_or(Error::UnsupportedMode) + if !self.atomic { + Ok(()) + } else { + Err(Error::UnsupportedMode) + } } fn assert_atomic(&self) -> Result<()> { - self.atomic.then_some(()).ok_or(Error::UnsupportedMode) + if self.atomic { + Ok(()) + } else { + Err(Error::UnsupportedMode) + } } } diff --git a/src/raw/requests.rs b/src/raw/requests.rs index bd678aa..400bf58 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -61,7 +61,7 @@ pub fn new_raw_batch_get_request( cf: Option, ) -> kvrpcpb::RawBatchGetRequest { let mut req = kvrpcpb::RawBatchGetRequest::default(); - req.set_keys(keys); + req.set_keys(keys.into()); req.maybe_set_cf(cf); req @@ -117,7 +117,7 @@ pub fn new_raw_batch_put_request( atomic: bool, ) -> kvrpcpb::RawBatchPutRequest { let mut req = kvrpcpb::RawBatchPutRequest::default(); - req.set_pairs(pairs); + req.set_pairs(pairs.into()); req.maybe_set_cf(cf); req.set_for_cas(atomic); @@ -145,7 +145,7 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { self.set_context(store.region_with_leader.context()?); - self.set_pairs(shard); + self.set_pairs(shard.into()); Ok(()) } } @@ -180,7 +180,7 @@ pub fn new_raw_batch_delete_request( cf: Option, ) -> kvrpcpb::RawBatchDeleteRequest { let mut req = kvrpcpb::RawBatchDeleteRequest::default(); - req.set_keys(keys); + req.set_keys(keys.into()); req.maybe_set_cf(cf); req @@ -252,7 +252,7 @@ pub fn new_raw_batch_scan_request( cf: Option, ) -> kvrpcpb::RawBatchScanRequest { let mut req = kvrpcpb::RawBatchScanRequest::default(); - req.set_ranges(ranges); + req.set_ranges(ranges.into()); req.set_each_limit(each_limit); req.set_key_only(key_only); req.maybe_set_cf(cf); @@ -271,12 +271,12 @@ impl Shardable for kvrpcpb::RawBatchScanRequest { &self, pd_client: &Arc, ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { - store_stream_for_ranges(self.ranges.clone(), pd_client.clone()) + store_stream_for_ranges(self.ranges.clone().into(), pd_client.clone()) } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { self.set_context(store.region_with_leader.context()?); - self.set_ranges(shard); + self.set_ranges(shard.into()); Ok(()) } } @@ -346,7 +346,7 @@ pub fn new_raw_coprocessor_request( let mut inner = kvrpcpb::RawCoprocessorRequest::default(); inner.set_copr_name(copr_name); inner.set_copr_version_req(copr_version_req); - inner.set_ranges(ranges); + inner.set_ranges(ranges.into()); RawCoprocessorRequest { inner, data_builder, @@ -389,12 +389,12 @@ impl Shardable for RawCoprocessorRequest { &self, pd_client: &Arc, ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { - store_stream_for_ranges(self.inner.ranges.clone(), pd_client.clone()) + store_stream_for_ranges(self.inner.ranges.clone().into(), pd_client.clone()) } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { self.inner.set_context(store.region_with_leader.context()?); - self.inner.set_ranges(shard.clone()); + self.inner.set_ranges(shard.clone().into()); self.inner.set_data((self.data_builder)( store.region_with_leader.region.clone(), shard, diff --git a/src/request/mod.rs b/src/request/mod.rs index af98b0f..9510f26 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -181,9 +181,8 @@ mod test { let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( |_: &dyn Any| { Ok(Box::new(kvrpcpb::CommitResponse { - region_error: None, - error: Some(kvrpcpb::KeyError::default()), - commit_version: 0, + error: Some(kvrpcpb::KeyError::default()).into(), + ..Default::default() }) as Box) }, ))); diff --git a/src/timestamp.rs b/src/timestamp.rs index 5bd21a2..ed781c3 100644 --- a/src/timestamp.rs +++ b/src/timestamp.rs @@ -35,8 +35,8 @@ impl TimestampExt for Timestamp { Self { physical: version >> PHYSICAL_SHIFT_BITS, logical: version & LOGICAL_MASK, - // Now we only support global transactions - suffix_bits: 0, + // Now we only support global transactions: suffix_bits: 0, + ..Default::default() } } diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index 03512fe..e5949cd 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -313,10 +313,10 @@ impl LockResolver { let mut txn_info_vec = Vec::with_capacity(txn_infos.len()); for (txn_id, commit_ts) in txn_infos.into_iter() { txn_ids.push(txn_id); - txn_info_vec.push(TxnInfo { - txn: txn_id, - status: commit_ts, - }); + let mut txn_info = TxnInfo::default(); + txn_info.set_txn(txn_id); + txn_info.set_status(commit_ts); + txn_info_vec.push(txn_info); } let cleaned_region = self .batch_resolve_locks(pd_client.clone(), store.clone(), txn_info_vec) @@ -435,7 +435,7 @@ mod tests { |_: &dyn Any| { fail::fail_point!("region-error", |_| { let resp = kvrpcpb::ResolveLockResponse { - region_error: Some(errorpb::Error::default()), + region_error: Some(errorpb::Error::default()).into(), ..Default::default() }; Ok(Box::new(resp) as Box) diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index e09576f..2ed9004 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -97,7 +97,7 @@ impl Process for DefaultProcessor { pub fn new_batch_get_request(keys: Vec>, timestamp: u64) -> kvrpcpb::BatchGetRequest { let mut req = kvrpcpb::BatchGetRequest::default(); - req.set_keys(keys); + req.set_keys(keys.into()); req.set_version(timestamp); req } @@ -172,7 +172,7 @@ pub fn new_resolve_lock_request( pub fn new_batch_resolve_lock_request(txn_infos: Vec) -> kvrpcpb::ResolveLockRequest { let mut req = kvrpcpb::ResolveLockRequest::default(); - req.set_txn_infos(txn_infos); + req.set_txn_infos(txn_infos.into()); req } @@ -219,7 +219,7 @@ pub fn new_prewrite_request( lock_ttl: u64, ) -> kvrpcpb::PrewriteRequest { let mut req = kvrpcpb::PrewriteRequest::default(); - req.set_mutations(mutations); + req.set_mutations(mutations.into()); req.set_primary_lock(primary_lock); req.set_start_version(start_version); req.set_lock_ttl(lock_ttl); @@ -264,7 +264,7 @@ impl Shardable for kvrpcpb::PrewriteRequest { // Only need to set secondary keys if we're sending the primary key. if self.use_async_commit && !self.mutations.iter().any(|m| m.key == self.primary_lock) { - self.set_secondaries(vec![]); + self.set_secondaries(vec![].into()); } // Only if there is only one request to send @@ -272,7 +272,7 @@ impl Shardable for kvrpcpb::PrewriteRequest { self.set_try_one_pc(false); } - self.set_mutations(shard); + self.set_mutations(shard.into()); Ok(()) } } @@ -283,7 +283,7 @@ pub fn new_commit_request( commit_version: u64, ) -> kvrpcpb::CommitRequest { let mut req = kvrpcpb::CommitRequest::default(); - req.set_keys(keys); + req.set_keys(keys.into()); req.set_start_version(start_version); req.set_commit_version(commit_version); @@ -301,7 +301,7 @@ pub fn new_batch_rollback_request( start_version: u64, ) -> kvrpcpb::BatchRollbackRequest { let mut req = kvrpcpb::BatchRollbackRequest::default(); - req.set_keys(keys); + req.set_keys(keys.into()); req.set_start_version(start_version); req @@ -319,7 +319,7 @@ pub fn new_pessimistic_rollback_request( for_update_ts: u64, ) -> kvrpcpb::PessimisticRollbackRequest { let mut req = kvrpcpb::PessimisticRollbackRequest::default(); - req.set_keys(keys); + req.set_keys(keys.into()); req.set_start_version(start_version); req.set_for_update_ts(for_update_ts); @@ -341,7 +341,7 @@ pub fn new_pessimistic_lock_request( need_value: bool, ) -> kvrpcpb::PessimisticLockRequest { let mut req = kvrpcpb::PessimisticLockRequest::default(); - req.set_mutations(mutations); + req.set_mutations(mutations.into()); req.set_primary_lock(primary_lock); req.set_start_version(start_version); req.set_lock_ttl(lock_ttl); @@ -375,7 +375,7 @@ impl Shardable for kvrpcpb::PessimisticLockRequest { fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { self.set_context(store.region_with_leader.context()?); - self.set_mutations(shard); + self.set_mutations(shard.into()); Ok(()) } } @@ -413,7 +413,7 @@ impl Merge> = resp.take_values().into(); let values_len = values.len(); let not_founds = resp.take_not_founds(); let kvpairs = mutations @@ -619,10 +619,10 @@ pub struct TransactionStatus { } impl From for TransactionStatus { - fn from(resp: kvrpcpb::CheckTxnStatusResponse) -> TransactionStatus { + fn from(mut resp: kvrpcpb::CheckTxnStatusResponse) -> TransactionStatus { TransactionStatus { action: resp.get_action(), - kind: (resp.commit_version, resp.lock_ttl, resp.lock_info).into(), + kind: (resp.commit_version, resp.lock_ttl, resp.lock_info.take()).into(), is_expired: false, } } @@ -685,7 +685,7 @@ pub fn new_check_secondary_locks_request( start_version: u64, ) -> kvrpcpb::CheckSecondaryLocksRequest { let mut req = kvrpcpb::CheckSecondaryLocksRequest::default(); - req.set_keys(keys); + req.set_keys(keys.into()); req.set_start_version(start_version); req } @@ -707,7 +707,7 @@ impl Merge for Collect { }; for resp in input { let resp = resp?; - for lock in resp.locks { + for lock in resp.locks.into_iter() { if !lock.use_async_commit { out.fallback_2pc = true; return Ok(out); @@ -751,7 +751,7 @@ impl HasLocks for kvrpcpb::CleanupResponse {} impl HasLocks for kvrpcpb::ScanLockResponse { fn take_locks(&mut self) -> Vec { - self.take_locks() + self.take_locks().into() } } @@ -783,6 +783,7 @@ impl HasLocks for kvrpcpb::PrewriteResponse { } #[cfg(test)] +#[cfg_attr(feature = "protobuf-codec", allow(clippy::useless_conversion))] mod tests { use crate::{ request::{plan::Merge, CollectWithShard, ResponseWithShard}, @@ -799,11 +800,11 @@ mod tests { let resp1 = ResponseWithShard( kvrpcpb::PessimisticLockResponse { - values: vec![value1.to_vec()], + values: vec![value1.to_vec()].into(), ..Default::default() }, vec![kvrpcpb::Mutation { - op: kvrpcpb::Op::PessimisticLock as i32, + op: kvrpcpb::Op::PessimisticLock.into(), key: key1.to_vec(), ..Default::default() }], @@ -811,11 +812,11 @@ mod tests { let resp_empty_value = ResponseWithShard( kvrpcpb::PessimisticLockResponse { - values: vec![value_empty.to_vec()], + values: vec![value_empty.to_vec()].into(), ..Default::default() }, vec![kvrpcpb::Mutation { - op: kvrpcpb::Op::PessimisticLock as i32, + op: kvrpcpb::Op::PessimisticLock.into(), key: key2.to_vec(), ..Default::default() }], @@ -823,18 +824,18 @@ mod tests { let resp_not_found = ResponseWithShard( kvrpcpb::PessimisticLockResponse { - values: vec![value_empty.to_vec(), value4.to_vec()], + values: vec![value_empty.to_vec(), value4.to_vec()].into(), not_founds: vec![true, false], ..Default::default() }, vec![ kvrpcpb::Mutation { - op: kvrpcpb::Op::PessimisticLock as i32, + op: kvrpcpb::Op::PessimisticLock.into(), key: key3.to_vec(), ..Default::default() }, kvrpcpb::Mutation { - op: kvrpcpb::Op::PessimisticLock as i32, + op: kvrpcpb::Op::PessimisticLock.into(), key: key4.to_vec(), ..Default::default() }, diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 6209ce4..171d908 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -596,9 +596,7 @@ async fn txn_pessimistic_rollback() -> Result<()> { #[serial] async fn txn_pessimistic_delete() -> Result<()> { init().await?; - let client = - TransactionClient::new_with_config(vec!["127.0.0.1:2379"], Default::default(), None) - .await?; + let client = TransactionClient::new_with_config(pd_addrs(), Default::default(), None).await?; // The transaction will lock the keys and must release the locks on commit, // even when values are not written to the DB. diff --git a/tikv-client-common/Cargo.toml b/tikv-client-common/Cargo.toml index 3262fc9..a8ff4d5 100644 --- a/tikv-client-common/Cargo.toml +++ b/tikv-client-common/Cargo.toml @@ -7,10 +7,14 @@ authors = ["The TiKV Project Authors"] repository = "https://github.com/tikv/client-rust" description = "Common components of the TiKV Rust client" +[features] +protobuf-codec = ["grpcio/protobuf-codec"] +prost-codec = ["grpcio/prost-codec"] + [dependencies] thiserror = "1" futures = { version = "0.3", features = ["compat", "async-await", "thread-pool"] } -grpcio = { version = "0.10", features = [ "prost-codec" ], default-features = false } +grpcio = { version = "0.10", default-features = false } lazy_static = "1" log = "0.4" regex = "1" diff --git a/tikv-client-pd/Cargo.toml b/tikv-client-pd/Cargo.toml index fb6fdaa..dc0b3e3 100644 --- a/tikv-client-pd/Cargo.toml +++ b/tikv-client-pd/Cargo.toml @@ -7,10 +7,14 @@ authors = ["The TiKV Project Authors"] repository = "https://github.com/tikv/client-rust" description = "Low level PD components for the TiKV Rust client" +[features] +protobuf-codec = ["grpcio/protobuf-codec"] +prost-codec = ["grpcio/prost-codec"] + [dependencies] async-trait = "0.1" futures = { version = "0.3", features = ["compat", "async-await", "thread-pool"] } -grpcio = { version = "0.10", features = [ "prost-codec" ], default-features = false } +grpcio = { version = "0.10", default-features = false } log = "0.4" tikv-client-common = { version = "0.1.0", path = "../tikv-client-common" } tikv-client-proto = { version = "0.1.0", path = "../tikv-client-proto" } diff --git a/tikv-client-pd/src/timestamp.rs b/tikv-client-pd/src/timestamp.rs index 4e5562f..9296864 100644 --- a/tikv-client-pd/src/timestamp.rs +++ b/tikv-client-pd/src/timestamp.rs @@ -152,16 +152,11 @@ impl<'a> Stream for TsoRequestStream<'a> { } if !requests.is_empty() { - let req = TsoRequest { - header: Some(RequestHeader { - cluster_id: self.cluster_id, - // TODO - sender_id: 0, - }), - count: requests.len() as u32, - // TODO - dc_location: String::new(), - }; + let mut req = TsoRequest::default(); + req.mut_header().set_cluster_id(self.cluster_id); + req.mut_header().set_sender_id(0); + req.set_count(requests.len() as u32); + req.set_dc_location(String::new()); let request_group = RequestGroup { tso_request: req.clone(), @@ -206,11 +201,10 @@ fn allocate_timestamps( for request in requests { offset -= 1; - let ts = Timestamp { - physical: tail_ts.physical, - logical: tail_ts.logical - offset as i64, - suffix_bits: tail_ts.get_suffix_bits(), - }; + let mut ts = Timestamp::default(); + ts.set_physical(tail_ts.physical); + ts.set_logical(tail_ts.logical - offset as i64); + ts.set_suffix_bits(tail_ts.get_suffix_bits()); let _ = request.send(ts); } } else { diff --git a/tikv-client-proto/Cargo.toml b/tikv-client-proto/Cargo.toml index 802e125..9c631bb 100644 --- a/tikv-client-proto/Cargo.toml +++ b/tikv-client-proto/Cargo.toml @@ -8,13 +8,18 @@ repository = "https://github.com/tikv/client-rust" description = "Protobuf specs for the TiKV Rust client" build = "build.rs" +[features] +protobuf-codec = ["protobuf-build/grpcio-protobuf-codec", "raft-proto/protobuf-codec", "grpcio/protobuf-codec"] +prost-codec = ["prost", "prost-derive", "protobuf-build/grpcio-prost-codec", "grpcio/prost-codec", "raft-proto/prost-codec"] + [build-dependencies] -protobuf-build = { version = "0.13", default-features = false, features = ["grpcio-prost-codec"] } +protobuf-build = { version = "0.13", default-features = false } [dependencies] -protobuf = "2.8" -prost = { version = "0.9" } -prost-derive = { version = "0.9" } +protobuf = "=2.8.0" +prost = { version = "0.9", optional = true } +prost-derive = { version = "0.9", optional = true } futures = "0.3" -grpcio = { version = "0.10", features = [ "prost-codec" ], default-features = false } +grpcio = { version = "0.10", default-features = false } lazy_static = { version = "1" } +raft-proto = { version = "0.7.0", default-features = false } diff --git a/tikv-client-proto/build.rs b/tikv-client-proto/build.rs index 99e0e9b..808e1ca 100644 --- a/tikv-client-proto/build.rs +++ b/tikv-client-proto/build.rs @@ -4,7 +4,7 @@ use protobuf_build::Builder; fn main() { Builder::new() - .search_dir_for_protos("./proto") - .includes(&["./include", "./proto"]) + .search_dir_for_protos("proto") + .append_to_black_list("eraftpb") .generate() } diff --git a/tikv-client-proto/src/lib.rs b/tikv-client-proto/src/lib.rs index 0a9a84a..7587d78 100644 --- a/tikv-client-proto/src/lib.rs +++ b/tikv-client-proto/src/lib.rs @@ -1,10 +1,11 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. -use protos::*; -pub use protos::{coprocessor, errorpb, kvrpcpb, metapb, mpp, pdpb, raft_serverpb, tikvpb}; +pub use protos::*; #[allow(dead_code)] #[allow(clippy::all)] mod protos { include!(concat!(env!("OUT_DIR"), "/protos/mod.rs")); + + use raft_proto::eraftpb; } diff --git a/tikv-client-store/Cargo.toml b/tikv-client-store/Cargo.toml index efdc18c..5dc7db6 100644 --- a/tikv-client-store/Cargo.toml +++ b/tikv-client-store/Cargo.toml @@ -7,11 +7,15 @@ authors = ["The TiKV Project Authors"] repository = "https://github.com/tikv/client-rust" description = "Low level TiKV node components of the TiKV Rust client" +[features] +protobuf-codec = ["grpcio/protobuf-codec"] +prost-codec = ["grpcio/prost-codec"] + [dependencies] async-trait = "0.1" derive-new = "0.5" futures = { version = "0.3", features = ["compat", "async-await", "thread-pool"] } -grpcio = { version = "0.10", features = [ "prost-codec" ], default-features = false } +grpcio = { version = "0.10", default-features = false } log = "0.4" tikv-client-common = { version = "0.1.0", path = "../tikv-client-common" } tikv-client-proto = { version = "0.1.0", path = "../tikv-client-proto" } diff --git a/tikv-client-store/src/errors.rs b/tikv-client-store/src/errors.rs index c1e915a..f40afd0 100644 --- a/tikv-client-store/src/errors.rs +++ b/tikv-client-store/src/errors.rs @@ -223,17 +223,12 @@ mod test { use tikv_client_proto::kvrpcpb; #[test] fn result_haslocks() { - let mut resp: Result<_, Error> = Ok(kvrpcpb::CommitResponse { - region_error: None, - error: None, - commit_version: 0, - }); + let mut resp: Result<_, Error> = Ok(kvrpcpb::CommitResponse::default()); assert!(resp.key_errors().is_none()); let mut resp: Result<_, Error> = Ok(kvrpcpb::CommitResponse { - region_error: None, - error: Some(kvrpcpb::KeyError::default()), - commit_version: 0, + error: Some(kvrpcpb::KeyError::default()).into(), + ..Default::default() }); assert!(resp.key_errors().is_some());