Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bad URL delete #1892

Merged
merged 6 commits into from
Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cmd/zt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,13 +600,15 @@ func deleteGCPBucket(c *chk.C, client *gcpUtils.Client, bucketName string, waitQ
break
}

c.Log("Failed to clear GCS bucket:", err) // todo: maybe this code should be more resilient
// Failure during listing
c.Assert(err, chk.Equals, nil)
return
}
if err == nil {
err = bucket.Object(attrs.Name).Delete(nil)
if err != nil {
c.Log("Could not clear GCS Buckets:", err) // todo: maybe this code should be more resilient
// Failure cleaning bucket
c.Assert(err, chk.Equals, nil)
return
}
}
Expand Down
64 changes: 38 additions & 26 deletions common/folderDeletionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,35 +103,51 @@ type standardFolderDeletionManager struct {
ctx context.Context
}

func (s *standardFolderDeletionManager) clean(u *url.URL) string {
sasless := strings.Split(u.String(), "?")[0] // first ?, if it exists, is always start of query
cleaned, err := url.PathUnescape(sasless)
if err != nil {
panic("uncleanable url") // should never happen
func (s standardFolderDeletionManager) copyURL(u *url.URL) *url.URL {
out := *u
if u.User != nil {
user := *u.User
out.User = &user
}
return cleaned

return &out
}

func (s *standardFolderDeletionManager) clean(u *url.URL) *url.URL {
out := s.copyURL(u)
out.RawQuery = "" // no SAS

return out
}

// getParent drops final part of path (not using use path.Dir because it messes with the // in URLs)
func (s *standardFolderDeletionManager) getParent(u *url.URL) (string, bool) {
if len(u.Path) == 0 {
return "", false // path is already empty, so we can't go up another level
func (s *standardFolderDeletionManager) getParent(u *url.URL) (*url.URL, bool) {
if len(u.Path) == 0 || u.Path == "/" {
return u, false // path is already empty, so we can't go up another level
}

// trim off last portion of path (or all of the path, if it only has one component)
c := s.clean(u)
lastSlash := strings.LastIndex(c, "/")
return c[0:lastSlash], true
out := s.clean(u)
out.Path = out.Path[:strings.LastIndex(out.Path, "/")]
if out.RawPath != "" {
out.RawPath = out.Path[:strings.LastIndex(out.RawPath, "/")]
}
return out, true
}

func (s standardFolderDeletionManager) getMapKey(u *url.URL) string {
return url.PathEscape(u.Path)
}

// getStateAlreadyLocked assumes the lock is already held
func (s *standardFolderDeletionManager) getStateAlreadyLocked(folder string) *folderDeletionState {
state, alreadyKnown := s.contents[folder]
func (s *standardFolderDeletionManager) getStateAlreadyLocked(folder *url.URL) *folderDeletionState {
fmapKey := s.getMapKey(folder)
state, alreadyKnown := s.contents[fmapKey]
if alreadyKnown {
return state
} else {
state = &folderDeletionState{}
s.contents[folder] = state
s.contents[fmapKey] = state
return state
}
}
Expand All @@ -155,7 +171,7 @@ func (s *standardFolderDeletionManager) RecordChildDeleted(childFile *url.URL) {
}

s.mu.Lock()
folderStatePtr, alreadyKnown := s.contents[folder]
folderStatePtr, alreadyKnown := s.contents[s.getMapKey(folder)]
if !alreadyKnown {
// we are not tracking this child, so there is nothing that we should do in response
// to its deletion (may happen in the recursive calls from tryDeletion, when they recurse up to parent dirs)
Expand All @@ -177,33 +193,29 @@ func (s *standardFolderDeletionManager) RecordChildDeleted(childFile *url.URL) {
}

func (s *standardFolderDeletionManager) RequestDeletion(folder *url.URL, deletionFunc FolderDeletionFunc) {
folderStr := s.clean(folder)
folder = s.clean(folder)

s.mu.Lock()
folderStatePtr := s.getStateAlreadyLocked(folderStr)
folderStatePtr := s.getStateAlreadyLocked(folder)
folderStatePtr.deleter = deletionFunc
shouldDel := folderStatePtr.shouldDeleteNow() // test now in case there are no children
s.mu.Unlock() // release lock before expensive deletion attempt

if shouldDel {
s.tryDeletion(folderStr, deletionFunc)
s.tryDeletion(folder, deletionFunc)
}
}

func (s *standardFolderDeletionManager) tryDeletion(folder string, deletionFunc FolderDeletionFunc) {
func (s *standardFolderDeletionManager) tryDeletion(folder *url.URL, deletionFunc FolderDeletionFunc) {
success := deletionFunc(s.ctx, s.logger) // for safety, deletionFunc should be coded to do nothing, and return false, if the directory is not empty

if success {
s.mu.Lock()
delete(s.contents, folder)
delete(s.contents, s.getMapKey(folder))
s.mu.Unlock()

// folder is, itself, a child of its parent. So recurse. This is the only place that RecordChildDeleted should be called with a FOLDER parameter
u, err := url.Parse(folder)
if err != nil {
panic("folder url not parsable") // should never happen, because we started with a URL
}
s.RecordChildDeleted(u)
s.RecordChildDeleted(folder)
}
}

Expand Down
2 changes: 1 addition & 1 deletion common/zt_folderDeletionManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (s *folderDeletionManagerSuite) TestGetParent(c *chk.C) {
c.Assert(ok, chk.Equals, false)
} else {
c.Assert(ok, chk.Equals, true)
c.Assert(p, chk.Equals, expectedParent)
c.Assert(p.String(), chk.Equals, expectedParent)
}
}

Expand Down