Skip to content

Commit

Permalink
Prepare isExpired to pass a margin for slower scenarios (#20)
Browse files Browse the repository at this point in the history
* move to 250ms for slow suscriptions

* Move margin to variable and set default value to 250ms

* Bump dependencies

* Fix CI

* Try with margin

* Pass margin to isExpired too
  • Loading branch information
sebastianvarela authored Oct 24, 2024
1 parent 806b4e6 commit 4797f57
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 14 deletions.
16 changes: 10 additions & 6 deletions Sources/Publishers/Publishers.RemoveExpired.swift
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import Combine

public extension Publisher {
func removeExpired() -> Publishers.RemoveExpired<Self>
func removeExpired(margin: TimeInterval = taskDefaultMargin) -> Publishers.RemoveExpired<Self>
where Output: Taskable {
Publishers.RemoveExpired(upstream: self)
Publishers.RemoveExpired(upstream: self, margin: margin)
}
}

Expand All @@ -15,13 +15,15 @@ public extension Publishers {
public typealias Failure = Upstream.Failure

public let upstream: Upstream
private let margin: TimeInterval

public init(upstream: Upstream) {
public init(upstream: Upstream, margin: TimeInterval) {
self.upstream = upstream
self.margin = margin
}

public func receive<S: Subscriber>(subscriber: S) where Upstream.Failure == S.Failure, Output == S.Input {
upstream.subscribe(Inner(downstream: subscriber))
upstream.subscribe(Inner(downstream: subscriber, margin: margin))
}
}
}
Expand All @@ -31,17 +33,19 @@ extension Publishers.RemoveExpired {
where Downstream.Input == Output, Downstream.Failure == Upstream.Failure, Output: Taskable {
let combineIdentifier = CombineIdentifier()
private let downstream: Downstream
private let margin: TimeInterval

fileprivate init(downstream: Downstream) {
fileprivate init(downstream: Downstream, margin: TimeInterval) {
self.downstream = downstream
self.margin = margin
}

func receive(subscription: Subscription) {
downstream.receive(subscription: subscription)
}

func receive(_ input: Upstream.Output) -> Subscribers.Demand {
if input.isExpired {
if input.isExpired(margin: margin) {
return .none
}
return downstream.receive(input)
Expand Down
4 changes: 2 additions & 2 deletions Sources/Publishers/Publishers.Scope.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import Foundation

public extension Publisher {
/// From a publisher, we can focus on a task and filter all expired and duplicated task. This publisher don't send value if at suscription moment there is a expired task.
func scope<T: Taskable & Equatable>(_ transform: @escaping (Self.Output) -> T) -> AnyPublisher<T, Failure> {
func scope<T: Taskable & Equatable>(_ transform: @escaping (Self.Output) -> T, margin: TimeInterval = taskDefaultMargin) -> AnyPublisher<T, Failure> {
map(transform)
.removeExpired()
.removeExpired(margin: margin)
.removeDuplicates()
.eraseToAnyPublisher()
}
Expand Down
7 changes: 4 additions & 3 deletions Sources/Task/Task.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import Foundation

public let taskDefaultMargin: TimeInterval = 0.250

public class Task<T: Equatable, E: Error & Equatable>: Taskable, Equatable, CustomDebugStringConvertible {
public typealias Payload = T
public typealias Failure = E
Expand Down Expand Up @@ -50,9 +52,8 @@ public class Task<T: Equatable, E: Error & Equatable>: Taskable, Equatable, Cust
status == .running
}

public var isExpired: Bool {
let margin: TimeInterval = 0.1 // 100ms for suscriptions propagations
return started.timeIntervalSinceNow + expiration.value + margin < 0
public func isExpired(margin: TimeInterval) -> Bool {
started.timeIntervalSinceNow + expiration.value + margin < 0
}

public var isRecentlySucceeded: Bool {
Expand Down
8 changes: 7 additions & 1 deletion Sources/Task/Taskable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ public protocol Taskable {

var isIdle: Bool { get }
var isRunning: Bool { get }
var isExpired: Bool { get }
func isExpired(margin: TimeInterval) -> Bool
var isRecentlySucceeded: Bool { get }
var isTerminal: Bool { get }
var isSuccessful: Bool { get }
Expand All @@ -23,6 +23,12 @@ public protocol Taskable {
static func success(_ payload: Payload, started: Date, expiration: TaskExpiration, tag: String?, progress: Decimal?) -> Self
}

public extension Taskable {
var isExpired: Bool {
self.isExpired(margin: taskDefaultMargin)
}
}

public extension Taskable {
static func idle(started: Date = Date(),
tag: String? = nil,
Expand Down
6 changes: 4 additions & 2 deletions Tests/PublishersTests+RemoveExpired.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ extension PublishersTests {

let subject = PassthroughSubject<Task<String, TestError>, Never>()

let margin: TimeInterval = 0.500

subject
.removeExpired() // Filter the 2 expired task
.removeExpired(margin: margin) // Filter the 2 expired task
.removeDuplicates() // Pass only the first success task because the expired they never get here!
.sink { task in
XCTAssertFalse(task.isExpired)
XCTAssertFalse(task.isExpired(margin: margin))
expectation.fulfill()
}
.store(in: &cancellables)
Expand Down

0 comments on commit 4797f57

Please sign in to comment.