Add opentelemetry tracing for Redis (#7598)

This pulls in the redisotel "extra" from the go-redis driver and
instruments our Redis connections with it.
This commit is contained in:
Matthew McPherrin 2024-08-08 08:12:58 -07:00 committed by GitHub
parent c9132baa37
commit 80351a94e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
40 changed files with 10663 additions and 55 deletions

4
go.mod
View File

@ -24,7 +24,8 @@ require (
github.com/nxadm/tail v1.4.11 github.com/nxadm/tail v1.4.11
github.com/prometheus/client_golang v1.15.1 github.com/prometheus/client_golang v1.15.1
github.com/prometheus/client_model v0.4.0 github.com/prometheus/client_model v0.4.0
github.com/redis/go-redis/v9 v9.4.0 github.com/redis/go-redis/extra/redisotel/v9 v9.5.3
github.com/redis/go-redis/v9 v9.5.3
github.com/titanous/rocacheck v0.0.0-20171023193734-afe73141d399 github.com/titanous/rocacheck v0.0.0-20171023193734-afe73141d399
github.com/weppos/publicsuffix-go v0.30.3-0.20240510084413-5f1d03393b3d github.com/weppos/publicsuffix-go v0.30.3-0.20240510084413-5f1d03393b3d
github.com/zmap/zcrypto v0.0.0-20231219022726-a1f61fb1661c github.com/zmap/zcrypto v0.0.0-20231219022726-a1f61fb1661c
@ -78,6 +79,7 @@ require (
github.com/poy/onpar v1.1.2 // indirect github.com/poy/onpar v1.1.2 // indirect
github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect
github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0 // indirect
go.opentelemetry.io/otel/metric v1.27.0 // indirect go.opentelemetry.io/otel/metric v1.27.0 // indirect
go.opentelemetry.io/proto/otlp v1.2.0 // indirect go.opentelemetry.io/proto/otlp v1.2.0 // indirect

8
go.sum
View File

@ -214,8 +214,12 @@ github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7z
github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI=
github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/redis/go-redis/v9 v9.4.0 h1:Yzoz33UZw9I/mFhx4MNrB6Fk+XHO1VukNcCa1+lwyKk= github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3 h1:1/BDligzCa40GTllkDnY3Y5DTHuKCONbB2JcRyIfl20=
github.com/redis/go-redis/v9 v9.4.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3/go.mod h1:3dZmcLn3Qw6FLlWASn1g4y+YO9ycEFUOM+bhBmzLVKQ=
github.com/redis/go-redis/extra/redisotel/v9 v9.5.3 h1:kuvuJL/+MZIEdvtb/kTBRiRgYaOmx1l+lYJyVdrRUOs=
github.com/redis/go-redis/extra/redisotel/v9 v9.5.3/go.mod h1:7f/FMrf5RRRVHXgfk7CzSVzXHiWeuOQUu2bsVqWoa+g=
github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU=
github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=

View File

@ -3,11 +3,13 @@ package redis
import ( import (
"fmt" "fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/redis/go-redis/extra/redisotel/v9"
"github.com/redis/go-redis/v9"
"github.com/letsencrypt/boulder/cmd" "github.com/letsencrypt/boulder/cmd"
"github.com/letsencrypt/boulder/config" "github.com/letsencrypt/boulder/config"
blog "github.com/letsencrypt/boulder/log" blog "github.com/letsencrypt/boulder/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/redis/go-redis/v9"
) )
// Config contains the configuration needed to act as a Redis client. // Config contains the configuration needed to act as a Redis client.
@ -163,6 +165,11 @@ func NewRingFromConfig(c Config, stats prometheus.Registerer, log blog.Logger) (
lookup.start() lookup.start()
} }
err = redisotel.InstrumentTracing(inner)
if err != nil {
return nil, err
}
return &Ring{ return &Ring{
Ring: inner, Ring: inner,
lookup: lookup, lookup: lookup,

View File

@ -0,0 +1,25 @@
Copyright (c) 2013 The github.com/redis/go-redis Authors.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -0,0 +1,149 @@
package rediscmd
import (
"encoding/hex"
"fmt"
"strconv"
"strings"
"time"
"github.com/redis/go-redis/v9"
)
func CmdString(cmd redis.Cmder) string {
b := make([]byte, 0, 32)
b = AppendCmd(b, cmd)
return String(b)
}
func CmdsString(cmds []redis.Cmder) (string, string) {
const numCmdLimit = 100
const numNameLimit = 10
seen := make(map[string]struct{}, numNameLimit)
unqNames := make([]string, 0, numNameLimit)
b := make([]byte, 0, 32*len(cmds))
for i, cmd := range cmds {
if i > numCmdLimit {
break
}
if i > 0 {
b = append(b, '\n')
}
b = AppendCmd(b, cmd)
if len(unqNames) >= numNameLimit {
continue
}
name := cmd.FullName()
if _, ok := seen[name]; !ok {
seen[name] = struct{}{}
unqNames = append(unqNames, name)
}
}
summary := strings.Join(unqNames, " ")
return summary, String(b)
}
func AppendCmd(b []byte, cmd redis.Cmder) []byte {
const numArgLimit = 32
for i, arg := range cmd.Args() {
if i > numArgLimit {
break
}
if i > 0 {
b = append(b, ' ')
}
b = appendArg(b, arg)
}
if err := cmd.Err(); err != nil {
b = append(b, ": "...)
b = append(b, err.Error()...)
}
return b
}
func appendArg(b []byte, v interface{}) []byte {
const argLenLimit = 64
switch v := v.(type) {
case nil:
return append(b, "<nil>"...)
case string:
if len(v) > argLenLimit {
v = v[:argLenLimit]
}
return appendUTF8String(b, Bytes(v))
case []byte:
if len(v) > argLenLimit {
v = v[:argLenLimit]
}
return appendUTF8String(b, v)
case int:
return strconv.AppendInt(b, int64(v), 10)
case int8:
return strconv.AppendInt(b, int64(v), 10)
case int16:
return strconv.AppendInt(b, int64(v), 10)
case int32:
return strconv.AppendInt(b, int64(v), 10)
case int64:
return strconv.AppendInt(b, v, 10)
case uint:
return strconv.AppendUint(b, uint64(v), 10)
case uint8:
return strconv.AppendUint(b, uint64(v), 10)
case uint16:
return strconv.AppendUint(b, uint64(v), 10)
case uint32:
return strconv.AppendUint(b, uint64(v), 10)
case uint64:
return strconv.AppendUint(b, v, 10)
case float32:
return strconv.AppendFloat(b, float64(v), 'f', -1, 64)
case float64:
return strconv.AppendFloat(b, v, 'f', -1, 64)
case bool:
if v {
return append(b, "true"...)
}
return append(b, "false"...)
case time.Time:
return v.AppendFormat(b, time.RFC3339Nano)
default:
return append(b, fmt.Sprint(v)...)
}
}
func appendUTF8String(dst []byte, src []byte) []byte {
if isSimple(src) {
dst = append(dst, src...)
return dst
}
s := len(dst)
dst = append(dst, make([]byte, hex.EncodedLen(len(src)))...)
hex.Encode(dst[s:], src)
return dst
}
func isSimple(b []byte) bool {
for _, c := range b {
if !isSimpleByte(c) {
return false
}
}
return true
}
func isSimpleByte(c byte) bool {
return c >= 0x21 && c <= 0x7e
}

View File

@ -0,0 +1,12 @@
//go:build appengine
// +build appengine
package rediscmd
func String(b []byte) string {
return string(b)
}
func Bytes(s string) []byte {
return []byte(s)
}

View File

@ -0,0 +1,21 @@
//go:build !appengine
// +build !appengine
package rediscmd
import "unsafe"
// String converts byte slice to string.
func String(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}
// Bytes converts string to byte slice.
func Bytes(s string) []byte {
return *(*[]byte)(unsafe.Pointer(
&struct {
string
Cap int
}{s, len(s)},
))
}

View File

@ -0,0 +1,25 @@
Copyright (c) 2013 The github.com/redis/go-redis Authors.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -0,0 +1,34 @@
# OpenTelemetry instrumentation for go-redis
## Installation
```bash
go get github.com/redis/go-redis/extra/redisotel/v9
```
## Usage
Tracing is enabled by adding a hook:
```go
import (
"github.com/redis/go-redis/v9"
"github.com/redis/go-redis/extra/redisotel/v9"
)
rdb := rdb.NewClient(&rdb.Options{...})
// Enable tracing instrumentation.
if err := redisotel.InstrumentTracing(rdb); err != nil {
panic(err)
}
// Enable metrics instrumentation.
if err := redisotel.InstrumentMetrics(rdb); err != nil {
panic(err)
}
```
See [example](../../example/otel) and
[Monitoring Go Redis Performance and Errors](https://redis.uptrace.dev/guide/go-redis-monitoring.html)
for details.

View File

@ -0,0 +1,138 @@
package redisotel
import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
"go.opentelemetry.io/otel/trace"
)
type config struct {
// Common options.
dbSystem string
attrs []attribute.KeyValue
// Tracing options.
tp trace.TracerProvider
tracer trace.Tracer
dbStmtEnabled bool
// Metrics options.
mp metric.MeterProvider
meter metric.Meter
poolName string
}
type baseOption interface {
apply(conf *config)
}
type Option interface {
baseOption
tracing()
metrics()
}
type option func(conf *config)
func (fn option) apply(conf *config) {
fn(conf)
}
func (fn option) tracing() {}
func (fn option) metrics() {}
func newConfig(opts ...baseOption) *config {
conf := &config{
dbSystem: "redis",
attrs: []attribute.KeyValue{},
tp: otel.GetTracerProvider(),
mp: otel.GetMeterProvider(),
dbStmtEnabled: true,
}
for _, opt := range opts {
opt.apply(conf)
}
conf.attrs = append(conf.attrs, semconv.DBSystemKey.String(conf.dbSystem))
return conf
}
func WithDBSystem(dbSystem string) Option {
return option(func(conf *config) {
conf.dbSystem = dbSystem
})
}
// WithAttributes specifies additional attributes to be added to the span.
func WithAttributes(attrs ...attribute.KeyValue) Option {
return option(func(conf *config) {
conf.attrs = append(conf.attrs, attrs...)
})
}
//------------------------------------------------------------------------------
type TracingOption interface {
baseOption
tracing()
}
type tracingOption func(conf *config)
var _ TracingOption = (*tracingOption)(nil)
func (fn tracingOption) apply(conf *config) {
fn(conf)
}
func (fn tracingOption) tracing() {}
// WithTracerProvider specifies a tracer provider to use for creating a tracer.
// If none is specified, the global provider is used.
func WithTracerProvider(provider trace.TracerProvider) TracingOption {
return tracingOption(func(conf *config) {
conf.tp = provider
})
}
// WithDBStatement tells the tracing hook not to log raw redis commands.
func WithDBStatement(on bool) TracingOption {
return tracingOption(func(conf *config) {
conf.dbStmtEnabled = on
})
}
//------------------------------------------------------------------------------
type MetricsOption interface {
baseOption
metrics()
}
type metricsOption func(conf *config)
var _ MetricsOption = (*metricsOption)(nil)
func (fn metricsOption) apply(conf *config) {
fn(conf)
}
func (fn metricsOption) metrics() {}
// WithMeterProvider configures a metric.Meter used to create instruments.
func WithMeterProvider(mp metric.MeterProvider) MetricsOption {
return metricsOption(func(conf *config) {
conf.mp = mp
})
}

View File

@ -0,0 +1,256 @@
package redisotel
import (
"context"
"fmt"
"net"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"github.com/redis/go-redis/v9"
)
// InstrumentMetrics starts reporting OpenTelemetry Metrics.
//
// Based on https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/database-metrics.md
func InstrumentMetrics(rdb redis.UniversalClient, opts ...MetricsOption) error {
baseOpts := make([]baseOption, len(opts))
for i, opt := range opts {
baseOpts[i] = opt
}
conf := newConfig(baseOpts...)
if conf.meter == nil {
conf.meter = conf.mp.Meter(
instrumName,
metric.WithInstrumentationVersion("semver:"+redis.Version()),
)
}
switch rdb := rdb.(type) {
case *redis.Client:
if conf.poolName == "" {
opt := rdb.Options()
conf.poolName = opt.Addr
}
conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName))
if err := reportPoolStats(rdb, conf); err != nil {
return err
}
if err := addMetricsHook(rdb, conf); err != nil {
return err
}
return nil
case *redis.ClusterClient:
rdb.OnNewNode(func(rdb *redis.Client) {
if conf.poolName == "" {
opt := rdb.Options()
conf.poolName = opt.Addr
}
conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName))
if err := reportPoolStats(rdb, conf); err != nil {
otel.Handle(err)
}
if err := addMetricsHook(rdb, conf); err != nil {
otel.Handle(err)
}
})
return nil
case *redis.Ring:
rdb.OnNewNode(func(rdb *redis.Client) {
if conf.poolName == "" {
opt := rdb.Options()
conf.poolName = opt.Addr
}
conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName))
if err := reportPoolStats(rdb, conf); err != nil {
otel.Handle(err)
}
if err := addMetricsHook(rdb, conf); err != nil {
otel.Handle(err)
}
})
return nil
default:
return fmt.Errorf("redisotel: %T not supported", rdb)
}
}
func reportPoolStats(rdb *redis.Client, conf *config) error {
labels := conf.attrs
idleAttrs := append(labels, attribute.String("state", "idle"))
usedAttrs := append(labels, attribute.String("state", "used"))
idleMax, err := conf.meter.Int64ObservableUpDownCounter(
"db.client.connections.idle.max",
metric.WithDescription("The maximum number of idle open connections allowed"),
)
if err != nil {
return err
}
idleMin, err := conf.meter.Int64ObservableUpDownCounter(
"db.client.connections.idle.min",
metric.WithDescription("The minimum number of idle open connections allowed"),
)
if err != nil {
return err
}
connsMax, err := conf.meter.Int64ObservableUpDownCounter(
"db.client.connections.max",
metric.WithDescription("The maximum number of open connections allowed"),
)
if err != nil {
return err
}
usage, err := conf.meter.Int64ObservableUpDownCounter(
"db.client.connections.usage",
metric.WithDescription("The number of connections that are currently in state described by the state attribute"),
)
if err != nil {
return err
}
timeouts, err := conf.meter.Int64ObservableUpDownCounter(
"db.client.connections.timeouts",
metric.WithDescription("The number of connection timeouts that have occurred trying to obtain a connection from the pool"),
)
if err != nil {
return err
}
redisConf := rdb.Options()
_, err = conf.meter.RegisterCallback(
func(ctx context.Context, o metric.Observer) error {
stats := rdb.PoolStats()
o.ObserveInt64(idleMax, int64(redisConf.MaxIdleConns), metric.WithAttributes(labels...))
o.ObserveInt64(idleMin, int64(redisConf.MinIdleConns), metric.WithAttributes(labels...))
o.ObserveInt64(connsMax, int64(redisConf.PoolSize), metric.WithAttributes(labels...))
o.ObserveInt64(usage, int64(stats.IdleConns), metric.WithAttributes(idleAttrs...))
o.ObserveInt64(usage, int64(stats.TotalConns-stats.IdleConns), metric.WithAttributes(usedAttrs...))
o.ObserveInt64(timeouts, int64(stats.Timeouts), metric.WithAttributes(labels...))
return nil
},
idleMax,
idleMin,
connsMax,
usage,
timeouts,
)
return err
}
func addMetricsHook(rdb *redis.Client, conf *config) error {
createTime, err := conf.meter.Float64Histogram(
"db.client.connections.create_time",
metric.WithDescription("The time it took to create a new connection."),
metric.WithUnit("ms"),
)
if err != nil {
return err
}
useTime, err := conf.meter.Float64Histogram(
"db.client.connections.use_time",
metric.WithDescription("The time between borrowing a connection and returning it to the pool."),
metric.WithUnit("ms"),
)
if err != nil {
return err
}
rdb.AddHook(&metricsHook{
createTime: createTime,
useTime: useTime,
attrs: conf.attrs,
})
return nil
}
type metricsHook struct {
createTime metric.Float64Histogram
useTime metric.Float64Histogram
attrs []attribute.KeyValue
}
var _ redis.Hook = (*metricsHook)(nil)
func (mh *metricsHook) DialHook(hook redis.DialHook) redis.DialHook {
return func(ctx context.Context, network, addr string) (net.Conn, error) {
start := time.Now()
conn, err := hook(ctx, network, addr)
dur := time.Since(start)
attrs := make([]attribute.KeyValue, 0, len(mh.attrs)+1)
attrs = append(attrs, mh.attrs...)
attrs = append(attrs, statusAttr(err))
mh.createTime.Record(ctx, milliseconds(dur), metric.WithAttributes(attrs...))
return conn, err
}
}
func (mh *metricsHook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
start := time.Now()
err := hook(ctx, cmd)
dur := time.Since(start)
attrs := make([]attribute.KeyValue, 0, len(mh.attrs)+2)
attrs = append(attrs, mh.attrs...)
attrs = append(attrs, attribute.String("type", "command"))
attrs = append(attrs, statusAttr(err))
mh.useTime.Record(ctx, milliseconds(dur), metric.WithAttributes(attrs...))
return err
}
}
func (mh *metricsHook) ProcessPipelineHook(
hook redis.ProcessPipelineHook,
) redis.ProcessPipelineHook {
return func(ctx context.Context, cmds []redis.Cmder) error {
start := time.Now()
err := hook(ctx, cmds)
dur := time.Since(start)
attrs := make([]attribute.KeyValue, 0, len(mh.attrs)+2)
attrs = append(attrs, mh.attrs...)
attrs = append(attrs, attribute.String("type", "pipeline"))
attrs = append(attrs, statusAttr(err))
mh.useTime.Record(ctx, milliseconds(dur), metric.WithAttributes(attrs...))
return err
}
}
func milliseconds(d time.Duration) float64 {
return float64(d) / float64(time.Millisecond)
}
func statusAttr(err error) attribute.KeyValue {
if err != nil {
return attribute.String("status", "error")
}
return attribute.String("status", "ok")
}

View File

@ -0,0 +1,232 @@
package redisotel
import (
"context"
"fmt"
"net"
"runtime"
"strconv"
"strings"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
"go.opentelemetry.io/otel/trace"
"github.com/redis/go-redis/extra/rediscmd/v9"
"github.com/redis/go-redis/v9"
)
const (
instrumName = "github.com/redis/go-redis/extra/redisotel"
)
func InstrumentTracing(rdb redis.UniversalClient, opts ...TracingOption) error {
switch rdb := rdb.(type) {
case *redis.Client:
opt := rdb.Options()
connString := formatDBConnString(opt.Network, opt.Addr)
opts = addServerAttributes(opts, opt.Addr)
rdb.AddHook(newTracingHook(connString, opts...))
return nil
case *redis.ClusterClient:
rdb.AddHook(newTracingHook("", opts...))
rdb.OnNewNode(func(rdb *redis.Client) {
opt := rdb.Options()
opts = addServerAttributes(opts, opt.Addr)
connString := formatDBConnString(opt.Network, opt.Addr)
rdb.AddHook(newTracingHook(connString, opts...))
})
return nil
case *redis.Ring:
rdb.AddHook(newTracingHook("", opts...))
rdb.OnNewNode(func(rdb *redis.Client) {
opt := rdb.Options()
opts = addServerAttributes(opts, opt.Addr)
connString := formatDBConnString(opt.Network, opt.Addr)
rdb.AddHook(newTracingHook(connString, opts...))
})
return nil
default:
return fmt.Errorf("redisotel: %T not supported", rdb)
}
}
type tracingHook struct {
conf *config
spanOpts []trace.SpanStartOption
}
var _ redis.Hook = (*tracingHook)(nil)
func newTracingHook(connString string, opts ...TracingOption) *tracingHook {
baseOpts := make([]baseOption, len(opts))
for i, opt := range opts {
baseOpts[i] = opt
}
conf := newConfig(baseOpts...)
if conf.tracer == nil {
conf.tracer = conf.tp.Tracer(
instrumName,
trace.WithInstrumentationVersion("semver:"+redis.Version()),
)
}
if connString != "" {
conf.attrs = append(conf.attrs, semconv.DBConnectionString(connString))
}
return &tracingHook{
conf: conf,
spanOpts: []trace.SpanStartOption{
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(conf.attrs...),
},
}
}
func (th *tracingHook) DialHook(hook redis.DialHook) redis.DialHook {
return func(ctx context.Context, network, addr string) (net.Conn, error) {
ctx, span := th.conf.tracer.Start(ctx, "redis.dial", th.spanOpts...)
defer span.End()
conn, err := hook(ctx, network, addr)
if err != nil {
recordError(span, err)
return nil, err
}
return conn, nil
}
}
func (th *tracingHook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
fn, file, line := funcFileLine("github.com/redis/go-redis")
attrs := make([]attribute.KeyValue, 0, 8)
attrs = append(attrs,
semconv.CodeFunction(fn),
semconv.CodeFilepath(file),
semconv.CodeLineNumber(line),
)
if th.conf.dbStmtEnabled {
cmdString := rediscmd.CmdString(cmd)
attrs = append(attrs, semconv.DBStatement(cmdString))
}
opts := th.spanOpts
opts = append(opts, trace.WithAttributes(attrs...))
ctx, span := th.conf.tracer.Start(ctx, cmd.FullName(), opts...)
defer span.End()
if err := hook(ctx, cmd); err != nil {
recordError(span, err)
return err
}
return nil
}
}
func (th *tracingHook) ProcessPipelineHook(
hook redis.ProcessPipelineHook,
) redis.ProcessPipelineHook {
return func(ctx context.Context, cmds []redis.Cmder) error {
fn, file, line := funcFileLine("github.com/redis/go-redis")
attrs := make([]attribute.KeyValue, 0, 8)
attrs = append(attrs,
semconv.CodeFunction(fn),
semconv.CodeFilepath(file),
semconv.CodeLineNumber(line),
attribute.Int("db.redis.num_cmd", len(cmds)),
)
summary, cmdsString := rediscmd.CmdsString(cmds)
if th.conf.dbStmtEnabled {
attrs = append(attrs, semconv.DBStatement(cmdsString))
}
opts := th.spanOpts
opts = append(opts, trace.WithAttributes(attrs...))
ctx, span := th.conf.tracer.Start(ctx, "redis.pipeline "+summary, opts...)
defer span.End()
if err := hook(ctx, cmds); err != nil {
recordError(span, err)
return err
}
return nil
}
}
func recordError(span trace.Span, err error) {
if err != redis.Nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
}
func formatDBConnString(network, addr string) string {
if network == "tcp" {
network = "redis"
}
return fmt.Sprintf("%s://%s", network, addr)
}
func funcFileLine(pkg string) (string, string, int) {
const depth = 16
var pcs [depth]uintptr
n := runtime.Callers(3, pcs[:])
ff := runtime.CallersFrames(pcs[:n])
var fn, file string
var line int
for {
f, ok := ff.Next()
if !ok {
break
}
fn, file, line = f.Function, f.File, f.Line
if !strings.Contains(fn, pkg) {
break
}
}
if ind := strings.LastIndexByte(fn, '/'); ind != -1 {
fn = fn[ind+1:]
}
return fn, file, line
}
// Database span attributes semantic conventions recommended server address and port
// https://opentelemetry.io/docs/specs/semconv/database/database-spans/#connection-level-attributes
func addServerAttributes(opts []TracingOption, addr string) []TracingOption {
host, portString, err := net.SplitHostPort(addr)
if err != nil {
return opts
}
opts = append(opts, WithAttributes(
semconv.ServerAddress(host),
))
// Parse the port string to an integer
port, err := strconv.Atoi(portString)
if err != nil {
return opts
}
opts = append(opts, WithAttributes(
semconv.ServerPort(port),
))
return opts
}

View File

@ -1,3 +1,12 @@
## Unreleased
### Changed
* `go-redis` won't skip span creation if the parent spans is not recording. ([#2980](https://github.com/redis/go-redis/issues/2980))
Users can use the OpenTelemetry sampler to control the sampling behavior.
For instance, you can use the `ParentBased(NeverSample())` sampler from `go.opentelemetry.io/otel/sdk/trace` to keep
a similar behavior (drop orphan spans) of `go-redis` as before.
## [9.0.5](https://github.com/redis/go-redis/compare/v9.0.4...v9.0.5) (2023-05-29) ## [9.0.5](https://github.com/redis/go-redis/compare/v9.0.4...v9.0.5) (2023-05-29)

View File

@ -1,7 +1,12 @@
GO_MOD_DIRS := $(shell find . -type f -name 'go.mod' -exec dirname {} \; | sort) GO_MOD_DIRS := $(shell find . -type f -name 'go.mod' -exec dirname {} \; | sort)
test: testdeps test: testdeps
$(eval GO_VERSION := $(shell go version | cut -d " " -f 3 | cut -d. -f2))
set -e; for dir in $(GO_MOD_DIRS); do \ set -e; for dir in $(GO_MOD_DIRS); do \
if echo "$${dir}" | grep -q "./example" && [ "$(GO_VERSION)" = "19" ]; then \
echo "Skipping go test in $${dir} due to Go version 1.19 and dir contains ./example"; \
continue; \
fi; \
echo "go test in $${dir}"; \ echo "go test in $${dir}"; \
(cd "$${dir}" && \ (cd "$${dir}" && \
go mod tidy -compat=1.18 && \ go mod tidy -compat=1.18 && \

View File

@ -51,8 +51,8 @@ key value NoSQL database that uses RocksDB as storage engine and is compatible w
## Features ## Features
- Redis 3 commands except QUIT, MONITOR, and SYNC. - Redis commands except QUIT and SYNC.
- Automatic connection pooling with - Automatic connection pooling.
- [Pub/Sub](https://redis.uptrace.dev/guide/go-redis-pubsub.html). - [Pub/Sub](https://redis.uptrace.dev/guide/go-redis-pubsub.html).
- [Pipelines and transactions](https://redis.uptrace.dev/guide/go-redis-pipelines.html). - [Pipelines and transactions](https://redis.uptrace.dev/guide/go-redis-pipelines.html).
- [Scripting](https://redis.uptrace.dev/guide/lua-scripting.html). - [Scripting](https://redis.uptrace.dev/guide/lua-scripting.html).
@ -143,9 +143,6 @@ to this specification.
```go ```go
import ( import (
"context"
"fmt"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
) )
@ -161,6 +158,30 @@ func ExampleClient() *redis.Client {
``` ```
### Advanced Configuration
go-redis supports extending the client identification phase to allow projects to send their own custom client identification.
#### Default Client Identification
By default, go-redis automatically sends the client library name and version during the connection process. This feature is available in redis-server as of version 7.2. As a result, the command is "fire and forget", meaning it should fail silently, in the case that the redis server does not support this feature.
#### Disabling Identity Verification
When connection identity verification is not required or needs to be explicitly disabled, a `DisableIndentity` configuration option exists. In V10 of this library, `DisableIndentity` will become `DisableIdentity` in order to fix the associated typo.
To disable verification, set the `DisableIndentity` option to `true` in the Redis client options:
```go
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
DisableIndentity: true, // Disable set-info on connect
})
```
## Contributing ## Contributing
Please see [out contributing guidelines](CONTRIBUTING.md) to help us improve this library! Please see [out contributing guidelines](CONTRIBUTING.md) to help us improve this library!

View File

@ -2,6 +2,7 @@ package redis
import ( import (
"context" "context"
"errors"
) )
type BitMapCmdable interface { type BitMapCmdable interface {
@ -15,6 +16,7 @@ type BitMapCmdable interface {
BitPos(ctx context.Context, key string, bit int64, pos ...int64) *IntCmd BitPos(ctx context.Context, key string, bit int64, pos ...int64) *IntCmd
BitPosSpan(ctx context.Context, key string, bit int8, start, end int64, span string) *IntCmd BitPosSpan(ctx context.Context, key string, bit int8, start, end int64, span string) *IntCmd
BitField(ctx context.Context, key string, values ...interface{}) *IntSliceCmd BitField(ctx context.Context, key string, values ...interface{}) *IntSliceCmd
BitFieldRO(ctx context.Context, key string, values ...interface{}) *IntSliceCmd
} }
func (c cmdable) GetBit(ctx context.Context, key string, offset int64) *IntCmd { func (c cmdable) GetBit(ctx context.Context, key string, offset int64) *IntCmd {
@ -37,16 +39,26 @@ func (c cmdable) SetBit(ctx context.Context, key string, offset int64, value int
type BitCount struct { type BitCount struct {
Start, End int64 Start, End int64
Unit string // BYTE(default) | BIT
} }
const BitCountIndexByte string = "BYTE"
const BitCountIndexBit string = "BIT"
func (c cmdable) BitCount(ctx context.Context, key string, bitCount *BitCount) *IntCmd { func (c cmdable) BitCount(ctx context.Context, key string, bitCount *BitCount) *IntCmd {
args := []interface{}{"bitcount", key} args := make([]any, 2, 5)
args[0] = "bitcount"
args[1] = key
if bitCount != nil { if bitCount != nil {
args = append( args = append(args, bitCount.Start, bitCount.End)
args, if bitCount.Unit != "" {
bitCount.Start, if bitCount.Unit != BitCountIndexByte && bitCount.Unit != BitCountIndexBit {
bitCount.End, cmd := NewIntCmd(ctx)
) cmd.SetErr(errors.New("redis: invalid bitcount index"))
return cmd
}
args = append(args, bitCount.Unit)
}
} }
cmd := NewIntCmd(ctx, args...) cmd := NewIntCmd(ctx, args...)
_ = c(ctx, cmd) _ = c(ctx, cmd)

View File

@ -5310,6 +5310,16 @@ type LibraryInfo struct {
LibVer *string LibVer *string
} }
// WithLibraryName returns a valid LibraryInfo with library name only.
func WithLibraryName(libName string) LibraryInfo {
return LibraryInfo{LibName: &libName}
}
// WithLibraryVersion returns a valid LibraryInfo with library version only.
func WithLibraryVersion(libVer string) LibraryInfo {
return LibraryInfo{LibVer: &libVer}
}
// ------------------------------------------- // -------------------------------------------
type InfoCmd struct { type InfoCmd struct {
@ -5444,9 +5454,12 @@ func (cmd *MonitorCmd) readMonitor(rd *proto.Reader, cancel context.CancelFunc)
for { for {
cmd.mu.Lock() cmd.mu.Lock()
st := cmd.status st := cmd.status
pk, _ := rd.Peek(1)
cmd.mu.Unlock() cmd.mu.Unlock()
if pk, _ := rd.Peek(1); len(pk) != 0 && st == monitorStatusStart { if len(pk) != 0 && st == monitorStatusStart {
cmd.mu.Lock()
line, err := rd.ReadString() line, err := rd.ReadString()
cmd.mu.Unlock()
if err != nil { if err != nil {
return err return err
} }

View File

@ -309,7 +309,7 @@ func (c statefulCmdable) ClientSetInfo(ctx context.Context, info LibraryInfo) *S
var cmd *StatusCmd var cmd *StatusCmd
if info.LibName != nil { if info.LibName != nil {
libName := fmt.Sprintf("go-redis(%s,%s)", *info.LibName, runtime.Version()) libName := fmt.Sprintf("go-redis(%s,%s)", *info.LibName, internal.ReplaceSpaces(runtime.Version()))
cmd = NewStatusCmd(ctx, "client", "setinfo", "LIB-NAME", libName) cmd = NewStatusCmd(ctx, "client", "setinfo", "LIB-NAME", libName)
} else { } else {
cmd = NewStatusCmd(ctx, "client", "setinfo", "LIB-VER", *info.LibVer) cmd = NewStatusCmd(ctx, "client", "setinfo", "LIB-VER", *info.LibVer)

View File

@ -7,6 +7,7 @@ import (
"net" "net"
"strings" "strings"
"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/proto" "github.com/redis/go-redis/v9/internal/proto"
) )
@ -129,7 +130,9 @@ func isMovedError(err error) (moved bool, ask bool, addr string) {
if ind == -1 { if ind == -1 {
return false, false, "" return false, false, ""
} }
addr = s[ind+1:] addr = s[ind+1:]
addr = internal.GetAddr(addr)
return return
} }

View File

@ -19,6 +19,7 @@ type GenericCmdable interface {
Keys(ctx context.Context, pattern string) *StringSliceCmd Keys(ctx context.Context, pattern string) *StringSliceCmd
Migrate(ctx context.Context, host, port, key string, db int, timeout time.Duration) *StatusCmd Migrate(ctx context.Context, host, port, key string, db int, timeout time.Duration) *StatusCmd
Move(ctx context.Context, key string, db int) *BoolCmd Move(ctx context.Context, key string, db int) *BoolCmd
ObjectFreq(ctx context.Context, key string) *IntCmd
ObjectRefCount(ctx context.Context, key string) *IntCmd ObjectRefCount(ctx context.Context, key string) *IntCmd
ObjectEncoding(ctx context.Context, key string) *StringCmd ObjectEncoding(ctx context.Context, key string) *StringCmd
ObjectIdleTime(ctx context.Context, key string) *DurationCmd ObjectIdleTime(ctx context.Context, key string) *DurationCmd
@ -159,6 +160,12 @@ func (c cmdable) Move(ctx context.Context, key string, db int) *BoolCmd {
return cmd return cmd
} }
func (c cmdable) ObjectFreq(ctx context.Context, key string) *IntCmd {
cmd := NewIntCmd(ctx, "object", "freq", key)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) ObjectRefCount(ctx context.Context, key string) *IntCmd { func (c cmdable) ObjectRefCount(ctx context.Context, key string) *IntCmd {
cmd := NewIntCmd(ctx, "object", "refcount", key) cmd := NewIntCmd(ctx, "object", "refcount", key)
_ = c(ctx, cmd) _ = c(ctx, cmd)

View File

@ -168,9 +168,12 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
return nil, ErrClosed return nil, ErrClosed
} }
p.connsMu.Lock()
if p.cfg.MaxActiveConns > 0 && p.poolSize >= p.cfg.MaxActiveConns { if p.cfg.MaxActiveConns > 0 && p.poolSize >= p.cfg.MaxActiveConns {
p.connsMu.Unlock()
return nil, ErrPoolExhausted return nil, ErrPoolExhausted
} }
p.connsMu.Unlock()
cn, err := p.dialConn(ctx, pooled) cn, err := p.dialConn(ctx, pooled)
if err != nil { if err != nil {
@ -180,6 +183,11 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
p.connsMu.Lock() p.connsMu.Lock()
defer p.connsMu.Unlock() defer p.connsMu.Unlock()
if p.cfg.MaxActiveConns > 0 && p.poolSize >= p.cfg.MaxActiveConns {
_ = cn.Close()
return nil, ErrPoolExhausted
}
p.conns = append(p.conns, cn) p.conns = append(p.conns, cn)
if pooled { if pooled {
// If pool is full remove the cn on next Put. // If pool is full remove the cn on next Put.

View File

@ -2,6 +2,8 @@ package internal
import ( import (
"context" "context"
"net"
"strings"
"time" "time"
"github.com/redis/go-redis/v9/internal/util" "github.com/redis/go-redis/v9/internal/util"
@ -44,3 +46,38 @@ func isLower(s string) bool {
} }
return true return true
} }
func ReplaceSpaces(s string) string {
// Pre-allocate a builder with the same length as s to minimize allocations.
// This is a basic optimization; adjust the initial size based on your use case.
var builder strings.Builder
builder.Grow(len(s))
for _, char := range s {
if char == ' ' {
// Replace space with a hyphen.
builder.WriteRune('-')
} else {
// Copy the character as-is.
builder.WriteRune(char)
}
}
return builder.String()
}
func GetAddr(addr string) string {
ind := strings.LastIndexByte(addr, ':')
if ind == -1 {
return ""
}
if strings.IndexByte(addr, '.') != -1 {
return addr
}
if addr[0] == '[' {
return addr
}
return net.JoinHostPort(addr[:ind], addr[ind+1:])
}

View File

@ -61,6 +61,12 @@ type Options struct {
// before reconnecting. It should return the current username and password. // before reconnecting. It should return the current username and password.
CredentialsProvider func() (username string, password string) CredentialsProvider func() (username string, password string)
// CredentialsProviderContext is an enhanced parameter of CredentialsProvider,
// done to maintain API compatibility. In the future,
// there might be a merge between CredentialsProviderContext and CredentialsProvider.
// There will be a conflict between them; if CredentialsProviderContext exists, we will ignore CredentialsProvider.
CredentialsProviderContext func(ctx context.Context) (username string, password string, err error)
// Database to be selected after connecting to the server. // Database to be selected after connecting to the server.
DB int DB int
@ -235,7 +241,7 @@ func NewDialer(opt *Options) func(context.Context, string, string) (net.Conn, er
} }
} }
// ParseURL parses an URL into Options that can be used to connect to Redis. // ParseURL parses a URL into Options that can be used to connect to Redis.
// Scheme is required. // Scheme is required.
// There are two connection types: by tcp socket and by unix socket. // There are two connection types: by tcp socket and by unix socket.
// Tcp connection: // Tcp connection:
@ -250,12 +256,12 @@ func NewDialer(opt *Options) func(context.Context, string, string) (net.Conn, er
// - field names are mapped using snake-case conversion: to set MaxRetries, use max_retries // - field names are mapped using snake-case conversion: to set MaxRetries, use max_retries
// - only scalar type fields are supported (bool, int, time.Duration) // - only scalar type fields are supported (bool, int, time.Duration)
// - for time.Duration fields, values must be a valid input for time.ParseDuration(); // - for time.Duration fields, values must be a valid input for time.ParseDuration();
// additionally a plain integer as value (i.e. without unit) is intepreted as seconds // additionally a plain integer as value (i.e. without unit) is interpreted as seconds
// - to disable a duration field, use value less than or equal to 0; to use the default // - to disable a duration field, use value less than or equal to 0; to use the default
// value, leave the value blank or remove the parameter // value, leave the value blank or remove the parameter
// - only the last value is interpreted if a parameter is given multiple times // - only the last value is interpreted if a parameter is given multiple times
// - fields "network", "addr", "username" and "password" can only be set using other // - fields "network", "addr", "username" and "password" can only be set using other
// URL attributes (scheme, host, userinfo, resp.), query paremeters using these // URL attributes (scheme, host, userinfo, resp.), query parameters using these
// names will be treated as unknown parameters // names will be treated as unknown parameters
// - unknown parameter names will result in an error // - unknown parameter names will result in an error
// //

View File

@ -65,6 +65,8 @@ type ClusterOptions struct {
Protocol int Protocol int
Username string Username string
Password string Password string
CredentialsProvider func() (username string, password string)
CredentialsProviderContext func(ctx context.Context) (username string, password string, err error)
MaxRetries int MaxRetries int
MinRetryBackoff time.Duration MinRetryBackoff time.Duration
@ -156,12 +158,12 @@ func (opt *ClusterOptions) init() {
// - field names are mapped using snake-case conversion: to set MaxRetries, use max_retries // - field names are mapped using snake-case conversion: to set MaxRetries, use max_retries
// - only scalar type fields are supported (bool, int, time.Duration) // - only scalar type fields are supported (bool, int, time.Duration)
// - for time.Duration fields, values must be a valid input for time.ParseDuration(); // - for time.Duration fields, values must be a valid input for time.ParseDuration();
// additionally a plain integer as value (i.e. without unit) is intepreted as seconds // additionally a plain integer as value (i.e. without unit) is interpreted as seconds
// - to disable a duration field, use value less than or equal to 0; to use the default // - to disable a duration field, use value less than or equal to 0; to use the default
// value, leave the value blank or remove the parameter // value, leave the value blank or remove the parameter
// - only the last value is interpreted if a parameter is given multiple times // - only the last value is interpreted if a parameter is given multiple times
// - fields "network", "addr", "username" and "password" can only be set using other // - fields "network", "addr", "username" and "password" can only be set using other
// URL attributes (scheme, host, userinfo, resp.), query paremeters using these // URL attributes (scheme, host, userinfo, resp.), query parameters using these
// names will be treated as unknown parameters // names will be treated as unknown parameters
// - unknown parameter names will result in an error // - unknown parameter names will result in an error
// //
@ -274,6 +276,8 @@ func (opt *ClusterOptions) clientOptions() *Options {
Protocol: opt.Protocol, Protocol: opt.Protocol,
Username: opt.Username, Username: opt.Username,
Password: opt.Password, Password: opt.Password,
CredentialsProvider: opt.CredentialsProvider,
CredentialsProviderContext: opt.CredentialsProviderContext,
MaxRetries: opt.MaxRetries, MaxRetries: opt.MaxRetries,
MinRetryBackoff: opt.MinRetryBackoff, MinRetryBackoff: opt.MinRetryBackoff,
@ -1293,6 +1297,7 @@ func (c *ClusterClient) processPipelineNode(
_ = node.Client.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error { _ = node.Client.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
cn, err := node.Client.getConn(ctx) cn, err := node.Client.getConn(ctx)
if err != nil { if err != nil {
node.MarkAsFailing()
_ = c.mapCmdsByNode(ctx, failedCmds, cmds) _ = c.mapCmdsByNode(ctx, failedCmds, cmds)
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
return err return err
@ -1314,6 +1319,9 @@ func (c *ClusterClient) processPipelineNodeConn(
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error { if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmds(wr, cmds) return writeCmds(wr, cmds)
}); err != nil { }); err != nil {
if isBadConn(err, false, node.Client.getAddr()) {
node.MarkAsFailing()
}
if shouldRetry(err, true) { if shouldRetry(err, true) {
_ = c.mapCmdsByNode(ctx, failedCmds, cmds) _ = c.mapCmdsByNode(ctx, failedCmds, cmds)
} }
@ -1345,7 +1353,7 @@ func (c *ClusterClient) pipelineReadCmds(
continue continue
} }
if c.opt.ReadOnly { if c.opt.ReadOnly && isBadConn(err, false, node.Client.getAddr()) {
node.MarkAsFailing() node.MarkAsFailing()
} }

View File

@ -1,8 +0,0 @@
{
"name": "redis",
"version": "9.4.0",
"main": "index.js",
"repository": "git@github.com:redis/go-redis.git",
"author": "Vladimir Mihailenco <vladimir.webdev@gmail.com>",
"license": "BSD-2-clause"
}

View File

@ -491,7 +491,7 @@ func (c *PubSub) getContext() context.Context {
// Receive* APIs can not be used after channel is created. // Receive* APIs can not be used after channel is created.
// //
// go-redis periodically sends ping messages to test connection health // go-redis periodically sends ping messages to test connection health
// and re-subscribes if ping can not not received for 1 minute. // and re-subscribes if ping can not received for 1 minute.
func (c *PubSub) Channel(opts ...ChannelOption) <-chan *Message { func (c *PubSub) Channel(opts ...ChannelOption) <-chan *Message {
c.chOnce.Do(func() { c.chOnce.Do(func() {
c.msgCh = newChannel(c, opts...) c.msgCh = newChannel(c, opts...)

View File

@ -283,8 +283,13 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
} }
cn.Inited = true cn.Inited = true
var err error
username, password := c.opt.Username, c.opt.Password username, password := c.opt.Username, c.opt.Password
if c.opt.CredentialsProvider != nil { if c.opt.CredentialsProviderContext != nil {
if username, password, err = c.opt.CredentialsProviderContext(ctx); err != nil {
return err
}
} else if c.opt.CredentialsProvider != nil {
username, password = c.opt.CredentialsProvider() username, password = c.opt.CredentialsProvider()
} }
@ -300,7 +305,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
// for redis-server versions that do not support the HELLO command, // for redis-server versions that do not support the HELLO command,
// RESP2 will continue to be used. // RESP2 will continue to be used.
if err := conn.Hello(ctx, protocol, username, password, "").Err(); err == nil { if err = conn.Hello(ctx, protocol, username, password, "").Err(); err == nil {
auth = true auth = true
} else if !isRedisError(err) { } else if !isRedisError(err) {
// When the server responds with the RESP protocol and the result is not a normal // When the server responds with the RESP protocol and the result is not a normal
@ -312,18 +317,8 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
// difficult to rely on error strings to determine all results. // difficult to rely on error strings to determine all results.
return err return err
} }
if !c.opt.DisableIndentity {
libName := "" _, err = conn.Pipelined(ctx, func(pipe Pipeliner) error {
libVer := Version()
if c.opt.IdentitySuffix != "" {
libName = c.opt.IdentitySuffix
}
libInfo := LibraryInfo{LibName: &libName}
conn.ClientSetInfo(ctx, libInfo)
libInfo = LibraryInfo{LibVer: &libVer}
conn.ClientSetInfo(ctx, libInfo)
}
_, err := conn.Pipelined(ctx, func(pipe Pipeliner) error {
if !auth && password != "" { if !auth && password != "" {
if username != "" { if username != "" {
pipe.AuthACL(ctx, username, password) pipe.AuthACL(ctx, username, password)
@ -350,6 +345,18 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
return err return err
} }
if !c.opt.DisableIndentity {
libName := ""
libVer := Version()
if c.opt.IdentitySuffix != "" {
libName = c.opt.IdentitySuffix
}
p := conn.Pipeline()
p.ClientSetInfo(ctx, WithLibraryName(libName))
p.ClientSetInfo(ctx, WithLibraryVersion(libVer))
_, _ = p.Exec(ctx)
}
if c.opt.OnConnect != nil { if c.opt.OnConnect != nil {
return c.opt.OnConnect(ctx, conn) return c.opt.OnConnect(ctx, conn)
} }

View File

@ -153,6 +153,9 @@ func (opt *FailoverOptions) sentinelOptions(addr string) *Options {
ConnMaxLifetime: opt.ConnMaxLifetime, ConnMaxLifetime: opt.ConnMaxLifetime,
TLSConfig: opt.TLSConfig, TLSConfig: opt.TLSConfig,
DisableIndentity: opt.DisableIndentity,
IdentitySuffix: opt.IdentitySuffix,
} }
} }
@ -190,6 +193,9 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
ConnMaxLifetime: opt.ConnMaxLifetime, ConnMaxLifetime: opt.ConnMaxLifetime,
TLSConfig: opt.TLSConfig, TLSConfig: opt.TLSConfig,
DisableIndentity: opt.DisableIndentity,
IdentitySuffix: opt.IdentitySuffix,
} }
} }

View File

@ -178,36 +178,42 @@ func (c cmdable) XReadStreams(ctx context.Context, streams ...string) *XStreamSl
func (c cmdable) XGroupCreate(ctx context.Context, stream, group, start string) *StatusCmd { func (c cmdable) XGroupCreate(ctx context.Context, stream, group, start string) *StatusCmd {
cmd := NewStatusCmd(ctx, "xgroup", "create", stream, group, start) cmd := NewStatusCmd(ctx, "xgroup", "create", stream, group, start)
cmd.SetFirstKeyPos(2)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
func (c cmdable) XGroupCreateMkStream(ctx context.Context, stream, group, start string) *StatusCmd { func (c cmdable) XGroupCreateMkStream(ctx context.Context, stream, group, start string) *StatusCmd {
cmd := NewStatusCmd(ctx, "xgroup", "create", stream, group, start, "mkstream") cmd := NewStatusCmd(ctx, "xgroup", "create", stream, group, start, "mkstream")
cmd.SetFirstKeyPos(2)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
func (c cmdable) XGroupSetID(ctx context.Context, stream, group, start string) *StatusCmd { func (c cmdable) XGroupSetID(ctx context.Context, stream, group, start string) *StatusCmd {
cmd := NewStatusCmd(ctx, "xgroup", "setid", stream, group, start) cmd := NewStatusCmd(ctx, "xgroup", "setid", stream, group, start)
cmd.SetFirstKeyPos(2)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
func (c cmdable) XGroupDestroy(ctx context.Context, stream, group string) *IntCmd { func (c cmdable) XGroupDestroy(ctx context.Context, stream, group string) *IntCmd {
cmd := NewIntCmd(ctx, "xgroup", "destroy", stream, group) cmd := NewIntCmd(ctx, "xgroup", "destroy", stream, group)
cmd.SetFirstKeyPos(2)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
func (c cmdable) XGroupCreateConsumer(ctx context.Context, stream, group, consumer string) *IntCmd { func (c cmdable) XGroupCreateConsumer(ctx context.Context, stream, group, consumer string) *IntCmd {
cmd := NewIntCmd(ctx, "xgroup", "createconsumer", stream, group, consumer) cmd := NewIntCmd(ctx, "xgroup", "createconsumer", stream, group, consumer)
cmd.SetFirstKeyPos(2)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
func (c cmdable) XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd { func (c cmdable) XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd {
cmd := NewIntCmd(ctx, "xgroup", "delconsumer", stream, group, consumer) cmd := NewIntCmd(ctx, "xgroup", "delconsumer", stream, group, consumer)
cmd.SetFirstKeyPos(2)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }

View File

@ -2,5 +2,5 @@ package redis
// Version is the current release version. // Version is the current release version.
func Version() string { func Version() string {
return "9.4.0" return "9.5.3"
} }

View File

@ -0,0 +1,3 @@
# Semconv v1.24.0
[![PkgGoDev](https://pkg.go.dev/badge/go.opentelemetry.io/otel/semconv/v1.24.0)](https://pkg.go.dev/go.opentelemetry.io/otel/semconv/v1.24.0)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,9 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package semconv implements OpenTelemetry semantic conventions.
//
// OpenTelemetry semantic conventions are agreed standardized naming
// patterns for OpenTelemetry things. This package represents the v1.24.0
// version of the OpenTelemetry semantic conventions.
package semconv // import "go.opentelemetry.io/otel/semconv/v1.24.0"

View File

@ -0,0 +1,200 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Code generated from semantic convention specification. DO NOT EDIT.
package semconv // import "go.opentelemetry.io/otel/semconv/v1.24.0"
import "go.opentelemetry.io/otel/attribute"
// This event represents an occurrence of a lifecycle transition on the iOS
// platform.
const (
// IosStateKey is the attribute Key conforming to the "ios.state" semantic
// conventions. It represents the this attribute represents the state the
// application has transitioned into at the occurrence of the event.
//
// Type: Enum
// RequirementLevel: Required
// Stability: experimental
// Note: The iOS lifecycle states are defined in the [UIApplicationDelegate
// documentation](https://developer.apple.com/documentation/uikit/uiapplicationdelegate#1656902),
// and from which the `OS terminology` column values are derived.
IosStateKey = attribute.Key("ios.state")
)
var (
// The app has become `active`. Associated with UIKit notification `applicationDidBecomeActive`
IosStateActive = IosStateKey.String("active")
// The app is now `inactive`. Associated with UIKit notification `applicationWillResignActive`
IosStateInactive = IosStateKey.String("inactive")
// The app is now in the background. This value is associated with UIKit notification `applicationDidEnterBackground`
IosStateBackground = IosStateKey.String("background")
// The app is now in the foreground. This value is associated with UIKit notification `applicationWillEnterForeground`
IosStateForeground = IosStateKey.String("foreground")
// The app is about to terminate. Associated with UIKit notification `applicationWillTerminate`
IosStateTerminate = IosStateKey.String("terminate")
)
// This event represents an occurrence of a lifecycle transition on the Android
// platform.
const (
// AndroidStateKey is the attribute Key conforming to the "android.state"
// semantic conventions. It represents the this attribute represents the
// state the application has transitioned into at the occurrence of the
// event.
//
// Type: Enum
// RequirementLevel: Required
// Stability: experimental
// Note: The Android lifecycle states are defined in [Activity lifecycle
// callbacks](https://developer.android.com/guide/components/activities/activity-lifecycle#lc),
// and from which the `OS identifiers` are derived.
AndroidStateKey = attribute.Key("android.state")
)
var (
// Any time before Activity.onResume() or, if the app has no Activity, Context.startService() has been called in the app for the first time
AndroidStateCreated = AndroidStateKey.String("created")
// Any time after Activity.onPause() or, if the app has no Activity, Context.stopService() has been called when the app was in the foreground state
AndroidStateBackground = AndroidStateKey.String("background")
// Any time after Activity.onResume() or, if the app has no Activity, Context.startService() has been called when the app was in either the created or background states
AndroidStateForeground = AndroidStateKey.String("foreground")
)
// This semantic convention defines the attributes used to represent a feature
// flag evaluation as an event.
const (
// FeatureFlagKeyKey is the attribute Key conforming to the
// "feature_flag.key" semantic conventions. It represents the unique
// identifier of the feature flag.
//
// Type: string
// RequirementLevel: Required
// Stability: experimental
// Examples: 'logo-color'
FeatureFlagKeyKey = attribute.Key("feature_flag.key")
// FeatureFlagProviderNameKey is the attribute Key conforming to the
// "feature_flag.provider_name" semantic conventions. It represents the
// name of the service provider that performs the flag evaluation.
//
// Type: string
// RequirementLevel: Recommended
// Stability: experimental
// Examples: 'Flag Manager'
FeatureFlagProviderNameKey = attribute.Key("feature_flag.provider_name")
// FeatureFlagVariantKey is the attribute Key conforming to the
// "feature_flag.variant" semantic conventions. It represents the sHOULD be
// a semantic identifier for a value. If one is unavailable, a stringified
// version of the value can be used.
//
// Type: string
// RequirementLevel: Recommended
// Stability: experimental
// Examples: 'red', 'true', 'on'
// Note: A semantic identifier, commonly referred to as a variant, provides
// a means
// for referring to a value without including the value itself. This can
// provide additional context for understanding the meaning behind a value.
// For example, the variant `red` maybe be used for the value `#c05543`.
//
// A stringified version of the value can be used in situations where a
// semantic identifier is unavailable. String representation of the value
// should be determined by the implementer.
FeatureFlagVariantKey = attribute.Key("feature_flag.variant")
)
// FeatureFlagKey returns an attribute KeyValue conforming to the
// "feature_flag.key" semantic conventions. It represents the unique identifier
// of the feature flag.
func FeatureFlagKey(val string) attribute.KeyValue {
return FeatureFlagKeyKey.String(val)
}
// FeatureFlagProviderName returns an attribute KeyValue conforming to the
// "feature_flag.provider_name" semantic conventions. It represents the name of
// the service provider that performs the flag evaluation.
func FeatureFlagProviderName(val string) attribute.KeyValue {
return FeatureFlagProviderNameKey.String(val)
}
// FeatureFlagVariant returns an attribute KeyValue conforming to the
// "feature_flag.variant" semantic conventions. It represents the sHOULD be a
// semantic identifier for a value. If one is unavailable, a stringified
// version of the value can be used.
func FeatureFlagVariant(val string) attribute.KeyValue {
return FeatureFlagVariantKey.String(val)
}
// RPC received/sent message.
const (
// MessageCompressedSizeKey is the attribute Key conforming to the
// "message.compressed_size" semantic conventions. It represents the
// compressed size of the message in bytes.
//
// Type: int
// RequirementLevel: Optional
// Stability: experimental
MessageCompressedSizeKey = attribute.Key("message.compressed_size")
// MessageIDKey is the attribute Key conforming to the "message.id"
// semantic conventions. It represents the mUST be calculated as two
// different counters starting from `1` one for sent messages and one for
// received message.
//
// Type: int
// RequirementLevel: Optional
// Stability: experimental
// Note: This way we guarantee that the values will be consistent between
// different implementations.
MessageIDKey = attribute.Key("message.id")
// MessageTypeKey is the attribute Key conforming to the "message.type"
// semantic conventions. It represents the whether this is a received or
// sent message.
//
// Type: Enum
// RequirementLevel: Optional
// Stability: experimental
MessageTypeKey = attribute.Key("message.type")
// MessageUncompressedSizeKey is the attribute Key conforming to the
// "message.uncompressed_size" semantic conventions. It represents the
// uncompressed size of the message in bytes.
//
// Type: int
// RequirementLevel: Optional
// Stability: experimental
MessageUncompressedSizeKey = attribute.Key("message.uncompressed_size")
)
var (
// sent
MessageTypeSent = MessageTypeKey.String("SENT")
// received
MessageTypeReceived = MessageTypeKey.String("RECEIVED")
)
// MessageCompressedSize returns an attribute KeyValue conforming to the
// "message.compressed_size" semantic conventions. It represents the compressed
// size of the message in bytes.
func MessageCompressedSize(val int) attribute.KeyValue {
return MessageCompressedSizeKey.Int(val)
}
// MessageID returns an attribute KeyValue conforming to the "message.id"
// semantic conventions. It represents the mUST be calculated as two different
// counters starting from `1` one for sent messages and one for received
// message.
func MessageID(val int) attribute.KeyValue {
return MessageIDKey.Int(val)
}
// MessageUncompressedSize returns an attribute KeyValue conforming to the
// "message.uncompressed_size" semantic conventions. It represents the
// uncompressed size of the message in bytes.
func MessageUncompressedSize(val int) attribute.KeyValue {
return MessageUncompressedSizeKey.Int(val)
}

View File

@ -0,0 +1,9 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package semconv // import "go.opentelemetry.io/otel/semconv/v1.24.0"
const (
// ExceptionEventName is the name of the Span event representing an exception.
ExceptionEventName = "exception"
)

1071
vendor/go.opentelemetry.io/otel/semconv/v1.24.0/metric.go generated vendored Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,9 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package semconv // import "go.opentelemetry.io/otel/semconv/v1.24.0"
// SchemaURL is the schema URL that matches the version of the semantic conventions
// that this package defines. Semconv packages starting from v1.4.0 must declare
// non-empty schema URL in the form https://opentelemetry.io/schemas/<version>
const SchemaURL = "https://opentelemetry.io/schemas/1.24.0"

1323
vendor/go.opentelemetry.io/otel/semconv/v1.24.0/trace.go generated vendored Normal file

File diff suppressed because it is too large Load Diff

9
vendor/modules.txt vendored
View File

@ -249,7 +249,13 @@ github.com/prometheus/common/model
github.com/prometheus/procfs github.com/prometheus/procfs
github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/fs
github.com/prometheus/procfs/internal/util github.com/prometheus/procfs/internal/util
# github.com/redis/go-redis/v9 v9.4.0 # github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3
## explicit; go 1.19
github.com/redis/go-redis/extra/rediscmd/v9
# github.com/redis/go-redis/extra/redisotel/v9 v9.5.3
## explicit; go 1.19
github.com/redis/go-redis/extra/redisotel/v9
# github.com/redis/go-redis/v9 v9.5.3
## explicit; go 1.18 ## explicit; go 1.18
github.com/redis/go-redis/v9 github.com/redis/go-redis/v9
github.com/redis/go-redis/v9/internal github.com/redis/go-redis/v9/internal
@ -313,6 +319,7 @@ go.opentelemetry.io/otel/internal/global
go.opentelemetry.io/otel/propagation go.opentelemetry.io/otel/propagation
go.opentelemetry.io/otel/semconv/v1.17.0 go.opentelemetry.io/otel/semconv/v1.17.0
go.opentelemetry.io/otel/semconv/v1.20.0 go.opentelemetry.io/otel/semconv/v1.20.0
go.opentelemetry.io/otel/semconv/v1.24.0
go.opentelemetry.io/otel/semconv/v1.25.0 go.opentelemetry.io/otel/semconv/v1.25.0
# go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0 # go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0
## explicit; go 1.21 ## explicit; go 1.21