diff --git a/oracle/oracles/pd_test.go b/oracle/oracles/pd_test.go index 27ec8991a9..4a354d75d4 100644 --- a/oracle/oracles/pd_test.go +++ b/oracle/oracles/pd_test.go @@ -237,56 +237,6 @@ func TestAdaptiveUpdateTSInterval(t *testing.T) { assert.Equal(t, adaptiveUpdateTSIntervalStateNormal, o.adaptiveUpdateIntervalState.state) } -func TestValidateReadTS(t *testing.T) { - testImpl := func(staleRead bool) { - pdClient := MockPdClient{} - o, err := NewPdOracle(&pdClient, &PDOracleOptions{ - UpdateInterval: time.Second * 2, - }) - assert.NoError(t, err) - defer o.Close() - - ctx := context.Background() - opt := &oracle.Option{TxnScope: oracle.GlobalTxnScope} - - // Always returns error for MaxUint64 - err = o.ValidateReadTS(ctx, math.MaxUint64, staleRead, opt) - if staleRead { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - - ts, err := o.GetTimestamp(ctx, opt) - assert.NoError(t, err) - assert.GreaterOrEqual(t, ts, uint64(1)) - - err = o.ValidateReadTS(ctx, 1, staleRead, opt) - assert.NoError(t, err) - ts, err = o.GetTimestamp(ctx, opt) - assert.NoError(t, err) - // The readTS exceeds the latest ts, so it first fails the check with the low resolution ts. Then it fallbacks to - // the fetching-from-PD path, and it can get the previous ts + 1, which can allow this validation to pass. - err = o.ValidateReadTS(ctx, ts+1, staleRead, opt) - assert.NoError(t, err) - // It can't pass if the readTS is newer than previous ts + 2. - ts, err = o.GetTimestamp(ctx, opt) - assert.NoError(t, err) - err = o.ValidateReadTS(ctx, ts+2, staleRead, opt) - assert.Error(t, err) - - // Simulate other PD clients requests a timestamp. - ts, err = o.GetTimestamp(ctx, opt) - assert.NoError(t, err) - pdClient.logicalTimestamp.Add(2) - err = o.ValidateReadTS(ctx, ts+3, staleRead, opt) - assert.NoError(t, err) - } - - testImpl(true) - testImpl(false) -} - type MockPDClientWithPause struct { MockPdClient mu sync.Mutex @@ -306,105 +256,6 @@ func (c *MockPDClientWithPause) Resume() { c.mu.Unlock() } -func TestValidateReadTSForStaleReadReusingGetTSResult(t *testing.T) { - pdClient := &MockPDClientWithPause{} - o, err := NewPdOracle(pdClient, &PDOracleOptions{ - UpdateInterval: time.Second * 2, - NoUpdateTS: true, - }) - assert.NoError(t, err) - defer o.Close() - - asyncValidate := func(ctx context.Context, readTS uint64) chan error { - ch := make(chan error, 1) - go func() { - err := o.ValidateReadTS(ctx, readTS, true, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - ch <- err - }() - return ch - } - - noResult := func(ch chan error) { - select { - case <-ch: - assert.FailNow(t, "a ValidateReadTS operation is not blocked while it's expected to be blocked") - default: - } - } - - cancelIndices := []int{-1, -1, 0, 1} - for i, ts := range []uint64{100, 200, 300, 400} { - // Note: the ts is the result that the next GetTS will return. Any validation with readTS <= ts should pass, otherwise fail. - - // We will cancel the cancelIndex-th validation call. This is for testing that canceling some of the calls - // doesn't affect other calls that are waiting - cancelIndex := cancelIndices[i] - - pdClient.Pause() - - results := make([]chan error, 0, 5) - - ctx, cancel := context.WithCancel(context.Background()) - - getCtx := func(index int) context.Context { - if cancelIndex == index { - return ctx - } - return context.Background() - } - - results = append(results, asyncValidate(getCtx(0), ts-2)) - results = append(results, asyncValidate(getCtx(1), ts+2)) - results = append(results, asyncValidate(getCtx(2), ts-1)) - results = append(results, asyncValidate(getCtx(3), ts+1)) - results = append(results, asyncValidate(getCtx(4), ts)) - - expectedSucceeds := []bool{true, false, true, false, true} - - time.Sleep(time.Millisecond * 50) - for _, ch := range results { - noResult(ch) - } - - cancel() - - for i, ch := range results { - if i == cancelIndex { - select { - case err := <-ch: - assert.Errorf(t, err, "index: %v", i) - assert.Containsf(t, err.Error(), "context canceled", "index: %v", i) - case <-time.After(time.Second): - assert.FailNowf(t, "expected result to be ready but still blocked", "index: %v", i) - } - } else { - noResult(ch) - } - } - - // ts will be the next ts returned to these validation calls. - pdClient.logicalTimestamp.Store(int64(ts - 1)) - pdClient.Resume() - for i, ch := range results { - if i == cancelIndex { - continue - } - - select { - case err = <-ch: - case <-time.After(time.Second): - assert.FailNowf(t, "expected result to be ready but still blocked", "index: %v", i) - } - if expectedSucceeds[i] { - assert.NoErrorf(t, err, "index: %v", i) - } else { - assert.Errorf(t, err, "index: %v", i) - assert.NotContainsf(t, err.Error(), "context canceled", "index: %v", i) - } - } - } -} - func TestValidateReadTSForNormalReadDoNotAffectUpdateInterval(t *testing.T) { oracleInterface, err := NewPdOracle(&MockPdClient{}, &PDOracleOptions{ UpdateInterval: time.Second * 2, @@ -435,9 +286,9 @@ func TestValidateReadTSForNormalReadDoNotAffectUpdateInterval(t *testing.T) { assert.NoError(t, err) mustNoNotify() - // It loads `ts + 1` from the mock PD, and the check cannot pass. + // It loads `ts + 1` from the mock PD, and the check cannot pass. But the error is ignored. err = o.ValidateReadTS(ctx, ts+2, false, opt) - assert.Error(t, err) + assert.NoError(t, err) mustNoNotify() // Do the check again. It loads `ts + 2` from the mock PD, and the check passes.