Skip to content

Commit

Permalink
kvclient: filter the old value if old-value is disabled (#1304) (#1347)
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
ti-srebot authored Jan 26, 2021
1 parent f7db19c commit eb66cf3
Show file tree
Hide file tree
Showing 7 changed files with 361 additions and 72 deletions.
57 changes: 16 additions & 41 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1360,19 +1360,9 @@ func (s *eventFeedSession) singleEventFeed(
}
metricPullEventInitializedCounter.Inc()
initialized = true
for _, cacheEntry := range matcher.cachedCommit {
value, ok := matcher.matchRow(cacheEntry)
if !ok {
// when cdc receives a commit log without a corresponding
// prewrite log before initialized, a committed log with
// the same key and start-ts must have been received.
log.Info("ignore commit event without prewrite",
zap.Binary("key", cacheEntry.GetKey()),
zap.Uint64("ts", cacheEntry.GetStartTs()))
continue
}

revent, err := assembleCommitEvent(regionID, cacheEntry, value)
cachedEvents := matcher.matchCachedRow()
for _, cachedEvent := range cachedEvents {
revent, err := assembleRowEvent(regionID, cachedEvent, s.enableOldValue)
if err != nil {
return lastResolvedTs, errors.Trace(err)
}
Expand All @@ -1383,30 +1373,11 @@ func (s *eventFeedSession) singleEventFeed(
return lastResolvedTs, errors.Trace(ctx.Err())
}
}
matcher.clearCacheCommit()
case cdcpb.Event_COMMITTED:
metricPullEventCommittedCounter.Inc()
var opType model.OpType
switch entry.GetOpType() {
case cdcpb.Event_Row_DELETE:
opType = model.OpTypeDelete
case cdcpb.Event_Row_PUT:
opType = model.OpTypePut
default:
return lastResolvedTs, cerror.ErrUnknownKVEventType.GenWithStackByArgs(entry.GetOpType(), entry)
}

revent := &model.RegionFeedEvent{
RegionID: regionID,
Val: &model.RawKVEntry{
OpType: opType,
Key: entry.Key,
Value: entry.GetValue(),
OldValue: entry.GetOldValue(),
StartTs: entry.StartTs,
CRTs: entry.CommitTs,
RegionID: regionID,
},
revent, err := assembleRowEvent(regionID, entry, s.enableOldValue)
if err != nil {
return lastResolvedTs, errors.Trace(err)
}

if entry.CommitTs <= lastResolvedTs {
Expand Down Expand Up @@ -1434,8 +1405,7 @@ func (s *eventFeedSession) singleEventFeed(
zap.Uint64("resolvedTs", lastResolvedTs),
zap.Uint64("regionID", regionID))
}
// emit a value
value, ok := matcher.matchRow(entry)
ok := matcher.matchRow(entry)
if !ok {
if !initialized {
matcher.cacheCommitRow(entry)
Expand All @@ -1444,7 +1414,7 @@ func (s *eventFeedSession) singleEventFeed(
return lastResolvedTs, cerror.ErrPrewriteNotMatch.GenWithStackByArgs(entry.GetKey(), entry.GetStartTs())
}

revent, err := assembleCommitEvent(regionID, entry, value)
revent, err := assembleRowEvent(regionID, entry, s.enableOldValue)
if err != nil {
return lastResolvedTs, errors.Trace(err)
}
Expand Down Expand Up @@ -1479,7 +1449,7 @@ func (s *eventFeedSession) singleEventFeed(
}
}

func assembleCommitEvent(regionID uint64, entry *cdcpb.Event_Row, value *pendingValue) (*model.RegionFeedEvent, error) {
func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row, enableOldValue bool) (*model.RegionFeedEvent, error) {
var opType model.OpType
switch entry.GetOpType() {
case cdcpb.Event_Row_DELETE:
Expand All @@ -1495,13 +1465,18 @@ func assembleCommitEvent(regionID uint64, entry *cdcpb.Event_Row, value *pending
Val: &model.RawKVEntry{
OpType: opType,
Key: entry.Key,
Value: value.value,
OldValue: value.oldValue,
Value: entry.GetValue(),
StartTs: entry.StartTs,
CRTs: entry.CommitTs,
RegionID: regionID,
},
}

// when old-value is disabled, it is still possible for the tikv to send a event containing the old value
// we need avoid a old-value sent to downstream when old-value is disabled
if enableOldValue {
revent.Val.OldValue = entry.GetOldValue()
}
return revent, nil
}

Expand Down
117 changes: 117 additions & 0 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,123 @@ func (s *clientSuite) TestNewClose(c *check.C) {
c.Assert(err, check.IsNil)
}

func (s *clientSuite) TestAssembleRowEvent(c *check.C) {
defer testleak.AfterTest(c)()
testCases := []struct {
regionID uint64
entry *cdcpb.Event_Row
enableOldValue bool
expected *model.RegionFeedEvent
err string
}{{
regionID: 1,
entry: &cdcpb.Event_Row{
StartTs: 1,
CommitTs: 2,
Key: []byte("k1"),
Value: []byte("v1"),
OpType: cdcpb.Event_Row_PUT,
},
enableOldValue: false,
expected: &model.RegionFeedEvent{
RegionID: 1,
Val: &model.RawKVEntry{
OpType: model.OpTypePut,
StartTs: 1,
CRTs: 2,
Key: []byte("k1"),
Value: []byte("v1"),
RegionID: 1,
},
},
}, {
regionID: 2,
entry: &cdcpb.Event_Row{
StartTs: 1,
CommitTs: 2,
Key: []byte("k2"),
Value: []byte("v2"),
OpType: cdcpb.Event_Row_DELETE,
},
enableOldValue: false,
expected: &model.RegionFeedEvent{
RegionID: 2,
Val: &model.RawKVEntry{
OpType: model.OpTypeDelete,
StartTs: 1,
CRTs: 2,
Key: []byte("k2"),
Value: []byte("v2"),
RegionID: 2,
},
},
}, {
regionID: 3,
entry: &cdcpb.Event_Row{
StartTs: 1,
CommitTs: 2,
Key: []byte("k2"),
Value: []byte("v2"),
OldValue: []byte("ov2"),
OpType: cdcpb.Event_Row_PUT,
},
enableOldValue: false,
expected: &model.RegionFeedEvent{
RegionID: 3,
Val: &model.RawKVEntry{
OpType: model.OpTypePut,
StartTs: 1,
CRTs: 2,
Key: []byte("k2"),
Value: []byte("v2"),
RegionID: 3,
},
},
}, {
regionID: 4,
entry: &cdcpb.Event_Row{
StartTs: 1,
CommitTs: 2,
Key: []byte("k3"),
Value: []byte("v3"),
OldValue: []byte("ov3"),
OpType: cdcpb.Event_Row_PUT,
},
enableOldValue: true,
expected: &model.RegionFeedEvent{
RegionID: 4,
Val: &model.RawKVEntry{
OpType: model.OpTypePut,
StartTs: 1,
CRTs: 2,
Key: []byte("k3"),
Value: []byte("v3"),
OldValue: []byte("ov3"),
RegionID: 4,
},
},
}, {
regionID: 2,
entry: &cdcpb.Event_Row{
StartTs: 1,
CommitTs: 2,
Key: []byte("k2"),
Value: []byte("v2"),
OpType: cdcpb.Event_Row_UNKNOWN,
},
enableOldValue: false,
err: "[CDC:ErrUnknownKVEventType]unknown kv event type: UNKNOWN, entry: start_ts:1 commit_ts:2 key:\"k2\" value:\"v2\" ",
}}

for _, tc := range testCases {
event, err := assembleRowEvent(tc.regionID, tc.entry, tc.enableOldValue)
c.Assert(event, check.DeepEquals, tc.expected)
if err != nil {
c.Assert(err.Error(), check.Equals, tc.err)
}
}
}

func mockInitializedEvent(regionID, requestID uint64) *cdcpb.ChangeDataEvent {
initialized := &cdcpb.ChangeDataEvent{
Events: []*cdcpb.Event{
Expand Down
60 changes: 40 additions & 20 deletions cdc/kv/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@ package kv

import (
"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/log"
"go.uber.org/zap"
)

type pendingValue struct {
value []byte
oldValue []byte
}

type matcher struct {
// TODO : clear the single prewrite
unmatchedValue map[matchKey]*pendingValue
unmatchedValue map[matchKey]*cdcpb.Event_Row
cachedCommit []*cdcpb.Event_Row
}

Expand All @@ -39,39 +36,62 @@ func newMatchKey(row *cdcpb.Event_Row) matchKey {

func newMatcher() *matcher {
return &matcher{
unmatchedValue: make(map[matchKey]*pendingValue),
unmatchedValue: make(map[matchKey]*cdcpb.Event_Row),
}
}

func (m *matcher) putPrewriteRow(row *cdcpb.Event_Row) {
key := newMatchKey(row)
value := row.GetValue()
oldvalue := row.GetOldValue()
// tikv may send a prewrite event with empty value (txn heartbeat)
// here we need to avoid the invalid prewrite event overwrite the value
if _, exist := m.unmatchedValue[key]; exist && len(value) == 0 {
// tikv may send a fake prewrite event with empty value caused by txn heartbeat.
// here we need to avoid the fake prewrite event overwrite the prewrite value.

// when the old-value is disabled, the value of the fake prewrite event is empty.
// when the old-value is enabled, the value of the fake prewrite event is also empty,
// but the old value of the fake prewrite event is not empty.
// We can distinguish fake prewrite events by whether the value is empty,
// no matter the old-value is enable or disabled
if _, exist := m.unmatchedValue[key]; exist && len(row.GetValue()) == 0 {
return
}
m.unmatchedValue[key] = &pendingValue{
value: value,
oldValue: oldvalue,
}
m.unmatchedValue[key] = row
}

func (m *matcher) matchRow(row *cdcpb.Event_Row) (*pendingValue, bool) {
// matchRow matches the commit event with the cached prewrite event
// the Value and OldValue will be assigned if a matched prewrite event exists.
func (m *matcher) matchRow(row *cdcpb.Event_Row) bool {
if value, exist := m.unmatchedValue[newMatchKey(row)]; exist {
row.Value = value.GetValue()
row.OldValue = value.GetOldValue()
delete(m.unmatchedValue, newMatchKey(row))
return value, true
return true
}
return nil, false
return false
}

func (m *matcher) cacheCommitRow(row *cdcpb.Event_Row) {
m.cachedCommit = append(m.cachedCommit, row)
}

func (m *matcher) clearCacheCommit() {
func (m *matcher) matchCachedRow() []*cdcpb.Event_Row {
cachedCommit := m.cachedCommit
m.cachedCommit = nil
top := 0
for i := 0; i < len(cachedCommit); i++ {
cacheEntry := cachedCommit[i]
ok := m.matchRow(cacheEntry)
if !ok {
// when cdc receives a commit log without a corresponding
// prewrite log before initialized, a committed log with
// the same key and start-ts must have been received.
log.Info("ignore commit event without prewrite",
zap.Binary("key", cacheEntry.GetKey()),
zap.Uint64("ts", cacheEntry.GetStartTs()))
continue
}
cachedCommit[top] = cacheEntry
top++
}
return cachedCommit[:top]
}

func (m *matcher) rollbackRow(row *cdcpb.Event_Row) {
Expand Down
Loading

0 comments on commit eb66cf3

Please sign in to comment.