mirror of https://github.com/tikv/pd.git
394 lines
13 KiB
Go
394 lines
13 KiB
Go
// Copyright 2020 TiKV Project Authors.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package tso
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"runtime/trace"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/kvproto/pkg/pdpb"
|
|
"github.com/pingcap/log"
|
|
|
|
"github.com/tikv/pd/pkg/election"
|
|
"github.com/tikv/pd/pkg/errs"
|
|
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
|
|
"github.com/tikv/pd/pkg/mcs/utils/constant"
|
|
"github.com/tikv/pd/pkg/member"
|
|
"github.com/tikv/pd/pkg/storage/endpoint"
|
|
"github.com/tikv/pd/pkg/utils/keypath"
|
|
"github.com/tikv/pd/pkg/utils/logutil"
|
|
)
|
|
|
|
const (
|
|
// maxUpdateTSORetryCount is the max retry count for updating TSO.
|
|
// When encountering a network partition, manually retrying may help the next request succeed with the new endpoint according to https://github.com/etcd-io/etcd/issues/8711
|
|
maxUpdateTSORetryCount = 3
|
|
// Etcd client retry with `roundRobinQuorumBackoff` (https://github.com/etcd-io/etcd/blob/d62cdeee4863001b09e772ed013eb1342a1d0f89/client/v3/client.go#L488),
|
|
// whose default interval is 25ms, so we sleep 50ms here. (https://github.com/etcd-io/etcd/blob/d62cdeee4863001b09e772ed013eb1342a1d0f89/client/v3/options.go#L53)
|
|
updateTSORetryInterval = 50 * time.Millisecond
|
|
)
|
|
|
|
// Allocator is the global single point TSO allocator.
|
|
type Allocator struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
|
|
cfg Config
|
|
// keyspaceGroupID is the keyspace group ID of the allocator.
|
|
keyspaceGroupID uint32
|
|
// for election use
|
|
member member.Election
|
|
// expectedPrimaryLease is used to store the expected primary lease.
|
|
expectedPrimaryLease atomic.Value // store as *election.LeaderLease
|
|
timestampOracle *timestampOracle
|
|
|
|
// observability
|
|
tsoAllocatorRoleGauge prometheus.Gauge
|
|
logFields []zap.Field
|
|
}
|
|
|
|
// NewAllocator creates a new TSO allocator.
|
|
func NewAllocator(
|
|
ctx context.Context,
|
|
keyspaceGroupID uint32,
|
|
member member.Election,
|
|
storage endpoint.TSOStorage,
|
|
cfg Config,
|
|
) *Allocator {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
keyspaceGroupIDStr := strconv.FormatUint(uint64(keyspaceGroupID), 10)
|
|
a := &Allocator{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
cfg: cfg,
|
|
keyspaceGroupID: keyspaceGroupID,
|
|
member: member,
|
|
timestampOracle: ×tampOracle{
|
|
keyspaceGroupID: keyspaceGroupID,
|
|
member: member,
|
|
storage: storage,
|
|
saveInterval: cfg.GetTSOSaveInterval(),
|
|
updatePhysicalInterval: cfg.GetTSOUpdatePhysicalInterval(),
|
|
maxResetTSGap: cfg.GetMaxResetTSGap,
|
|
tsoMux: &tsoObject{},
|
|
metrics: newTSOMetrics(keyspaceGroupIDStr),
|
|
},
|
|
tsoAllocatorRoleGauge: tsoAllocatorRole.WithLabelValues(keyspaceGroupIDStr),
|
|
logFields: []zap.Field{
|
|
logutil.CondUint32("keyspace-group-id", keyspaceGroupID, keyspaceGroupID > 0),
|
|
zap.String("name", member.Name()),
|
|
zap.Uint64("id", member.ID()),
|
|
},
|
|
}
|
|
|
|
a.wg.Add(1)
|
|
go a.allocatorUpdater()
|
|
|
|
return a
|
|
}
|
|
|
|
// allocatorUpdater is used to run the TSO Allocator updating daemon.
|
|
func (a *Allocator) allocatorUpdater() {
|
|
defer logutil.LogPanic()
|
|
defer a.wg.Done()
|
|
|
|
tsTicker := time.NewTicker(a.cfg.GetTSOUpdatePhysicalInterval())
|
|
failpoint.Inject("fastUpdatePhysicalInterval", func() {
|
|
tsTicker.Reset(time.Millisecond)
|
|
})
|
|
defer tsTicker.Stop()
|
|
|
|
log.Info("entering into allocator update loop", a.logFields...)
|
|
for {
|
|
select {
|
|
case <-tsTicker.C:
|
|
// Only try to update when the member is serving and the allocator is initialized.
|
|
if !a.isServing() || !a.IsInitialize() {
|
|
continue
|
|
}
|
|
if err := a.UpdateTSO(); err != nil {
|
|
log.Warn("failed to update allocator's timestamp", append(a.logFields, errs.ZapError(err))...)
|
|
a.Reset(true)
|
|
// To wait for the allocator to be re-initialized next time.
|
|
continue
|
|
}
|
|
case <-a.ctx.Done():
|
|
a.Reset(false)
|
|
log.Info("exit the allocator update loop", a.logFields...)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close is used to close the allocator and shutdown all the daemon loops.
|
|
// tso service call this function to shutdown the loop here, but pd manages its own loop.
|
|
func (a *Allocator) Close() {
|
|
log.Info("closing the allocator", a.logFields...)
|
|
a.cancel()
|
|
a.wg.Wait()
|
|
log.Info("closed the allocator", a.logFields...)
|
|
}
|
|
|
|
// Initialize will initialize the created TSO allocator.
|
|
func (a *Allocator) Initialize() error {
|
|
a.tsoAllocatorRoleGauge.Set(1)
|
|
return a.timestampOracle.syncTimestamp()
|
|
}
|
|
|
|
// IsInitialize is used to indicates whether this allocator is initialized.
|
|
func (a *Allocator) IsInitialize() bool {
|
|
return a.timestampOracle.isInitialized()
|
|
}
|
|
|
|
// UpdateTSO is used to update the TSO in memory and the time window in etcd.
|
|
func (a *Allocator) UpdateTSO() (err error) {
|
|
for i := range maxUpdateTSORetryCount {
|
|
err = a.timestampOracle.updateTimestamp()
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
log.Warn("try to update the tso but failed",
|
|
zap.Int("retry-count", i), zap.Duration("retry-interval", updateTSORetryInterval), errs.ZapError(err))
|
|
time.Sleep(updateTSORetryInterval)
|
|
}
|
|
return
|
|
}
|
|
|
|
// SetTSO sets the physical part with given TSO.
|
|
func (a *Allocator) SetTSO(tso uint64, ignoreSmaller, skipUpperBoundCheck bool) error {
|
|
return a.timestampOracle.resetUserTimestamp(tso, ignoreSmaller, skipUpperBoundCheck)
|
|
}
|
|
|
|
// GenerateTSO is used to generate the given number of TSOs. Make sure you have initialized the TSO allocator before calling this method.
|
|
func (a *Allocator) GenerateTSO(ctx context.Context, count uint32) (pdpb.Timestamp, error) {
|
|
defer trace.StartRegion(ctx, "Allocator.GenerateTSO").End()
|
|
if !a.isServing() {
|
|
// "leader" is not suitable name, but we keep it for compatibility.
|
|
a.getMetrics().notLeaderEvent.Inc()
|
|
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested pd %s of cluster", errs.NotLeaderErr))
|
|
}
|
|
|
|
return a.timestampOracle.getTS(ctx, count)
|
|
}
|
|
|
|
// Reset is used to reset the TSO allocator, it will also reset the leadership if the `resetLeadership` flag is true.
|
|
func (a *Allocator) Reset(resetLeadership bool) {
|
|
a.tsoAllocatorRoleGauge.Set(0)
|
|
a.timestampOracle.resetTimestamp()
|
|
// Reset if it still has the leadership. Otherwise the data race may occur because of the re-campaigning.
|
|
if resetLeadership && a.isServing() {
|
|
a.member.Resign()
|
|
}
|
|
}
|
|
|
|
// The PD server will conduct its own leadership election independently of the TSO allocator,
|
|
// while the TSO service will manage its leadership election within the TSO allocator.
|
|
// This function is used to manually initiate the TSO allocator leadership election loop.
|
|
func (a *Allocator) startPrimaryElectionLoop() {
|
|
a.wg.Add(1)
|
|
go a.primaryElectionLoop()
|
|
}
|
|
|
|
// primaryElectionLoop is used to maintain the TSO primary election and TSO's
|
|
// running allocator. It is only used in microservice env.
|
|
func (a *Allocator) primaryElectionLoop() {
|
|
defer logutil.LogPanic()
|
|
defer a.wg.Done()
|
|
|
|
for {
|
|
select {
|
|
case <-a.ctx.Done():
|
|
log.Info("exit the tso primary election loop", a.logFields...)
|
|
return
|
|
default:
|
|
}
|
|
m := a.member.(*member.Participant)
|
|
primary, checkAgain := m.CheckPrimary()
|
|
if checkAgain {
|
|
continue
|
|
}
|
|
if primary != nil {
|
|
log.Info("start to watch the primary",
|
|
append(a.logFields, zap.Stringer("tso-primary", primary))...)
|
|
// Watch will keep looping and never return unless the primary has changed.
|
|
primary.Watch(a.ctx)
|
|
log.Info("the tso primary has changed, try to re-campaign a primary",
|
|
append(a.logFields, zap.Stringer("old-tso-primary", primary))...)
|
|
}
|
|
|
|
// To make sure the expected primary(if existed) and new primary are on the same server.
|
|
expectedPrimary := mcsutils.GetExpectedPrimaryFlag(a.member.Client(), &keypath.MsParam{
|
|
ServiceName: constant.TSOServiceName,
|
|
GroupID: a.keyspaceGroupID,
|
|
})
|
|
// skip campaign the primary if the expected primary is not empty and not equal to the current memberValue.
|
|
// expected primary ONLY SET BY `{service}/primary/transfer` API.
|
|
if len(expectedPrimary) > 0 && !strings.Contains(a.member.MemberValue(), expectedPrimary) {
|
|
log.Info("skip campaigning of tso primary and check later", append(a.logFields,
|
|
zap.String("expected-primary-id", expectedPrimary),
|
|
zap.String("cur-member-value", m.ParticipantString()))...)
|
|
time.Sleep(200 * time.Millisecond)
|
|
continue
|
|
}
|
|
|
|
a.campaignPrimary()
|
|
}
|
|
}
|
|
|
|
func (a *Allocator) campaignPrimary() {
|
|
log.Info("start to campaign the primary", a.logFields...)
|
|
lease := a.cfg.GetLease()
|
|
if err := a.member.Campaign(a.ctx, lease); err != nil {
|
|
if errors.Is(err, errs.ErrEtcdTxnConflict) {
|
|
log.Info("campaign tso primary meets error due to txn conflict, another tso server may campaign successfully",
|
|
a.logFields...)
|
|
} else if errors.Is(err, errs.ErrCheckCampaign) {
|
|
log.Info("campaign tso primary meets error due to pre-check campaign failed, the tso keyspace group may be in split",
|
|
a.logFields...)
|
|
} else {
|
|
log.Error("campaign tso primary meets error due to etcd error", append(a.logFields, errs.ZapError(err))...)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Start keepalive the leadership and enable TSO service.
|
|
// TSO service is strictly enabled/disabled by the leader lease for 2 reasons:
|
|
// 1. lease based approach is not affected by thread pause, slow runtime schedule, etc.
|
|
// 2. load region could be slow. Based on lease we can recover TSO service faster.
|
|
ctx, cancel := context.WithCancel(a.ctx)
|
|
var resetPrimaryOnce sync.Once
|
|
defer resetPrimaryOnce.Do(func() {
|
|
cancel()
|
|
a.member.Resign()
|
|
})
|
|
|
|
// maintain the leadership, after this, TSO can be service.
|
|
a.member.GetLeadership().Keep(ctx)
|
|
log.Info("campaign tso primary ok", a.logFields...)
|
|
|
|
log.Info("initializing the tso allocator")
|
|
if err := a.Initialize(); err != nil {
|
|
log.Error("failed to initialize the tso allocator", append(a.logFields, errs.ZapError(err))...)
|
|
return
|
|
}
|
|
defer func() {
|
|
// Primary will be reset in `resetPrimaryOnce` later.
|
|
a.Reset(false)
|
|
}()
|
|
|
|
// check expected primary and watch the primary.
|
|
exitPrimary := make(chan struct{})
|
|
primaryLease, err := mcsutils.KeepExpectedPrimaryAlive(ctx, a.member.Client(), exitPrimary,
|
|
lease, &keypath.MsParam{
|
|
ServiceName: constant.TSOServiceName,
|
|
GroupID: a.keyspaceGroupID,
|
|
}, a.member.(*member.Participant))
|
|
if err != nil {
|
|
log.Error("prepare tso primary watch error", append(a.logFields, errs.ZapError(err))...)
|
|
return
|
|
}
|
|
a.expectedPrimaryLease.Store(primaryLease)
|
|
a.member.PromoteSelf()
|
|
|
|
tsoLabel := fmt.Sprintf("TSO Service Group %d", a.keyspaceGroupID)
|
|
member.ServiceMemberGauge.WithLabelValues(tsoLabel).Set(1)
|
|
defer resetPrimaryOnce.Do(func() {
|
|
cancel()
|
|
a.member.Resign()
|
|
member.ServiceMemberGauge.WithLabelValues(tsoLabel).Set(0)
|
|
})
|
|
|
|
log.Info("tso primary is ready to serve", a.logFields...)
|
|
|
|
primaryTicker := time.NewTicker(constant.PrimaryTickInterval)
|
|
defer primaryTicker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-primaryTicker.C:
|
|
if !a.isServing() {
|
|
log.Info("no longer a primary because lease has expired, the tso primary will step down", a.logFields...)
|
|
return
|
|
}
|
|
case <-ctx.Done():
|
|
// Server is closed and it should return nil.
|
|
log.Info("exit primary campaign", a.logFields...)
|
|
return
|
|
case <-exitPrimary:
|
|
log.Info("no longer be primary because primary have been updated, the TSO primary will step down", a.logFields...)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetPrimaryAddr returns the address of primary in the election group.
|
|
func (a *Allocator) GetPrimaryAddr() string {
|
|
if a == nil || a.member == nil {
|
|
return ""
|
|
}
|
|
primaryAddrs := a.member.GetServingUrls()
|
|
if len(primaryAddrs) < 1 {
|
|
return ""
|
|
}
|
|
return primaryAddrs[0]
|
|
}
|
|
|
|
// GetMember returns the member of the allocator.
|
|
func (a *Allocator) GetMember() member.Election {
|
|
return a.member
|
|
}
|
|
|
|
// isServing returns whether the member is serving or not.
|
|
// For PD, whether the member is the leader.
|
|
// For microservices, whether the participant is the primary.
|
|
func (a *Allocator) isServing() bool {
|
|
if a == nil || a.member == nil {
|
|
return false
|
|
}
|
|
return a.member.IsServing()
|
|
}
|
|
|
|
func (a *Allocator) isPrimaryElected() bool {
|
|
if a == nil || a.member == nil {
|
|
return false
|
|
}
|
|
return a.member.(*member.Participant).IsPrimaryElected()
|
|
}
|
|
|
|
// GetExpectedPrimaryLease returns the expected primary lease.
|
|
func (a *Allocator) GetExpectedPrimaryLease() *election.Lease {
|
|
l := a.expectedPrimaryLease.Load()
|
|
if l == nil {
|
|
return nil
|
|
}
|
|
return l.(*election.Lease)
|
|
}
|
|
|
|
func (a *Allocator) getMetrics() *tsoMetrics {
|
|
return a.timestampOracle.metrics
|
|
}
|