Fix snappy/framed-snappy encoding/decoding (#12911)

#### Description
This is an alternative PR to #12825.

I'm taking all the commits from that PR and adding the feature gate on
the client side, as requested by reviews. The server behavior of peeking
into the initial bytes to identify the encoding is kept :)

If you've reviewed the previous PR, you can just review the latest
commit!

<!-- Issue number if applicable -->
#### Link to tracking issue
Fixes #10584

---------

Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com>
Co-authored-by: Michael Graff <mgraff@cardinalhq.io>
Co-authored-by: Alex Boten <223565+codeboten@users.noreply.github.com>
Co-authored-by: Pablo Baeyens <pbaeyens31+github@gmail.com>
Co-authored-by: Jonathan <perebaj@gmail.com>
This commit is contained in:
Arthur Silva Sens 2025-05-05 13:18:42 -03:00 committed by GitHub
parent 829157cef7
commit c85ebbec77
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 268 additions and 36 deletions

View File

@ -0,0 +1,36 @@
# Use this changelog template to create an entry for release notes.
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: confighttp and configcompression
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Fix handling of `snappy` content-encoding in a backwards-compatible way"
# One or more tracking issues or pull requests related to the change
issues: [10584, 12825]
# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The collector used the Snappy compression type of "framed" to handle the HTTP
content-encoding "snappy". However, this encoding is typically used to indicate
the "block" compression variant of "snappy". This change allows the collector to:
- When receiving a request with encoding 'snappy', the server endpoints will peek
at the first bytes of the payload to determine if it is "framed" or "block" snappy,
and will decompress accordingly. This is a backwards-compatible change.
If the feature-gate "confighttp.framedSnappy" is enabled, you'll see new behavior for both client and server:
- Client compression type "snappy" will now compress to the "block" variant of snappy
instead of "framed". Client compression type "x-snappy-framed" will now compress to the "framed" variant of snappy.
- Servers will accept both "snappy" and "x-snappy-framed" as valid content-encodings.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]

View File

@ -24,6 +24,7 @@ const (
TypeZlib Type = "zlib"
TypeDeflate Type = "deflate"
TypeSnappy Type = "snappy"
TypeSnappyFramed Type = "x-snappy-framed"
TypeZstd Type = "zstd"
TypeLz4 Type = "lz4"
typeNone Type = "none"
@ -43,6 +44,7 @@ func (ct *Type) UnmarshalText(in []byte) error {
typ == TypeZlib ||
typ == TypeDeflate ||
typ == TypeSnappy ||
typ == TypeSnappyFramed ||
typ == TypeZstd ||
typ == TypeLz4 ||
typ == typeNone ||

View File

@ -42,6 +42,12 @@ func TestUnmarshalText(t *testing.T) {
shouldError: false,
isCompressed: true,
},
{
name: "ValidSnappyFramed",
compressionName: []byte("x-snappy-framed"),
shouldError: false,
isCompressed: true,
},
{
name: "ValidZstd",
compressionName: []byte("zstd"),
@ -128,6 +134,17 @@ func TestValidateParams(t *testing.T) {
compressionLevel: 1,
shouldError: true,
},
{
name: "ValidSnappyFramed",
compressionName: []byte("x-snappy-framed"),
shouldError: false,
},
{
name: "InvalidSnappyFramed",
compressionName: []byte("x-snappy-framed"),
compressionLevel: 1,
shouldError: true,
},
{
name: "ValidZstd",
compressionName: []byte("zstd"),

View File

@ -48,6 +48,8 @@ README](../configtls/README.md).
- SpeedBestCompression: `11`
- `snappy`
No compression levels supported yet
- `x-snappy-framed` (When feature gate `confighttp.framedSnappy` is enabled)
No compression levels supported yet
- [`max_idle_conns`](https://golang.org/pkg/net/http/#Transport)
- [`max_idle_conns_per_host`](https://golang.org/pkg/net/http/#Transport)
- [`max_conns_per_host`](https://golang.org/pkg/net/http/#Transport)
@ -105,6 +107,7 @@ will not be enabled.
- `endpoint`: Valid value syntax available [here](https://github.com/grpc/grpc/blob/master/doc/naming.md)
- `max_request_body_size`: configures the maximum allowed body size in bytes for a single request. Default: `20971520` (20MiB)
- `compression_algorithms`: configures the list of compression algorithms the server can accept. Default: ["", "gzip", "zstd", "zlib", "snappy", "deflate", "lz4"]
- `x-snappy-framed` can be used if feature gate `confighttp.snappyFramed` is enabled.
- [`tls`](../configtls/README.md)
- [`auth`](../configauth/README.md)
- `request_params`: a list of query parameter names to add to the auth context, along with the HTTP headers

View File

@ -6,9 +6,11 @@
package confighttp // import "go.opentelemetry.io/collector/config/confighttp"
import (
"bufio"
"bytes"
"compress/gzip"
"compress/zlib"
"errors"
"fmt"
"io"
"net/http"
@ -18,6 +20,15 @@ import (
"github.com/pierrec/lz4/v4"
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/featuregate"
)
var enableFramedSnappy = featuregate.GlobalRegistry().MustRegister(
"confighttp.framedSnappy",
featuregate.StageAlpha,
featuregate.WithRegisterFromVersion("v0.125.0"),
featuregate.WithRegisterDescription("Content encoding 'snappy' will compress/decompress block snappy format while 'x-snappy-framed' will compress/decompress framed snappy format."),
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector/issues/10584"),
)
type compressRoundTripper struct {
@ -60,14 +71,7 @@ var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, erro
}
return zr, nil
},
//nolint:unparam // Ignoring the linter request to remove error return since it needs to match the method signature
"snappy": func(body io.ReadCloser) (io.ReadCloser, error) {
// Lazy Reading content to improve memory efficiency
return &compressReadCloser{
Reader: snappy.NewReader(body),
orig: body,
}, nil
},
"snappy": snappyHandler,
//nolint:unparam // Ignoring the linter request to remove error return since it needs to match the method signature
"lz4": func(body io.ReadCloser) (io.ReadCloser, error) {
return &compressReadCloser{
@ -75,6 +79,55 @@ var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, erro
orig: body,
}, nil
},
//nolint:unparam // Ignoring the linter request to remove error return since it needs to match the method signature
"x-snappy-framed": func(body io.ReadCloser) (io.ReadCloser, error) {
return &compressReadCloser{
Reader: snappy.NewReader(body),
orig: body,
}, nil
},
}
// snappyFramingHeader is always the first 10 bytes of a snappy framed stream.
var snappyFramingHeader = []byte{
0xff, 0x06, 0x00, 0x00,
0x73, 0x4e, 0x61, 0x50, 0x70, 0x59, // "sNaPpY"
}
// snappyHandler returns an io.ReadCloser that auto-detects the snappy format.
// This is necessary because the collector previously used "content-encoding: snappy"
// but decompressed and compressed the payloads using the snappy framing format.
// However, "content-encoding: snappy" is uses the block format, and "x-snappy-framed"
// is the framing format. This handler is a (hopefully temporary) hack to
// make this work in a backwards-compatible way.
//
// See https://github.com/google/snappy/blob/6af9287fbdb913f0794d0148c6aa43b58e63c8e3/framing_format.txt#L27-L36
// for more details on the framing format.
func snappyHandler(body io.ReadCloser) (io.ReadCloser, error) {
br := bufio.NewReader(body)
peekBytes, err := br.Peek(len(snappyFramingHeader))
if err != nil && !errors.Is(err, io.EOF) {
return nil, err
}
isFramed := len(peekBytes) >= len(snappyFramingHeader) && bytes.Equal(peekBytes[:len(snappyFramingHeader)], snappyFramingHeader)
if isFramed {
return &compressReadCloser{
Reader: snappy.NewReader(br),
orig: body,
}, nil
}
compressed, err := io.ReadAll(br)
if err != nil {
return nil, err
}
decoded, err := snappy.Decode(nil, compressed)
if err != nil {
return nil, err
}
return io.NopCloser(bytes.NewReader(decoded)), nil
}
func newCompressionParams(level configcompression.Level) configcompression.CompressionParams {
@ -143,6 +196,9 @@ func httpContentDecompressor(h http.Handler, maxRequestBodySize int64, eh func(w
enabled := map[string]func(body io.ReadCloser) (io.ReadCloser, error){}
for _, dec := range enableDecoders {
if dec == "x-frame-snappy" && !enableFramedSnappy.IsEnabled() {
continue
}
enabled[dec] = availableDecoders[dec]
if dec == "deflate" {

View File

@ -25,6 +25,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/featuregate"
)
func TestHTTPClientCompression(t *testing.T) {
@ -32,6 +33,7 @@ func TestHTTPClientCompression(t *testing.T) {
compressedGzipBody := compressGzip(t, testBody)
compressedZlibBody := compressZlib(t, testBody)
compressedDeflateBody := compressZlib(t, testBody)
compressedSnappyFramedBody := compressSnappyFramed(t, testBody)
compressedSnappyBody := compressSnappy(t, testBody)
compressedZstdBody := compressZstd(t, testBody)
compressedLz4Body := compressLz4(t, testBody)
@ -39,11 +41,12 @@ func TestHTTPClientCompression(t *testing.T) {
const invalidGzipLevel configcompression.Level = 100
tests := []struct {
name string
encoding configcompression.Type
level configcompression.Level
reqBody []byte
shouldError bool
name string
encoding configcompression.Type
level configcompression.Level
framedSnappyEnabled bool
reqBody []byte
shouldError bool
}{
{
name: "ValidEmpty",
@ -99,10 +102,11 @@ func TestHTTPClientCompression(t *testing.T) {
shouldError: false,
},
{
name: "ValidSnappy",
encoding: configcompression.TypeSnappy,
reqBody: compressedSnappyBody.Bytes(),
shouldError: false,
name: "ValidSnappy",
encoding: configcompression.TypeSnappy,
framedSnappyEnabled: true,
reqBody: compressedSnappyBody.Bytes(),
shouldError: false,
},
{
name: "InvalidSnappy",
@ -111,6 +115,20 @@ func TestHTTPClientCompression(t *testing.T) {
reqBody: compressedSnappyBody.Bytes(),
shouldError: true,
},
{
name: "ValidSnappyFramed",
encoding: configcompression.TypeSnappyFramed,
framedSnappyEnabled: true,
reqBody: compressedSnappyFramedBody.Bytes(),
shouldError: false,
},
{
name: "InvalidSnappyFramed",
encoding: configcompression.TypeSnappyFramed,
level: gzip.DefaultCompression,
reqBody: compressedSnappyFramedBody.Bytes(),
shouldError: true,
},
{
name: "ValidZstd",
encoding: configcompression.TypeZstd,
@ -134,6 +152,8 @@ func TestHTTPClientCompression(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_ = featuregate.GlobalRegistry().Set(enableFramedSnappy.ID(), tt.framedSnappyEnabled)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
assert.NoError(t, err, "failed to read request body: %v", err)
@ -193,7 +213,7 @@ func TestHTTPCustomDecompression(t *testing.T) {
return io.NopCloser(strings.NewReader("decompressed body")), nil
},
}
srv := httptest.NewServer(httpContentDecompressor(handler, defaultMaxRequestBodySize, defaultErrorHandler, defaultCompressionAlgorithms, decoders))
srv := httptest.NewServer(httpContentDecompressor(handler, defaultMaxRequestBodySize, defaultErrorHandler, defaultCompressionAlgorithms(), decoders))
t.Cleanup(srv.Close)
@ -214,11 +234,12 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
testBody := []byte("uncompressed_text")
noDecoders := map[string]func(io.ReadCloser) (io.ReadCloser, error){}
tests := []struct {
name string
encoding string
reqBody *bytes.Buffer
respCode int
respBody string
name string
encoding string
reqBody *bytes.Buffer
respCode int
respBody string
framedSnappyEnabled bool
}{
{
name: "NoCompression",
@ -250,12 +271,28 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
reqBody: compressZstd(t, testBody),
respCode: http.StatusOK,
},
{
name: "ValidSnappyFramed",
encoding: "x-snappy-framed",
framedSnappyEnabled: true,
reqBody: compressSnappyFramed(t, testBody),
respCode: http.StatusOK,
},
{
name: "ValidSnappy",
encoding: "snappy",
reqBody: compressSnappy(t, testBody),
respCode: http.StatusOK,
},
{
// Should work even without the framed snappy feature gate enabled,
// since during decompression we're peeking the compression header
// and identifying which snappy encoding was used.
name: "ValidSnappyFramedAsSnappy",
encoding: "snappy",
reqBody: compressSnappyFramed(t, testBody),
respCode: http.StatusOK,
},
{
name: "ValidLz4",
encoding: "lz4",
@ -290,12 +327,20 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
respCode: http.StatusBadRequest,
respBody: "invalid input: magic number mismatch",
},
{
name: "InvalidSnappyFramed",
encoding: "x-snappy-framed",
framedSnappyEnabled: true,
reqBody: bytes.NewBuffer(testBody),
respCode: http.StatusBadRequest,
respBody: "snappy: corrupt input",
},
{
name: "InvalidSnappy",
encoding: "snappy",
reqBody: bytes.NewBuffer(testBody),
respCode: http.StatusBadRequest,
respBody: "snappy: corrupt input",
respBody: "snappy: corrupt input\n",
},
{
name: "UnsupportedCompression",
@ -307,6 +352,8 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_ = featuregate.GlobalRegistry().Set(enableFramedSnappy.ID(), tt.framedSnappyEnabled)
srv := httptest.NewServer(httpContentDecompressor(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
@ -318,7 +365,7 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
assert.NoError(t, err, "failed to read request body: %v", err)
assert.EqualValues(t, testBody, string(body))
w.WriteHeader(http.StatusOK)
}), defaultMaxRequestBodySize, defaultErrorHandler, defaultCompressionAlgorithms, noDecoders))
}), defaultMaxRequestBodySize, defaultErrorHandler, defaultCompressionAlgorithms(), noDecoders))
t.Cleanup(srv.Close)
req, err := http.NewRequest(http.MethodGet, srv.URL, tt.reqBody)
@ -415,7 +462,7 @@ func TestOverrideCompressionList(t *testing.T) {
}), defaultMaxRequestBodySize, defaultErrorHandler, configuredDecoders, nil))
t.Cleanup(srv.Close)
req, err := http.NewRequest(http.MethodGet, srv.URL, compressSnappy(t, []byte("123decompressed body")))
req, err := http.NewRequest(http.MethodGet, srv.URL, compressSnappyFramed(t, []byte("123decompressed body")))
require.NoError(t, err, "failed to create request to test handler")
req.Header.Set("Content-Encoding", "snappy")
@ -435,9 +482,10 @@ func TestDecompressorAvoidDecompressionBomb(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
name string
encoding string
compress func(tb testing.TB, payload []byte) *bytes.Buffer
name string
encoding string
compress func(tb testing.TB, payload []byte) *bytes.Buffer
framedSnappyEnabled bool
}{
// None encoding is ignored since it does not
// enforce the max body size if content encoding header is not set
@ -456,6 +504,12 @@ func TestDecompressorAvoidDecompressionBomb(t *testing.T) {
encoding: "zlib",
compress: compressZlib,
},
{
name: "x-snappy-framed",
encoding: "x-snappy-framed",
compress: compressSnappyFramed,
framedSnappyEnabled: true,
},
{
name: "snappy",
encoding: "snappy",
@ -468,7 +522,8 @@ func TestDecompressorAvoidDecompressionBomb(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
// t.Parallel() // TODO: Re-enable parallel tests once feature gate is removed. We can't parallelize since registry is shared.
_ = featuregate.GlobalRegistry().Set(enableFramedSnappy.ID(), tc.framedSnappyEnabled)
h := httpContentDecompressor(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@ -479,7 +534,7 @@ func TestDecompressorAvoidDecompressionBomb(t *testing.T) {
}),
1024,
defaultErrorHandler,
defaultCompressionAlgorithms,
defaultCompressionAlgorithms(),
availableDecoders,
)
@ -517,7 +572,7 @@ func compressZlib(tb testing.TB, body []byte) *bytes.Buffer {
return &buf
}
func compressSnappy(tb testing.TB, body []byte) *bytes.Buffer {
func compressSnappyFramed(tb testing.TB, body []byte) *bytes.Buffer {
var buf bytes.Buffer
sw := snappy.NewBufferedWriter(&buf)
_, err := sw.Write(body)
@ -526,6 +581,14 @@ func compressSnappy(tb testing.TB, body []byte) *bytes.Buffer {
return &buf
}
func compressSnappy(tb testing.TB, body []byte) *bytes.Buffer {
var buf bytes.Buffer
compressed := snappy.Encode(nil, body)
_, err := buf.Write(compressed)
require.NoError(tb, err)
return &buf
}
func compressZstd(tb testing.TB, body []byte) *bytes.Buffer {
var buf bytes.Buffer
compression := zstd.SpeedFastest

View File

@ -66,10 +66,26 @@ func newWriteCloserResetFunc(compressionType configcompression.Type, compression
w, _ := gzip.NewWriterLevel(nil, int(compressionParams.Level))
return w
}, nil
case configcompression.TypeSnappy:
case configcompression.TypeSnappyFramed:
if !enableFramedSnappy.IsEnabled() {
return nil, errors.New("x-snappy-framed is not enabled")
}
return func() writeCloserReset {
return snappy.NewBufferedWriter(nil)
}, nil
case configcompression.TypeSnappy:
if !enableFramedSnappy.IsEnabled() {
// If framed snappy feature gate is not enabled, we keep the current behavior
// where the 'Content-Encoding: snappy' is compressed as the framed snappy format.
return func() writeCloserReset {
return snappy.NewBufferedWriter(nil)
}, nil
}
return func() writeCloserReset {
// If framed snappy feature gate is enabled, we use the correct behavior
// where the 'Content-Encoding: snappy' is compressed as the block snappy format.
return &rawSnappyWriter{}
}, nil
case configcompression.TypeZstd:
level := zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(int(compressionParams.Level)))
return func() writeCloserReset {
@ -111,3 +127,37 @@ func (p *compressor) compress(buf *bytes.Buffer, body io.ReadCloser) error {
return writer.Close()
}
// rawSnappyWriter buffers all writes and, on Close,
// compresses the data as a raw snappy block (non-framed)
// and writes the compressed bytes to the underlying writer.
type rawSnappyWriter struct {
buffer bytes.Buffer
w io.Writer
closed bool
}
// Write buffers the data.
func (w *rawSnappyWriter) Write(p []byte) (int, error) {
return w.buffer.Write(p)
}
// Close compresses the buffered data in one shot using snappy.Encode,
// writes the compressed block to the underlying writer, and marks the writer as closed.
func (w *rawSnappyWriter) Close() error {
if w.closed {
return nil
}
w.closed = true
// Compress the buffered uncompressed bytes.
compressed := snappy.Encode(nil, w.buffer.Bytes())
_, err := w.w.Write(compressed)
return err
}
// Reset sets a new underlying writer, resets the buffer and the closed flag.
func (w *rawSnappyWriter) Reset(newWriter io.Writer) {
w.buffer.Reset()
w.w = newWriter
w.closed = false
}

View File

@ -38,7 +38,12 @@ const (
defaultMaxRequestBodySize = 20 * 1024 * 1024 // 20MiB
)
var defaultCompressionAlgorithms = []string{"", "gzip", "zstd", "zlib", "snappy", "deflate", "lz4"}
func defaultCompressionAlgorithms() []string {
if enableFramedSnappy.IsEnabled() {
return []string{"", "gzip", "zstd", "zlib", "snappy", "deflate", "lz4", "x-snappy-framed"}
}
return []string{"", "gzip", "zstd", "zlib", "snappy", "deflate", "lz4"}
}
// ClientConfig defines settings for creating an HTTP client.
type ClientConfig struct {
@ -450,7 +455,7 @@ func (hss *ServerConfig) ToServer(ctx context.Context, host component.Host, sett
}
if hss.CompressionAlgorithms == nil {
hss.CompressionAlgorithms = defaultCompressionAlgorithms
hss.CompressionAlgorithms = defaultCompressionAlgorithms()
}
// Apply middlewares in reverse order so they execute in

View File

@ -21,6 +21,7 @@ require (
go.opentelemetry.io/collector/extension/extensionauth/extensionauthtest v0.125.0
go.opentelemetry.io/collector/extension/extensionmiddleware v0.125.0
go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmiddlewaretest v0.125.0
go.opentelemetry.io/collector/featuregate v1.31.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0
go.opentelemetry.io/otel v1.35.0
go.uber.org/goleak v1.3.0
@ -41,7 +42,6 @@ require (
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/collector/featuregate v1.31.0 // indirect
go.opentelemetry.io/collector/internal/telemetry v0.125.0 // indirect
go.opentelemetry.io/collector/pdata v1.31.0 // indirect
go.opentelemetry.io/contrib/bridges/otelzap v0.10.0 // indirect