Skip to content

Commit

Permalink
AsyncThrottleSequence/Fixed error propagation (#309)
Browse files Browse the repository at this point in the history
  • Loading branch information
tonikocjan authored and borut-t committed May 16, 2024
1 parent 9dfce2c commit c26f043
Showing 1 changed file with 16 additions and 21 deletions.
37 changes: 16 additions & 21 deletions Sources/Async/AsyncThrottleSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -116,35 +116,30 @@ extension AsyncThrottleSequence: AsyncSequence where C.Duration == Duration {
/// - Throws: An error if the underlying `AsyncSequence` throws an error.
/// - Returns: The next element in the sequence, or `nil` if there are no more elements.
public func next() async throws -> Element? {
var task: Task<Element?, Error>?

lock.withLock {
let task = lock.withLock {
taskInExecution?.cancel()
taskInExecution = nil
let taskA = Task {
try await Task.sleep(until: clock.now.advanced(by: delayBetweenTasks), clock: clock)
let task = Task {
try await Task.sleep(
until: clock.now.advanced(by: delayBetweenTasks),
clock: clock
)
let result = try await baseIterator.next()
do {
// If task was cancelled while a request was being awaited,
// return `nil`!
try Task.checkCancellation()
return result
} catch {
if error is CancellationError {
return nil
}
throw error
}
try Task.checkCancellation()
return result
}
task = taskA
taskInExecution = taskA
taskInExecution = task
return task
}
do {
return try await task?.value
return try await task.value
} catch {
return nil
if error is CancellationError {
return nil
}
throw error
}
}
}
}

/// Creates a new iterator for the `AsyncThrottleSequence`.
Expand Down

0 comments on commit c26f043

Please sign in to comment.