Skip to content

Commit

Permalink
feat(storage): enable parallel writes by using per-repo and per-diges…
Browse files Browse the repository at this point in the history
…t locking

- lock per repo on pushes/pulls/retention, in short index operations
- lock per digest when using multiple operations affecting the cachedb and storage
(blob writes/deletes/moves/links in storage which need to be in accordance with cachedb content)

Do not lock multiple repos at the same time in the same goroutine! It will cause deadlocks.
Same applies to digests.

Signed-off-by: Andrei Aaron <aaaron@luxoft.com>
  • Loading branch information
andaaron committed Mar 11, 2025
1 parent 0930e57 commit b649d07
Show file tree
Hide file tree
Showing 15 changed files with 895 additions and 720 deletions.
10 changes: 5 additions & 5 deletions pkg/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,12 +956,12 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re
if userCanMount {
ok, blen, err = imgStore.CheckBlob(name, digest)
} else {
var lockLatency time.Time
err = imgStore.WithRepoReadLock(name, func() error {
var err error
ok, blen, _, err = imgStore.StatBlob(name, digest)

imgStore.RLock(&lockLatency)
defer imgStore.RUnlock(&lockLatency)

ok, blen, _, err = imgStore.StatBlob(name, digest)
return err
})
}

if err != nil {
Expand Down
13 changes: 7 additions & 6 deletions pkg/extensions/sync/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"os"
"path"
"strings"
"time"

"github.com/containers/image/v5/types"
"github.com/opencontainers/go-digest"
Expand Down Expand Up @@ -99,8 +98,6 @@ func (registry *DestinationRegistry) CommitImage(imageReference types.ImageRefer
registry.log.Info().Str("syncTempDir", path.Join(tempImageStore.RootDir(), repo)).Str("reference", reference).
Msg("pushing synced local image to local registry")

var lockLatency time.Time

manifestBlob, manifestDigest, mediaType, err := tempImageStore.GetImageManifest(repo, reference)
if err != nil {
registry.log.Error().Str("errorType", common.TypeOf(err)).
Expand Down Expand Up @@ -136,10 +133,14 @@ func (registry *DestinationRegistry) CommitImage(imageReference types.ImageRefer
}

for _, manifest := range indexManifest.Manifests {
tempImageStore.RLock(&lockLatency)
manifestBuf, err := tempImageStore.GetBlobContent(repo, manifest.Digest)
tempImageStore.RUnlock(&lockLatency)
var manifestBuf []byte

err := tempImageStore.WithRepoReadLock(repo, func() error {
var err error
manifestBuf, err = tempImageStore.GetBlobContent(repo, manifest.Digest)

return err
})
if err != nil {
registry.log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Str("dir", path.Join(tempImageStore.RootDir(), repo)).Str("digest", manifest.Digest.String()).
Expand Down
160 changes: 78 additions & 82 deletions pkg/meta/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"errors"
"time"

godigest "github.com/opencontainers/go-digest"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -108,66 +107,65 @@ func getReposToBeDeleted(allStorageRepos []string, allMetaDBRepos []string) []st
func ParseRepo(repo string, metaDB mTypes.MetaDB, storeController stypes.StoreController, log log.Logger) error {
imageStore := storeController.GetImageStore(repo)

var lockLatency time.Time
err := imageStore.WithRepoReadLock(repo, func() error {
indexBlob, err := imageStore.GetIndexContent(repo)
if err != nil {
log.Error().Err(err).Str("repository", repo).Msg("failed to read index.json for repo")

imageStore.RLock(&lockLatency)
defer imageStore.RUnlock(&lockLatency)
return err
}

indexBlob, err := imageStore.GetIndexContent(repo)
if err != nil {
log.Error().Err(err).Str("repository", repo).Msg("failed to read index.json for repo")
var indexContent ispec.Index

return err
}
err = json.Unmarshal(indexBlob, &indexContent)
if err != nil {
log.Error().Err(err).Str("repository", repo).Msg("failed to unmarshal index.json for repo")

var indexContent ispec.Index
return err
}

err = json.Unmarshal(indexBlob, &indexContent)
if err != nil {
log.Error().Err(err).Str("repository", repo).Msg("failed to unmarshal index.json for repo")
err = metaDB.ResetRepoReferences(repo)
if err != nil && !errors.Is(err, zerr.ErrRepoMetaNotFound) {
log.Error().Err(err).Str("repository", repo).Msg("failed to reset tag field in RepoMetadata for repo")

return err
}
return err
}

err = metaDB.ResetRepoReferences(repo)
if err != nil && !errors.Is(err, zerr.ErrRepoMetaNotFound) {
log.Error().Err(err).Str("repository", repo).Msg("failed to reset tag field in RepoMetadata for repo")
for _, manifest := range indexContent.Manifests {
tag := manifest.Annotations[ispec.AnnotationRefName]

return err
}
if zcommon.IsReferrersTag(tag) {
continue
}

for _, manifest := range indexContent.Manifests {
tag := manifest.Annotations[ispec.AnnotationRefName]
manifestBlob, _, _, err := imageStore.GetImageManifest(repo, manifest.Digest.String())
if err != nil {
log.Error().Err(err).Str("repository", repo).Str("digest", manifest.Digest.String()).
Msg("failed to get blob for image")

if zcommon.IsReferrersTag(tag) {
continue
}
return err
}

manifestBlob, _, _, err := imageStore.GetImageManifest(repo, manifest.Digest.String())
if err != nil {
log.Error().Err(err).Str("repository", repo).Str("digest", manifest.Digest.String()).
Msg("failed to get blob for image")
reference := tag

return err
}
if tag == "" {
reference = manifest.Digest.String()
}

reference := tag
err = SetImageMetaFromInput(context.Background(), repo, reference, manifest.MediaType, manifest.Digest, manifestBlob,
imageStore, metaDB, log)
if err != nil {
log.Error().Err(err).Str("repository", repo).Str("tag", tag).
Msg("failed to set metadata for image")

if tag == "" {
reference = manifest.Digest.String()
return err
}
}

err = SetImageMetaFromInput(context.Background(), repo, reference, manifest.MediaType, manifest.Digest, manifestBlob,
imageStore, metaDB, log)
if err != nil {
log.Error().Err(err).Str("repository", repo).Str("tag", tag).
Msg("failed to set metadata for image")

return err
}
}
return nil
})

return nil
return err
}

func getAllRepos(storeController stypes.StoreController, log log.Logger) ([]string, error) {
Expand Down Expand Up @@ -222,34 +220,33 @@ func getCosignSignatureLayersInfo(
return layers, err
}

var lockLatency time.Time

imageStore.RLock(&lockLatency)
defer imageStore.RUnlock(&lockLatency)
err := imageStore.WithRepoReadLock(repo, func() error {
for _, layer := range manifestContent.Layers {
layerContent, err := imageStore.GetBlobContent(repo, layer.Digest)
if err != nil {
log.Error().Err(err).Str("repository", repo).Str("reference", tag).Str("layerDigest", layer.Digest.String()).
Msg("failed to get cosign signature layer content")

for _, layer := range manifestContent.Layers {
layerContent, err := imageStore.GetBlobContent(repo, layer.Digest)
if err != nil {
log.Error().Err(err).Str("repository", repo).Str("reference", tag).Str("layerDigest", layer.Digest.String()).Msg(
"failed to get cosign signature layer content")
return err
}

return layers, err
}
layerSigKey, ok := layer.Annotations[zcommon.CosignSigKey]
if !ok {
log.Error().Err(err).Str("repository", repo).Str("reference", tag).Str("layerDigest", layer.Digest.String()).
Msg("failed to get specific annotation of cosign signature")
}

layerSigKey, ok := layer.Annotations[zcommon.CosignSigKey]
if !ok {
log.Error().Err(err).Str("repository", repo).Str("reference", tag).Str("layerDigest", layer.Digest.String()).Msg(
"failed to get specific annotation of cosign signature")
layers = append(layers, mTypes.LayerInfo{
LayerDigest: layer.Digest.String(),
LayerContent: layerContent,
SignatureKey: layerSigKey,
})
}

layers = append(layers, mTypes.LayerInfo{
LayerDigest: layer.Digest.String(),
LayerContent: layerContent,
SignatureKey: layerSigKey,
})
}
return nil
})

return layers, nil
return layers, err
}

func getNotationSignatureLayersInfo(
Expand Down Expand Up @@ -279,28 +276,27 @@ func getNotationSignatureLayersInfo(

layer := manifestContent.Layers[0].Digest

var lockLatency time.Time
err := imageStore.WithRepoReadLock(repo, func() error {
layerContent, err := imageStore.GetBlobContent(repo, layer)
if err != nil {
log.Error().Err(err).Str("repository", repo).Str("reference", manifestDigest).Str("layerDigest", layer.String()).
Msg("failed to get notation signature blob content")

imageStore.RLock(&lockLatency)
defer imageStore.RUnlock(&lockLatency)
return err
}

layerContent, err := imageStore.GetBlobContent(repo, layer)
if err != nil {
log.Error().Err(err).Str("repository", repo).Str("reference", manifestDigest).Str("layerDigest", layer.String()).Msg(
"failed to get notation signature blob content")
layerSigKey := manifestContent.Layers[0].MediaType

return layers, err
}

layerSigKey := manifestContent.Layers[0].MediaType
layers = append(layers, mTypes.LayerInfo{
LayerDigest: layer.String(),
LayerContent: layerContent,
SignatureKey: layerSigKey,
})

layers = append(layers, mTypes.LayerInfo{
LayerDigest: layer.String(),
LayerContent: layerContent,
SignatureKey: layerSigKey,
return nil
})

return layers, nil
return layers, err
}

// SetMetadataFromInput tries to set manifest metadata and update repo metadata by adding the current tag
Expand Down
13 changes: 4 additions & 9 deletions pkg/storage/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,9 +876,6 @@ type DedupeTaskGenerator struct {
ImgStore storageTypes.ImageStore
// storage dedupe value
Dedupe bool
// store blobs paths grouped by digest
digest godigest.Digest
duplicateBlobs []string
/* store processed digest, used for iterating duplicateBlobs one by one
and generating a task for each unprocessed one*/
lastDigests []godigest.Digest
Expand Down Expand Up @@ -917,15 +914,15 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) {
}

// get all blobs from storage.imageStore and group them by digest
gen.digest, gen.duplicateBlobs, err = gen.ImgStore.GetNextDigestWithBlobPaths(gen.repos, gen.lastDigests)
digest, duplicateBlobs, err := gen.ImgStore.GetNextDigestWithBlobPaths(gen.repos, gen.lastDigests)
if err != nil {
gen.Log.Error().Err(err).Str("component", "dedupe").Msg("failed to get next digest")

return nil, err
}

// if no digests left, then mark the task generator as done
if gen.digest == "" {
if digest == "" {
gen.Log.Info().Str("component", "dedupe").Msg("no digests left, finished")

gen.done = true
Expand All @@ -934,10 +931,10 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) {
}

// mark digest as processed before running its task
gen.lastDigests = append(gen.lastDigests, gen.digest)
gen.lastDigests = append(gen.lastDigests, digest)

// generate rebuild dedupe task for this digest
return newDedupeTask(gen.ImgStore, gen.digest, gen.Dedupe, gen.duplicateBlobs, gen.Log), nil
return newDedupeTask(gen.ImgStore, digest, gen.Dedupe, duplicateBlobs, gen.Log), nil
}

func (gen *DedupeTaskGenerator) IsDone() bool {
Expand All @@ -950,9 +947,7 @@ func (gen *DedupeTaskGenerator) IsReady() bool {

func (gen *DedupeTaskGenerator) Reset() {
gen.lastDigests = []godigest.Digest{}
gen.duplicateBlobs = []string{}
gen.repos = []string{}
gen.digest = ""
gen.done = false
}

Expand Down
Loading

0 comments on commit b649d07

Please sign in to comment.