Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transfer - Snapshot may be incomplete #1016

Merged
merged 3 commits into from
Nov 5, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 23 additions & 17 deletions artifactory/commands/transferfiles/fulltransfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,15 @@ func (m *fullTransferPhase) run() error {
if ShouldStop(&m.phaseBase, &delayHelper, errorsChannelMng) {
return nil
}
folderHandler := m.createFolderFullTransferHandlerFunc(*pcWrapper, uploadChunkChan, delayHelper, errorsChannelMng)
_, err := pcWrapper.chunkBuilderProducerConsumer.AddTaskWithError(folderHandler(folderParams{relativePath: "."}), pcWrapper.errorsQueue.AddError)

// Get the directory's node from the snapshot manager, and use information from previous transfer attempts if such exist.
node, done, err := m.getAndHandleDirectoryNode(".")
if err != nil || done {
return err
}

folderHandler := m.createFolderFullTransferHandlerFunc(node, *pcWrapper, uploadChunkChan, delayHelper, errorsChannelMng)
_, err = pcWrapper.chunkBuilderProducerConsumer.AddTaskWithError(folderHandler(folderParams{relativePath: "."}), pcWrapper.errorsQueue.AddError)
return err
}
delayAction := func(phase phaseBase, addedDelayFiles []string) error {
Expand All @@ -117,17 +124,17 @@ type folderParams struct {
relativePath string
}

func (m *fullTransferPhase) createFolderFullTransferHandlerFunc(pcWrapper producerConsumerWrapper, uploadChunkChan chan UploadedChunk,
func (m *fullTransferPhase) createFolderFullTransferHandlerFunc(node *reposnapshot.Node, pcWrapper producerConsumerWrapper, uploadChunkChan chan UploadedChunk,
delayHelper delayUploadHelper, errorsChannelMng *ErrorsChannelMng) folderFullTransferHandlerFunc {
return func(params folderParams) parallel.TaskFunc {
return func(threadId int) error {
logMsgPrefix := clientUtils.GetLogMsgPrefix(threadId, false)
return m.transferFolder(params, logMsgPrefix, pcWrapper, uploadChunkChan, delayHelper, errorsChannelMng)
return m.transferFolder(node, params, logMsgPrefix, pcWrapper, uploadChunkChan, delayHelper, errorsChannelMng)
}
}
}

func (m *fullTransferPhase) transferFolder(params folderParams, logMsgPrefix string, pcWrapper producerConsumerWrapper,
func (m *fullTransferPhase) transferFolder(node *reposnapshot.Node, params folderParams, logMsgPrefix string, pcWrapper producerConsumerWrapper,
uploadChunkChan chan UploadedChunk, delayHelper delayUploadHelper, errorsChannelMng *ErrorsChannelMng) (err error) {
log.Debug(logMsgPrefix+"Handling folder:", path.Join(m.repoKey, params.relativePath))

Expand All @@ -139,12 +146,6 @@ func (m *fullTransferPhase) transferFolder(params folderParams, logMsgPrefix str
return
}

// Get the directory's node from the snapshot manager, and use information from previous transfer attempts if such exist.
node, done, err := m.getAndHandleDirectoryNode(params, logMsgPrefix)
if err != nil || done {
return err
}

curUploadChunk, err := m.searchAndHandleFolderContents(params, pcWrapper,
uploadChunkChan, delayHelper, errorsChannelMng, node)
if err != nil {
Expand Down Expand Up @@ -227,7 +228,13 @@ func (m *fullTransferPhase) handleFoundChildFolder(params folderParams, pcWrappe
item servicesUtils.ResultItem) (err error) {
newRelativePath := getFolderRelativePath(item.Name, params.relativePath)

folderHandler := m.createFolderFullTransferHandlerFunc(pcWrapper, uploadChunkChan, delayHelper, errorsChannelMng)
// Get the directory's node from the snapshot manager, and use information from previous transfer attempts if such exist.
node, done, err := m.getAndHandleDirectoryNode(newRelativePath)
if err != nil || done {
return err
}

folderHandler := m.createFolderFullTransferHandlerFunc(node, pcWrapper, uploadChunkChan, delayHelper, errorsChannelMng)
_, err = pcWrapper.chunkBuilderProducerConsumer.AddTaskWithError(folderHandler(folderParams{relativePath: newRelativePath}), pcWrapper.errorsQueue.AddError)
return
}
Expand Down Expand Up @@ -289,15 +296,14 @@ func generateFolderContentAqlQuery(repoKey, relativePath string, paginationOffse
// node - A node in the repository snapshot tree, which represents the current directory.
// completed - Whether handling the node directory was completed. If it wasn't fully transferred, we start exploring and transferring it from scratch.
// previousChildren - If the directory requires exploring, previously known children will be added from this map in order to preserve their states and references.
func (m *fullTransferPhase) getAndHandleDirectoryNode(params folderParams, logMsgPrefix string) (node *reposnapshot.Node, completed bool, err error) {
node, err = m.stateManager.LookUpNode(params.relativePath)
func (m *fullTransferPhase) getAndHandleDirectoryNode(relativePath string) (node *reposnapshot.Node, completed bool, err error) {
node, err = m.stateManager.LookUpNode(relativePath)
if err != nil {
return
}

// If data was not loaded from snapshot, we know that the node is visited for the first time and was not explored.
loadedFromSnapshot, err := m.stateManager.WasSnapshotLoaded()
if err != nil || !loadedFromSnapshot {
if !m.stateManager.WasSnapshotLoaded() {
return
}

Expand All @@ -306,7 +312,7 @@ func (m *fullTransferPhase) getAndHandleDirectoryNode(params folderParams, logMs
return
}
if completed {
log.Debug(logMsgPrefix+"Skipping completed folder:", path.Join(m.repoKey, params.relativePath))
log.Debug("Skipping completed folder:", path.Join(m.repoKey, relativePath))
return
}
// If the node was not completed, we will start exploring it from the beginning.
Expand Down
8 changes: 2 additions & 6 deletions artifactory/commands/transferfiles/state/transfersnapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,8 @@ func (ts *TransferStateManager) LookUpNode(relativePath string) (requestedNode *
return
}

func (ts *TransferStateManager) WasSnapshotLoaded() (wasLoaded bool, err error) {
err = ts.snapshotAction(func(rts *RepoTransferSnapshot) error {
wasLoaded = rts.loadedFromSnapshot
return nil
})
return
func (ts *TransferStateManager) WasSnapshotLoaded() bool {
return ts.repoTransferSnapshot.loadedFromSnapshot
}

func (ts *TransferStateManager) GetDirectorySnapshotNodeWithLru(relativePath string) (node *reposnapshot.Node, err error) {
Expand Down
5 changes: 3 additions & 2 deletions utils/reposnapshot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,10 @@ func (node *Node) IncrementFilesCount() error {

func (node *Node) DecrementFilesCount() error {
return node.action(func(node *Node) error {
if node.filesCount > 0 {
node.filesCount--
if node.filesCount == 0 {
return errorutils.CheckErrorf("attempting to decrease file count in node '%s', but the files count is already 0", node.name)
}
node.filesCount--
return nil
})
}
Expand Down