Skip to content

Commit 6129c45

Browse files
author
Swapnil Mhamane
committed
Update garbage collection logic
Signed-off-by: Swapnil Mhamane <swapnil.mhamane@sap.com>
1 parent 819788e commit 6129c45

12 files changed

+180
-75
lines changed

CHANGELOG.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
### Added
66

7-
- Integration test for AWS
7+
- Incremental backup of etcd, where full snapshot is taken first and then we apply watch and persist the logs accumulated over certain period to snapshot store. Restore process, restores from the full snapshot, start the embedded etcd and apply the logged events one by one.
8+
9+
- Initial setup for Integration test for AWS
810

911
## 0.2.3 - 2018-05-22
1012

cmd/server.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command {
110110
maxBackups,
111111
deltaSnapshotIntervalSeconds,
112112
time.Duration(etcdConnectionTimeout),
113+
time.Duration(garbageCollectionPeriodSeconds),
113114
tlsConfig)
114115
if err != nil {
115116
logger.Fatalf("Failed to create snapshotter from configured storage provider: %v", err)
@@ -128,7 +129,8 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command {
128129
handler.Status = http.StatusServiceUnavailable
129130
continue
130131
}
131-
132+
gcStopCh := make(chan bool)
133+
go ssr.GarbageCollector(gcStopCh)
132134
if err := ssr.Run(stopCh); err != nil {
133135
handler.Status = http.StatusServiceUnavailable
134136
if etcdErr, ok := err.(*errors.EtcdError); ok == true {
@@ -139,6 +141,7 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command {
139141
} else {
140142
handler.Status = http.StatusOK
141143
}
144+
gcStopCh <- true
142145
}
143146
},
144147
}

cmd/snapshot.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,17 @@ storing snapshots on various cloud storage providers as well as local disk locat
5555
maxBackups,
5656
deltaSnapshotIntervalSeconds,
5757
time.Duration(etcdConnectionTimeout),
58+
time.Duration(garbageCollectionPeriodSeconds),
5859
tlsConfig)
5960
if err != nil {
6061
logger.Fatalf("Failed to create snapshotter: %v", err)
6162
}
62-
63+
gcStopCh := make(chan bool)
64+
go ssr.GarbageCollector(gcStopCh)
6365
if err := ssr.Run(stopCh); err != nil {
6466
logger.Fatalf("Snapshotter failed with error: %v", err)
6567
}
68+
gcStopCh <- true
6669
logger.Info("Shutting down...")
6770
return
6871
},
@@ -79,6 +82,7 @@ func initializeSnapshotterFlags(cmd *cobra.Command) {
7982
cmd.Flags().IntVarP(&deltaSnapshotIntervalSeconds, "delta-snapshot-interval-seconds", "i", 10, "Interval in no. of seconds after which delta snapshot will be persisted")
8083
cmd.Flags().IntVarP(&maxBackups, "max-backups", "m", 7, "maximum number of previous backups to keep")
8184
cmd.Flags().IntVar(&etcdConnectionTimeout, "etcd-connection-timeout", 30, "etcd client connection timeout")
85+
cmd.Flags().IntVar(&garbageCollectionPeriodSeconds, "garbage-collection-period-seconds", 30, "Period in seconds for garbage collecting old backups")
8286
cmd.Flags().BoolVar(&insecureTransport, "insecure-transport", true, "disable transport security for client connections")
8387
cmd.Flags().BoolVar(&insecureSkipVerify, "insecure-skip-tls-verify", false, "skip server certificate verification")
8488
cmd.Flags().StringVar(&certFile, "cert", "", "identify secure client using this TLS certificate file")

cmd/types.go

+11-10
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,17 @@ var (
2929
logger = logrus.New()
3030

3131
//snapshotter flags
32-
schedule string
33-
etcdEndpoints []string
34-
deltaSnapshotIntervalSeconds int
35-
maxBackups int
36-
etcdConnectionTimeout int
37-
insecureTransport bool
38-
insecureSkipVerify bool
39-
certFile string
40-
keyFile string
41-
caFile string
32+
schedule string
33+
etcdEndpoints []string
34+
deltaSnapshotIntervalSeconds int
35+
maxBackups int
36+
etcdConnectionTimeout int
37+
garbageCollectionPeriodSeconds int
38+
insecureTransport bool
39+
insecureSkipVerify bool
40+
certFile string
41+
keyFile string
42+
caFile string
4243

4344
//server flags
4445
port int

pkg/retry/retry.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func Do(retryFunc func() error, config *Config) error {
2727
for n < config.Attempts {
2828
delayTime := config.Delay * (1 << (n - 1))
2929
time.Sleep((time.Duration)(delayTime) * config.Units)
30-
config.Logger.Infof("Job attempt: %d", n)
30+
config.Logger.Infof("Job attempt: %d", n+1)
3131
err = retryFunc()
3232
if err == nil {
3333
return nil

pkg/snapshot/restorer/restorer.go

+45-4
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,10 @@ func (r *Restorer) Restore(ro RestoreOptions) error {
6060
if err := r.restoreFromBaseSnapshot(ro); err != nil {
6161
return fmt.Errorf("failed to restore from the base snapshot :%v", err)
6262
}
63-
63+
if len(ro.DeltaSnapList) == 0 {
64+
r.logger.Infof("No delta snapshots present over base snapshot.")
65+
return nil
66+
}
6467
r.logger.Infof("Starting embedded etcd server...")
6568
e, err := startEmbeddedEtcd(ro)
6669
if err != nil {
@@ -73,7 +76,8 @@ func (r *Restorer) Restore(ro RestoreOptions) error {
7376
return err
7477
}
7578
defer client.Close()
76-
r.logger.Infof("Applying incremental snapshots...")
79+
80+
r.logger.Infof("Applying delta snapshots...")
7781
return r.applyDeltaSnapshots(client, ro.DeltaSnapList)
7882
}
7983

@@ -84,7 +88,7 @@ func (r *Restorer) restoreFromBaseSnapshot(ro RestoreOptions) error {
8488
r.logger.Warnf("Base snapshot path not provided. Will do nothing.")
8589
return nil
8690
}
87-
91+
r.logger.Infof("Restoring from base snapshot: %s", path.Join(ro.BaseSnapshot.SnapDir, ro.BaseSnapshot.SnapName))
8892
cfg := etcdserver.ServerConfig{
8993
InitialClusterToken: ro.ClusterToken,
9094
InitialPeerURLsMap: ro.ClusterURLs,
@@ -311,16 +315,52 @@ func startEmbeddedEtcd(ro RestoreOptions) (*embed.Etcd, error) {
311315

312316
// applyDeltaSnapshot applies thw events from time sorted list of delta snapshot to etcd sequentially
313317
func (r *Restorer) applyDeltaSnapshots(client *clientv3.Client, snapList snapstore.SnapList) error {
314-
for _, snap := range snapList {
318+
firstDeltaSnap := snapList[0]
319+
if err := r.applyFirstDeltaSnapshot(client, *firstDeltaSnap); err != nil {
320+
return err
321+
}
322+
for _, snap := range snapList[1:] {
315323
if err := r.applyDeltaSnapshot(client, *snap); err != nil {
316324
return err
317325
}
318326
}
319327
return nil
320328
}
321329

330+
// applyFirstDeltaSnapshot applies thw events from first delta snapshot to etcd
331+
func (r *Restorer) applyFirstDeltaSnapshot(client *clientv3.Client, snap snapstore.Snapshot) error {
332+
r.logger.Infof("Applying first delta snapshot %s", path.Join(snap.SnapDir, snap.SnapName))
333+
events, err := getEventsFromDeltaSnapshot(r.store, snap)
334+
if err != nil {
335+
return fmt.Errorf("failed to read events from delta snapshot %s : %v", snap.SnapName, err)
336+
}
337+
338+
// Note: Since revision in full snapshot file name might be lower than actual revision stored in snapshot.
339+
// This is because of issue refereed below. So, as per workaround used in our logic of taking delta snapshot,
340+
// latest revision from full snapshot may overlap with first few revision on first delta snapshot
341+
// Hence, we have to additionally take care of that.
342+
// Refer: https://github.com/coreos/etcd/issues/9037
343+
ctx := context.TODO()
344+
resp, err := client.Get(ctx, "", clientv3.WithLastRev()...)
345+
if err != nil {
346+
return fmt.Errorf("failed to get etcd latest revision: %v", err)
347+
}
348+
lastRevision := resp.Header.Revision
349+
350+
var newRevisionIndex int
351+
for index, event := range events {
352+
if event.EtcdEvent.Kv.ModRevision > lastRevision {
353+
newRevisionIndex = index
354+
break
355+
}
356+
}
357+
358+
return applyEventsToEtcd(client, events[newRevisionIndex:])
359+
}
360+
322361
// applyDeltaSnapshot applies thw events from delta snapshot to etcd
323362
func (r *Restorer) applyDeltaSnapshot(client *clientv3.Client, snap snapstore.Snapshot) error {
363+
r.logger.Infof("Applying delta snapshot %s", path.Join(snap.SnapDir, snap.SnapName))
324364
events, err := getEventsFromDeltaSnapshot(r.store, snap)
325365
if err != nil {
326366
return fmt.Errorf("failed to read events from delta snapshot %s : %v", snap.SnapName, err)
@@ -355,6 +395,7 @@ func applyEventsToEtcd(client *clientv3.Client, events []event) error {
355395
ops = []clientv3.Op{}
356396
ctx = context.TODO()
357397
)
398+
358399
for _, e := range events {
359400
ev := e.EtcdEvent
360401
nextRev := ev.Kv.ModRevision
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright (c) 2018 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package snapshotter
16+
17+
import (
18+
"path"
19+
"time"
20+
21+
"github.com/gardener/etcd-backup-restore/pkg/snapstore"
22+
)
23+
24+
// GarbageCollector basically consider the older backups as garbage and deletes it
25+
func (ssr *Snapshotter) GarbageCollector(stopCh <-chan bool) {
26+
for {
27+
select {
28+
case <-stopCh:
29+
return
30+
case <-time.After(ssr.garbageCollectionPeriodSeconds * time.Second):
31+
32+
ssr.logger.Infoln("GC: Executing garbage collection...")
33+
snapList, err := ssr.store.List()
34+
if err != nil {
35+
ssr.logger.Warnf("GC: Failed to list snapshots: %v", err)
36+
continue
37+
}
38+
39+
snapLen := len(snapList)
40+
var snapStreamIndexList []int
41+
snapStreamIndexList = append(snapStreamIndexList, 0)
42+
for index := 1; index < snapLen; index++ {
43+
if snapList[index].Kind == snapstore.SnapshotKindFull {
44+
snapStreamIndexList = append(snapStreamIndexList, index)
45+
}
46+
}
47+
48+
for snapStreamIndex := 0; snapStreamIndex < len(snapStreamIndexList)-ssr.maxBackups; snapStreamIndex++ {
49+
for i := snapStreamIndexList[snapStreamIndex+1] - 1; i >= snapStreamIndex; i-- {
50+
ssr.logger.Infof("Deleting old snapshot: %s", path.Join(snapList[i].SnapDir, snapList[i].SnapName))
51+
if err := ssr.store.Delete(*snapList[i]); err != nil {
52+
ssr.logger.Warnf("Failed to delete snapshot %s: %v", path.Join(snapList[i].SnapDir, snapList[i].SnapName), err)
53+
}
54+
}
55+
}
56+
}
57+
}
58+
}

pkg/snapshot/snapshotter/snapshotter.go

+17-31
Original file line numberDiff line numberDiff line change
@@ -34,22 +34,25 @@ import (
3434
)
3535

3636
// NewSnapshotter returns the snapshotter object.
37-
func NewSnapshotter(schedule string, store snapstore.SnapStore, logger *logrus.Logger, maxBackups, deltaSnapshotIntervalSeconds int, etcdConnectionTimeout time.Duration, tlsConfig *TLSConfig) (*Snapshotter, error) {
37+
func NewSnapshotter(schedule string, store snapstore.SnapStore, logger *logrus.Logger, maxBackups, deltaSnapshotIntervalSeconds int, etcdConnectionTimeout, garbageCollectionPeriodSeconds time.Duration, tlsConfig *TLSConfig) (*Snapshotter, error) {
3838
logger.Printf("Validating schedule...")
3939
sdl, err := cron.ParseStandard(schedule)
4040
if err != nil {
4141
return nil, fmt.Errorf("invalid schedule provied %s : %v", schedule, err)
4242
}
43+
if maxBackups < 1 {
44+
return nil, fmt.Errorf("maximum backups limit should be greater than zero. Input MaxBackups: %s", maxBackups)
45+
}
4346

4447
return &Snapshotter{
45-
logger: logger,
46-
schedule: sdl,
47-
store: store,
48-
maxBackups: maxBackups,
49-
etcdConnectionTimeout: etcdConnectionTimeout,
50-
tlsConfig: tlsConfig,
51-
deltaSnapshotIntervalSeconds: deltaSnapshotIntervalSeconds,
52-
//currentDeltaSnapshotCount: 0,
48+
logger: logger,
49+
schedule: sdl,
50+
store: store,
51+
maxBackups: maxBackups,
52+
etcdConnectionTimeout: etcdConnectionTimeout,
53+
garbageCollectionPeriodSeconds: garbageCollectionPeriodSeconds,
54+
tlsConfig: tlsConfig,
55+
deltaSnapshotIntervalSeconds: deltaSnapshotIntervalSeconds,
5356
}, nil
5457
}
5558

@@ -135,9 +138,6 @@ func (ssr *Snapshotter) Run(stopCh <-chan struct{}) error {
135138

136139
now = time.Now()
137140
effective = ssr.schedule.Next(now)
138-
139-
// TODO: move this garbage collector to paraller thread
140-
ssr.garbageCollector()
141141
if effective.IsZero() {
142142
ssr.logger.Infoln("There are no backup scheduled for future. Stopping now.")
143143
return nil
@@ -166,6 +166,9 @@ func (ssr *Snapshotter) TakeFullSnapshot() error {
166166

167167
ctx, cancel := context.WithTimeout(context.TODO(), ssr.etcdConnectionTimeout*time.Second)
168168
defer cancel()
169+
// Note: Although Get and snapshot call are not atomic, so revision number in snapshot file
170+
// may be ahead of the revision found from GET call. But currently this is the only workaround available
171+
// Refer: https://github.com/coreos/etcd/issues/9037
169172
resp, err := client.Get(ctx, "", clientv3.WithLastRev()...)
170173
if err != nil {
171174
return &errors.EtcdError{
@@ -199,23 +202,6 @@ func (ssr *Snapshotter) TakeFullSnapshot() error {
199202
return nil
200203
}
201204

202-
// garbageCollector basically consider the older backups as garbage and deletes it
203-
func (ssr *Snapshotter) garbageCollector() {
204-
ssr.logger.Infoln("Executing garbage collection...")
205-
snapList, err := ssr.store.List()
206-
if err != nil {
207-
ssr.logger.Warnf("Failed to list snapshots: %v", err)
208-
return
209-
}
210-
snapLen := len(snapList)
211-
for i := 0; i < (snapLen - ssr.maxBackups); i++ {
212-
ssr.logger.Infof("Deleting old snapshot: %s", path.Join(snapList[i].SnapDir, snapList[i].SnapName))
213-
if err := ssr.store.Delete(*snapList[i]); err != nil {
214-
ssr.logger.Warnf("Failed to delete snapshot: %s", path.Join(snapList[i].SnapDir, snapList[i].SnapName))
215-
}
216-
}
217-
}
218-
219205
// GetTLSClientForEtcd creates an etcd client using the TLS config params.
220206
func GetTLSClientForEtcd(tlsConfig *TLSConfig) (*clientv3.Client, error) {
221207
// set tls if any one tls option set
@@ -272,13 +258,14 @@ func (ssr *Snapshotter) applyWatch(wg *sync.WaitGroup, fullSnapshotCh chan<- tim
272258
Message: fmt.Sprintf("failed to create etcd client: %v", err),
273259
}
274260
}
275-
go ssr.processWatch(wg, client, fullSnapshotCh, stopCh)
276261
wg.Add(1)
262+
go ssr.processWatch(wg, client, fullSnapshotCh, stopCh)
277263
return nil
278264
}
279265

280266
// processWatch processess watch to take delta snapshot periodically by collecting set of events within period
281267
func (ssr *Snapshotter) processWatch(wg *sync.WaitGroup, client *clientv3.Client, fullSnapshotCh chan<- time.Time, stopCh <-chan bool) {
268+
defer wg.Done()
282269
ctx := context.TODO()
283270
watchCh := client.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(ssr.prevSnapshot.LastRevision+1))
284271
ssr.logger.Infof("Applied watch on etcd from revision: %8d", ssr.prevSnapshot.LastRevision+1)
@@ -292,7 +279,6 @@ func (ssr *Snapshotter) processWatch(wg *sync.WaitGroup, client *clientv3.Client
292279
select {
293280
case <-stopCh:
294281
ssr.logger.Infoln("Received stop signal. Terminating current watch...")
295-
wg.Done()
296282
return
297283

298284
case wr, ok := <-watchCh:

0 commit comments

Comments
 (0)