Skip to content

fix(admin): fix snwatcher handle heartbeat timeout #1070

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/varlogadm/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func newStartCommand() *cli.Command {
flagMRConnTimeout.DurationFlag(false, mrmanager.DefaultMRConnTimeout),
flagMRCallTimeout.DurationFlag(false, mrmanager.DefaultMRCallTimeout),

flagSNWatcherHeartbeatCheckDeadline.DurationFlag(false, snwatcher.DefaultHeartbeatDeadline),
flagSNWatcherHeartbeatCheckDeadline.DurationFlag(false, snwatcher.DefaultHeartbeatCheckDeadline),
flagSNWatcherHeartbeatTimeout.DurationFlag(false, snwatcher.DefaultHeartbeatTimeout),
flagSNWatcherReportDeadline.DurationFlag(false, snwatcher.DefaultReportDeadline),

flags.GRPCServerReadBufferSize,
Expand Down Expand Up @@ -177,6 +178,7 @@ func start(c *cli.Context) error {
admin.WithMetadataRepositoryManager(mrMgr),
admin.WithStorageNodeManager(snMgr),
admin.WithStorageNodeWatcherOptions(
snwatcher.WithHeartbeatTimeout(c.Duration(flagSNWatcherHeartbeatTimeout.Name)),
snwatcher.WithHeartbeatCheckDeadline(c.Duration(flagSNWatcherHeartbeatCheckDeadline.Name)),
snwatcher.WithReportDeadline(c.Duration(flagSNWatcherReportDeadline.Name)),
),
Expand Down
15 changes: 11 additions & 4 deletions cmd/varlogadm/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,18 @@ var (
}

flagSNWatcherHeartbeatCheckDeadline = flags.FlagDesc{
Name: "sn-watcher-heartbeat-check-deadline",
Envs: []string{"SN_WATCHER_HEARTBEAT_CHECK_DEADLINE"},
Name: "sn-watcher-heartbeat-check-deadline",
Envs: []string{"SN_WATCHER_HEARTBEAT_CHECK_DEADLINE"},
Usage: "dealine for heartbeat check request to storage node",
}
flagSNWatcherReportDeadline = flags.FlagDesc{
Name: "sn-watcher-report-deadline",
Envs: []string{"SN_WATCHER_REPORT_DEADLINE"},
Name: "sn-watcher-report-deadline",
Envs: []string{"SN_WATCHER_REPORT_DEADLINE"},
Usage: "dealine for report request to storage node",
}
flagSNWatcherHeartbeatTimeout = flags.FlagDesc{
Name: "sn-watcher-heartbeat-timeout",
Envs: []string{"SN_WATCHER_HEARTBEAT_TIMEOUT"},
Usage: "dealine to decide whether a storage node is live",
}
)
5 changes: 3 additions & 2 deletions cmd/varlogadm/testdata/varlogadm.ct
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ OPTIONS:
--mr-call-timeout value mr call timeout (default: 3s) [$MR_CALL_TIMEOUT]
--mr-conn-timeout value mr connection timeout (default: 1s) [$MR_CONN_TIMEOUT]
--replica-selector value, --repsel value random | lfu (default: "lfu") [$REPLICA_SELECTOR]
--sn-watcher-heartbeat-check-deadline value (default: 3s) [$SN_WATCHER_HEARTBEAT_CHECK_DEADLINE]
--sn-watcher-report-deadline value (default: 3s) [$SN_WATCHER_REPORT_DEADLINE]
--sn-watcher-heartbeat-check-deadline value dealine for heartbeat check request to storage node (default: 1s) [$SN_WATCHER_HEARTBEAT_CHECK_DEADLINE]
--sn-watcher-heartbeat-timeout value dealine to decide whether a storage node is live (default: 5s) [$SN_WATCHER_HEARTBEAT_TIMEOUT]
--sn-watcher-report-deadline value dealine for report request to storage node (default: 1s) [$SN_WATCHER_REPORT_DEADLINE]

Cluster:

Expand Down
45 changes: 42 additions & 3 deletions internal/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"context"
"math/rand"
"net"
"slices"
"sort"
Expand Down Expand Up @@ -903,6 +904,22 @@
return adm.sealInternal(ctx, tpid, lsid)
}

// sealMeta seals the logstream metadata in metadata repository only.
func (adm *Admin) sealMeta(ctx context.Context, lsid types.LogStreamID) (types.GLSN, error) {
adm.lockLogStreamStatus(lsid)
defer adm.unlockLogStreamStatus(lsid)

adm.statRepository.SetLogStreamStatus(lsid, varlogpb.LogStreamStatusSealing)

lastGLSN, err := adm.mrmgr.Seal(ctx, lsid)
if err != nil {
adm.statRepository.SetLogStreamStatus(lsid, varlogpb.LogStreamStatusRunning)
return types.InvalidGLSN, err
}

Check warning on line 918 in internal/admin/admin.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin.go#L916-L918

Added lines #L916 - L918 were not covered by tests

return lastGLSN, nil
}

func (adm *Admin) sealInternal(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) ([]snpb.LogStreamReplicaMetadataDescriptor, types.GLSN, error) {
adm.statRepository.SetLogStreamStatus(lsid, varlogpb.LogStreamStatusSealing)

Expand Down Expand Up @@ -1093,19 +1110,41 @@
return nil
}

// HandleHeartbeatTimeout is called by snwatcher.
// It seals all logstreams belonging to sn defined by snid.
// Seal on metarepos first, as seal request on sn may be delayed.
// Sealing metarepos is a high priority.
func (adm *Admin) HandleHeartbeatTimeout(ctx context.Context, snid types.StorageNodeID) {
meta, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return
}

// TODO: store sn status
target := make([]*varlogpb.LogStreamDescriptor, 0, len(meta.GetLogStreams()))
for _, ls := range meta.GetLogStreams() {
if ls.IsReplica(snid) {
adm.logger.Debug("seal due to heartbeat timeout", zap.Any("snid", snid), zap.Any("lsid", ls.LogStreamID))
_, _, _ = adm.seal(ctx, ls.TopicID, ls.LogStreamID)
target = append(target, ls)
}
}
rand.Shuffle(len(target), func(i, j int) { target[i], target[j] = target[j], target[i] })

for _, ls := range target {
_, err = adm.sealMeta(ctx, ls.LogStreamID)
if err != nil {
adm.logger.Error("seal to metarepos failed",
zap.Any("snid", snid),
zap.Any("lsid", ls.LogStreamID),
zap.Error(err))
}

Check warning on line 1138 in internal/admin/admin.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin.go#L1134-L1138

Added lines #L1134 - L1138 were not covered by tests
}

hctx, cancel := context.WithTimeout(ctx, adm.storagenodeFailureHandleTimeout)
defer cancel()

for _, ls := range target {
adm.logger.Debug("seal due to heartbeat timeout", zap.Any("snid", snid), zap.Any("lsid", ls.LogStreamID))
_, _, _ = adm.seal(hctx, ls.TopicID, ls.LogStreamID)
}
}

func (adm *Admin) checkLogStreamStatus(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID, mrStatus, replicaStatus varlogpb.LogStreamStatus) {
Expand Down
Loading