diff --git a/sdk/metric/aggregator/minmaxsumcount/mmsc.go b/sdk/metric/aggregator/minmaxsumcount/mmsc.go index 57a5cc885..d1f174fd5 100644 --- a/sdk/metric/aggregator/minmaxsumcount/mmsc.go +++ b/sdk/metric/aggregator/minmaxsumcount/mmsc.go @@ -16,26 +16,25 @@ package minmaxsumcount // import "go.opentelemetry.io/otel/sdk/metric/aggregator import ( "context" + "sync" "go.opentelemetry.io/otel/api/core" "go.opentelemetry.io/otel/api/metric" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregator" - "go.opentelemetry.io/otel/sdk/internal" ) type ( - // Aggregator aggregates measure events, keeping only the max, + // Aggregator aggregates measure events, keeping only the min, max, // sum, and count. Aggregator struct { - // states has to be aligned for 64-bit atomic operations. - states [2]state - lock internal.StateLocker - kind core.NumberKind + lock sync.Mutex + current state + checkpoint state + kind core.NumberKind } state struct { - // all fields have to be aligned for 64-bit atomic operations. count core.Number sum core.Number min core.Number @@ -47,27 +46,18 @@ var _ export.Aggregator = &Aggregator{} var _ aggregator.MinMaxSumCount = &Aggregator{} // New returns a new measure aggregator for computing min, max, sum, and -// count. It does not compute quantile information other than Max. +// count. It does not compute quantile information other than Min and Max. // -// This aggregator uses the StateLocker pattern to guarantee -// the count, sum, min and max are consistent within a checkpoint +// This type uses a mutex for Update() and Checkpoint() concurrency. func New(desc *metric.Descriptor) *Aggregator { kind := desc.NumberKind() return &Aggregator{ kind: kind, - states: [2]state{ - { - count: core.NewUint64Number(0), - sum: kind.Zero(), - min: kind.Maximum(), - max: kind.Minimum(), - }, - { - count: core.NewUint64Number(0), - sum: kind.Zero(), - min: kind.Maximum(), - max: kind.Minimum(), - }, + current: state{ + count: core.NewUint64Number(0), + sum: kind.Zero(), + min: kind.Maximum(), + max: kind.Minimum(), }, } } @@ -76,14 +66,14 @@ func New(desc *metric.Descriptor) *Aggregator { func (c *Aggregator) Sum() (core.Number, error) { c.lock.Lock() defer c.lock.Unlock() - return c.checkpoint().sum, nil + return c.checkpoint.sum, nil } // Count returns the number of values in the checkpoint. func (c *Aggregator) Count() (int64, error) { c.lock.Lock() defer c.lock.Unlock() - return c.checkpoint().count.CoerceToInt64(core.Uint64NumberKind), nil + return c.checkpoint.count.CoerceToInt64(core.Uint64NumberKind), nil } // Min returns the minimum value in the checkpoint. @@ -92,10 +82,10 @@ func (c *Aggregator) Count() (int64, error) { func (c *Aggregator) Min() (core.Number, error) { c.lock.Lock() defer c.lock.Unlock() - if c.checkpoint().count.IsZero(core.Uint64NumberKind) { + if c.checkpoint.count.IsZero(core.Uint64NumberKind) { return c.kind.Zero(), aggregator.ErrNoData } - return c.checkpoint().min, nil + return c.checkpoint.min, nil } // Max returns the maximum value in the checkpoint. @@ -104,63 +94,43 @@ func (c *Aggregator) Min() (core.Number, error) { func (c *Aggregator) Max() (core.Number, error) { c.lock.Lock() defer c.lock.Unlock() - if c.checkpoint().count.IsZero(core.Uint64NumberKind) { + if c.checkpoint.count.IsZero(core.Uint64NumberKind) { return c.kind.Zero(), aggregator.ErrNoData } - return c.checkpoint().max, nil + return c.checkpoint.max, nil } // Checkpoint saves the current state and resets the current state to // the empty set. func (c *Aggregator) Checkpoint(ctx context.Context, desc *metric.Descriptor) { - c.lock.SwapActiveState(c.resetCheckpoint) + c.lock.Lock() + c.checkpoint, c.current = c.current, c.emptyState() + c.lock.Unlock() } -// checkpoint returns the "cold" state, i.e. state collected prior to the -// most recent Checkpoint() call -func (c *Aggregator) checkpoint() *state { - return &c.states[c.lock.ColdIdx()] -} - -func (c *Aggregator) resetCheckpoint() { - checkpoint := c.checkpoint() - - checkpoint.count.SetUint64(0) - checkpoint.sum.SetNumber(c.kind.Zero()) - checkpoint.min.SetNumber(c.kind.Maximum()) - checkpoint.max.SetNumber(c.kind.Minimum()) +func (c *Aggregator) emptyState() state { + kind := c.kind + return state{ + count: core.NewUint64Number(0), + sum: kind.Zero(), + min: kind.Maximum(), + max: kind.Minimum(), + } } // Update adds the recorded measurement to the current data set. func (c *Aggregator) Update(_ context.Context, number core.Number, desc *metric.Descriptor) error { kind := desc.NumberKind() - cIdx := c.lock.Start() - defer c.lock.End(cIdx) - - current := &c.states[cIdx] - current.count.AddUint64Atomic(1) - current.sum.AddNumberAtomic(kind, number) - - for { - cmin := current.min.AsNumberAtomic() - - if number.CompareNumber(kind, cmin) >= 0 { - break - } - if current.min.CompareAndSwapNumber(cmin, number) { - break - } + c.lock.Lock() + defer c.lock.Unlock() + c.current.count.AddInt64(1) + c.current.sum.AddNumber(kind, number) + if number.CompareNumber(kind, c.current.min) < 0 { + c.current.min = number } - for { - cmax := current.max.AsNumberAtomic() - - if number.CompareNumber(kind, cmax) <= 0 { - break - } - if current.max.CompareAndSwapNumber(cmax, number) { - break - } + if number.CompareNumber(kind, c.current.max) > 0 { + c.current.max = number } return nil } @@ -172,22 +142,14 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *metric.Descriptor) error return aggregator.NewInconsistentMergeError(c, oa) } - // Lock() synchronizes Merge() and Checkpoint() to ensure all operations of - // Merge() are performed on the same state. - c.lock.Lock() - defer c.lock.Unlock() + c.checkpoint.count.AddNumber(core.Uint64NumberKind, o.checkpoint.count) + c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum) - current := c.checkpoint() - ocheckpoint := o.checkpoint() - - current.count.AddNumber(core.Uint64NumberKind, ocheckpoint.count) - current.sum.AddNumber(desc.NumberKind(), ocheckpoint.sum) - - if current.min.CompareNumber(desc.NumberKind(), ocheckpoint.min) > 0 { - current.min.SetNumber(ocheckpoint.min) + if c.checkpoint.min.CompareNumber(desc.NumberKind(), o.checkpoint.min) > 0 { + c.checkpoint.min.SetNumber(o.checkpoint.min) } - if current.max.CompareNumber(desc.NumberKind(), ocheckpoint.max) < 0 { - current.max.SetNumber(ocheckpoint.max) + if c.checkpoint.max.CompareNumber(desc.NumberKind(), o.checkpoint.max) < 0 { + c.checkpoint.max.SetNumber(o.checkpoint.max) } return nil } diff --git a/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go b/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go index 80f0cec4e..58262d7ff 100644 --- a/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go +++ b/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go @@ -18,15 +18,12 @@ import ( "context" "math" "math/rand" - "os" "testing" - "unsafe" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/api/core" "go.opentelemetry.io/otel/api/metric" - ottest "go.opentelemetry.io/otel/internal/testing" "go.opentelemetry.io/otel/sdk/export/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/aggregator/test" ) @@ -62,37 +59,6 @@ var ( } ) -// Ensure struct alignment prior to running tests. -func TestMain(m *testing.M) { - fields := []ottest.FieldOffset{ - { - Name: "Aggregator.states", - Offset: unsafe.Offsetof(Aggregator{}.states), - }, - { - Name: "state.count", - Offset: unsafe.Offsetof(state{}.count), - }, - { - Name: "state.sum", - Offset: unsafe.Offsetof(state{}.sum), - }, - { - Name: "state.min", - Offset: unsafe.Offsetof(state{}.min), - }, - { - Name: "state.max", - Offset: unsafe.Offsetof(state{}.max), - }, - } - if !ottest.Aligned8Byte(fields, os.Stderr) { - os.Exit(1) - } - - os.Exit(m.Run()) -} - func TestMinMaxSumCountAbsolute(t *testing.T) { test.RunProfiles(t, func(t *testing.T, profile test.Profile) { minMaxSumCount(t, profile, positiveOnly) diff --git a/sdk/metric/minmaxsumcount_stress_test.go b/sdk/metric/minmaxsumcount_stress_test.go index b5a7e6eab..10611f24f 100644 --- a/sdk/metric/minmaxsumcount_stress_test.go +++ b/sdk/metric/minmaxsumcount_stress_test.go @@ -12,10 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// This test is too large for the race detector. This SDK uses no locks -// that the race detector would help with, anyway. -// +build !race - package metric_test import (