Compare commits

..

No commits in common. "main" and "release/v2.290.0" have entirely different histories.

249 changed files with 3980 additions and 3046 deletions

View File

@ -3,7 +3,7 @@
"build": {
"dockerfile": "Dockerfile",
"args": {
"DEV_VERSION": "v47",
"DEV_VERSION": "v45",
"http_proxy": "${localEnv:http_proxy}",
"https_proxy": "${localEnv:https_proxy}"
}
@ -23,15 +23,7 @@
"zxh404.vscode-proto3"
],
"settings": {
"files.insertFinalNewline": true,
"[git-commit]": {
"editor.rulers": [
72,
80
],
"editor.wordWrap": "wordWrapColumn",
"editor.wordWrapColumn": 80
}
"files.insertFinalNewline": true
}
}
},

View File

@ -1,156 +0,0 @@
# Linkerd2 Proxy Copilot Instructions
## Code Generation
- Code MUST pass `cargo fmt`.
- Code MUST pass `cargo clippy --all-targets --all-features -- -D warnings`.
- Markdown MUST pass `markdownlint-cli2`.
- Prefer `?` for error propagation.
- Avoid `unwrap()` and `expect()` outside tests.
- Use `tracing` crate macros (`tracing::info!`, etc.) for structured logging.
### Comments
Comments should explain **why**, not **what**. Focus on high-level rationale and
design intent at the function or block level, rather than line-by-line
descriptions.
- Use comments to capture:
- System-facing or interface-level concerns
- Key invariants, preconditions, and postconditions
- Design decisions and trade-offs
- Cross-references to architecture or design documentation
- Avoid:
- Line-by-line commentary explaining obvious code
- Restating what the code already clearly expresses
- For public APIs:
- Use `///` doc comments to describe the contract, behavior, parameters, and
usage examples
- For internal rationale:
- Use `//` comments sparingly to note non-obvious reasoning or edge-case
handling
- Be neutral and factual.
### Rust File Organization
For Rust source files, enforce this layout:
1. **Nonpublic imports**
- Declare all `use` statements for private/internal crates first.
- Group imports to avoid duplicates and do **not** add blank lines between
`use` statements.
2. **Module declarations**
- List all `mod` declarations.
3. **Reexports**
- Follow with `pub use` statements.
4. **Type definitions**
- Define `struct`, `enum`, `type`, and `trait` declarations.
- Sort by visibility: `pub` first, then `pub(crate)`, then private.
- Public types should be documented with `///` comments.
5. **Impl blocks**
- Implement methods in the same order as types above.
- Precede each types `impl` block with a header comment: `// === <TypeName> ===`
6. **Tests**
- End with a `tests` module guarded by `#[cfg(test)]`.
- If the infile test module exceeds 100lines, move it to
`tests/<filename>.rs` as a child integrationtest module.
## Test Generation
- Async tests MUST use `tokio::test`.
- Synchronous tests use `#[test]`.
- Include at least one failingedgecase test per public function.
- Use `tracing::info!` for logging in tests, usually in place of comments.
## Code Review
### Rust
- Point out any `unsafe` blocks and justify their safety.
- Flag functions >50 LOC for refactor suggestions.
- Highlight missing docs on public items.
### Markdown
- Use `markdownlint-cli2` to check for linting errors.
- Lines SHOULD be wrapped at 80 characters.
- Fenced code blocks MUST include a language identifier.
### Copilot Instructions
- Start each instruction with an imperative, presenttense verb.
- Keep each instruction under 120 characters.
- Provide one directive per instruction; avoid combining multiple ideas.
- Use "MUST" and "SHOULD" sparingly to emphasize critical rules.
- Avoid semicolons and complex punctuation within bullets.
- Do not reference external links, documents, or specific coding standards.
## Commit Messages
Commits follow the Conventional Commits specification:
### Subject
Subjects are in the form: `<type>[optional scope]: <description>`
- **Type**: feat, fix, docs, refactor, test, chore, ci, build, perf, revert
(others by agreement)
- **Scope**: optional, lowercase; may include `/` to denote submodules (e.g.
`http/detect`)
- **Description**: imperative mood, present tense, no trailing period
- MUST be less than 72 characters
- Omit needless words!
### Body
Non-trivial commits SHOULD include a body summarizing the change.
- Explain *why* the change was needed.
- Describe *what* was done at a high level.
- Use present-tense narration.
- Use complete sentences, paragraphs, and punctuation.
- Preceded by a blank line.
- Wrapped at 80 characters.
- Omit needless words!
### Breaking changes
If the change introduces a backwards-incompatible change, it MUST be marked as
such.
- Indicated by `!` after the type/scope (e.g. `feat(inbound)!: …`)
- Optionally including a `BREAKING CHANGE:` section in the footer explaining the
change in behavior.
### Examples
```text
feat(auth): add JWT refresh endpoint
There is currently no way to refresh a JWT token.
This exposes a new `/refresh` route that returns a refreshed token.
```
```text
feat(api)!: remove deprecated v1 routes
The `/v1/*` endpoints have been deprecated for a long time and are no
longer called by clients.
This change removes the `/v1/*` endpoints and all associated code,
including integration tests and documentation.
BREAKING CHANGE: The previously-deprecated `/v1/*` endpoints were removed.
```
## Pull Requests
- The subject line MUST be in the conventional commit format.
- Autogenerate a PR body summarizing the problem, solution, and verification steps.
- List breaking changes under a separate **Breaking Changes** heading.

View File

@ -21,7 +21,6 @@ updates:
groups:
boring:
patterns:
- "tokio-boring"
- "boring*"
futures:
patterns:
@ -44,9 +43,6 @@ updates:
- "tokio-rustls"
- "rustls*"
- "ring"
symbolic:
patterns:
- "symbolic-*"
tracing:
patterns:
- "tracing*"

View File

@ -22,13 +22,13 @@ permissions:
jobs:
build:
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
container: ghcr.io/linkerd/dev:v47-rust
runs-on: ubuntu-24.04
container: ghcr.io/linkerd/dev:v45-rust
timeout-minutes: 20
continue-on-error: true
steps:
- run: rustup toolchain install --profile=minimal beta
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- run: git config --global --add safe.directory "$PWD" # actions/runner#2033
- run: just toolchain=beta fetch
- run: just toolchain=beta build

View File

@ -21,11 +21,11 @@ env:
jobs:
meta:
timeout-minutes: 5
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- id: changed
uses: tj-actions/changed-files@ed68ef82c095e0d48ec87eccea555d944a631a4c
uses: tj-actions/changed-files@b74df86ccb65173a8e33ba5492ac1a2ca6b216fd
with:
files: |
.codecov.yml
@ -40,19 +40,19 @@ jobs:
codecov:
needs: meta
if: (github.event_name == 'push' && github.ref == 'refs/heads/main') || needs.meta.outputs.any_changed == 'true'
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
runs-on: ubuntu-24.04
timeout-minutes: 30
container:
image: docker://ghcr.io/linkerd/dev:v47-rust
image: docker://ghcr.io/linkerd/dev:v45-rust
options: --security-opt seccomp=unconfined # 🤷
env:
CXX: "/usr/bin/clang++-19"
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- uses: Swatinem/rust-cache@98c8021b550208e191a6a3145459bfc9fb29c4c0
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6
- run: cargo tarpaulin --locked --workspace --exclude=linkerd2-proxy --exclude=linkerd-transport-header --exclude=opencensus-proto --exclude=spire-proto --no-run
- run: cargo tarpaulin --locked --workspace --exclude=linkerd2-proxy --exclude=linkerd-transport-header --exclude=opencensus-proto --exclude=spire-proto --skip-clean --ignore-tests --no-fail-fast --out=Xml
# Some tests are especially flakey in coverage tests. That's fine. We
# only really care to measure how much of our codebase is covered.
continue-on-error: true
- uses: codecov/codecov-action@18283e04ce6e62d37312384ff67231eb8fd56d24
- uses: codecov/codecov-action@0565863a31f2c772f9f0395002a31e3f06189574

View File

@ -26,13 +26,13 @@ permissions:
jobs:
list-changed:
timeout-minutes: 3
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
container: docker://rust:1.88.0
runs-on: ubuntu-24.04
container: docker://rust:1.83.0
steps:
- run: apt update && apt install -y jo
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- run: git config --global --add safe.directory "$PWD" # actions/runner#2033
- uses: tj-actions/changed-files@ed68ef82c095e0d48ec87eccea555d944a631a4c
- uses: tj-actions/changed-files@b74df86ccb65173a8e33ba5492ac1a2ca6b216fd
id: changed-files
- name: list changed crates
id: list-changed
@ -47,15 +47,15 @@ jobs:
build:
needs: [list-changed]
timeout-minutes: 40
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
container: docker://rust:1.88.0
runs-on: ubuntu-24.04
container: docker://rust:1.83.0
strategy:
matrix:
dir: ${{ fromJson(needs.list-changed.outputs.dirs) }}
steps:
- run: rustup toolchain add nightly
- run: cargo install cargo-fuzz
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- run: git config --global --add safe.directory "$PWD" # actions/runner#2033
- working-directory: ${{matrix.dir}}
run: cargo +nightly fetch

View File

@ -12,9 +12,9 @@ on:
jobs:
markdownlint:
timeout-minutes: 5
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- uses: DavidAnson/markdownlint-cli2-action@992badcdf24e3b8eb7e87ff9287fe931bcb00c6e
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- uses: DavidAnson/markdownlint-cli2-action@05f32210e84442804257b2a6f20b273450ec8265
with:
globs: "**/*.md"

View File

@ -22,13 +22,13 @@ permissions:
jobs:
build:
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
container: ghcr.io/linkerd/dev:v47-rust
runs-on: ubuntu-24.04
container: ghcr.io/linkerd/dev:v45-rust
timeout-minutes: 20
continue-on-error: true
steps:
- run: rustup toolchain install --profile=minimal nightly
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- run: git config --global --add safe.directory "$PWD" # actions/runner#2033
- run: just toolchain=nightly fetch
- run: just toolchain=nightly profile=release build

View File

@ -14,24 +14,24 @@ concurrency:
jobs:
meta:
timeout-minutes: 5
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- id: build
uses: tj-actions/changed-files@ed68ef82c095e0d48ec87eccea555d944a631a4c
uses: tj-actions/changed-files@b74df86ccb65173a8e33ba5492ac1a2ca6b216fd
with:
files: |
.github/workflows/pr.yml
justfile
Dockerfile
- id: actions
uses: tj-actions/changed-files@ed68ef82c095e0d48ec87eccea555d944a631a4c
uses: tj-actions/changed-files@b74df86ccb65173a8e33ba5492ac1a2ca6b216fd
with:
files: |
.github/workflows/**
.devcontainer/*
- id: cargo
uses: tj-actions/changed-files@ed68ef82c095e0d48ec87eccea555d944a631a4c
uses: tj-actions/changed-files@b74df86ccb65173a8e33ba5492ac1a2ca6b216fd
with:
files_ignore: "Cargo.toml"
files: |
@ -40,7 +40,7 @@ jobs:
if: steps.cargo.outputs.any_changed == 'true'
run: ./.github/list-crates.sh ${{ steps.cargo.outputs.all_changed_files }}
- id: rust
uses: tj-actions/changed-files@ed68ef82c095e0d48ec87eccea555d944a631a4c
uses: tj-actions/changed-files@b74df86ccb65173a8e33ba5492ac1a2ca6b216fd
with:
files: |
**/*.rs
@ -57,7 +57,7 @@ jobs:
info:
timeout-minutes: 3
needs: meta
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
runs-on: ubuntu-24.04
steps:
- name: Info
run: |
@ -74,25 +74,25 @@ jobs:
actions:
needs: meta
if: needs.meta.outputs.actions_changed == 'true'
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
runs-on: ubuntu-24.04
steps:
- uses: linkerd/dev/actions/setup-tools@v47
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- uses: linkerd/dev/actions/setup-tools@v45
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- run: just action-lint
- run: just action-dev-check
rust:
needs: meta
if: needs.meta.outputs.cargo_changed == 'true' || needs.meta.outputs.rust_changed == 'true'
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
container: ghcr.io/linkerd/dev:v47-rust
runs-on: ubuntu-24.04
container: ghcr.io/linkerd/dev:v45-rust
permissions:
contents: read
timeout-minutes: 20
steps:
- run: git config --global --add safe.directory "$PWD" # actions/runner#2033
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- uses: Swatinem/rust-cache@98c8021b550208e191a6a3145459bfc9fb29c4c0
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6
- run: just fetch
- run: cargo deny --all-features check bans licenses sources
- run: just check-fmt
@ -107,15 +107,15 @@ jobs:
needs: meta
if: needs.meta.outputs.cargo_changed == 'true'
timeout-minutes: 20
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
container: ghcr.io/linkerd/dev:v47-rust
runs-on: ubuntu-24.04
container: ghcr.io/linkerd/dev:v45-rust
strategy:
matrix:
crate: ${{ fromJson(needs.meta.outputs.cargo_crates) }}
steps:
- run: git config --global --add safe.directory "$PWD" # actions/runner#2033
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- uses: Swatinem/rust-cache@98c8021b550208e191a6a3145459bfc9fb29c4c0
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6
- run: just fetch
- run: just check-crate ${{ matrix.crate }}
@ -123,11 +123,11 @@ jobs:
needs: meta
if: needs.meta.outputs.cargo_changed == 'true' || needs.meta.outputs.rust_changed == 'true'
timeout-minutes: 20
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
runs-on: ubuntu-24.04
env:
WAIT_TIMEOUT: 2m
steps:
- uses: linkerd/dev/actions/setup-tools@v47
- uses: linkerd/dev/actions/setup-tools@v45
- name: scurl https://run.linkerd.io/install-edge | sh
run: |
scurl https://run.linkerd.io/install-edge | sh
@ -136,7 +136,7 @@ jobs:
tag=$(linkerd version --client --short)
echo "linkerd $tag"
echo "LINKERD_TAG=$tag" >> "$GITHUB_ENV"
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- run: just docker
- run: just k3d-create
- run: just k3d-load-linkerd
@ -149,7 +149,7 @@ jobs:
timeout-minutes: 3
needs: [meta, actions, rust, rust-crates, linkerd-install]
if: always()
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
runs-on: ubuntu-24.04
permissions:
contents: write
@ -168,7 +168,7 @@ jobs:
if: contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled')
run: exit 1
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
if: needs.meta.outputs.is_dependabot == 'true' && needs.meta.outputs.any_changed == 'true'
- name: "Merge dependabot changes"
if: needs.meta.outputs.is_dependabot == 'true' && needs.meta.outputs.any_changed == 'true'

View File

@ -13,7 +13,7 @@ concurrency:
jobs:
last-release:
if: github.repository == 'linkerd/linkerd2-proxy' # Don't run this in forks.
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
runs-on: ubuntu-24.04
timeout-minutes: 5
env:
GH_REPO: ${{ github.repository }}
@ -41,10 +41,10 @@ jobs:
last-commit:
needs: last-release
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
runs-on: ubuntu-24.04
timeout-minutes: 5
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- name: Check if the most recent commit is after the last release
id: recency
env:
@ -62,7 +62,7 @@ jobs:
trigger-release:
needs: [last-release, last-commit]
if: needs.last-release.outputs.recent == 'false' && needs.last-commit.outputs.after-release == 'true'
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
runs-on: ubuntu-24.04
timeout-minutes: 5
permissions:
actions: write

View File

@ -46,7 +46,6 @@ on:
default: true
env:
CARGO: "cargo auditable"
CARGO_INCREMENTAL: 0
CARGO_NET_RETRY: 10
RUSTFLAGS: "-D warnings -A deprecated --cfg tokio_unstable"
@ -59,25 +58,9 @@ concurrency:
jobs:
meta:
timeout-minutes: 5
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
if: github.event_name == 'pull_request'
- id: workflow
if: github.event_name == 'pull_request'
uses: tj-actions/changed-files@ed68ef82c095e0d48ec87eccea555d944a631a4c
with:
files: |
.github/workflows/release.yml
- id: build
if: github.event_name == 'pull_request'
uses: tj-actions/changed-files@ed68ef82c095e0d48ec87eccea555d944a631a4c
with:
files: |
justfile
Cargo.toml
- id: version
- id: meta
env:
VERSION: ${{ inputs.version }}
shell: bash
@ -85,45 +68,47 @@ jobs:
set -euo pipefail
shopt -s extglob
if [[ "$GITHUB_EVENT_NAME" == pull_request ]]; then
echo version="0.0.0-test.${GITHUB_SHA:0:7}" >> "$GITHUB_OUTPUT"
echo version="0.0.0-test.${GITHUB_SHA:0:7}"
echo archs='["amd64"]'
echo oses='["linux"]'
exit 0
fi
fi >> "$GITHUB_OUTPUT"
if ! [[ "$VERSION" =~ ^v[0-9]+\.[0-9]+\.[0-9]+(-[0-9A-Za-z-]+)?(\+[0-9A-Za-z-]+)?$ ]]; then
echo "Invalid version: $VERSION" >&2
exit 1
fi
echo version="${VERSION#v}" >> "$GITHUB_OUTPUT"
- id: platform
shell: bash
env:
WORKFLOW_CHANGED: ${{ steps.workflow.outputs.any_changed }}
run: |
if [[ "$GITHUB_EVENT_NAME" == pull_request && "$WORKFLOW_CHANGED" != 'true' ]]; then
( echo archs='["amd64"]'
echo oses='["linux"]' ) >> "$GITHUB_OUTPUT"
exit 0
fi
( echo archs='["amd64", "arm64"]'
( echo version="${VERSION#v}"
echo archs='["amd64", "arm64", "arm"]'
echo oses='["linux", "windows"]'
) >> "$GITHUB_OUTPUT"
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
if: github.event_name == 'pull_request'
- id: changed
if: github.event_name == 'pull_request'
uses: tj-actions/changed-files@b74df86ccb65173a8e33ba5492ac1a2ca6b216fd
with:
files: |
.github/workflows/release.yml
justfile
Cargo.toml
outputs:
archs: ${{ steps.platform.outputs.archs }}
oses: ${{ steps.platform.outputs.oses }}
version: ${{ steps.version.outputs.version }}
package: ${{ github.event_name == 'workflow_dispatch' || steps.build.outputs.any_changed == 'true' || steps.workflow.outputs.any_changed == 'true' }}
archs: ${{ steps.meta.outputs.archs }}
oses: ${{ steps.meta.outputs.oses }}
version: ${{ steps.meta.outputs.version }}
package: ${{ github.event_name == 'workflow_dispatch' || steps.changed.outputs.any_changed == 'true' }}
profile: ${{ inputs.profile || 'release' }}
publish: ${{ inputs.publish }}
ref: ${{ inputs.ref || github.sha }}
tag: "${{ inputs.tag-prefix || 'release/' }}v${{ steps.version.outputs.version }}"
tag: "${{ inputs.tag-prefix || 'release/' }}v${{ steps.meta.outputs.version }}"
prerelease: ${{ inputs.prerelease }}
draft: ${{ inputs.draft }}
latest: ${{ inputs.latest }}
info:
needs: meta
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
runs-on: ubuntu-24.04
timeout-minutes: 3
steps:
- name: Inputs
@ -149,13 +134,15 @@ jobs:
exclude:
- os: windows
arch: arm64
- os: windows
arch: arm
# If we're not actually building on a release tag, don't short-circuit on
# errors. This helps us know whether a failure is platform-specific.
continue-on-error: ${{ needs.meta.outputs.publish != 'true' }}
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
runs-on: ubuntu-24.04
timeout-minutes: 40
container: docker://ghcr.io/linkerd/dev:v47-rust-musl
container: docker://ghcr.io/linkerd/dev:v45-rust-musl
env:
LINKERD2_PROXY_VENDOR: ${{ github.repository_owner }}
LINKERD2_PROXY_VERSION: ${{ needs.meta.outputs.version }}
@ -163,19 +150,15 @@ jobs:
# TODO: add to dev image
- name: Install MiniGW
if: matrix.os == 'windows'
run: apt-get update && apt-get install -y mingw-w64
- name: Install cross compilation toolchain
if: matrix.arch == 'arm64'
run: apt-get update && apt-get install -y binutils-aarch64-linux-gnu
run: apt-get update && apt-get install mingw-w64 -y
- name: Configure git
run: git config --global --add safe.directory "$PWD" # actions/runner#2033
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
with:
ref: ${{ needs.meta.outputs.ref }}
- uses: Swatinem/rust-cache@98c8021b550208e191a6a3145459bfc9fb29c4c0
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6
with:
key: ${{ matrix.os }}-${{ matrix.arch }}
key: ${{ matrix.arch }}
- run: just fetch
- run: just arch=${{ matrix.arch }} libc=${{ matrix.libc }} os=${{ matrix.os }} rustup
- run: just arch=${{ matrix.arch }} libc=${{ matrix.libc }} os=${{ matrix.os }} profile=${{ needs.meta.outputs.profile }} build
@ -187,7 +170,7 @@ jobs:
publish:
needs: [meta, package]
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
runs-on: ubuntu-24.04
timeout-minutes: 5
permissions:
actions: write
@ -204,13 +187,13 @@ jobs:
git config --global user.name "$GITHUB_USERNAME"
git config --global user.email "$GITHUB_USERNAME"@users.noreply.github.com
# Tag the release.
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
with:
token: ${{ secrets.LINKERD2_PROXY_GITHUB_TOKEN || github.token }}
ref: ${{ needs.meta.outputs.ref }}
- run: git tag -a -m "$VERSION" "$TAG"
# Fetch the artifacts.
- uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093
- uses: actions/download-artifact@95815c38cf2ff2164869cbab79da8d1f422bc89e
with:
path: artifacts
- run: du -h artifacts/**/*
@ -218,7 +201,7 @@ jobs:
- if: needs.meta.outputs.publish == 'true'
run: git push origin "$TAG"
- if: needs.meta.outputs.publish == 'true'
uses: softprops/action-gh-release@72f2c25fcb47643c292f7107632f7a47c1df5cd8
uses: softprops/action-gh-release@c95fe1489396fe8a9eb87c0abf8aa5b2ef267fda
with:
name: ${{ env.VERSION }}
tag_name: ${{ env.TAG }}
@ -242,7 +225,7 @@ jobs:
needs: publish
if: always()
timeout-minutes: 3
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
runs-on: ubuntu-24.04
steps:
- name: Results
run: |

View File

@ -13,8 +13,8 @@ on:
jobs:
sh-lint:
timeout-minutes: 5
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
runs-on: ubuntu-24.04
steps:
- uses: linkerd/dev/actions/setup-tools@v47
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- uses: linkerd/dev/actions/setup-tools@v45
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- run: just sh-lint

View File

@ -13,10 +13,10 @@ permissions:
jobs:
devcontainer:
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
container: ghcr.io/linkerd/dev:v47-rust
runs-on: ubuntu-24.04
container: ghcr.io/linkerd/dev:v45-rust
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- run: git config --global --add safe.directory "$PWD" # actions/runner#2033
- run: |
VERSION_REGEX='channel = "([0-9]+\.[0-9]+\.[0-9]+)"'
@ -35,10 +35,10 @@ jobs:
workflows:
runs-on: ${{ vars.LINKERD2_PROXY_RUNNER || 'ubuntu-24.04' }}
runs-on: ubuntu-24.04
steps:
- uses: linkerd/dev/actions/setup-tools@v47
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- uses: linkerd/dev/actions/setup-tools@v45
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- shell: bash
run: |
VERSION_REGEX='channel = "([0-9]+\.[0-9]+\.[0-9]+)"'

1230
Cargo.lock

File diff suppressed because it is too large Load Diff

View File

@ -42,6 +42,8 @@ members = [
"linkerd/idle-cache",
"linkerd/io",
"linkerd/meshtls",
"linkerd/meshtls/boring",
"linkerd/meshtls/rustls",
"linkerd/meshtls/verifier",
"linkerd/metrics",
"linkerd/mock/http-body",
@ -69,7 +71,6 @@ members = [
"linkerd/reconnect",
"linkerd/retry",
"linkerd/router",
"linkerd/rustls",
"linkerd/service-profiles",
"linkerd/signal",
"linkerd/stack",
@ -114,14 +115,14 @@ prost = { version = "0.13" }
prost-build = { version = "0.13", default-features = false }
prost-types = { version = "0.13" }
tokio-rustls = { version = "0.26", default-features = false, features = [
"ring",
"logging",
] }
tonic = { version = "0.13", default-features = false }
tonic-build = { version = "0.13", default-features = false }
tonic = { version = "0.12", default-features = false }
tonic-build = { version = "0.12", default-features = false }
tower = { version = "0.5", default-features = false }
tower-service = { version = "0.3" }
tower-test = { version = "0.4" }
tracing = { version = "0.1" }
[workspace.dependencies.http-body-util]
version = "0.1.3"
@ -134,4 +135,4 @@ default-features = false
features = ["tokio", "tracing"]
[workspace.dependencies.linkerd2-proxy-api]
version = "0.17.0"
version = "0.16.0"

View File

@ -3,7 +3,7 @@
# This is intended **DEVELOPMENT ONLY**, i.e. so that proxy developers can
# easily test the proxy in the context of the larger `linkerd2` project.
ARG RUST_IMAGE=ghcr.io/linkerd/dev:v47-rust
ARG RUST_IMAGE=ghcr.io/linkerd/dev:v45-rust
# Use an arbitrary ~recent edge release image to get the proxy
# identity-initializing and linkerd-await wrappers.
@ -14,16 +14,11 @@ FROM $LINKERD2_IMAGE as linkerd2
FROM --platform=$BUILDPLATFORM $RUST_IMAGE as fetch
ARG PROXY_FEATURES=""
ARG TARGETARCH="amd64"
RUN apt-get update && \
apt-get install -y time && \
if [[ "$PROXY_FEATURES" =~ .*meshtls-boring.* ]] ; then \
apt-get install -y golang ; \
fi && \
case "$TARGETARCH" in \
amd64) true ;; \
arm64) apt-get install --no-install-recommends -y binutils-aarch64-linux-gnu ;; \
esac && \
rm -rf /var/lib/apt/lists/*
ENV CARGO_NET_RETRY=10
@ -38,6 +33,7 @@ RUN --mount=type=cache,id=cargo,target=/usr/local/cargo/registry \
FROM fetch as build
ENV CARGO_INCREMENTAL=0
ENV RUSTFLAGS="-D warnings -A deprecated --cfg tokio_unstable"
ARG TARGETARCH="amd64"
ARG PROFILE="release"
ARG LINKERD2_PROXY_VERSION=""
ARG LINKERD2_PROXY_VENDOR=""

View File

@ -86,9 +86,8 @@ minutes to review our [code of conduct][coc].
We test our code by way of fuzzing and this is described in [FUZZING.md](/docs/FUZZING.md).
A third party security audit focused on fuzzing Linkerd2-proxy was performed by
Ada Logics in 2021. The
[full report](/docs/reports/linkerd2-proxy-fuzzing-report.pdf) can be found in
the `docs/reports/` directory.
Ada Logics in 2021. The full report is available
[here](/docs/reports/linkerd2-proxy-fuzzing-report.pdf).
## License

View File

@ -2,6 +2,7 @@
targets = [
{ triple = "x86_64-unknown-linux-gnu" },
{ triple = "aarch64-unknown-linux-gnu" },
{ triple = "armv7-unknown-linux-gnu" },
]
[advisories]
@ -25,12 +26,17 @@ confidence-threshold = 0.8
exceptions = [
{ allow = [
"ISC",
"MIT",
"OpenSSL",
], name = "aws-lc-sys", version = "*" },
{ allow = [
"ISC",
"OpenSSL",
], name = "aws-lc-fips-sys", version = "*" },
], name = "ring", version = "*" },
]
[[licenses.clarify]]
name = "ring"
version = "*"
expression = "MIT AND ISC AND OpenSSL"
license-files = [
{ path = "LICENSE", hash = 0xbd0eed23 },
]
[bans]
@ -42,8 +48,6 @@ deny = [
{ name = "rustls", wrappers = ["tokio-rustls"] },
# rustls-webpki should be used instead.
{ name = "webpki" },
# aws-lc-rs should be used instead.
{ name = "ring" }
]
skip = [
# `linkerd-trace-context`, `rustls-pemfile` and `tonic` depend on `base64`
@ -62,10 +66,6 @@ skip-tree = [
{ name = "rustix", version = "0.38" },
# `pprof` uses a number of old dependencies. for now, we skip its subtree.
{ name = "pprof" },
# aws-lc-rs uses a slightly outdated version of bindgen
{ name = "bindgen", version = "0.69.5" },
# socket v0.6 is still propagating through the ecosystem
{ name = "socket2", version = "0.5" },
]
[sources]

View File

@ -12,12 +12,9 @@ engine.
We place the fuzz tests into folders within the individual crates that the fuzz
tests target. For example, we have a fuzz test that that target the crate
`/linkerd/addr` and the code in `/linkerd/addr/src` and thus the fuzz test that
targets this crate is put in `/linkerd/addr/fuzz`.
The folder structure for each of the fuzz tests is automatically generated by
`cargo fuzz init`. See cargo fuzz's
[`README.md`](https://github.com/rust-fuzz/cargo-fuzz#cargo-fuzz-init) for more
information.
targets this crate is put in `/linkerd/addr/fuzz`. The folder set up we use for
each of the fuzz tests is automatically generated by `cargo fuzz init`
(described [here](https://github.com/rust-fuzz/cargo-fuzz#cargo-fuzz-init)).
### Fuzz targets
@ -99,5 +96,6 @@ unit-test-like fuzzers, but are essentially just more substantial in nature. The
idea behind these fuzzers is to test end-to-end concepts more so than individual
components of the proxy.
The [inbound fuzzer](/linkerd/app/inbound/fuzz/fuzz_targets/fuzz_target_1.rs)
is an example of this.
The inbound fuzzer
[here](/linkerd/app/inbound/fuzz/fuzz_targets/fuzz_target_1.rs) is an example of
this.

View File

@ -18,10 +18,6 @@ features := ""
export LINKERD2_PROXY_VERSION := env_var_or_default("LINKERD2_PROXY_VERSION", "0.0.0-dev" + `git rev-parse --short HEAD`)
export LINKERD2_PROXY_VENDOR := env_var_or_default("LINKERD2_PROXY_VENDOR", `whoami` + "@" + `hostname`)
# TODO: these variables will be included in dev v48
export AWS_LC_SYS_CFLAGS_aarch64_unknown_linux_gnu := env_var_or_default("AWS_LC_SYS_CFLAGS_aarch64_unknown_linux_gnu", "-fuse-ld=/usr/aarch64-linux-gnu/bin/ld")
export AWS_LC_SYS_CFLAGS_aarch64_unknown_linux_musl := env_var_or_default("AWS_LC_SYS_CFLAGS_aarch64_unknown_linux_musl", "-fuse-ld=/usr/aarch64-linux-gnu/bin/ld")
# The version name to use for packages.
package_version := "v" + LINKERD2_PROXY_VERSION
@ -30,7 +26,7 @@ docker-repo := "localhost/linkerd/proxy"
docker-tag := `git rev-parse --abbrev-ref HEAD | sed 's|/|.|g'` + "." + `git rev-parse --short HEAD`
docker-image := docker-repo + ":" + docker-tag
# The architecture name to use for packages. Either 'amd64' or 'arm64'.
# The architecture name to use for packages. Either 'amd64', 'arm64', or 'arm'.
arch := "amd64"
# The OS name to use for packages. Either 'linux' or 'windows'.
os := "linux"
@ -43,6 +39,8 @@ _target := if os + '-' + arch == "linux-amd64" {
"x86_64-unknown-linux-" + libc
} else if os + '-' + arch == "linux-arm64" {
"aarch64-unknown-linux-" + libc
} else if os + '-' + arch == "linux-arm" {
"armv7-unknown-linux-" + libc + "eabihf"
} else if os + '-' + arch == "windows-amd64" {
"x86_64-pc-windows-" + libc
} else {
@ -141,7 +139,7 @@ _strip:
_package_bin := _package_dir / "bin" / "linkerd2-proxy"
# XXX aarch64-musl builds do not enable PIE, so we use target-specific
# XXX {aarch64,arm}-musl builds do not enable PIE, so we use target-specific
# files to document those differences.
_expected_checksec := '.checksec' / arch + '-' + libc + '.json'

View File

@ -13,7 +13,7 @@ cargo-fuzz = true
libfuzzer-sys = "0.4"
linkerd-addr = { path = ".." }
linkerd-tracing = { path = "../../tracing", features = ["ansi"] }
tracing = { workspace = true }
tracing = "0.1"
# Prevent this from interfering with workspaces
[workspace]

View File

@ -100,11 +100,15 @@ impl Addr {
// them ourselves.
format!("[{}]", a.ip())
};
http::uri::Authority::from_str(&ip)
.unwrap_or_else(|err| panic!("SocketAddr ({a}) must be valid authority: {err}"))
http::uri::Authority::from_str(&ip).unwrap_or_else(|err| {
panic!("SocketAddr ({}) must be valid authority: {}", a, err)
})
}
Addr::Socket(a) => {
http::uri::Authority::from_str(&a.to_string()).unwrap_or_else(|err| {
panic!("SocketAddr ({}) must be valid authority: {}", a, err)
})
}
Addr::Socket(a) => http::uri::Authority::from_str(&a.to_string())
.unwrap_or_else(|err| panic!("SocketAddr ({a}) must be valid authority: {err}")),
}
}
@ -261,14 +265,14 @@ mod tests {
];
for (host, expected_result) in cases {
let a = Addr::from_str(host).unwrap();
assert_eq!(a.is_loopback(), *expected_result, "{host:?}")
assert_eq!(a.is_loopback(), *expected_result, "{:?}", host)
}
}
fn test_to_http_authority(cases: &[&str]) {
let width = cases.iter().map(|s| s.len()).max().unwrap_or(0);
for host in cases {
print!("trying {host:width$} ... ");
print!("trying {:1$} ... ", host, width);
Addr::from_str(host).unwrap().to_http_authority();
println!("ok");
}

View File

@ -36,4 +36,4 @@ tokio = { version = "1", features = ["rt"] }
tokio-stream = { version = "0.1", features = ["time", "sync"] }
tonic = { workspace = true, default-features = false, features = ["prost"] }
tower = { workspace = true }
tracing = { workspace = true }
tracing = "0.1"

View File

@ -22,12 +22,12 @@ http-body = { workspace = true }
http-body-util = { workspace = true }
hyper = { workspace = true, features = ["http1", "http2"] }
futures = { version = "0.3", default-features = false }
pprof = { version = "0.15", optional = true, features = ["prost-codec"] }
pprof = { version = "0.14", optional = true, features = ["prost-codec"] }
serde = "1"
serde_json = "1"
thiserror = "2"
tokio = { version = "1", features = ["macros", "sync", "parking_lot"] }
tracing = { workspace = true }
tracing = "0.1"
linkerd-app-core = { path = "../core" }
linkerd-app-inbound = { path = "../inbound" }

View File

@ -13,7 +13,7 @@
use futures::future::{self, TryFutureExt};
use http::StatusCode;
use linkerd_app_core::{
metrics::{self as metrics, legacy::FmtMetrics},
metrics::{self as metrics, FmtMetrics},
proxy::http::{Body, BoxBody, ClientHandle, Request, Response},
trace, Error, Result,
};
@ -32,7 +32,7 @@ pub use self::readiness::{Latch, Readiness};
#[derive(Clone)]
pub struct Admin<M> {
metrics: metrics::legacy::Serve<M>,
metrics: metrics::Serve<M>,
tracing: trace::Handle,
ready: Readiness,
shutdown_tx: mpsc::UnboundedSender<()>,
@ -52,7 +52,7 @@ impl<M> Admin<M> {
tracing: trace::Handle,
) -> Self {
Self {
metrics: metrics::legacy::Serve::new(metrics),
metrics: metrics::Serve::new(metrics),
ready,
shutdown_tx,
enable_shutdown,

View File

@ -27,7 +27,7 @@ where
.into_body()
.collect()
.await
.map_err(io::Error::other)?
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
.aggregate();
match level.set_from(body.chunk()) {
Ok(_) => mk_rsp(StatusCode::NO_CONTENT, BoxBody::empty()),

View File

@ -2,7 +2,7 @@ use linkerd_app_core::{
classify,
config::ServerConfig,
drain, errors, identity,
metrics::{self, legacy::FmtMetrics},
metrics::{self, FmtMetrics},
proxy::http,
serve,
svc::{self, ExtractParam, InsertParam, Param},
@ -214,7 +214,7 @@ impl Config {
impl Param<transport::labels::Key> for Tcp {
fn param(&self) -> transport::labels::Key {
transport::labels::Key::inbound_server(
self.tls.as_ref().map(|t| t.labels()),
self.tls.clone(),
self.addr.into(),
self.policy.server_label(),
)
@ -272,7 +272,7 @@ impl Param<metrics::ServerLabel> for Http {
impl Param<metrics::EndpointLabels> for Permitted {
fn param(&self) -> metrics::EndpointLabels {
metrics::InboundEndpointLabels {
tls: self.http.tcp.tls.as_ref().map(|t| t.labels()),
tls: self.http.tcp.tls.clone(),
authority: None,
target_addr: self.http.tcp.addr.into(),
policy: self.permit.labels.clone(),

View File

@ -13,23 +13,31 @@ independently of the inbound and outbound proxy logic.
"""
[dependencies]
bytes = { workspace = true }
drain = { workspace = true, features = ["retain"] }
http = { workspace = true }
http-body = { workspace = true }
http-body-util = { workspace = true }
hyper = { workspace = true, features = ["http1", "http2"] }
hyper-util = { workspace = true }
futures = { version = "0.3", default-features = false }
ipnet = "2.11"
prometheus-client = { workspace = true }
regex = "1"
serde_json = "1"
thiserror = "2"
tokio = { version = "1", features = ["macros", "sync", "parking_lot"] }
tokio-stream = { version = "0.1", features = ["time"] }
tonic = { workspace = true, default-features = false, features = ["prost"] }
tracing = { workspace = true }
tracing = "0.1"
parking_lot = "0.12"
pin-project = "1"
linkerd-addr = { path = "../../addr" }
linkerd-conditional = { path = "../../conditional" }
linkerd-dns = { path = "../../dns" }
linkerd-duplex = { path = "../../duplex" }
linkerd-errno = { path = "../../errno" }
linkerd-error = { path = "../../error" }
linkerd-error-respond = { path = "../../error-respond" }
linkerd-exp-backoff = { path = "../../exp-backoff" }
@ -56,7 +64,6 @@ linkerd-proxy-tcp = { path = "../../proxy/tcp" }
linkerd-proxy-transport = { path = "../../proxy/transport" }
linkerd-reconnect = { path = "../../reconnect" }
linkerd-router = { path = "../../router" }
linkerd-rustls = { path = "../../rustls" }
linkerd-service-profiles = { path = "../../service-profiles" }
linkerd-stack = { path = "../../stack" }
linkerd-stack-metrics = { path = "../../stack/metrics" }
@ -76,6 +83,5 @@ features = ["make", "spawn-ready", "timeout", "util", "limit"]
semver = "1"
[dev-dependencies]
bytes = { workspace = true }
http-body-util = { workspace = true }
linkerd-mock-http-body = { path = "../../mock/http-body" }
quickcheck = { version = "1", default-features = false }

View File

@ -4,11 +4,11 @@ fn set_env(name: &str, cmd: &mut Command) {
let value = match cmd.output() {
Ok(output) => String::from_utf8(output.stdout).unwrap(),
Err(err) => {
println!("cargo:warning={err}");
println!("cargo:warning={}", err);
"".to_string()
}
};
println!("cargo:rustc-env={name}={value}");
println!("cargo:rustc-env={}={}", name, value);
}
fn version() -> String {

View File

@ -1,4 +1,5 @@
use crate::profiles;
pub use classify::gate;
use linkerd_error::Error;
use linkerd_proxy_client_policy as client_policy;
use linkerd_proxy_http::{classify, HasH2Reason, ResponseTimeoutError};
@ -213,7 +214,7 @@ fn h2_error(err: &Error) -> String {
if let Some(reason) = err.h2_reason() {
// This should output the error code in the same format as the spec,
// for example: PROTOCOL_ERROR
format!("h2({reason:?})")
format!("h2({:?})", reason)
} else {
trace!("classifying found non-h2 error: {:?}", err);
String::from("unclassified")

View File

@ -101,7 +101,7 @@ impl Config {
identity: identity::NewClient,
) -> svc::ArcNewService<
(),
svc::BoxCloneSyncService<http::Request<tonic::body::Body>, http::Response<RspBody>>,
svc::BoxCloneSyncService<http::Request<tonic::body::BoxBody>, http::Response<RspBody>>,
> {
let addr = self.addr;
tracing::trace!(%addr, "Building");

View File

@ -25,7 +25,6 @@ pub mod metrics;
pub mod proxy;
pub mod serve;
pub mod svc;
pub mod tls_info;
pub mod transport;
pub use self::build_info::{BuildInfo, BUILD_INFO};

View File

@ -54,7 +54,7 @@ pub struct Proxy {
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct ControlLabels {
addr: Addr,
server_id: tls::ConditionalClientTlsLabels,
server_id: tls::ConditionalClientTls,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
@ -65,7 +65,7 @@ pub enum EndpointLabels {
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct InboundEndpointLabels {
pub tls: tls::ConditionalServerTlsLabels,
pub tls: tls::ConditionalServerTls,
pub authority: Option<http::uri::Authority>,
pub target_addr: SocketAddr,
pub policy: RouteAuthzLabels,
@ -98,7 +98,7 @@ pub struct RouteAuthzLabels {
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct OutboundEndpointLabels {
pub server_id: tls::ConditionalClientTlsLabels,
pub server_id: tls::ConditionalClientTls,
pub authority: Option<http::uri::Authority>,
pub labels: Option<String>,
pub zone_locality: OutboundZoneLocality,
@ -155,10 +155,10 @@ where
I: Iterator<Item = (&'i String, &'i String)>,
{
let (k0, v0) = labels_iter.next()?;
let mut out = format!("{prefix}_{k0}=\"{v0}\"");
let mut out = format!("{}_{}=\"{}\"", prefix, k0, v0);
for (k, v) in labels_iter {
write!(out, ",{prefix}_{k}=\"{v}\"").expect("label concat must succeed");
write!(out, ",{}_{}=\"{}\"", prefix, k, v).expect("label concat must succeed");
}
Some(out)
}
@ -166,7 +166,7 @@ where
// === impl Metrics ===
impl Metrics {
pub fn new(retain_idle: Duration) -> (Self, impl legacy::FmtMetrics + Clone + Send + 'static) {
pub fn new(retain_idle: Duration) -> (Self, impl FmtMetrics + Clone + Send + 'static) {
let (control, control_report) = {
let m = http_metrics::Requests::<ControlLabels, Class>::default();
let r = m.clone().into_report(retain_idle).with_prefix("control");
@ -223,7 +223,6 @@ impl Metrics {
opentelemetry,
};
use legacy::FmtMetrics as _;
let report = endpoint_report
.and_report(profile_route_report)
.and_report(retry_report)
@ -244,17 +243,15 @@ impl svc::Param<ControlLabels> for control::ControlAddr {
fn param(&self) -> ControlLabels {
ControlLabels {
addr: self.addr.clone(),
server_id: self.identity.as_ref().map(tls::ClientTls::labels),
server_id: self.identity.clone(),
}
}
}
impl legacy::FmtLabels for ControlLabels {
impl FmtLabels for ControlLabels {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self { addr, server_id } = self;
write!(f, "addr=\"{addr}\",")?;
TlsConnect::from(server_id).fmt_labels(f)?;
write!(f, "addr=\"{}\",", self.addr)?;
TlsConnect::from(&self.server_id).fmt_labels(f)?;
Ok(())
}
@ -282,19 +279,13 @@ impl ProfileRouteLabels {
}
}
impl legacy::FmtLabels for ProfileRouteLabels {
impl FmtLabels for ProfileRouteLabels {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
direction,
addr,
labels,
} = self;
self.direction.fmt_labels(f)?;
write!(f, ",dst=\"{}\"", self.addr)?;
direction.fmt_labels(f)?;
write!(f, ",dst=\"{addr}\"")?;
if let Some(labels) = labels.as_ref() {
write!(f, ",{labels}")?;
if let Some(labels) = self.labels.as_ref() {
write!(f, ",{}", labels)?;
}
Ok(())
@ -315,7 +306,7 @@ impl From<OutboundEndpointLabels> for EndpointLabels {
}
}
impl legacy::FmtLabels for EndpointLabels {
impl FmtLabels for EndpointLabels {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Inbound(i) => (Direction::In, i).fmt_labels(f),
@ -324,36 +315,32 @@ impl legacy::FmtLabels for EndpointLabels {
}
}
impl legacy::FmtLabels for InboundEndpointLabels {
impl FmtLabels for InboundEndpointLabels {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
tls,
authority,
target_addr,
policy,
} = self;
if let Some(a) = authority.as_ref() {
if let Some(a) = self.authority.as_ref() {
Authority(a).fmt_labels(f)?;
write!(f, ",")?;
}
((TargetAddr(*target_addr), TlsAccept::from(tls)), policy).fmt_labels(f)?;
(
(TargetAddr(self.target_addr), TlsAccept::from(&self.tls)),
&self.policy,
)
.fmt_labels(f)?;
Ok(())
}
}
impl legacy::FmtLabels for ServerLabel {
impl FmtLabels for ServerLabel {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self(meta, port) = self;
write!(
f,
"srv_group=\"{}\",srv_kind=\"{}\",srv_name=\"{}\",srv_port=\"{}\"",
meta.group(),
meta.kind(),
meta.name(),
port
self.0.group(),
self.0.kind(),
self.0.name(),
self.1
)
}
}
@ -375,47 +362,41 @@ impl prom::EncodeLabelSetMut for ServerLabel {
}
}
impl legacy::FmtLabels for ServerAuthzLabels {
impl FmtLabels for ServerAuthzLabels {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self { server, authz } = self;
server.fmt_labels(f)?;
self.server.fmt_labels(f)?;
write!(
f,
",authz_group=\"{}\",authz_kind=\"{}\",authz_name=\"{}\"",
authz.group(),
authz.kind(),
authz.name()
self.authz.group(),
self.authz.kind(),
self.authz.name()
)
}
}
impl legacy::FmtLabels for RouteLabels {
impl FmtLabels for RouteLabels {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self { server, route } = self;
server.fmt_labels(f)?;
self.server.fmt_labels(f)?;
write!(
f,
",route_group=\"{}\",route_kind=\"{}\",route_name=\"{}\"",
route.group(),
route.kind(),
route.name(),
self.route.group(),
self.route.kind(),
self.route.name(),
)
}
}
impl legacy::FmtLabels for RouteAuthzLabels {
impl FmtLabels for RouteAuthzLabels {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self { route, authz } = self;
route.fmt_labels(f)?;
self.route.fmt_labels(f)?;
write!(
f,
",authz_group=\"{}\",authz_kind=\"{}\",authz_name=\"{}\"",
authz.group(),
authz.kind(),
authz.name(),
self.authz.group(),
self.authz.kind(),
self.authz.name(),
)
}
}
@ -426,28 +407,19 @@ impl svc::Param<OutboundZoneLocality> for OutboundEndpointLabels {
}
}
impl legacy::FmtLabels for OutboundEndpointLabels {
impl FmtLabels for OutboundEndpointLabels {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
server_id,
authority,
labels,
// TODO(kate): this label is not currently emitted.
zone_locality: _,
target_addr,
} = self;
if let Some(a) = authority.as_ref() {
if let Some(a) = self.authority.as_ref() {
Authority(a).fmt_labels(f)?;
write!(f, ",")?;
}
let ta = TargetAddr(*target_addr);
let tls = TlsConnect::from(server_id);
let ta = TargetAddr(self.target_addr);
let tls = TlsConnect::from(&self.server_id);
(ta, tls).fmt_labels(f)?;
if let Some(labels) = labels.as_ref() {
write!(f, ",{labels}")?;
if let Some(labels) = self.labels.as_ref() {
write!(f, ",{}", labels)?;
}
Ok(())
@ -463,20 +435,19 @@ impl fmt::Display for Direction {
}
}
impl legacy::FmtLabels for Direction {
impl FmtLabels for Direction {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "direction=\"{self}\"")
write!(f, "direction=\"{}\"", self)
}
}
impl legacy::FmtLabels for Authority<'_> {
impl FmtLabels for Authority<'_> {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self(authority) = self;
write!(f, "authority=\"{authority}\"")
write!(f, "authority=\"{}\"", self.0)
}
}
impl legacy::FmtLabels for Class {
impl FmtLabels for Class {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let class = |ok: bool| if ok { "success" } else { "failure" };
@ -498,7 +469,8 @@ impl legacy::FmtLabels for Class {
Class::Error(msg) => write!(
f,
"classification=\"failure\",grpc_status=\"\",error=\"{msg}\""
"classification=\"failure\",grpc_status=\"\",error=\"{}\"",
msg
),
}
}
@ -524,15 +496,9 @@ impl StackLabels {
}
}
impl legacy::FmtLabels for StackLabels {
impl FmtLabels for StackLabels {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
direction,
protocol,
name,
} = self;
direction.fmt_labels(f)?;
write!(f, ",protocol=\"{protocol}\",name=\"{name}\"")
self.direction.fmt_labels(f)?;
write!(f, ",protocol=\"{}\",name=\"{}\"", self.protocol, self.name)
}
}

View File

@ -1,70 +0,0 @@
use linkerd_metrics::prom;
use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue, LabelValueEncoder};
use std::{
fmt::{Error, Write},
sync::{Arc, OnceLock},
};
static TLS_INFO: OnceLock<Arc<TlsInfo>> = OnceLock::new();
#[derive(Clone, Debug, Default, Hash, PartialEq, Eq, EncodeLabelSet)]
pub struct TlsInfo {
tls_suites: MetricValueList,
tls_kx_groups: MetricValueList,
tls_rand: String,
tls_key_provider: String,
tls_fips: bool,
}
#[derive(Clone, Debug, Default, Hash, PartialEq, Eq)]
struct MetricValueList {
values: Vec<&'static str>,
}
impl FromIterator<&'static str> for MetricValueList {
fn from_iter<T: IntoIterator<Item = &'static str>>(iter: T) -> Self {
MetricValueList {
values: iter.into_iter().collect(),
}
}
}
impl EncodeLabelValue for MetricValueList {
fn encode(&self, encoder: &mut LabelValueEncoder<'_>) -> Result<(), Error> {
for value in &self.values {
value.encode(encoder)?;
encoder.write_char(',')?;
}
Ok(())
}
}
pub fn metric() -> prom::Family<TlsInfo, prom::ConstGauge> {
let fam = prom::Family::<TlsInfo, prom::ConstGauge>::new_with_constructor(|| {
prom::ConstGauge::new(1)
});
let tls_info = TLS_INFO.get_or_init(|| {
let provider = linkerd_rustls::get_default_provider();
let tls_suites = provider
.cipher_suites
.iter()
.flat_map(|cipher_suite| cipher_suite.suite().as_str())
.collect::<MetricValueList>();
let tls_kx_groups = provider
.kx_groups
.iter()
.flat_map(|suite| suite.name().as_str())
.collect::<MetricValueList>();
Arc::new(TlsInfo {
tls_suites,
tls_kx_groups,
tls_rand: format!("{:?}", provider.secure_random),
tls_key_provider: format!("{:?}", provider.key_provider),
tls_fips: provider.fips(),
})
});
let _ = fam.get_or_create(tls_info);
fam
}

View File

@ -1,7 +1,7 @@
use crate::metrics::ServerLabel as PolicyServerLabel;
pub use crate::metrics::{Direction, OutboundEndpointLabels};
use linkerd_conditional::Conditional;
use linkerd_metrics::legacy::FmtLabels;
use linkerd_metrics::FmtLabels;
use linkerd_tls as tls;
use std::{fmt, net::SocketAddr};
@ -20,16 +20,16 @@ pub enum Key {
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct ServerLabels {
direction: Direction,
tls: tls::ConditionalServerTlsLabels,
tls: tls::ConditionalServerTls,
target_addr: SocketAddr,
policy: Option<PolicyServerLabel>,
}
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct TlsAccept<'t>(pub &'t tls::ConditionalServerTlsLabels);
pub struct TlsAccept<'t>(pub &'t tls::ConditionalServerTls);
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub(crate) struct TlsConnect<'t>(pub &'t tls::ConditionalClientTlsLabels);
pub(crate) struct TlsConnect<'t>(&'t tls::ConditionalClientTls);
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct TargetAddr(pub SocketAddr);
@ -38,7 +38,7 @@ pub struct TargetAddr(pub SocketAddr);
impl Key {
pub fn inbound_server(
tls: tls::ConditionalServerTlsLabels,
tls: tls::ConditionalServerTls,
target_addr: SocketAddr,
server: PolicyServerLabel,
) -> Self {
@ -62,7 +62,7 @@ impl FmtLabels for Key {
}
Self::InboundClient => {
const NO_TLS: tls::client::ConditionalClientTlsLabels =
const NO_TLS: tls::client::ConditionalClientTls =
Conditional::None(tls::NoClientTls::Loopback);
Direction::In.fmt_labels(f)?;
@ -75,7 +75,7 @@ impl FmtLabels for Key {
impl ServerLabels {
fn inbound(
tls: tls::ConditionalServerTlsLabels,
tls: tls::ConditionalServerTls,
target_addr: SocketAddr,
policy: PolicyServerLabel,
) -> Self {
@ -90,7 +90,7 @@ impl ServerLabels {
fn outbound(target_addr: SocketAddr) -> Self {
ServerLabels {
direction: Direction::Out,
tls: tls::ConditionalServerTlsLabels::None(tls::NoServerTls::Loopback),
tls: tls::ConditionalServerTls::None(tls::NoServerTls::Loopback),
target_addr,
policy: None,
}
@ -99,17 +99,14 @@ impl ServerLabels {
impl FmtLabels for ServerLabels {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
direction,
tls,
target_addr,
policy,
} = self;
direction.fmt_labels(f)?;
self.direction.fmt_labels(f)?;
f.write_str(",peer=\"src\",")?;
((TargetAddr(*target_addr), TlsAccept(tls)), policy.as_ref()).fmt_labels(f)?;
(
(TargetAddr(self.target_addr), TlsAccept(&self.tls)),
self.policy.as_ref(),
)
.fmt_labels(f)?;
Ok(())
}
@ -117,28 +114,27 @@ impl FmtLabels for ServerLabels {
// === impl TlsAccept ===
impl<'t> From<&'t tls::ConditionalServerTlsLabels> for TlsAccept<'t> {
fn from(c: &'t tls::ConditionalServerTlsLabels) -> Self {
impl<'t> From<&'t tls::ConditionalServerTls> for TlsAccept<'t> {
fn from(c: &'t tls::ConditionalServerTls) -> Self {
TlsAccept(c)
}
}
impl FmtLabels for TlsAccept<'_> {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self(tls) = self;
match tls {
match self.0 {
Conditional::None(tls::NoServerTls::Disabled) => {
write!(f, "tls=\"disabled\"")
}
Conditional::None(why) => {
write!(f, "tls=\"no_identity\",no_tls_reason=\"{why}\"")
write!(f, "tls=\"no_identity\",no_tls_reason=\"{}\"", why)
}
Conditional::Some(tls::ServerTlsLabels::Established { client_id }) => match client_id {
Some(id) => write!(f, "tls=\"true\",client_id=\"{id}\""),
Conditional::Some(tls::ServerTls::Established { client_id, .. }) => match client_id {
Some(id) => write!(f, "tls=\"true\",client_id=\"{}\"", id),
None => write!(f, "tls=\"true\",client_id=\"\""),
},
Conditional::Some(tls::ServerTlsLabels::Passthru { sni }) => {
write!(f, "tls=\"opaque\",sni=\"{sni}\"")
Conditional::Some(tls::ServerTls::Passthru { sni }) => {
write!(f, "tls=\"opaque\",sni=\"{}\"", sni)
}
}
}
@ -146,25 +142,23 @@ impl FmtLabels for TlsAccept<'_> {
// === impl TlsConnect ===
impl<'t> From<&'t tls::ConditionalClientTlsLabels> for TlsConnect<'t> {
fn from(s: &'t tls::ConditionalClientTlsLabels) -> Self {
impl<'t> From<&'t tls::ConditionalClientTls> for TlsConnect<'t> {
fn from(s: &'t tls::ConditionalClientTls) -> Self {
TlsConnect(s)
}
}
impl FmtLabels for TlsConnect<'_> {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self(tls) = self;
match tls {
match self.0 {
Conditional::None(tls::NoClientTls::Disabled) => {
write!(f, "tls=\"disabled\"")
}
Conditional::None(why) => {
write!(f, "tls=\"no_identity\",no_tls_reason=\"{why}\"")
write!(f, "tls=\"no_identity\",no_tls_reason=\"{}\"", why)
}
Conditional::Some(tls::ClientTlsLabels { server_id }) => {
write!(f, "tls=\"true\",server_id=\"{server_id}\"")
Conditional::Some(tls::ClientTls { server_id, .. }) => {
write!(f, "tls=\"true\",server_id=\"{}\"", server_id)
}
}
}
@ -174,13 +168,12 @@ impl FmtLabels for TlsConnect<'_> {
impl FmtLabels for TargetAddr {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self(target_addr) = self;
write!(
f,
"target_addr=\"{}\",target_ip=\"{}\",target_port=\"{}\"",
target_addr,
target_addr.ip(),
target_addr.port()
self.0,
self.0.ip(),
self.0.port()
)
}
}
@ -201,8 +194,9 @@ mod tests {
use std::sync::Arc;
let labels = ServerLabels::inbound(
tls::ConditionalServerTlsLabels::Some(tls::ServerTlsLabels::Established {
tls::ConditionalServerTls::Some(tls::ServerTls::Established {
client_id: Some("foo.id.example.com".parse().unwrap()),
negotiated_protocol: None,
}),
([192, 0, 2, 4], 40000).into(),
PolicyServerLabel(

View File

@ -18,7 +18,7 @@ thiserror = "2"
tokio = { version = "1", features = ["sync"] }
tonic = { workspace = true, default-features = false }
tower = { workspace = true, default-features = false }
tracing = { workspace = true }
tracing = "0.1"
[dev-dependencies]
linkerd-app-inbound = { path = "../inbound", features = ["test-util"] }

View File

@ -90,7 +90,7 @@ impl Gateway {
detect_timeout,
queue,
addr,
meta.into(),
meta,
),
None => {
tracing::debug!(

View File

@ -153,7 +153,7 @@ fn mk_routes(profile: &profiles::Profile) -> Option<outbound::http::Routes> {
if let Some((addr, metadata)) = profile.endpoint.clone() {
return Some(outbound::http::Routes::Endpoint(
Remote(ServerAddr(addr)),
metadata.into(),
metadata,
));
}

View File

@ -13,7 +13,8 @@ Configures and runs the inbound proxy
test-util = [
"linkerd-app-test",
"linkerd-idle-cache/test-util",
"linkerd-meshtls/test-util",
"linkerd-meshtls/rustls",
"linkerd-meshtls-rustls/test-util",
]
[dependencies]
@ -24,7 +25,8 @@ linkerd-app-core = { path = "../core" }
linkerd-app-test = { path = "../test", optional = true }
linkerd-http-access-log = { path = "../../http/access-log" }
linkerd-idle-cache = { path = "../../idle-cache" }
linkerd-meshtls = { path = "../../meshtls", optional = true, default-features = false }
linkerd-meshtls = { path = "../../meshtls", optional = true }
linkerd-meshtls-rustls = { path = "../../meshtls/rustls", optional = true }
linkerd-proxy-client-policy = { path = "../../proxy/client-policy" }
linkerd-tonic-stream = { path = "../../tonic-stream" }
linkerd-tonic-watch = { path = "../../tonic-watch" }
@ -36,7 +38,7 @@ thiserror = "2"
tokio = { version = "1", features = ["sync"] }
tonic = { workspace = true, default-features = false }
tower = { workspace = true, features = ["util"] }
tracing = { workspace = true }
tracing = "0.1"
[dependencies.linkerd-proxy-server-policy]
path = "../../proxy/server-policy"
@ -47,7 +49,7 @@ hyper = { workspace = true, features = ["http1", "http2"] }
linkerd-app-test = { path = "../test" }
arbitrary = { version = "1", features = ["derive"] }
libfuzzer-sys = { version = "0.4", features = ["arbitrary-derive"] }
linkerd-meshtls = { path = "../../meshtls", features = [
linkerd-meshtls-rustls = { path = "../../meshtls/rustls", features = [
"test-util",
] }
@ -60,7 +62,8 @@ linkerd-http-metrics = { path = "../../http/metrics", features = ["test-util"] }
linkerd-http-box = { path = "../../http/box" }
linkerd-idle-cache = { path = "../../idle-cache", features = ["test-util"] }
linkerd-io = { path = "../../io", features = ["tokio-test"] }
linkerd-meshtls = { path = "../../meshtls", features = [
linkerd-meshtls = { path = "../../meshtls", features = ["rustls"] }
linkerd-meshtls-rustls = { path = "../../meshtls/rustls", features = [
"test-util",
] }
linkerd-proxy-server-policy = { path = "../../proxy/server-policy", features = [

View File

@ -18,12 +18,13 @@ linkerd-app-core = { path = "../../core" }
linkerd-app-inbound = { path = ".." }
linkerd-app-test = { path = "../../test" }
linkerd-idle-cache = { path = "../../../idle-cache", features = ["test-util"] }
linkerd-meshtls = { path = "../../../meshtls", features = [
linkerd-meshtls = { path = "../../../meshtls", features = ["rustls"] }
linkerd-meshtls-rustls = { path = "../../../meshtls/rustls", features = [
"test-util",
] }
linkerd-tracing = { path = "../../../tracing", features = ["ansi"] }
tokio = { version = "1", features = ["full"] }
tracing = { workspace = true }
tracing = "0.1"
# Prevent this from interfering with workspaces
[workspace]

View File

@ -325,7 +325,7 @@ impl svc::Param<Remote<ServerAddr>> for Forward {
impl svc::Param<transport::labels::Key> for Forward {
fn param(&self) -> transport::labels::Key {
transport::labels::Key::inbound_server(
self.tls.as_ref().map(|t| t.labels()),
self.tls.clone(),
self.orig_dst_addr.into(),
self.permit.labels.server.clone(),
)
@ -429,7 +429,7 @@ impl svc::Param<ServerLabel> for Http {
impl svc::Param<transport::labels::Key> for Http {
fn param(&self) -> transport::labels::Key {
transport::labels::Key::inbound_server(
self.tls.status.as_ref().map(|t| t.labels()),
self.tls.status.clone(),
self.tls.orig_dst_addr.into(),
self.tls.policy.server_label(),
)

View File

@ -117,7 +117,7 @@ impl<N> Inbound<N> {
let identity = rt
.identity
.server()
.spawn_with_alpn(vec![transport_header::PROTOCOL.into()])
.with_alpn(vec![transport_header::PROTOCOL.into()])
.expect("TLS credential store must be held");
inner
@ -311,8 +311,9 @@ impl Param<Remote<ServerAddr>> for AuthorizedLocalTcp {
impl Param<transport::labels::Key> for AuthorizedLocalTcp {
fn param(&self) -> transport::labels::Key {
transport::labels::Key::inbound_server(
tls::ConditionalServerTlsLabels::Some(tls::ServerTlsLabels::Established {
tls::ConditionalServerTls::Some(tls::ServerTls::Established {
client_id: Some(self.client_id.clone()),
negotiated_protocol: None,
}),
self.addr.into(),
self.permit.labels.server.clone(),
@ -343,8 +344,9 @@ impl Param<Remote<ClientAddr>> for LocalHttp {
impl Param<transport::labels::Key> for LocalHttp {
fn param(&self) -> transport::labels::Key {
transport::labels::Key::inbound_server(
tls::ConditionalServerTlsLabels::Some(tls::ServerTlsLabels::Established {
tls::ConditionalServerTls::Some(tls::ServerTls::Established {
client_id: Some(self.client.client_id.clone()),
negotiated_protocol: None,
}),
self.addr.into(),
self.policy.server_label(),
@ -433,14 +435,6 @@ impl Param<tls::ConditionalServerTls> for GatewayTransportHeader {
}
}
impl Param<tls::ConditionalServerTlsLabels> for GatewayTransportHeader {
fn param(&self) -> tls::ConditionalServerTlsLabels {
tls::ConditionalServerTlsLabels::Some(tls::ServerTlsLabels::Established {
client_id: Some(self.client.client_id.clone()),
})
}
}
impl Param<tls::ClientId> for GatewayTransportHeader {
fn param(&self) -> tls::ClientId {
self.client.client_id.clone()

View File

@ -395,7 +395,7 @@ fn endpoint_labels(
) -> impl svc::ExtractParam<metrics::EndpointLabels, Logical> + Clone {
move |t: &Logical| -> metrics::EndpointLabels {
metrics::InboundEndpointLabels {
tls: t.tls.as_ref().map(|t| t.labels()),
tls: t.tls.clone(),
authority: unsafe_authority_labels
.then(|| t.logical.as_ref().map(|d| d.as_http_authority()))
.flatten(),

View File

@ -664,7 +664,7 @@ async fn grpc_response_class() {
let response_total = metrics
.get_response_total(
&metrics::EndpointLabels::Inbound(metrics::InboundEndpointLabels {
tls: Target::meshed_h2().1.map(|t| t.labels()),
tls: Target::meshed_h2().1,
authority: None,
target_addr: "127.0.0.1:80".parse().unwrap(),
policy: metrics::RouteAuthzLabels {
@ -762,7 +762,7 @@ async fn test_unsafe_authority_labels(
let response_total = metrics
.get_response_total(
&metrics::EndpointLabels::Inbound(metrics::InboundEndpointLabels {
tls: Target::meshed_http1().1.as_ref().map(|t| t.labels()),
tls: Target::meshed_http1().1,
authority: expected_authority,
target_addr: "127.0.0.1:80".parse().unwrap(),
policy: metrics::RouteAuthzLabels {
@ -861,7 +861,12 @@ fn grpc_status_server(
#[tracing::instrument]
fn connect_error() -> impl Fn(Remote<ServerAddr>) -> io::Result<io::BoxedIo> {
move |_| Err(io::Error::other("server is not listening"))
move |_| {
Err(io::Error::new(
io::ErrorKind::Other,
"server is not listening",
))
}
}
#[tracing::instrument]

View File

@ -113,6 +113,10 @@ impl<S> Inbound<S> {
&self.runtime.identity
}
pub fn proxy_metrics(&self) -> &metrics::Proxy {
&self.runtime.metrics.proxy
}
/// A helper for gateways to instrument policy checks.
pub fn authorize_http<N>(
&self,

View File

@ -13,7 +13,7 @@ pub(crate) mod error;
pub use linkerd_app_core::metrics::*;
/// Holds LEGACY inbound proxy metrics.
/// Holds outbound proxy metrics.
#[derive(Clone, Debug)]
pub struct InboundMetrics {
pub http_authz: authz::HttpAuthzMetrics,
@ -50,7 +50,7 @@ impl InboundMetrics {
}
}
impl legacy::FmtMetrics for InboundMetrics {
impl FmtMetrics for InboundMetrics {
fn fmt_metrics(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.http_authz.fmt_metrics(f)?;
self.http_errors.fmt_metrics(f)?;

View File

@ -1,9 +1,8 @@
use crate::policy::{AllowPolicy, HttpRoutePermit, Meta, ServerPermit};
use linkerd_app_core::{
metrics::{
legacy::{Counter, FmtLabels, FmtMetrics},
metrics, RouteAuthzLabels, RouteLabels, ServerAuthzLabels, ServerLabel, TargetAddr,
TlsAccept,
metrics, Counter, FmtLabels, FmtMetrics, RouteAuthzLabels, RouteLabels, ServerAuthzLabels,
ServerLabel, TargetAddr, TlsAccept,
},
tls,
transport::OrigDstAddr,
@ -68,7 +67,7 @@ pub struct HTTPLocalRateLimitLabels {
#[derive(Debug, Hash, PartialEq, Eq)]
struct Key<L> {
target: TargetAddr,
tls: tls::ConditionalServerTlsLabels,
tls: tls::ConditionalServerTls,
labels: L,
}
@ -81,7 +80,7 @@ type HttpLocalRateLimitKey = Key<HTTPLocalRateLimitLabels>;
// === impl HttpAuthzMetrics ===
impl HttpAuthzMetrics {
pub fn allow(&self, permit: &HttpRoutePermit, tls: tls::ConditionalServerTlsLabels) {
pub fn allow(&self, permit: &HttpRoutePermit, tls: tls::ConditionalServerTls) {
self.0
.allow
.lock()
@ -94,7 +93,7 @@ impl HttpAuthzMetrics {
&self,
labels: ServerLabel,
dst: OrigDstAddr,
tls: tls::ConditionalServerTlsLabels,
tls: tls::ConditionalServerTls,
) {
self.0
.route_not_found
@ -104,12 +103,7 @@ impl HttpAuthzMetrics {
.incr();
}
pub fn deny(
&self,
labels: RouteLabels,
dst: OrigDstAddr,
tls: tls::ConditionalServerTlsLabels,
) {
pub fn deny(&self, labels: RouteLabels, dst: OrigDstAddr, tls: tls::ConditionalServerTls) {
self.0
.deny
.lock()
@ -122,7 +116,7 @@ impl HttpAuthzMetrics {
&self,
labels: HTTPLocalRateLimitLabels,
dst: OrigDstAddr,
tls: tls::ConditionalServerTlsLabels,
tls: tls::ConditionalServerTls,
) {
self.0
.http_local_rate_limit
@ -193,7 +187,7 @@ impl FmtMetrics for HttpAuthzMetrics {
// === impl TcpAuthzMetrics ===
impl TcpAuthzMetrics {
pub fn allow(&self, permit: &ServerPermit, tls: tls::ConditionalServerTlsLabels) {
pub fn allow(&self, permit: &ServerPermit, tls: tls::ConditionalServerTls) {
self.0
.allow
.lock()
@ -202,7 +196,7 @@ impl TcpAuthzMetrics {
.incr();
}
pub fn deny(&self, policy: &AllowPolicy, tls: tls::ConditionalServerTlsLabels) {
pub fn deny(&self, policy: &AllowPolicy, tls: tls::ConditionalServerTls) {
self.0
.deny
.lock()
@ -211,7 +205,7 @@ impl TcpAuthzMetrics {
.incr();
}
pub fn terminate(&self, policy: &AllowPolicy, tls: tls::ConditionalServerTlsLabels) {
pub fn terminate(&self, policy: &AllowPolicy, tls: tls::ConditionalServerTls) {
self.0
.terminate
.lock()
@ -252,24 +246,18 @@ impl FmtMetrics for TcpAuthzMetrics {
impl FmtLabels for HTTPLocalRateLimitLabels {
fn fmt_labels(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
server,
rate_limit,
scope,
} = self;
server.fmt_labels(f)?;
if let Some(rl) = rate_limit {
self.server.fmt_labels(f)?;
if let Some(rl) = &self.rate_limit {
write!(
f,
",ratelimit_group=\"{}\",ratelimit_kind=\"{}\",ratelimit_name=\"{}\",ratelimit_scope=\"{}\"",
rl.group(),
rl.kind(),
rl.name(),
scope,
self.scope,
)
} else {
write!(f, ",ratelimit_scope=\"{scope}\"")
write!(f, ",ratelimit_scope=\"{}\"", self.scope)
}
}
}
@ -277,7 +265,7 @@ impl FmtLabels for HTTPLocalRateLimitLabels {
// === impl Key ===
impl<L> Key<L> {
fn new(labels: L, dst: OrigDstAddr, tls: tls::ConditionalServerTlsLabels) -> Self {
fn new(labels: L, dst: OrigDstAddr, tls: tls::ConditionalServerTls) -> Self {
Self {
tls,
target: TargetAddr(dst.into()),
@ -288,30 +276,24 @@ impl<L> Key<L> {
impl<L: FmtLabels> FmtLabels for Key<L> {
fn fmt_labels(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
target,
tls,
labels,
} = self;
(target, (labels, TlsAccept(tls))).fmt_labels(f)
(self.target, (&self.labels, TlsAccept(&self.tls))).fmt_labels(f)
}
}
impl ServerKey {
fn from_policy(policy: &AllowPolicy, tls: tls::ConditionalServerTlsLabels) -> Self {
fn from_policy(policy: &AllowPolicy, tls: tls::ConditionalServerTls) -> Self {
Self::new(policy.server_label(), policy.dst_addr(), tls)
}
}
impl RouteAuthzKey {
fn from_permit(permit: &HttpRoutePermit, tls: tls::ConditionalServerTlsLabels) -> Self {
fn from_permit(permit: &HttpRoutePermit, tls: tls::ConditionalServerTls) -> Self {
Self::new(permit.labels.clone(), permit.dst, tls)
}
}
impl ServerAuthzKey {
fn from_permit(permit: &ServerPermit, tls: tls::ConditionalServerTlsLabels) -> Self {
fn from_permit(permit: &ServerPermit, tls: tls::ConditionalServerTls) -> Self {
Self::new(permit.labels.clone(), permit.dst, tls)
}
}

View File

@ -8,7 +8,7 @@ use crate::{
};
use linkerd_app_core::{
errors::{FailFastError, LoadShedError},
metrics::legacy::FmtLabels,
metrics::FmtLabels,
tls,
};
use std::fmt;

View File

@ -1,9 +1,6 @@
use super::ErrorKind;
use linkerd_app_core::{
metrics::{
legacy::{Counter, FmtMetrics},
metrics, ServerLabel,
},
metrics::{metrics, Counter, FmtMetrics, ServerLabel},
svc::{self, stack::NewMonitor},
transport::{labels::TargetAddr, OrigDstAddr},
Error,

View File

@ -1,9 +1,6 @@
use super::ErrorKind;
use linkerd_app_core::{
metrics::{
legacy::{Counter, FmtMetrics},
metrics,
},
metrics::{metrics, Counter, FmtMetrics},
svc::{self, stack::NewMonitor},
transport::{labels::TargetAddr, OrigDstAddr},
Error,

View File

@ -33,7 +33,7 @@ static INVALID_POLICY: once_cell::sync::OnceCell<ServerPolicy> = once_cell::sync
impl<S> Api<S>
where
S: tonic::client::GrpcService<tonic::body::Body, Error = Error> + Clone,
S: tonic::client::GrpcService<tonic::body::BoxBody, Error = Error> + Clone,
S::ResponseBody: http::Body<Data = tonic::codegen::Bytes, Error = Error> + Send + 'static,
{
pub(super) fn new(
@ -57,7 +57,7 @@ where
impl<S> Service<u16> for Api<S>
where
S: tonic::client::GrpcService<tonic::body::Body, Error = Error>,
S: tonic::client::GrpcService<tonic::body::BoxBody, Error = Error>,
S: Clone + Send + Sync + 'static,
S::ResponseBody: http::Body<Data = tonic::codegen::Bytes, Error = Error> + Send + 'static,
S::Future: Send + 'static,

View File

@ -40,7 +40,7 @@ impl Config {
limits: ReceiveLimits,
) -> impl GetPolicy + Clone + Send + Sync + 'static
where
C: tonic::client::GrpcService<tonic::body::Body, Error = Error>,
C: tonic::client::GrpcService<tonic::body::BoxBody, Error = Error>,
C: Clone + Unpin + Send + Sync + 'static,
C::ResponseBody: http::Body<Data = tonic::codegen::Bytes, Error = Error>,
C::ResponseBody: Send + 'static,

View File

@ -248,11 +248,8 @@ impl<T, N> HttpPolicyService<T, N> {
);
}
}
self.metrics.deny(
labels,
self.connection.dst,
self.connection.tls.as_ref().map(|t| t.labels()),
);
self.metrics
.deny(labels, self.connection.dst, self.connection.tls.clone());
return Err(HttpRouteUnauthorized(()).into());
}
};
@ -282,19 +279,14 @@ impl<T, N> HttpPolicyService<T, N> {
}
};
self.metrics
.allow(&permit, self.connection.tls.as_ref().map(|t| t.labels()));
self.metrics.allow(&permit, self.connection.tls.clone());
Ok((permit, r#match, route))
}
fn mk_route_not_found(&self) -> Error {
let labels = self.policy.server_label();
self.metrics.route_not_found(
labels,
self.connection.dst,
self.connection.tls.as_ref().map(|t| t.labels()),
);
self.metrics
.route_not_found(labels, self.connection.dst, self.connection.tls.clone());
HttpRouteNotFound(()).into()
}
@ -314,7 +306,7 @@ impl<T, N> HttpPolicyService<T, N> {
self.metrics.ratelimit(
self.policy.ratelimit_label(&err),
self.connection.dst,
self.connection.tls.as_ref().map(|t| t.labels()),
self.connection.tls.clone(),
);
err.into()
})

View File

@ -74,7 +74,7 @@ impl<S> Store<S> {
opaque_ports: RangeInclusiveSet<u16>,
) -> Self
where
S: tonic::client::GrpcService<tonic::body::Body, Error = Error>,
S: tonic::client::GrpcService<tonic::body::BoxBody, Error = Error>,
S: Clone + Send + Sync + 'static,
S::Future: Send,
S::ResponseBody: http::Body<Data = tonic::codegen::Bytes, Error = Error> + Send + 'static,
@ -138,7 +138,7 @@ impl<S> Store<S> {
impl<S> GetPolicy for Store<S>
where
S: tonic::client::GrpcService<tonic::body::Body, Error = Error>,
S: tonic::client::GrpcService<tonic::body::BoxBody, Error = Error>,
S: Clone + Send + Sync + 'static,
S::Future: Send,
S::ResponseBody: http::Body<Data = tonic::codegen::Bytes, Error = Error> + Send + 'static,

View File

@ -77,8 +77,7 @@ where
// This new services requires a ClientAddr, so it must necessarily be built for each
// connection. So we can just increment the counter here since the service can only
// be used at most once.
self.metrics
.allow(&permit, tls.as_ref().map(|t| t.labels()));
self.metrics.allow(&permit, tls.clone());
let inner = self.inner.new_service((permit, target));
TcpPolicy::Authorized(Authorized {
@ -98,7 +97,7 @@ where
?tls, %client,
"Connection denied"
);
self.metrics.deny(&policy, tls.as_ref().map(|t| t.labels()));
self.metrics.deny(&policy, tls);
TcpPolicy::Unauthorized(deny)
}
}
@ -168,7 +167,7 @@ where
%client,
"Connection terminated due to policy change",
);
metrics.terminate(&policy, tls.as_ref().map(|t| t.labels()));
metrics.terminate(&policy, tls);
return Err(denied.into());
}
}

View File

@ -263,7 +263,7 @@ fn orig_dst_addr() -> OrigDstAddr {
OrigDstAddr(([192, 0, 2, 2], 1000).into())
}
impl tonic::client::GrpcService<tonic::body::Body> for MockSvc {
impl tonic::client::GrpcService<tonic::body::BoxBody> for MockSvc {
type ResponseBody = linkerd_app_core::control::RspBody;
type Error = Error;
type Future = futures::future::Pending<Result<http::Response<Self::ResponseBody>, Self::Error>>;
@ -275,7 +275,7 @@ impl tonic::client::GrpcService<tonic::body::Body> for MockSvc {
unreachable!()
}
fn call(&mut self, _req: http::Request<tonic::body::Body>) -> Self::Future {
fn call(&mut self, _req: http::Request<tonic::body::BoxBody>) -> Self::Future {
unreachable!()
}
}

View File

@ -27,7 +27,7 @@ impl Inbound<()> {
limits: ReceiveLimits,
) -> impl policy::GetPolicy + Clone + Send + Sync + 'static
where
C: tonic::client::GrpcService<tonic::body::Body, Error = Error>,
C: tonic::client::GrpcService<tonic::body::BoxBody, Error = Error>,
C: Clone + Unpin + Send + Sync + 'static,
C::ResponseBody: http::Body<Data = tonic::codegen::Bytes, Error = Error>,
C::ResponseBody: Send + 'static,

View File

@ -3,7 +3,9 @@ pub use futures::prelude::*;
use linkerd_app_core::{
config,
dns::Suffix,
drain, exp_backoff, identity, metrics,
drain, exp_backoff,
identity::rustls,
metrics,
proxy::{
http::{h1, h2},
tap,
@ -96,7 +98,7 @@ pub fn runtime() -> (ProxyRuntime, drain::Signal) {
let (tap, _) = tap::new();
let (metrics, _) = metrics::Metrics::new(std::time::Duration::from_secs(10));
let runtime = ProxyRuntime {
identity: identity::creds::default_for_test().1,
identity: rustls::creds::default_for_test().1.into(),
metrics: metrics.proxy,
tap,
span_sink: None,

View File

@ -23,50 +23,38 @@ h2 = { workspace = true }
http = { workspace = true }
http-body = { workspace = true }
http-body-util = { workspace = true }
hyper = { workspace = true, features = [
"http1",
"http2",
"client",
"server",
] }
hyper-util = { workspace = true, features = ["service"] }
ipnet = "2"
linkerd-app = { path = "..", features = ["allow-loopback"] }
linkerd-app-core = { path = "../core" }
linkerd-app-test = { path = "../test" }
linkerd-meshtls = { path = "../../meshtls", features = ["test-util"] }
linkerd-metrics = { path = "../../metrics", features = ["test_util"] }
linkerd-rustls = { path = "../../rustls" }
linkerd2-proxy-api = { workspace = true, features = [
"destination",
"arbitrary",
] }
linkerd-app-test = { path = "../test" }
linkerd-tracing = { path = "../../tracing" }
maplit = "1"
parking_lot = "0.12"
regex = "1"
rustls-pemfile = "2.2"
socket2 = "0.6"
socket2 = "0.5"
tokio = { version = "1", features = ["io-util", "net", "rt", "macros"] }
tokio-rustls = { workspace = true }
tokio-stream = { version = "0.1", features = ["sync"] }
tonic = { workspace = true, features = ["transport", "router"], default-features = false }
tokio-rustls = { workspace = true }
rustls-pemfile = "2.2"
tower = { workspace = true, default-features = false }
tracing = { workspace = true }
[dependencies.hyper]
workspace = true
features = [
"client",
"http1",
"http2",
"server",
]
[dependencies.linkerd2-proxy-api]
workspace = true
features = [
"arbitrary",
"destination",
]
[dependencies.tracing-subscriber]
version = "0.3"
default-features = false
features = [
tonic = { workspace = true, features = ["transport"], default-features = false }
tracing = "0.1"
tracing-subscriber = { version = "0.3", default-features = false, features = [
"fmt",
"std",
]
] }
[dev-dependencies]
flate2 = { version = "1", default-features = false, features = [
@ -74,5 +62,8 @@ flate2 = { version = "1", default-features = false, features = [
] }
# Log streaming isn't enabled by default globally, but we want to test it.
linkerd-app-admin = { path = "../admin", features = ["log-streaming"] }
# No code from this crate is actually used; only necessary to enable the Rustls
# implementation.
linkerd-meshtls = { path = "../../meshtls", features = ["rustls"] }
linkerd-tracing = { path = "../../tracing", features = ["ansi"] }
serde_json = "1"

View File

@ -262,7 +262,10 @@ impl pb::destination_server::Destination for Controller {
}
tracing::warn!(?dst, ?updates, "request does not match");
let msg = format!("expected get call for {dst:?} but got get call for {req:?}");
let msg = format!(
"expected get call for {:?} but got get call for {:?}",
dst, req
);
calls.push_front(Dst::Call(dst, updates));
return Err(grpc::Status::new(grpc::Code::Unavailable, msg));
}

View File

@ -8,8 +8,7 @@ use std::{
};
use linkerd2_proxy_api::identity as pb;
use linkerd_rustls::get_default_provider;
use tokio_rustls::rustls::{self, server::WebPkiClientVerifier};
use tokio_rustls::rustls::{self, pki_types::CertificateDer, server::WebPkiClientVerifier};
use tonic as grpc;
pub struct Identity {
@ -35,6 +34,10 @@ type Certify = Box<
> + Send,
>;
static TLS_VERSIONS: &[&rustls::SupportedProtocolVersion] = &[&rustls::version::TLS13];
static TLS_SUPPORTED_CIPHERSUITES: &[rustls::SupportedCipherSuite] =
&[rustls::crypto::ring::cipher_suite::TLS13_CHACHA20_POLY1305_SHA256];
struct Certificates {
pub leaf: Vec<u8>,
pub intermediates: Vec<Vec<u8>>,
@ -51,13 +54,13 @@ impl Certificates {
let leaf = certs
.next()
.expect("no leaf cert in pemfile")
.map_err(|_| io::Error::other("rustls error reading certs"))?
.map_err(|_| io::Error::new(io::ErrorKind::Other, "rustls error reading certs"))?
.as_ref()
.to_vec();
let intermediates = certs
.map(|cert| cert.map(|cert| cert.as_ref().to_vec()))
.collect::<Result<Vec<_>, _>>()
.map_err(|_| io::Error::other("rustls error reading certs"))?;
.map_err(|_| io::Error::new(io::ErrorKind::Other, "rustls error reading certs"))?;
Ok(Certificates {
leaf,
@ -101,16 +104,19 @@ impl Identity {
use std::io::Cursor;
let mut roots = rustls::RootCertStore::empty();
let trust_anchors = rustls_pemfile::certs(&mut Cursor::new(trust_anchors))
.map(|bytes| bytes.map(CertificateDer::from))
.collect::<Result<Vec<_>, _>>()
.expect("error parsing pemfile");
let (added, skipped) = roots.add_parsable_certificates(trust_anchors);
assert_ne!(added, 0, "trust anchors must include at least one cert");
assert_eq!(skipped, 0, "no certs in pemfile should be invalid");
let provider = get_default_provider();
let mut provider = rustls::crypto::ring::default_provider();
provider.cipher_suites = TLS_SUPPORTED_CIPHERSUITES.to_vec();
let provider = Arc::new(provider);
let client_config = rustls::ClientConfig::builder_with_provider(provider.clone())
.with_safe_default_protocol_versions()
.with_protocol_versions(TLS_VERSIONS)
.expect("client config must be valid")
.with_root_certificates(roots.clone())
.with_no_client_auth();
@ -122,7 +128,7 @@ impl Identity {
.expect("server verifier must be valid");
let server_config = rustls::ServerConfig::builder_with_provider(provider)
.with_safe_default_protocol_versions()
.with_protocol_versions(TLS_VERSIONS)
.expect("server config must be valid")
.with_client_cert_verifier(client_cert_verifier)
.with_single_cert(certs.chain(), key)
@ -213,7 +219,7 @@ impl Controller {
let f = f.take().expect("called twice?");
let fut = f(req)
.map_ok(grpc::Response::new)
.map_err(|e| grpc::Status::new(grpc::Code::Internal, format!("{e}")));
.map_err(|e| grpc::Status::new(grpc::Code::Internal, format!("{}", e)));
Box::pin(fut)
});
self.expect_calls.lock().push_back(func);

View File

@ -3,7 +3,6 @@
#![warn(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)]
#![forbid(unsafe_code)]
#![recursion_limit = "256"]
#![allow(clippy::result_large_err)]
mod test_env;
@ -248,7 +247,7 @@ impl fmt::Display for HumanDuration {
let secs = self.0.as_secs();
let subsec_ms = self.0.subsec_nanos() as f64 / 1_000_000f64;
if secs == 0 {
write!(fmt, "{subsec_ms}ms")
write!(fmt, "{}ms", subsec_ms)
} else {
write!(fmt, "{}s", secs as f64 + subsec_ms)
}

View File

@ -302,7 +302,7 @@ impl Controller {
}
pub async fn run(self) -> controller::Listening {
let routes = grpc::service::Routes::default()
let svc = grpc::transport::Server::builder()
.add_service(
inbound_server_policies_server::InboundServerPoliciesServer::new(Server(Arc::new(
self.inbound,
@ -310,9 +310,9 @@ impl Controller {
)
.add_service(outbound_policies_server::OutboundPoliciesServer::new(
Server(Arc::new(self.outbound)),
));
controller::run(RoutesSvc(routes), "support policy controller", None).await
))
.into_service();
controller::run(RoutesSvc(svc), "support policy controller", None).await
}
}
@ -525,9 +525,7 @@ impl Service<Request<hyper::body::Incoming>> for RoutesSvc {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let Self(routes) = self;
<grpc::service::Routes as Service<Request<UnsyncBoxBody<Bytes, grpc::Status>>>>::poll_ready(
routes, cx,
)
routes.poll_ready(cx)
}
fn call(&mut self, req: Request<hyper::body::Incoming>) -> Self::Future {

View File

@ -108,7 +108,7 @@ impl fmt::Debug for MockOrigDst {
match self {
Self::Addr(addr) => f
.debug_tuple("MockOrigDst::Addr")
.field(&format_args!("{addr}"))
.field(&format_args!("{}", addr))
.finish(),
Self::Direct => f.debug_tuple("MockOrigDst::Direct").finish(),
Self::None => f.debug_tuple("MockOrigDst::None").finish(),
@ -416,9 +416,9 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening {
use std::fmt::Write;
let mut ports = inbound_default_ports.iter();
if let Some(port) = ports.next() {
let mut var = format!("{port}");
let mut var = format!("{}", port);
for port in ports {
write!(&mut var, ",{port}").expect("writing to String should never fail");
write!(&mut var, ",{}", port).expect("writing to String should never fail");
}
info!("{}={:?}", app::env::ENV_INBOUND_PORTS, var);
env.put(app::env::ENV_INBOUND_PORTS, var);

View File

@ -137,28 +137,28 @@ impl TapEventExt for pb::TapEvent {
fn request_init_authority(&self) -> &str {
match self.event() {
pb::tap_event::http::Event::RequestInit(ev) => &ev.authority,
e => panic!("not RequestInit event: {e:?}"),
e => panic!("not RequestInit event: {:?}", e),
}
}
fn request_init_path(&self) -> &str {
match self.event() {
pb::tap_event::http::Event::RequestInit(ev) => &ev.path,
e => panic!("not RequestInit event: {e:?}"),
e => panic!("not RequestInit event: {:?}", e),
}
}
fn response_init_status(&self) -> u16 {
match self.event() {
pb::tap_event::http::Event::ResponseInit(ev) => ev.http_status as u16,
e => panic!("not ResponseInit event: {e:?}"),
e => panic!("not ResponseInit event: {:?}", e),
}
}
fn response_end_bytes(&self) -> u64 {
match self.event() {
pb::tap_event::http::Event::ResponseEnd(ev) => ev.response_bytes,
e => panic!("not ResponseEnd event: {e:?}"),
e => panic!("not ResponseEnd event: {:?}", e),
}
}
@ -170,7 +170,7 @@ impl TapEventExt for pb::TapEvent {
}) => code,
_ => panic!("not Eos GrpcStatusCode: {:?}", ev.eos),
},
ev => panic!("not ResponseEnd event: {ev:?}"),
ev => panic!("not ResponseEnd event: {:?}", ev),
}
}
}

View File

@ -381,7 +381,7 @@ mod cross_version {
}
fn default_dst_name(port: u16) -> String {
format!("{HOST}:{port}")
format!("{}:{}", HOST, port)
}
fn send_default_dst(

View File

@ -63,7 +63,7 @@ async fn wait_for_profile_stage(client: &client::Client, metrics: &client::Clien
for _ in 0i32..10 {
assert_eq!(client.get("/load-profile").await, "");
let m = metrics.get("/metrics").await;
let stage_metric = format!("rt_load_profile=\"{stage}\"");
let stage_metric = format!("rt_load_profile=\"{}\"", stage);
if m.contains(stage_metric.as_str()) {
break;
}

View File

@ -88,12 +88,12 @@ impl TestBuilder {
let port = srv.addr.port();
let ctrl = controller::new();
let dst_tx = ctrl.destination_tx(format!("{host}:{port}"));
let dst_tx = ctrl.destination_tx(format!("{}:{}", host, port));
dst_tx.send_addr(srv.addr);
let ctrl = controller::new();
let dst_tx = ctrl.destination_tx(format!("{host}:{port}"));
let dst_tx = ctrl.destination_tx(format!("{}:{}", host, port));
dst_tx.send_addr(srv.addr);
let profile_tx = ctrl.profile_tx(srv.addr.to_string());

View File

@ -1318,9 +1318,9 @@ async fn metrics_compression() {
body.copy_to_bytes(body.remaining()),
));
let mut scrape = String::new();
decoder
.read_to_string(&mut scrape)
.unwrap_or_else(|_| panic!("decode gzip (requested Accept-Encoding: {encoding})"));
decoder.read_to_string(&mut scrape).unwrap_or_else(|_| {
panic!("decode gzip (requested Accept-Encoding: {})", encoding)
});
scrape
}
};

View File

@ -26,7 +26,7 @@ async fn is_valid_json() {
assert!(!json.is_empty());
for obj in json {
println!("{obj}\n");
println!("{}\n", obj);
}
}
@ -53,7 +53,7 @@ async fn query_is_valid_json() {
assert!(!json.is_empty());
for obj in json {
println!("{obj}\n");
println!("{}\n", obj);
}
}
@ -74,9 +74,12 @@ async fn valid_get_does_not_error() {
let json = logs.await.unwrap();
for obj in json {
println!("{obj}\n");
println!("{}\n", obj);
if obj.get("error").is_some() {
panic!("expected the log stream to contain no error responses!\njson = {obj}");
panic!(
"expected the log stream to contain no error responses!\njson = {}",
obj
);
}
}
}
@ -98,9 +101,12 @@ async fn valid_query_does_not_error() {
let json = logs.await.unwrap();
for obj in json {
println!("{obj}\n");
println!("{}\n", obj);
if obj.get("error").is_some() {
panic!("expected the log stream to contain no error responses!\njson = {obj}");
panic!(
"expected the log stream to contain no error responses!\njson = {}",
obj
);
}
}
}
@ -136,7 +142,9 @@ async fn multi_filter() {
level.and_then(|value| value.as_str()),
Some("DEBUG") | Some("INFO") | Some("WARN") | Some("ERROR")
),
"level must be DEBUG, INFO, WARN, or ERROR\n level: {level:?}\n json: {obj:#?}"
"level must be DEBUG, INFO, WARN, or ERROR\n level: {:?}\n json: {:#?}",
level,
obj
);
}
@ -167,7 +175,7 @@ async fn get_log_stream(
let req = client
.request_body(
client
.request_builder(&format!("{PATH}?{filter}"))
.request_builder(&format!("{}?{}", PATH, filter))
.method(http::Method::GET)
.body(http_body_util::Full::new(Bytes::from(filter)))
.unwrap(),
@ -223,7 +231,7 @@ where
}
}
Err(e) => {
println!("body failed: {e}");
println!("body failed: {}", e);
break;
}
};

View File

@ -80,7 +80,10 @@ impl Test {
.await
};
env.put(app::env::ENV_INBOUND_DETECT_TIMEOUT, format!("{TIMEOUT:?}"));
env.put(
app::env::ENV_INBOUND_DETECT_TIMEOUT,
format!("{:?}", TIMEOUT),
);
(self.set_env)(&mut env);
@ -124,6 +127,26 @@ async fn inbound_timeout() {
.await;
}
/// Tests that the detect metric is labeled and incremented on I/O error.
#[tokio::test]
async fn inbound_io_err() {
let _trace = trace_init();
let (proxy, metrics) = Test::default().run().await;
let client = crate::tcp::client(proxy.inbound);
let tcp_client = client.connect().await;
tcp_client.write(TcpFixture::HELLO_MSG).await;
drop(tcp_client);
metric(&proxy)
.label("error", "i/o")
.value(1u64)
.assert_in(&metrics)
.await;
}
/// Tests that the detect metric is not incremented when TLS is successfully
/// detected.
#[tokio::test]
@ -169,6 +192,44 @@ async fn inbound_success() {
metric.assert_in(&metrics).await;
}
/// Tests both of the above cases together.
#[tokio::test]
async fn inbound_multi() {
let _trace = trace_init();
let (proxy, metrics) = Test::default().run().await;
let client = crate::tcp::client(proxy.inbound);
let metric = metric(&proxy);
let timeout_metric = metric.clone().label("error", "tls detection timeout");
let io_metric = metric.label("error", "i/o");
let tcp_client = client.connect().await;
tokio::time::sleep(TIMEOUT + Duration::from_millis(15)) // just in case
.await;
timeout_metric.clone().value(1u64).assert_in(&metrics).await;
drop(tcp_client);
let tcp_client = client.connect().await;
tcp_client.write(TcpFixture::HELLO_MSG).await;
drop(tcp_client);
io_metric.clone().value(1u64).assert_in(&metrics).await;
timeout_metric.clone().value(1u64).assert_in(&metrics).await;
let tcp_client = client.connect().await;
tokio::time::sleep(TIMEOUT + Duration::from_millis(15)) // just in case
.await;
io_metric.clone().value(1u64).assert_in(&metrics).await;
timeout_metric.clone().value(2u64).assert_in(&metrics).await;
drop(tcp_client);
}
/// Tests that TLS detect failure metrics are collected for the direct stack.
#[tokio::test]
async fn inbound_direct_multi() {

View File

@ -13,7 +13,7 @@ Configures and runs the outbound proxy
default = []
allow-loopback = []
test-subscriber = []
test-util = ["linkerd-app-test", "linkerd-meshtls/test-util", "dep:http-body"]
test-util = ["linkerd-app-test", "linkerd-meshtls-rustls/test-util", "dep:http-body"]
prometheus-client-rust-242 = [] # TODO
@ -32,7 +32,7 @@ thiserror = "2"
tokio = { version = "1", features = ["sync"] }
tonic = { workspace = true, default-features = false }
tower = { workspace = true, features = ["util"] }
tracing = { workspace = true }
tracing = "0.1"
linkerd-app-core = { path = "../core" }
linkerd-app-test = { path = "../test", optional = true }
@ -42,7 +42,7 @@ linkerd-http-prom = { path = "../../http/prom" }
linkerd-http-retry = { path = "../../http/retry" }
linkerd-http-route = { path = "../../http/route" }
linkerd-identity = { path = "../../identity" }
linkerd-meshtls = { path = "../../meshtls", optional = true, default-features = false }
linkerd-meshtls-rustls = { path = "../../meshtls/rustls", optional = true }
linkerd-opaq-route = { path = "../../opaq-route" }
linkerd-proxy-client-policy = { path = "../../proxy/client-policy", features = [
"proto",
@ -67,10 +67,10 @@ linkerd-app-test = { path = "../test", features = ["client-policy"] }
linkerd-http-box = { path = "../../http/box" }
linkerd-http-prom = { path = "../../http/prom", features = ["test-util"] }
linkerd-io = { path = "../../io", features = ["tokio-test"] }
linkerd-meshtls = { path = "../../meshtls", features = [
linkerd-meshtls = { path = "../../meshtls", features = ["rustls"] }
linkerd-meshtls-rustls = { path = "../../meshtls/rustls", features = [
"test-util",
] }
linkerd-mock-http-body = { path = "../../mock/http-body" }
linkerd-stack = { path = "../../stack", features = ["test-util"] }
linkerd-tracing = { path = "../../tracing", features = ["ansi"] }

View File

@ -134,13 +134,7 @@ impl<N> Outbound<N> {
.unwrap_or_else(|| (orig_dst, Default::default()));
// TODO(ver) We should be able to figure out resource coordinates for
// the endpoint?
synthesize_forward_policy(
&META,
detect_timeout,
queue,
addr,
meta.into(),
)
synthesize_forward_policy(&META, detect_timeout, queue, addr, meta)
},
);
return Ok((Some(profile), policy));
@ -195,7 +189,7 @@ pub fn synthesize_forward_policy(
timeout: Duration,
queue: policy::Queue,
addr: SocketAddr,
metadata: Arc<policy::EndpointMetadata>,
metadata: policy::EndpointMetadata,
) -> ClientPolicy {
policy_for_backend(
meta,

View File

@ -32,7 +32,7 @@ pub use self::balance::BalancerMetrics;
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum Dispatch {
Balance(NameAddr, EwmaConfig),
Forward(Remote<ServerAddr>, Arc<Metadata>),
Forward(Remote<ServerAddr>, Metadata),
/// A backend dispatcher that explicitly fails all requests.
Fail {
message: Arc<str>,
@ -49,7 +49,7 @@ pub struct DispatcherFailed(Arc<str>);
pub struct Endpoint<T> {
addr: Remote<ServerAddr>,
is_local: bool,
metadata: Arc<Metadata>,
metadata: Metadata,
parent: T,
queue: QueueConfig,
close_server_connection_on_remote_proxy_error: bool,
@ -279,13 +279,6 @@ impl<T> svc::Param<tls::ConditionalClientTls> for Endpoint<T> {
}
}
impl<T> svc::Param<tls::ConditionalClientTlsLabels> for Endpoint<T> {
fn param(&self) -> tls::ConditionalClientTlsLabels {
let tls: tls::ConditionalClientTls = self.param();
tls.as_ref().map(tls::ClientTls::labels)
}
}
impl<T> svc::Param<http::Variant> for Endpoint<T>
where
T: svc::Param<http::Variant>,

View File

@ -121,7 +121,7 @@ where
let http2 = http2.override_from(metadata.http2_client_params());
Endpoint {
addr: Remote(ServerAddr(addr)),
metadata: metadata.into(),
metadata,
is_local,
parent: target.parent,
queue: http_queue,

View File

@ -289,12 +289,6 @@ impl svc::Param<tls::ConditionalClientTls> for Endpoint {
}
}
impl svc::Param<tls::ConditionalClientTlsLabels> for Endpoint {
fn param(&self) -> tls::ConditionalClientTlsLabels {
tls::ConditionalClientTlsLabels::None(tls::NoClientTls::Disabled)
}
}
impl svc::Param<Option<tcp::tagged_transport::PortOverride>> for Endpoint {
fn param(&self) -> Option<tcp::tagged_transport::PortOverride> {
None

View File

@ -8,7 +8,7 @@ use linkerd_app_core::{
transport::addrs::*,
Addr, Error, Infallible, NameAddr, CANONICAL_DST_HEADER,
};
use std::{fmt::Debug, hash::Hash, sync::Arc};
use std::{fmt::Debug, hash::Hash};
use tokio::sync::watch;
pub mod policy;
@ -32,7 +32,7 @@ pub enum Routes {
/// Fallback endpoint forwarding.
// TODO(ver) Remove this variant when policy routes are fully wired up.
Endpoint(Remote<ServerAddr>, Arc<Metadata>),
Endpoint(Remote<ServerAddr>, Metadata),
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
@ -64,7 +64,7 @@ enum RouterParams<T: Clone + Debug + Eq + Hash> {
Profile(profile::Params<T>),
// TODO(ver) Remove this variant when policy routes are fully wired up.
Endpoint(Remote<ServerAddr>, Arc<Metadata>, T),
Endpoint(Remote<ServerAddr>, Metadata, T),
}
// Only applies to requests with profiles.

View File

@ -4,9 +4,6 @@ use super::{
test_util::*,
LabelGrpcRouteRsp, LabelHttpRouteRsp, RequestMetrics,
};
use bytes::{Buf, Bytes};
use http_body::Body;
use http_body_util::BodyExt;
use linkerd_app_core::{
dns,
svc::{
@ -17,10 +14,6 @@ use linkerd_app_core::{
};
use linkerd_http_prom::body_data::request::RequestBodyFamilies;
use linkerd_proxy_client_policy as policy;
use std::task::Poll;
static GRPC_STATUS: http::HeaderName = http::HeaderName::from_static("grpc-status");
static GRPC_STATUS_OK: http::HeaderValue = http::HeaderValue::from_static("0");
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn http_request_statuses() {
@ -527,160 +520,6 @@ async fn http_route_request_body_frames() {
tracing::info!("passed");
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn http_response_body_drop_on_eos() {
use linkerd_app_core::svc::{Service, ServiceExt};
const EXPORT_HOSTNAME_LABELS: bool = false;
let _trace = linkerd_tracing::test::trace_init();
let super::HttpRouteMetrics {
requests,
body_data,
..
} = super::HttpRouteMetrics::default();
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
let (mut svc, mut handle) = mock_http_route_metrics(
&requests,
&body_data,
&parent_ref,
&route_ref,
EXPORT_HOSTNAME_LABELS,
);
// Define a request and a response.
let req = http::Request::default();
let rsp = http::Response::builder()
.status(200)
.body(BoxBody::from_static("contents"))
.unwrap();
// Two counters for 200 responses that do/don't have an error.
let ok = requests.get_statuses(&labels::Rsp(
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
labels::HttpRsp {
status: Some(http::StatusCode::OK),
error: None,
},
));
let err = requests.get_statuses(&labels::Rsp(
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
labels::HttpRsp {
status: Some(http::StatusCode::OK),
error: Some(labels::Error::Unknown),
},
));
debug_assert_eq!(ok.get(), 0);
debug_assert_eq!(err.get(), 0);
// Send the request, and obtain the response.
let mut body = {
handle.allow(1);
svc.ready().await.expect("ready");
let mut call = svc.call(req);
let (_req, tx) = tokio::select! {
_ = (&mut call) => unreachable!(),
res = handle.next_request() => res.unwrap(),
};
assert_eq!(ok.get(), 0);
tx.send_response(rsp);
call.await.unwrap().into_body()
};
// The counters are not incremented yet.
assert_eq!(ok.get(), 0);
assert_eq!(err.get(), 0);
// Poll a frame out of the body.
let data = body
.frame()
.await
.expect("yields a result")
.expect("yields a frame")
.into_data()
.ok()
.expect("yields data");
assert_eq!(data.chunk(), "contents".as_bytes());
assert_eq!(data.remaining(), "contents".len());
// Show that the body reports itself as being complete.
debug_assert!(body.is_end_stream());
assert_eq!(ok.get(), 1);
assert_eq!(err.get(), 0);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn http_response_body_drop_early() {
use linkerd_app_core::svc::{Service, ServiceExt};
const EXPORT_HOSTNAME_LABELS: bool = false;
let _trace = linkerd_tracing::test::trace_init();
let super::HttpRouteMetrics {
requests,
body_data,
..
} = super::HttpRouteMetrics::default();
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
let (mut svc, mut handle) = mock_http_route_metrics(
&requests,
&body_data,
&parent_ref,
&route_ref,
EXPORT_HOSTNAME_LABELS,
);
// Define a request and a response.
let req = http::Request::default();
let rsp = http::Response::builder()
.status(200)
.body(BoxBody::from_static("contents"))
.unwrap();
// Two counters for 200 responses that do/don't have an error.
let ok = requests.get_statuses(&labels::Rsp(
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
labels::HttpRsp {
status: Some(http::StatusCode::OK),
error: None,
},
));
let err = requests.get_statuses(&labels::Rsp(
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
labels::HttpRsp {
status: Some(http::StatusCode::OK),
error: Some(labels::Error::Unknown),
},
));
debug_assert_eq!(ok.get(), 0);
debug_assert_eq!(err.get(), 0);
// Send the request, and obtain the response.
let body = {
handle.allow(1);
svc.ready().await.expect("ready");
let mut call = svc.call(req);
let (_req, tx) = tokio::select! {
_ = (&mut call) => unreachable!(),
res = handle.next_request() => res.unwrap(),
};
assert_eq!(ok.get(), 0);
tx.send_response(rsp);
call.await.unwrap().into_body()
};
// The counters are not incremented yet.
assert_eq!(ok.get(), 0);
assert_eq!(err.get(), 0);
// The body reports an error if it was not completed.
drop(body);
assert_eq!(ok.get(), 0);
assert_eq!(err.get(), 1);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn grpc_request_statuses_ok() {
const EXPORT_HOSTNAME_LABELS: bool = true;
@ -884,210 +723,6 @@ async fn grpc_request_statuses_error_body() {
.await;
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn grpc_response_body_drop_on_eos() {
use linkerd_app_core::svc::{Service, ServiceExt};
const EXPORT_HOSTNAME_LABELS: bool = false;
let _trace = linkerd_tracing::test::trace_init();
let super::GrpcRouteMetrics {
requests,
body_data,
..
} = super::GrpcRouteMetrics::default();
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
let (mut svc, mut handle) = mock_grpc_route_metrics(
&requests,
&body_data,
&parent_ref,
&route_ref,
EXPORT_HOSTNAME_LABELS,
);
// Define a request and a response.
let req = http::Request::default();
let rsp = http::Response::builder()
.status(200)
.body({
let data = Poll::Ready(Some(Ok(Bytes::from_static(b"contents"))));
let trailers = {
let mut trailers = http::HeaderMap::with_capacity(1);
trailers.insert(GRPC_STATUS.clone(), GRPC_STATUS_OK.clone());
Poll::Ready(Some(Ok(trailers)))
};
let body = linkerd_mock_http_body::MockBody::default()
.then_yield_data(data)
.then_yield_trailer(trailers);
BoxBody::new(body)
})
.unwrap();
// Two counters for 200 responses that do/don't have an error.
let ok = requests.get_statuses(&labels::Rsp(
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
labels::GrpcRsp {
status: Some(tonic::Code::Ok),
error: None,
},
));
let err = requests.get_statuses(&labels::Rsp(
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
labels::GrpcRsp {
status: Some(tonic::Code::Ok),
error: Some(labels::Error::Unknown),
},
));
debug_assert_eq!(ok.get(), 0);
debug_assert_eq!(err.get(), 0);
// Send the request, and obtain the response.
let mut body = {
handle.allow(1);
svc.ready().await.expect("ready");
let mut call = svc.call(req);
let (_req, tx) = tokio::select! {
_ = (&mut call) => unreachable!(),
res = handle.next_request() => res.unwrap(),
};
assert_eq!(ok.get(), 0);
tx.send_response(rsp);
call.await.unwrap().into_body()
};
// The counters are not incremented yet.
assert_eq!(ok.get(), 0);
assert_eq!(err.get(), 0);
// Poll a frame out of the body.
let data = body
.frame()
.await
.expect("yields a result")
.expect("yields a frame")
.into_data()
.ok()
.expect("yields data");
assert_eq!(data.chunk(), "contents".as_bytes());
assert_eq!(data.remaining(), "contents".len());
// Poll the trailers out of the body.
let trls = body
.frame()
.await
.expect("yields a result")
.expect("yields a frame")
.into_trailers()
.ok()
.expect("yields trailers");
assert_eq!(trls.get(&GRPC_STATUS).unwrap(), GRPC_STATUS_OK);
// Show that the body reports itself as being complete.
debug_assert!(body.is_end_stream());
assert_eq!(ok.get(), 1);
assert_eq!(err.get(), 0);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn grpc_response_body_drop_early() {
use linkerd_app_core::svc::{Service, ServiceExt};
const EXPORT_HOSTNAME_LABELS: bool = false;
let _trace = linkerd_tracing::test::trace_init();
let super::GrpcRouteMetrics {
requests,
body_data,
..
} = super::GrpcRouteMetrics::default();
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
let (mut svc, mut handle) = mock_grpc_route_metrics(
&requests,
&body_data,
&parent_ref,
&route_ref,
EXPORT_HOSTNAME_LABELS,
);
// Define a request and a response.
let req = http::Request::default();
let rsp = http::Response::builder()
.status(200)
.body({
let data = Poll::Ready(Some(Ok(Bytes::from_static(b"contents"))));
let trailers = {
let mut trailers = http::HeaderMap::with_capacity(1);
trailers.insert(GRPC_STATUS.clone(), GRPC_STATUS_OK.clone());
Poll::Ready(Some(Ok(trailers)))
};
let body = linkerd_mock_http_body::MockBody::default()
.then_yield_data(data)
.then_yield_trailer(trailers);
BoxBody::new(body)
})
.unwrap();
// Two counters for 200 responses that do/don't have an error.
let ok = requests.get_statuses(&labels::Rsp(
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
labels::GrpcRsp {
status: Some(tonic::Code::Ok),
error: None,
},
));
let err = requests.get_statuses(&labels::Rsp(
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
labels::GrpcRsp {
status: None,
error: Some(labels::Error::Unknown),
},
));
debug_assert_eq!(ok.get(), 0);
debug_assert_eq!(err.get(), 0);
// Send the request, and obtain the response.
let mut body = {
handle.allow(1);
svc.ready().await.expect("ready");
let mut call = svc.call(req);
let (_req, tx) = tokio::select! {
_ = (&mut call) => unreachable!(),
res = handle.next_request() => res.unwrap(),
};
assert_eq!(ok.get(), 0);
tx.send_response(rsp);
call.await.unwrap().into_body()
};
// The counters are not incremented yet.
assert_eq!(ok.get(), 0);
assert_eq!(err.get(), 0);
// Poll a frame out of the body.
let data = body
.frame()
.await
.expect("yields a result")
.expect("yields a frame")
.into_data()
.ok()
.expect("yields data");
assert_eq!(data.chunk(), "contents".as_bytes());
assert_eq!(data.remaining(), "contents".len());
// The counters are not incremented yet.
debug_assert!(!body.is_end_stream());
assert_eq!(ok.get(), 0);
assert_eq!(err.get(), 0);
// Then, drop the body without polling the trailers.
drop(body);
assert_eq!(ok.get(), 0);
assert_eq!(err.get(), 1);
}
// === Utils ===
const MOCK_GRPC_REQ_URI: &str = "http://host/svc/method";

View File

@ -54,7 +54,7 @@ impl retry::Policy for RetryPolicy {
if let Some(codes) = self.retryable_grpc_statuses.as_ref() {
let grpc_status = Self::grpc_status(rsp);
let retryable = grpc_status.is_some_and(|c| codes.contains(c));
let retryable = grpc_status.map_or(false, |c| codes.contains(c));
tracing::debug!(retryable, grpc.status = ?grpc_status);
if retryable {
return true;

View File

@ -214,7 +214,7 @@ impl Outbound<()> {
detect_timeout,
queue,
addr,
meta.into(),
meta,
);
}

View File

@ -146,7 +146,7 @@ impl Outbound<()> {
export_hostname_labels: bool,
) -> impl policy::GetPolicy
where
C: tonic::client::GrpcService<tonic::body::Body, Error = Error>,
C: tonic::client::GrpcService<tonic::body::BoxBody, Error = Error>,
C: Clone + Unpin + Send + Sync + 'static,
C::ResponseBody: proxy::http::Body<Data = tonic::codegen::Bytes, Error = Error>,
C::ResponseBody: Send + 'static,

View File

@ -130,7 +130,7 @@ impl OutboundMetrics {
}
}
impl legacy::FmtMetrics for OutboundMetrics {
impl FmtMetrics for OutboundMetrics {
fn fmt_metrics(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.http_errors.fmt_metrics(f)?;
self.tcp_errors.fmt_metrics(f)?;
@ -243,7 +243,7 @@ impl EncodeLabelSet for RouteRef {
// === impl ConcreteLabels ===
impl legacy::FmtLabels for ConcreteLabels {
impl FmtLabels for ConcreteLabels {
fn fmt_labels(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let ConcreteLabels(parent, backend) = self;

View File

@ -5,7 +5,7 @@ pub(crate) use self::{http::Http, tcp::Tcp};
use crate::http::IdentityRequired;
use linkerd_app_core::{
errors::{FailFastError, LoadShedError},
metrics::legacy::FmtLabels,
metrics::FmtLabels,
proxy::http::ResponseTimeoutError,
};
use std::fmt;

View File

@ -1,9 +1,6 @@
use super::ErrorKind;
use linkerd_app_core::{
metrics::{
legacy::{Counter, FmtMetrics},
metrics,
},
metrics::{metrics, Counter, FmtMetrics},
svc, Error,
};
use parking_lot::RwLock;

View File

@ -1,9 +1,6 @@
use super::ErrorKind;
use linkerd_app_core::{
metrics::{
legacy::{Counter, FmtMetrics},
metrics,
},
metrics::{metrics, Counter, FmtMetrics},
svc,
transport::{labels::TargetAddr, OrigDstAddr},
Error,

View File

@ -32,7 +32,7 @@ use tracing::info_span;
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum Dispatch {
Balance(NameAddr, balance::EwmaConfig),
Forward(Remote<ServerAddr>, Arc<Metadata>),
Forward(Remote<ServerAddr>, Metadata),
/// A backend dispatcher that explicitly fails all requests.
Fail {
message: Arc<str>,
@ -57,7 +57,7 @@ pub struct ConcreteError {
pub struct Endpoint<T> {
addr: Remote<ServerAddr>,
is_local: bool,
metadata: Arc<Metadata>,
metadata: Metadata,
parent: T,
}
@ -196,7 +196,7 @@ impl<C> Outbound<C> {
let is_local = inbound_ips.contains(&addr.ip());
Endpoint {
addr: Remote(ServerAddr(addr)),
metadata: metadata.into(),
metadata,
is_local,
parent: target.parent,
}
@ -419,10 +419,3 @@ impl<T> svc::Param<tls::ConditionalClientTls> for Endpoint<T> {
))
}
}
impl<T> svc::Param<tls::ConditionalClientTlsLabels> for Endpoint<T> {
fn param(&self) -> tls::ConditionalClientTlsLabels {
let tls: tls::ConditionalClientTls = self.param();
tls.as_ref().map(tls::ClientTls::labels)
}
}

View File

@ -160,7 +160,9 @@ async fn balances() {
}
assert!(
seen0 && seen1,
"Both endpoints must be used; ep0={seen0} ep1={seen1}"
"Both endpoints must be used; ep0={} ep1={}",
seen0,
seen1
);
// When we remove the ep0, all traffic goes to ep1:
@ -188,7 +190,8 @@ async fn balances() {
task.abort();
assert!(
errors::is_caused_by::<FailFastError>(&*err),
"unexpected error: {err}"
"unexpected error: {}",
err
);
assert!(resolved.only_configured(), "Resolution must be reused");
}

View File

@ -33,7 +33,7 @@ static INVALID_POLICY: once_cell::sync::OnceCell<ClientPolicy> = once_cell::sync
impl<S> Api<S>
where
S: tonic::client::GrpcService<tonic::body::Body, Error = Error> + Clone,
S: tonic::client::GrpcService<tonic::body::BoxBody, Error = Error> + Clone,
S::ResponseBody: http::Body<Data = tonic::codegen::Bytes, Error = Error> + Send + 'static,
{
pub(crate) fn new(
@ -59,7 +59,7 @@ where
impl<S> Service<Addr> for Api<S>
where
S: tonic::client::GrpcService<tonic::body::Body, Error = Error>,
S: tonic::client::GrpcService<tonic::body::BoxBody, Error = Error>,
S: Clone + Send + Sync + 'static,
S::ResponseBody: http::Body<Data = tonic::codegen::Bytes, Error = Error> + Send + 'static,
S::Future: Send + 'static,

View File

@ -8,8 +8,8 @@ use std::task::{Context, Poll};
#[derive(Clone, Debug)]
pub struct Connect {
addr: Remote<ServerAddr>,
tls: tls::ConditionalClientTls,
pub addr: Remote<ServerAddr>,
pub tls: tls::ConditionalClientTls,
}
/// Prevents outbound connections on the loopback interface, unless the
@ -77,12 +77,6 @@ where
// === impl Connect ===
impl Connect {
pub fn new(addr: Remote<ServerAddr>, tls: tls::ConditionalClientTls) -> Self {
Self { addr, tls }
}
}
impl svc::Param<Remote<ServerAddr>> for Connect {
fn param(&self) -> Remote<ServerAddr> {
self.addr
@ -94,14 +88,3 @@ impl svc::Param<tls::ConditionalClientTls> for Connect {
self.tls.clone()
}
}
#[cfg(test)]
impl Connect {
pub fn addr(&self) -> &Remote<ServerAddr> {
&self.addr
}
pub fn tls(&self) -> &tls::ConditionalClientTls {
&self.tls
}
}

View File

@ -67,7 +67,10 @@ where
let tls: tls::ConditionalClientTls = ep.param();
if let tls::ConditionalClientTls::None(reason) = tls {
trace!(%reason, "Not attempting opaque transport");
let target = Connect::new(ep.param(), tls);
let target = Connect {
addr: ep.param(),
tls,
};
return Box::pin(self.inner.connect(target).err_into::<Error>());
}
@ -106,10 +109,10 @@ where
let protocol: Option<SessionProtocol> = ep.param();
let connect = self.inner.connect(Connect::new(
Remote(ServerAddr((addr.ip(), connect_port).into())),
let connect = self.inner.connect(Connect {
addr: Remote(ServerAddr((addr.ip(), connect_port).into())),
tls,
));
});
Box::pin(async move {
let (mut io, meta) = connect.await.map_err(Into::into)?;
@ -200,9 +203,9 @@ mod test {
) -> impl Fn(Connect) -> futures::future::Ready<Result<(tokio_test::io::Mock, ConnectMeta), io::Error>>
{
move |ep| {
let Remote(ServerAddr(sa)) = ep.addr();
let Remote(ServerAddr(sa)) = ep.addr;
assert_eq!(sa.port(), 4143);
assert!(ep.tls().is_some());
assert!(ep.tls.is_some());
let buf = header.encode_prefaced_buf().expect("Must encode");
let io = tokio_test::io::Builder::new()
.write(&buf[..])
@ -222,9 +225,9 @@ mod test {
let svc = TaggedTransport {
inner: service_fn(|ep: Connect| {
let Remote(ServerAddr(sa)) = ep.addr();
let Remote(ServerAddr(sa)) = ep.addr;
assert_eq!(sa.port(), 4321);
assert!(ep.tls().is_none());
assert!(ep.tls.is_none());
let io = tokio_test::io::Builder::new().write(b"hello").build();
let meta = tls::ConnectMeta {
socket: Local(ClientAddr(([0, 0, 0, 0], 0).into())),

View File

@ -60,7 +60,7 @@ pub(crate) fn runtime() -> (ProxyRuntime, drain::Signal) {
let (tap, _) = tap::new();
let (metrics, _) = metrics::Metrics::new(std::time::Duration::from_secs(10));
let runtime = ProxyRuntime {
identity: linkerd_meshtls::creds::default_for_test().1,
identity: linkerd_meshtls_rustls::creds::default_for_test().1.into(),
metrics: metrics.proxy,
tap,
span_sink: None,

View File

@ -31,7 +31,7 @@ use tracing::info_span;
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum Dispatch {
Balance(NameAddr, balance::EwmaConfig),
Forward(Remote<ServerAddr>, Arc<Metadata>),
Forward(Remote<ServerAddr>, Metadata),
/// A backend dispatcher that explicitly fails all requests.
Fail {
message: Arc<str>,
@ -56,7 +56,7 @@ pub struct ConcreteError {
pub struct Endpoint<T> {
addr: Remote<ServerAddr>,
is_local: bool,
metadata: Arc<Metadata>,
metadata: Metadata,
parent: T,
}
@ -174,7 +174,7 @@ impl<C> Outbound<C> {
let is_local = inbound_ips.contains(&addr.ip());
Endpoint {
addr: Remote(ServerAddr(addr)),
metadata: metadata.into(),
metadata,
is_local,
parent: target.parent,
}
@ -385,10 +385,3 @@ impl<T> svc::Param<tls::ConditionalClientTls> for Endpoint<T> {
))
}
}
impl<T> svc::Param<tls::ConditionalClientTlsLabels> for Endpoint<T> {
fn param(&self) -> tls::ConditionalClientTlsLabels {
let tls: tls::ConditionalClientTls = self.param();
tls.as_ref().map(tls::ClientTls::labels)
}
}

View File

@ -11,18 +11,13 @@ use linkerd_proxy_client_policy::{self as client_policy, tls::sni};
use parking_lot::Mutex;
use std::{
collections::HashMap,
marker::PhantomData,
net::SocketAddr,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use tokio::sync::watch;
use tokio_rustls::rustls::{
internal::msgs::codec::{Codec, Reader},
pki_types::DnsName,
InvalidMessage,
};
use tokio_rustls::rustls::pki_types::DnsName;
mod basic;
@ -147,7 +142,7 @@ fn default_backend(addr: SocketAddr) -> client_policy::Backend {
capacity: 100,
failfast_timeout: Duration::from_secs(10),
},
dispatcher: BackendDispatcher::Forward(addr, EndpointMetadata::default().into()),
dispatcher: BackendDispatcher::Forward(addr, EndpointMetadata::default()),
}
}
@ -175,57 +170,44 @@ fn sni_route(backend: client_policy::Backend, sni: sni::MatchSni) -> client_poli
// generates a sample ClientHello TLS message for testing
fn generate_client_hello(sni: &str) -> Vec<u8> {
use tokio_rustls::rustls::{
internal::msgs::{base::Payload, codec::Codec, message::PlainMessage},
ContentType, ProtocolVersion,
internal::msgs::{
base::Payload,
codec::{Codec, Reader},
enums::Compression,
handshake::{
ClientExtension, ClientHelloPayload, HandshakeMessagePayload, HandshakePayload,
Random, ServerName, SessionId,
},
message::{MessagePayload, PlainMessage},
},
CipherSuite, ContentType, HandshakeType, ProtocolVersion,
};
let sni = DnsName::try_from(sni.to_string()).unwrap();
let sni = trim_hostname_trailing_dot_for_sni(&sni);
// rustls has internal-only types that can encode a ClientHello, but they are mostly
// inaccessible and an unstable part of the public API anyway. Manually encode one here for
// testing only instead.
let mut server_name_bytes = vec![];
0u8.encode(&mut server_name_bytes); // encode the type first
(sni.as_ref().len() as u16).encode(&mut server_name_bytes); // then the length as u16
server_name_bytes.extend_from_slice(sni.as_ref().as_bytes()); // then the server name itself
let mut hs_payload_bytes = vec![];
1u8.encode(&mut hs_payload_bytes); // client hello ID
let server_name =
ServerName::read(&mut Reader::init(&server_name_bytes)).expect("Server name is valid");
let client_hello_body = {
let mut payload = LengthPayload::<U24>::empty();
payload.buf.extend_from_slice(&[0x03, 0x03]); // client version, TLSv1.2
payload.buf.extend_from_slice(&[0u8; 32]); // random
0u8.encode(&mut payload.buf); // session ID
LengthPayload::<u16>::from_slice(&[0x00, 0x00] /* TLS_NULL_WITH_NULL_NULL */)
.encode(&mut payload.buf);
LengthPayload::<u8>::from_slice(&[0x00] /* no compression */).encode(&mut payload.buf);
let extensions = {
let mut payload = LengthPayload::<u16>::empty();
0u16.encode(&mut payload.buf); // server name extension ID
let server_name_extension = {
let mut payload = LengthPayload::<u16>::empty();
let server_name = {
let mut payload = LengthPayload::<u16>::empty();
0u8.encode(&mut payload.buf); // DNS hostname ID
LengthPayload::<u16>::from_slice(sni.as_ref().as_bytes())
.encode(&mut payload.buf);
payload
let hs_payload = HandshakeMessagePayload {
typ: HandshakeType::ClientHello,
payload: HandshakePayload::ClientHello(ClientHelloPayload {
client_version: ProtocolVersion::TLSv1_2,
random: Random::from([0; 32]),
session_id: SessionId::read(&mut Reader::init(&[0])).unwrap(),
cipher_suites: vec![CipherSuite::TLS_NULL_WITH_NULL_NULL],
compression_methods: vec![Compression::Null],
extensions: vec![ClientExtension::ServerName(vec![server_name])],
}),
};
server_name.encode(&mut payload.buf);
payload
};
server_name_extension.encode(&mut payload.buf);
payload
};
extensions.encode(&mut payload.buf);
payload
};
client_hello_body.encode(&mut hs_payload_bytes);
let mut hs_payload_bytes = Vec::default();
MessagePayload::handshake(hs_payload).encode(&mut hs_payload_bytes);
let message = PlainMessage {
typ: ContentType::Handshake,
@ -236,65 +218,6 @@ fn generate_client_hello(sni: &str) -> Vec<u8> {
message.into_unencrypted_opaque().encode()
}
#[derive(Debug)]
struct LengthPayload<T> {
buf: Vec<u8>,
_boo: PhantomData<fn() -> T>,
}
impl<T> LengthPayload<T> {
fn empty() -> Self {
Self {
buf: vec![],
_boo: PhantomData,
}
}
fn from_slice(s: &[u8]) -> Self {
Self {
buf: s.to_vec(),
_boo: PhantomData,
}
}
}
impl Codec<'_> for LengthPayload<u8> {
fn encode(&self, bytes: &mut Vec<u8>) {
(self.buf.len() as u8).encode(bytes);
bytes.extend_from_slice(&self.buf);
}
fn read(_: &mut Reader<'_>) -> std::result::Result<Self, InvalidMessage> {
unimplemented!()
}
}
impl Codec<'_> for LengthPayload<u16> {
fn encode(&self, bytes: &mut Vec<u8>) {
(self.buf.len() as u16).encode(bytes);
bytes.extend_from_slice(&self.buf);
}
fn read(_: &mut Reader<'_>) -> std::result::Result<Self, InvalidMessage> {
unimplemented!()
}
}
#[derive(Debug)]
struct U24;
impl Codec<'_> for LengthPayload<U24> {
fn encode(&self, bytes: &mut Vec<u8>) {
let len = self.buf.len() as u32;
bytes.extend_from_slice(&len.to_be_bytes()[1..]);
bytes.extend_from_slice(&self.buf);
}
fn read(_: &mut Reader<'_>) -> std::result::Result<Self, InvalidMessage> {
unimplemented!()
}
}
fn trim_hostname_trailing_dot_for_sni(dns_name: &DnsName<'_>) -> DnsName<'static> {
let dns_name_str = dns_name.as_ref();

View File

@ -43,7 +43,7 @@ impl Config {
) -> Result<
Dst<
impl svc::Service<
http::Request<tonic::body::Body>,
http::Request<tonic::body::BoxBody>,
Response = http::Response<control::RspBody>,
Error = Error,
Future = impl Send,

View File

@ -83,8 +83,8 @@ pub enum ParseError {
),
#[error("not a valid port range")]
NotAPortRange,
#[error("{0}")]
AddrError(#[source] addr::Error),
#[error(transparent)]
AddrError(addr::Error),
#[error("only two addresses are supported")]
TooManyAddrs,
#[error("not a valid identity name")]
@ -233,12 +233,8 @@ pub const ENV_IDENTITY_IDENTITY_SERVER_ID: &str = "LINKERD2_PROXY_IDENTITY_SERVE
pub const ENV_IDENTITY_IDENTITY_SERVER_NAME: &str = "LINKERD2_PROXY_IDENTITY_SERVER_NAME";
// If this config is set, then the proxy will be configured to use Spire as identity
// provider. On Unix systems this needs to be a path to a UDS while on Windows - a
// named pipe path.
pub const ENV_IDENTITY_SPIRE_WORKLOAD_API_ADDRESS: &str =
"LINKERD2_PROXY_IDENTITY_SPIRE_WORKLOAD_API_ADDRESS";
// provider
pub const ENV_IDENTITY_SPIRE_SOCKET: &str = "LINKERD2_PROXY_IDENTITY_SPIRE_SOCKET";
pub const IDENTITY_SPIRE_BASE: &str = "LINKERD2_PROXY_IDENTITY_SPIRE";
const DEFAULT_SPIRE_BACKOFF: ExponentialBackoff =
ExponentialBackoff::new_unchecked(Duration::from_millis(100), Duration::from_secs(1), 0.1);
@ -294,7 +290,7 @@ const DEFAULT_INBOUND_HTTP_FAILFAST_TIMEOUT: Duration = Duration::from_secs(1);
const DEFAULT_INBOUND_DETECT_TIMEOUT: Duration = Duration::from_secs(10);
const DEFAULT_INBOUND_CONNECT_TIMEOUT: Duration = Duration::from_millis(300);
const DEFAULT_INBOUND_CONNECT_BACKOFF: ExponentialBackoff =
ExponentialBackoff::new_unchecked(Duration::from_millis(100), Duration::from_secs(10), 0.1);
ExponentialBackoff::new_unchecked(Duration::from_millis(100), Duration::from_millis(500), 0.1);
const DEFAULT_OUTBOUND_TCP_QUEUE_CAPACITY: usize = 10_000;
const DEFAULT_OUTBOUND_TCP_FAILFAST_TIMEOUT: Duration = Duration::from_secs(3);
@ -303,7 +299,7 @@ const DEFAULT_OUTBOUND_HTTP_FAILFAST_TIMEOUT: Duration = Duration::from_secs(3);
const DEFAULT_OUTBOUND_DETECT_TIMEOUT: Duration = Duration::from_secs(10);
const DEFAULT_OUTBOUND_CONNECT_TIMEOUT: Duration = Duration::from_secs(1);
const DEFAULT_OUTBOUND_CONNECT_BACKOFF: ExponentialBackoff =
ExponentialBackoff::new_unchecked(Duration::from_millis(100), Duration::from_secs(60), 0.1);
ExponentialBackoff::new_unchecked(Duration::from_millis(100), Duration::from_millis(500), 0.1);
const DEFAULT_CONTROL_QUEUE_CAPACITY: usize = 100;
const DEFAULT_CONTROL_FAILFAST_TIMEOUT: Duration = Duration::from_secs(10);
@ -343,12 +339,12 @@ const DEFAULT_INBOUND_HTTP1_CONNECTION_POOL_IDLE_TIMEOUT: Duration = Duration::f
// TODO(ver) This should be configurable at the load balancer level.
const DEFAULT_OUTBOUND_HTTP1_CONNECTION_POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(3);
// By default, we limit the number of connections that may be opened per-host.
// We pick a high number (10k) that shouldn't interfere with most workloads, but
// will prevent issues with our outbound HTTP client from exhausting the file
// descriptors available to the process.
const DEFAULT_INBOUND_MAX_IDLE_CONNS_PER_ENDPOINT: usize = 10_000;
const DEFAULT_OUTBOUND_MAX_IDLE_CONNS_PER_ENDPOINT: usize = 10_000;
// By default, we don't limit the number of connections a connection pol may
// use, as doing so can severely impact CPU utilization for applications with
// many concurrent requests. It's generally preferable to use the MAX_IDLE_AGE
// limitations to quickly drop idle connections.
const DEFAULT_INBOUND_MAX_IDLE_CONNS_PER_ENDPOINT: usize = usize::MAX;
const DEFAULT_OUTBOUND_MAX_IDLE_CONNS_PER_ENDPOINT: usize = usize::MAX;
// These settings limit the number of requests that have not received responses,
// including those buffered in the proxy and dispatched to the destination
@ -913,13 +909,8 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
let identity = {
let tls = tls?;
match parse_deprecated(
strings,
ENV_IDENTITY_SPIRE_WORKLOAD_API_ADDRESS,
ENV_IDENTITY_SPIRE_SOCKET,
|s| Ok(s.to_string()),
)? {
Some(workload_api_addr) => match &tls.id {
match strings.get(ENV_IDENTITY_SPIRE_SOCKET)? {
Some(socket) => match &tls.id {
// TODO: perform stricter SPIFFE ID validation following:
// https://github.com/spiffe/spiffe/blob/27b59b81ba8c56885ac5d4be73b35b9b3305fd7a/standards/SPIFFE-ID.md
identity::Id::Uri(uri)
@ -928,7 +919,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
identity::Config::Spire {
tls,
client: spire::Config {
workload_api_addr: std::sync::Arc::new(workload_api_addr),
socket_addr: std::sync::Arc::new(socket),
backoff: parse_backoff(
strings,
IDENTITY_SPIRE_BASE,
@ -1117,11 +1108,11 @@ pub fn parse_backoff<S: Strings>(
base: &str,
default: ExponentialBackoff,
) -> Result<ExponentialBackoff, EnvError> {
let min_env = format!("LINKERD2_PROXY_{base}_EXP_BACKOFF_MIN");
let min_env = format!("LINKERD2_PROXY_{}_EXP_BACKOFF_MIN", base);
let min = parse(strings, &min_env, parse_duration);
let max_env = format!("LINKERD2_PROXY_{base}_EXP_BACKOFF_MAX");
let max_env = format!("LINKERD2_PROXY_{}_EXP_BACKOFF_MAX", base);
let max = parse(strings, &max_env, parse_duration);
let jitter_env = format!("LINKERD2_PROXY_{base}_EXP_BACKOFF_JITTER");
let jitter_env = format!("LINKERD2_PROXY_{}_EXP_BACKOFF_JITTER", base);
let jitter = parse(strings, &jitter_env, parse_number::<f64>);
match (min?, max?, jitter?) {
@ -1256,7 +1247,7 @@ pub fn parse_linkerd_identity_config<S: Strings>(
Ok((control, certify))
}
(addr, end_entity_dir, token, _minr, _maxr) => {
let s = format!("{ENV_IDENTITY_SVC_BASE}_ADDR and {ENV_IDENTITY_SVC_BASE}_NAME");
let s = format!("{0}_ADDR and {0}_NAME", ENV_IDENTITY_SVC_BASE);
let svc_env: &str = s.as_str();
for (unset, name) in &[
(addr.is_none(), svc_env),

View File

@ -172,11 +172,11 @@ mod tests {
fn test_unit<F: Fn(u64) -> Duration>(unit: &str, to_duration: F) {
for v in &[0, 1, 23, 456_789] {
let d = to_duration(*v);
let text = format!("{v}{unit}");
assert_eq!(parse_duration(&text), Ok(d), "text=\"{text}\"");
let text = format!("{}{}", v, unit);
assert_eq!(parse_duration(&text), Ok(d), "text=\"{}\"", text);
let text = format!(" {v}{unit}\t");
assert_eq!(parse_duration(&text), Ok(d), "text=\"{text}\"");
let text = format!(" {}{}\t", v, unit);
assert_eq!(parse_duration(&text), Ok(d), "text=\"{}\"", text);
}
}
@ -245,7 +245,7 @@ mod tests {
fn p(s: &str) -> Result<Vec<String>, ParseError> {
let mut sfxs = parse_dns_suffixes(s)?
.into_iter()
.map(|s| format!("{s}"))
.map(|s| format!("{}", s))
.collect::<Vec<_>>();
sfxs.sort();
Ok(sfxs)

View File

@ -4,8 +4,7 @@ pub use linkerd_app_core::identity::{client, Id};
use linkerd_app_core::{
control, dns,
identity::{
client::linkerd::Certify, creds, watch as watch_identity, CertMetrics, Credentials,
DerX509, WithCertMetrics,
client::linkerd::Certify, creds, CertMetrics, Credentials, DerX509, Mode, WithCertMetrics,
},
metrics::{prom, ControlHttp as ClientMetrics},
Result,
@ -110,7 +109,7 @@ impl Config {
}
}
Self::Spire { client, tls } => {
let addr = client.workload_api_addr.clone();
let addr = client.socket_addr.clone();
let spire = spire::client::Spire::new(tls.id.clone());
let (store, receiver, ready) = watch(tls, metrics.cert)?;
@ -138,7 +137,8 @@ fn watch(
watch::Receiver<bool>,
)> {
let (tx, ready) = watch::channel(false);
let (store, receiver) = watch_identity(tls.id, tls.server_name, &tls.trust_anchors_pem)?;
let (store, receiver) =
Mode::default().watch(tls.id, tls.server_name, &tls.trust_anchors_pem)?;
let cred = WithCertMetrics::new(metrics, NotifyReady { store, tx });
Ok((cred, receiver, ready))
}

View File

@ -19,10 +19,9 @@ use linkerd_app_core::{
config::ServerConfig,
control::{ControlAddr, Metrics as ControlMetrics},
dns, drain,
metrics::{legacy::FmtMetrics, prom},
metrics::{prom, FmtMetrics},
serve,
svc::Param,
tls_info,
transport::{addrs::*, listen::Bind},
Error, ProxyRuntime,
};
@ -84,13 +83,13 @@ pub struct App {
tap: tap::Tap,
}
// === impl Config ===
impl Config {
pub fn try_from_env() -> Result<Self, env::EnvError> {
env::Env.try_config()
}
}
impl Config {
/// Build an application.
///
/// It is currently required that this be run on a Tokio runtime, since some
@ -252,6 +251,9 @@ impl Config {
export_hostname_labels,
);
let dst_addr = dst.addr.clone();
// registry.sub_registry_with_prefix("gateway"),
let gateway = gateway::Gateway::new(gateway, inbound.clone(), outbound.clone()).stack(
dst.resolve.clone(),
dst.profiles.clone(),
@ -302,7 +304,6 @@ impl Config {
error!(%error, "Failed to register process metrics");
}
registry.register("proxy_build_info", "Proxy build info", BUILD_INFO.metric());
registry.register("rustls_info", "Proxy TLS info", tls_info::metric());
let admin = {
let identity = identity.receiver().server();
@ -329,7 +330,7 @@ impl Config {
Ok(App {
admin,
dst: dst.addr,
dst: dst_addr,
drain: drain_tx,
identity,
inbound_addr,
@ -357,8 +358,6 @@ impl Config {
}
}
// === impl App ===
impl App {
pub fn admin_addr(&self) -> Local<ServerAddr> {
self.admin.listen_addr
@ -397,7 +396,7 @@ impl App {
pub fn tracing_addr(&self) -> Option<&ControlAddr> {
match self.trace_collector {
trace_collector::TraceCollector::Disabled => None,
trace_collector::TraceCollector::Disabled { .. } => None,
crate::trace_collector::TraceCollector::Enabled(ref oc) => Some(&oc.addr),
}
}

View File

@ -46,7 +46,7 @@ impl Config {
) -> Result<
Policy<
impl svc::Service<
http::Request<tonic::body::Body>,
http::Request<tonic::body::BoxBody>,
Response = http::Response<control::RspBody>,
Error = Error,
Future = impl Send,

View File

@ -4,26 +4,40 @@ use tokio::sync::watch;
pub use linkerd_app_core::identity::client::spire as client;
#[cfg(target_os = "linux")]
const UNIX_PREFIX: &str = "unix:";
#[cfg(target_os = "linux")]
const TONIC_DEFAULT_URI: &str = "http://[::]:50051";
#[derive(Clone, Debug)]
pub struct Config {
pub workload_api_addr: Arc<String>,
pub socket_addr: Arc<String>,
pub backoff: ExponentialBackoff,
}
// Connects to SPIRE workload API via Unix Domain Socket
pub struct Client {
#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
config: Config,
}
// === impl Client ===
#[cfg(target_os = "linux")]
impl From<Config> for Client {
fn from(config: Config) -> Self {
Self { config }
}
}
#[cfg(not(target_os = "linux"))]
impl From<Config> for Client {
fn from(_: Config) -> Self {
panic!("Spire is supported on Linux only")
}
}
#[cfg(target_os = "linux")]
impl tower::Service<()> for Client {
type Response = tonic::Response<watch::Receiver<client::SvidUpdate>>;
type Error = Error;
@ -37,43 +51,25 @@ impl tower::Service<()> for Client {
}
fn call(&mut self, _req: ()) -> Self::Future {
let addr = self.config.workload_api_addr.clone();
let socket = self.config.socket_addr.clone();
let backoff = self.config.backoff;
Box::pin(async move {
use tokio::net::UnixStream;
use tonic::transport::{Endpoint, Uri};
// Strip the 'unix:' prefix for tonic compatibility.
let stripped_path = socket
.strip_prefix(UNIX_PREFIX)
.unwrap_or(socket.as_str())
.to_string();
// We will ignore this uri because uds do not use it
// if your connector does use the uri it will be provided
// as the request to the `MakeConnection`.
let chan = Endpoint::try_from(TONIC_DEFAULT_URI)?
.connect_with_connector(tower::util::service_fn(move |_: Uri| {
#[cfg(unix)]
{
use futures::TryFutureExt;
// The 'unix:' scheme must be stripped from socket paths.
let path = addr.strip_prefix("unix:").unwrap_or(addr.as_str());
tokio::net::UnixStream::connect(path.to_string())
.map_ok(hyper_util::rt::TokioIo::new)
}
#[cfg(windows)]
{
use tokio::net::windows::named_pipe;
let named_pipe_path = addr.clone();
let client = named_pipe::ClientOptions::new()
.open(named_pipe_path.as_str())
.map(hyper_util::rt::TokioIo::new);
futures::future::ready(client)
}
#[cfg(not(any(unix, windows)))]
{
compile_error!("Spire is supported only on Windows and Unix systems.");
futures::future::pending()
}
UnixStream::connect(stripped_path.clone()).map_ok(hyper_util::rt::TokioIo::new)
}))
.await?;
@ -84,3 +80,21 @@ impl tower::Service<()> for Client {
})
}
}
#[cfg(not(target_os = "linux"))]
impl tower::Service<()> for Client {
type Response = tonic::Response<watch::Receiver<client::SvidUpdate>>;
type Error = Error;
type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(
&mut self,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
unimplemented!("Spire is supported on Linux only")
}
fn call(&mut self, _req: ()) -> Self::Future {
unimplemented!("Spire is supported on Linux only")
}
}

Some files were not shown because too many files have changed in this diff Show More