mirror of https://github.com/tikv/client-go.git
P-DML: for AlreadyExist error, attach the value from flushingMemDB (#1218)
* feat: for AlreadyExist error during Flush, attach the value from flushingMemDB to it Signed-off-by: ekexium <eke@fastmail.com> * style: apply comment suggestion Signed-off-by: ekexium <eke@fastmail.com> --------- Signed-off-by: ekexium <eke@fastmail.com> Co-authored-by: cfzjywxk <lsswxrxr@163.com>
This commit is contained in:
parent
0416875f55
commit
d59fea5757
|
|
@ -141,6 +141,11 @@ func (d *PDError) Error() string {
|
|||
// ErrKeyExist wraps *pdpb.AlreadyExist to implement the error interface.
|
||||
type ErrKeyExist struct {
|
||||
*kvrpcpb.AlreadyExist
|
||||
// Value stores the value that is being written when the conflict is detected.
|
||||
// It was supposed to be an Insert mutation so the value should exist.
|
||||
// The value is not in the kv protocol,
|
||||
// it is introduced for error message handling in pipeliend-DML.
|
||||
Value []byte
|
||||
}
|
||||
|
||||
func (k *ErrKeyExist) Error() string {
|
||||
|
|
|
|||
2
go.mod
2
go.mod
|
|
@ -52,7 +52,7 @@ require (
|
|||
golang.org/x/net v0.22.0 // indirect
|
||||
golang.org/x/sys v0.18.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
golang.org/x/tools v0.19.0 // indirect
|
||||
golang.org/x/tools v0.18.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240304212257-790db918fca8 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240308144416-29370a3891b7 // indirect
|
||||
google.golang.org/protobuf v1.33.0 // indirect
|
||||
|
|
|
|||
4
go.sum
4
go.sum
|
|
@ -192,8 +192,8 @@ golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtn
|
|||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
|
||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw=
|
||||
golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc=
|
||||
golang.org/x/tools v0.18.0 h1:k8NLag8AGHnn+PHbl7g43CtqZAwG60vZkLqgyZgIHgQ=
|
||||
golang.org/x/tools v0.18.0/go.mod h1:GL7B4CwcLLeo59yx/9UWWuNOW1n3VZ4f5axWfML7Lcg=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
|
|
|||
|
|
@ -16,15 +16,18 @@ package unionstore
|
|||
|
||||
import (
|
||||
"context"
|
||||
stderrors "errors"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
tikverr "github.com/tikv/client-go/v2/error"
|
||||
"github.com/tikv/client-go/v2/internal/logutil"
|
||||
"github.com/tikv/client-go/v2/kv"
|
||||
"github.com/tikv/client-go/v2/metrics"
|
||||
"github.com/tikv/client-go/v2/util"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// PipelinedMemDB is a Store which contains
|
||||
|
|
@ -216,6 +219,10 @@ func (p *PipelinedMemDB) Flush(force bool) (bool, error) {
|
|||
}
|
||||
if p.flushingMemDB != nil {
|
||||
if err := <-p.errCh; err != nil {
|
||||
if err != nil {
|
||||
err = p.handleAlreadyExistErr(err)
|
||||
}
|
||||
p.flushingMemDB = nil
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
|
@ -261,6 +268,9 @@ func (p *PipelinedMemDB) needFlush() bool {
|
|||
func (p *PipelinedMemDB) FlushWait() error {
|
||||
if p.flushingMemDB != nil {
|
||||
err := <-p.errCh
|
||||
if err != nil {
|
||||
err = p.handleAlreadyExistErr(err)
|
||||
}
|
||||
// cleanup the flushingMemDB so the next call of FlushWait will not wait for the error channel.
|
||||
p.flushingMemDB = nil
|
||||
return err
|
||||
|
|
@ -268,6 +278,25 @@ func (p *PipelinedMemDB) FlushWait() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (p *PipelinedMemDB) handleAlreadyExistErr(err error) error {
|
||||
var existErr *tikverr.ErrKeyExist
|
||||
if stderrors.As(err, &existErr) {
|
||||
v, err2 := p.flushingMemDB.Get(existErr.GetKey())
|
||||
if err2 != nil {
|
||||
// TODO: log more info like start_ts, also for other logs
|
||||
logutil.BgLogger().Warn(
|
||||
"[pipelined-dml] Getting value from flushingMemDB when"+
|
||||
" AlreadyExist error occurs failed", zap.Error(err2),
|
||||
zap.Uint64("generation", p.generation),
|
||||
)
|
||||
} else {
|
||||
existErr.Value = v
|
||||
}
|
||||
return existErr
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Iter implements the Retriever interface.
|
||||
func (p *PipelinedMemDB) Iter([]byte, []byte) (Iterator, error) {
|
||||
return nil, errors.New("pipelined memdb does not support Iter")
|
||||
|
|
|
|||
Loading…
Reference in New Issue