Skip to content

Commit 25e97a2

Browse files
author
Swapnil Mhamane
committed
Update garbage collection logic
Signed-off-by: Swapnil Mhamane <swapnil.mhamane@sap.com>
1 parent 819788e commit 25e97a2

File tree

9 files changed

+135
-60
lines changed

9 files changed

+135
-60
lines changed

cmd/server.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command {
110110
maxBackups,
111111
deltaSnapshotIntervalSeconds,
112112
time.Duration(etcdConnectionTimeout),
113+
time.Duration(garbageCollectionPeriodSeconds),
113114
tlsConfig)
114115
if err != nil {
115116
logger.Fatalf("Failed to create snapshotter from configured storage provider: %v", err)
@@ -128,7 +129,8 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command {
128129
handler.Status = http.StatusServiceUnavailable
129130
continue
130131
}
131-
132+
gcStopCh := make(chan bool)
133+
go ssr.GarbageCollector(gcStopCh)
132134
if err := ssr.Run(stopCh); err != nil {
133135
handler.Status = http.StatusServiceUnavailable
134136
if etcdErr, ok := err.(*errors.EtcdError); ok == true {
@@ -139,6 +141,7 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command {
139141
} else {
140142
handler.Status = http.StatusOK
141143
}
144+
gcStopCh <- true
142145
}
143146
},
144147
}

cmd/snapshot.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,17 @@ storing snapshots on various cloud storage providers as well as local disk locat
5555
maxBackups,
5656
deltaSnapshotIntervalSeconds,
5757
time.Duration(etcdConnectionTimeout),
58+
time.Duration(garbageCollectionPeriodSeconds),
5859
tlsConfig)
5960
if err != nil {
6061
logger.Fatalf("Failed to create snapshotter: %v", err)
6162
}
62-
63+
gcStopCh := make(chan bool)
64+
go ssr.GarbageCollector(gcStopCh)
6365
if err := ssr.Run(stopCh); err != nil {
6466
logger.Fatalf("Snapshotter failed with error: %v", err)
6567
}
68+
gcStopCh <- true
6669
logger.Info("Shutting down...")
6770
return
6871
},
@@ -79,6 +82,7 @@ func initializeSnapshotterFlags(cmd *cobra.Command) {
7982
cmd.Flags().IntVarP(&deltaSnapshotIntervalSeconds, "delta-snapshot-interval-seconds", "i", 10, "Interval in no. of seconds after which delta snapshot will be persisted")
8083
cmd.Flags().IntVarP(&maxBackups, "max-backups", "m", 7, "maximum number of previous backups to keep")
8184
cmd.Flags().IntVar(&etcdConnectionTimeout, "etcd-connection-timeout", 30, "etcd client connection timeout")
85+
cmd.Flags().IntVar(&garbageCollectionPeriodSeconds, "garbage-collection-period-seconds", 30, "Period in seconds for garbage collecting old backups")
8286
cmd.Flags().BoolVar(&insecureTransport, "insecure-transport", true, "disable transport security for client connections")
8387
cmd.Flags().BoolVar(&insecureSkipVerify, "insecure-skip-tls-verify", false, "skip server certificate verification")
8488
cmd.Flags().StringVar(&certFile, "cert", "", "identify secure client using this TLS certificate file")

cmd/types.go

+11-10
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,17 @@ var (
2929
logger = logrus.New()
3030

3131
//snapshotter flags
32-
schedule string
33-
etcdEndpoints []string
34-
deltaSnapshotIntervalSeconds int
35-
maxBackups int
36-
etcdConnectionTimeout int
37-
insecureTransport bool
38-
insecureSkipVerify bool
39-
certFile string
40-
keyFile string
41-
caFile string
32+
schedule string
33+
etcdEndpoints []string
34+
deltaSnapshotIntervalSeconds int
35+
maxBackups int
36+
etcdConnectionTimeout int
37+
garbageCollectionPeriodSeconds int
38+
insecureTransport bool
39+
insecureSkipVerify bool
40+
certFile string
41+
keyFile string
42+
caFile string
4243

4344
//server flags
4445
port int

pkg/retry/retry.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func Do(retryFunc func() error, config *Config) error {
2727
for n < config.Attempts {
2828
delayTime := config.Delay * (1 << (n - 1))
2929
time.Sleep((time.Duration)(delayTime) * config.Units)
30-
config.Logger.Infof("Job attempt: %d", n)
30+
config.Logger.Infof("Job attempt: %d", n+1)
3131
err = retryFunc()
3232
if err == nil {
3333
return nil

pkg/snapshot/restorer/restorer.go

+24-4
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
"github.com/coreos/etcd/store"
4444
"github.com/coreos/etcd/wal"
4545
"github.com/coreos/etcd/wal/walpb"
46+
"github.com/gardener/etcd-backup-restore/pkg/errors"
4647
"github.com/gardener/etcd-backup-restore/pkg/snapstore"
4748
"github.com/sirupsen/logrus"
4849
)
@@ -60,7 +61,10 @@ func (r *Restorer) Restore(ro RestoreOptions) error {
6061
if err := r.restoreFromBaseSnapshot(ro); err != nil {
6162
return fmt.Errorf("failed to restore from the base snapshot :%v", err)
6263
}
63-
64+
if len(ro.DeltaSnapList) == 0 {
65+
r.logger.Infof("No delta snapshots present over base snapshot.")
66+
return nil
67+
}
6468
r.logger.Infof("Starting embedded etcd server...")
6569
e, err := startEmbeddedEtcd(ro)
6670
if err != nil {
@@ -73,7 +77,8 @@ func (r *Restorer) Restore(ro RestoreOptions) error {
7377
return err
7478
}
7579
defer client.Close()
76-
r.logger.Infof("Applying incremental snapshots...")
80+
81+
r.logger.Infof("Applying delta snapshots...")
7782
return r.applyDeltaSnapshots(client, ro.DeltaSnapList)
7883
}
7984

@@ -84,7 +89,7 @@ func (r *Restorer) restoreFromBaseSnapshot(ro RestoreOptions) error {
8489
r.logger.Warnf("Base snapshot path not provided. Will do nothing.")
8590
return nil
8691
}
87-
92+
r.logger.Infof("Restoring from base snapshot: %s", path.Join(ro.BaseSnapshot.SnapDir, ro.BaseSnapshot.SnapName))
8893
cfg := etcdserver.ServerConfig{
8994
InitialClusterToken: ro.ClusterToken,
9095
InitialPeerURLsMap: ro.ClusterURLs,
@@ -321,6 +326,7 @@ func (r *Restorer) applyDeltaSnapshots(client *clientv3.Client, snapList snapsto
321326

322327
// applyDeltaSnapshot applies thw events from delta snapshot to etcd
323328
func (r *Restorer) applyDeltaSnapshot(client *clientv3.Client, snap snapstore.Snapshot) error {
329+
r.logger.Infof("Applying delta snapshot %s", path.Join(snap.SnapDir, snap.SnapName))
324330
events, err := getEventsFromDeltaSnapshot(r.store, snap)
325331
if err != nil {
326332
return fmt.Errorf("failed to read events from delta snapshot %s : %v", snap.SnapName, err)
@@ -355,6 +361,20 @@ func applyEventsToEtcd(client *clientv3.Client, events []event) error {
355361
ops = []clientv3.Op{}
356362
ctx = context.TODO()
357363
)
364+
365+
// Note: Since revision in full snapshot file name might be lower than actual revision stored in snapshot.
366+
// This is because of issue refereed below. So, as per workaround used in our logic of taking delta snapshot,
367+
// latest revision from full snapshot may overlap with first few revision on first delta snapshot
368+
// Hence, we have to additionally take care of that.
369+
// Refer: https://github.com/coreos/etcd/issues/9037
370+
resp, err := client.Get(ctx, "", clientv3.WithLastRev()...)
371+
if err != nil {
372+
return &errors.EtcdError{
373+
Message: fmt.Sprintf("failed to get etcd latest revision: %v", err),
374+
}
375+
}
376+
lastRev = resp.Header.Revision
377+
358378
for _, e := range events {
359379
ev := e.EtcdEvent
360380
nextRev := ev.Kv.ModRevision
@@ -375,6 +395,6 @@ func applyEventsToEtcd(client *clientv3.Client, events []event) error {
375395
return fmt.Errorf("Unexpected event type")
376396
}
377397
}
378-
_, err := client.Txn(ctx).Then(ops...).Commit()
398+
_, err = client.Txn(ctx).Then(ops...).Commit()
379399
return err
380400
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright (c) 2018 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package snapshotter
16+
17+
import (
18+
"path"
19+
"time"
20+
21+
"github.com/gardener/etcd-backup-restore/pkg/snapstore"
22+
)
23+
24+
// GarbageCollector basically consider the older backups as garbage and deletes it
25+
func (ssr *Snapshotter) GarbageCollector(stopCh <-chan bool) {
26+
for {
27+
select {
28+
case <-stopCh:
29+
return
30+
case <-time.After(ssr.garbageCollectionPeriodSeconds * time.Second):
31+
32+
ssr.logger.Infoln("GC: Executing garbage collection...")
33+
snapList, err := ssr.store.List()
34+
if err != nil {
35+
ssr.logger.Warnf("GC: Failed to list snapshots: %v", err)
36+
continue
37+
}
38+
39+
snapLen := len(snapList)
40+
var snapStreamIndexList []int
41+
snapStreamIndexList = append(snapStreamIndexList, 0)
42+
for index := 1; index < snapLen; index++ {
43+
if snapList[index].Kind == snapstore.SnapshotKindFull {
44+
snapStreamIndexList = append(snapStreamIndexList, index)
45+
}
46+
}
47+
48+
for snapStreamIndex := 0; snapStreamIndex < len(snapStreamIndexList)-ssr.maxBackups; snapStreamIndex++ {
49+
for i := snapStreamIndexList[snapStreamIndex+1] - 1; i >= snapStreamIndex; i-- {
50+
ssr.logger.Infof("Deleting old snapshot: %s", path.Join(snapList[i].SnapDir, snapList[i].SnapName))
51+
if err := ssr.store.Delete(*snapList[i]); err != nil {
52+
ssr.logger.Warnf("Failed to delete snapshot %s: %v", path.Join(snapList[i].SnapDir, snapList[i].SnapName), err)
53+
}
54+
}
55+
}
56+
}
57+
}
58+
}

pkg/snapshot/snapshotter/snapshotter.go

+15-29
Original file line numberDiff line numberDiff line change
@@ -34,22 +34,25 @@ import (
3434
)
3535

3636
// NewSnapshotter returns the snapshotter object.
37-
func NewSnapshotter(schedule string, store snapstore.SnapStore, logger *logrus.Logger, maxBackups, deltaSnapshotIntervalSeconds int, etcdConnectionTimeout time.Duration, tlsConfig *TLSConfig) (*Snapshotter, error) {
37+
func NewSnapshotter(schedule string, store snapstore.SnapStore, logger *logrus.Logger, maxBackups, deltaSnapshotIntervalSeconds int, etcdConnectionTimeout, garbageCollectionPeriodSeconds time.Duration, tlsConfig *TLSConfig) (*Snapshotter, error) {
3838
logger.Printf("Validating schedule...")
3939
sdl, err := cron.ParseStandard(schedule)
4040
if err != nil {
4141
return nil, fmt.Errorf("invalid schedule provied %s : %v", schedule, err)
4242
}
43+
if maxBackups < 1 {
44+
return nil, fmt.Errorf("maximum backups limit should be greater than zero. Input MaxBackups: %s", maxBackups)
45+
}
4346

4447
return &Snapshotter{
45-
logger: logger,
46-
schedule: sdl,
47-
store: store,
48-
maxBackups: maxBackups,
49-
etcdConnectionTimeout: etcdConnectionTimeout,
50-
tlsConfig: tlsConfig,
51-
deltaSnapshotIntervalSeconds: deltaSnapshotIntervalSeconds,
52-
//currentDeltaSnapshotCount: 0,
48+
logger: logger,
49+
schedule: sdl,
50+
store: store,
51+
maxBackups: maxBackups,
52+
etcdConnectionTimeout: etcdConnectionTimeout,
53+
garbageCollectionPeriodSeconds: garbageCollectionPeriodSeconds,
54+
tlsConfig: tlsConfig,
55+
deltaSnapshotIntervalSeconds: deltaSnapshotIntervalSeconds,
5356
}, nil
5457
}
5558

@@ -135,9 +138,6 @@ func (ssr *Snapshotter) Run(stopCh <-chan struct{}) error {
135138

136139
now = time.Now()
137140
effective = ssr.schedule.Next(now)
138-
139-
// TODO: move this garbage collector to paraller thread
140-
ssr.garbageCollector()
141141
if effective.IsZero() {
142142
ssr.logger.Infoln("There are no backup scheduled for future. Stopping now.")
143143
return nil
@@ -166,6 +166,9 @@ func (ssr *Snapshotter) TakeFullSnapshot() error {
166166

167167
ctx, cancel := context.WithTimeout(context.TODO(), ssr.etcdConnectionTimeout*time.Second)
168168
defer cancel()
169+
// Note: Although Get and snapshot call are not atomic, so revision number in snapshot file
170+
// may be ahead of the revision found from GET call. But currently this is the only workaround available
171+
// Refer: https://github.com/coreos/etcd/issues/9037
169172
resp, err := client.Get(ctx, "", clientv3.WithLastRev()...)
170173
if err != nil {
171174
return &errors.EtcdError{
@@ -199,23 +202,6 @@ func (ssr *Snapshotter) TakeFullSnapshot() error {
199202
return nil
200203
}
201204

202-
// garbageCollector basically consider the older backups as garbage and deletes it
203-
func (ssr *Snapshotter) garbageCollector() {
204-
ssr.logger.Infoln("Executing garbage collection...")
205-
snapList, err := ssr.store.List()
206-
if err != nil {
207-
ssr.logger.Warnf("Failed to list snapshots: %v", err)
208-
return
209-
}
210-
snapLen := len(snapList)
211-
for i := 0; i < (snapLen - ssr.maxBackups); i++ {
212-
ssr.logger.Infof("Deleting old snapshot: %s", path.Join(snapList[i].SnapDir, snapList[i].SnapName))
213-
if err := ssr.store.Delete(*snapList[i]); err != nil {
214-
ssr.logger.Warnf("Failed to delete snapshot: %s", path.Join(snapList[i].SnapDir, snapList[i].SnapName))
215-
}
216-
}
217-
}
218-
219205
// GetTLSClientForEtcd creates an etcd client using the TLS config params.
220206
func GetTLSClientForEtcd(tlsConfig *TLSConfig) (*clientv3.Client, error) {
221207
// set tls if any one tls option set

pkg/snapshot/snapshotter/types.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,16 @@ import (
2525

2626
// Snapshotter is a struct for etcd snapshot taker
2727
type Snapshotter struct {
28-
logger *logrus.Logger
29-
schedule cron.Schedule
30-
store snapstore.SnapStore
31-
maxBackups int
32-
etcdConnectionTimeout time.Duration
33-
garbageCollectionTimeout time.Duration
34-
tlsConfig *TLSConfig
35-
deltaSnapshotIntervalSeconds int
36-
deltaEventCount int
37-
prevSnapshot snapstore.Snapshot
28+
logger *logrus.Logger
29+
schedule cron.Schedule
30+
store snapstore.SnapStore
31+
maxBackups int
32+
etcdConnectionTimeout time.Duration
33+
garbageCollectionPeriodSeconds time.Duration
34+
tlsConfig *TLSConfig
35+
deltaSnapshotIntervalSeconds int
36+
deltaEventCount int
37+
prevSnapshot snapstore.Snapshot
3838
}
3939

4040
// TLSConfig holds cert information and settings for TLS.

pkg/snapstore/local_snapstore.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"os"
2222
"path"
2323
"sort"
24+
"syscall"
2425
)
2526

2627
// LocalSnapStore is snapstore with local disk as backend
@@ -96,12 +97,14 @@ func (s *LocalSnapStore) List() (SnapList, error) {
9697

9798
// Delete should delete the snapshot file from store
9899
func (s *LocalSnapStore) Delete(snap Snapshot) error {
99-
err := os.Remove(path.Join(s.prefix, snap.SnapDir, snap.SnapName))
100-
if err != nil {
100+
if err := os.Remove(path.Join(s.prefix, snap.SnapDir, snap.SnapName)); err != nil {
101+
return err
102+
}
103+
err := os.Remove(path.Join(s.prefix, snap.SnapDir))
104+
if pathErr, ok := err.(*os.PathError); ok == true && pathErr.Err != syscall.ENOTEMPTY {
101105
return err
102106
}
103-
err = os.Remove(path.Join(s.prefix, snap.SnapDir))
104-
return err
107+
return nil
105108
}
106109

107110
// Size should return size of the snapshot file from store

0 commit comments

Comments
 (0)