Skip to content

Commit 13a47bd

Browse files
committed
API: Hide removeObserver functionality
Fixes #10
1 parent e2be681 commit 13a47bd

13 files changed

+53
-38
lines changed

CHANGELOG.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ Breaking changes in **bold**.
99
* A bunch of class / trait member fields are now `Try[A]` instead of `A`
1010
* Split `fire` into `fireValue` & `fireError`, etc.
1111
* By and large this does not affect existing Airstream usage, just customization by subclassing
12+
* **API: `Observable.removeObserver` and `Transaction.removeExternalObserver` are now private ([#10](https://github.com/raquo/Airstream/issues/10))**
13+
* **Build: Drop Scala 2.11 support**
1214
* New: SignalViewer
1315
* This serves as a warning about my intention to deprecate and eventually remove the entirety of the `State` type in Airstream. Its strictness has not proved useful, and yet has plenty of drawbacks. See [Laminar#37](https://github.com/raquo/Laminar/issues/37) for details. If you want to speak up against that, now is the time.
14-
15-
* **Build: Drop Scala 2.11 support**
1616

1717
#### v0.3 – Sep 2018
1818

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ EventStream is a **lazy** observable. That means that it will not receive or pro
9595

9696
When you add an Observer to a stream, it starts to send events to the observer from now on. Different streams could potentially have custom code in them overriding this behaviour however. We strive for obviousness.
9797

98-
The result of calling `observable.addObserver(observer)(owner)` or `observable.foreach(onNext)(owner)` is a Subscription. To remove the observer you can call `observable.removeObserver(sameObserver)` or `subscription.kill()`. You can ignore the `owner` implicit param for now. Read about it later in the [Ownership](#ownership) section.
98+
The result of calling `observable.addObserver(observer)(owner)` or `observable.foreach(onNext)(owner)` is a Subscription. To remove the observer manually, you can call `subscription.kill()`, but usually it's the `owner`'s job to do that. Hold that though for now, read about owners later in the [Ownership](#ownership) section.
9999

100100

101101
### Laziness
@@ -138,7 +138,7 @@ For extra clarity, while the stream `rap` does depend on `qux`, `rap` itself has
138138

139139
#### Stopping Observables
140140

141-
Just like Observers can be added to streams, they can also be removed, e.g. with `removeObserver`. When you remove the last observer (internal or external) from a stream, it is said to be **stopped**. The same domino effect as when starting streams applies, except the `onStop` method recursively undoes everything that was done by `onStart` – instead of adding an InternalObserver to parent stream, we remove it, and if that causes the grand-parent stream to be stopped, we call its `onStop` method, and the chain continues upstream.
141+
Just like Observers can be added to streams, they can also be removed, e.g. with `subscription.kill()`. When you remove the last observer (internal or external) from a stream, the stream is said to be **stopped**. The same domino effect as when starting streams applies, except the `onStop` method recursively undoes everything that was done by `onStart` – instead of adding an InternalObserver to parent stream, we remove it, and if that causes the grand-parent stream to be stopped, we call its `onStop` method, and the chain continues upstream.
142142

143143
When the dust settles, streams that are now without observers (internal or external) will be stopped, and those that still have observers will otherwise be untouched, except they will stop referencing the now-stopped observables in their lists of internal observers.
144144

src/main/scala/com/raquo/airstream/core/Observable.scala

+4-3
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,9 @@ trait Observable[+A] {
5151
subscription
5252
}
5353

54-
// @TODO[Bug] See https://github.com/raquo/Airstream/issues/10
55-
/** Schedule unsubscription from an external observer in the next transaction.
54+
/** Note: this is core-private for subscription safety. See https://github.com/raquo/Airstream/issues/10
55+
*
56+
* Schedule unsubscription from an external observer in the next transaction.
5657
*
5758
* This will still happen synchronously, but will not interfere with
5859
* iteration over the observables' lists of observers during the current
@@ -61,7 +62,7 @@ trait Observable[+A] {
6162
* Note: To completely unsubscribe an Observer from this Observable,
6263
* you need to remove it as many times as you added it to this Observable.
6364
*/
64-
@inline def removeObserver(observer: Observer[A]): Unit = {
65+
@inline private[core] def removeObserver(observer: Observer[A]): Unit = {
6566
Transaction.removeExternalObserver(this, observer)
6667
}
6768

src/main/scala/com/raquo/airstream/core/Transaction.scala

+6-3
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,13 @@ object Transaction { // extends GlobalCounter {
3535

3636
private[this] val pendingObserverRemovals: js.Array[() => Unit] = js.Array()
3737

38-
/** Safely remove external observer (such that it doesn't interfere with iteration over the list of observers).
39-
* Removal still happens synchronously, just at the end of a transaction if one is running right now.
38+
/** Note: this is core-private for subscription safety. See https://github.com/raquo/Airstream/issues/10
39+
*
40+
* Safely remove external observer (such that it doesn't interfere with iteration over the list of observers).
41+
* Removal still happens synchronously, just at the end of a transaction if one is running right now, so that it
42+
* does not interfere with iteration over the observables' lists of observers during the current transaction.
4043
*/
41-
def removeExternalObserver[A](observable: Observable[A], observer: Observer[A]): Unit = {
44+
private[core] def removeExternalObserver[A](observable: Observable[A], observer: Observer[A]): Unit = {
4245
if (isSafeToRemoveObserver) {
4346
// remove right now – useful for efficient recursive removals
4447
observable.removeExternalObserverNow(observer)

src/main/scala/com/raquo/airstream/eventstream/EventStream.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import com.raquo.airstream.core.AirstreamError.ObserverError
44
import com.raquo.airstream.core.{AirstreamError, LazyObservable, MemoryObservable, Transaction}
55
import com.raquo.airstream.features.CombineObservable
66
import com.raquo.airstream.ownership.Owner
7-
import com.raquo.airstream.signal.{FoldSignal, Signal, SignalFromEventStream}
7+
import com.raquo.airstream.signal.{FoldSignal, Signal, SignalFromEventStream, SignalViewer}
88
import com.raquo.airstream.state.{MapState, State}
99

1010
import scala.concurrent.Future

src/main/scala/com/raquo/airstream/signal/Signal.scala

+7-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,13 @@ trait Signal[+A] extends MemoryObservable[A] with LazyObservable[A] {
7777

7878
override def recoverToTry: Signal[Try[A]] = map(Try(_)).recover[Try[A]] { case err => Some(Failure(err)) }
7979

80-
def observe(implicit owner: Owner): SignalViewer[A] = new SignalViewer[A](this, owner)
80+
/** Add a noop observer to this signal to ensure that it's started.
81+
* This lets you access .now and .tryNow on the resulting SignalViewer.
82+
*
83+
* You can use `myStream.toWeakSignal.observe` to read the last emitted
84+
* value from event streams just as well.
85+
*/
86+
def observe(implicit owner: Owner): SignalViewer[A] = new SignalViewer(this, owner)
8187

8288
/** Initial value is only evaluated if/when needed (when there are observers) */
8389
override protected[airstream] def tryNow(): Try[A] = {

src/main/scala/com/raquo/airstream/signal/SignalViewer.scala

+5
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@ import com.raquo.airstream.ownership.Owner
55

66
import scala.util.Try
77

8+
/** This class adds a noop observer to `signal`, ensuring that its current value is computed.
9+
* It then lets you query `signal`'s current value with `now` and `tryNow` methods.
10+
*
11+
* This will likely replace the entirety of the `State` type in Airstream.
12+
*/
813
class SignalViewer[+A](signal: Signal[A], owner: Owner) {
914

1015
private[this] val subscription: Subscription = signal.addObserver(Observer.empty)(owner)

src/test/scala/com/raquo/airstream/eventstream/EventBusSpec.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -40,23 +40,23 @@ class EventBusSpec extends FunSpec with Matchers {
4040
val obs1 = Observer[Int](newValue => effects += Effect("obs1", newValue))
4141
val obs2 = Observer[Int](newValue => effects += Effect("obs2", newValue))
4242

43-
bus.events.addObserver(obs1)
44-
bus.events.addObserver(obs2)
43+
val sub1 = bus.events.addObserver(obs1)
44+
val sub2 = bus.events.addObserver(obs2)
4545
val subscription3 = bus.events.foreach(newValue => effects += Effect("obs3", newValue))
4646

4747
bus.writer.onNext(5)
4848

4949
effects shouldEqual mutable.Buffer(Effect("obs1", 5), Effect("obs2", 5), Effect("obs3", 5))
5050
effects.clear()
5151

52-
bus.events.removeObserver(obs2)
52+
sub2.kill()
5353

5454
bus.writer.onNext(6)
5555

5656
effects shouldEqual mutable.Buffer(Effect("obs1", 6), Effect("obs3", 6))
5757
effects.clear()
5858

59-
bus.events.removeObserver(obs1)
59+
sub1.kill()
6060

6161
bus.writer.onNext(7)
6262
bus.writer.onNext(8)

src/test/scala/com/raquo/airstream/eventstream/SampleCombineEventStream2Spec.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class SampleCombineEventStream2Spec extends FunSpec with Matchers {
3838

3939
// --
4040

41-
combinedStream.addObserver(combinedObserver)
41+
val subCombined = combinedStream.addObserver(combinedObserver)
4242

4343
calculations shouldEqual mutable.Buffer(
4444
Calculation("signal", 0)
@@ -90,7 +90,7 @@ class SampleCombineEventStream2Spec extends FunSpec with Matchers {
9090

9191
// --
9292

93-
combinedStream.removeObserver(combinedObserver)
93+
subCombined.kill()
9494
sampledSignal.addObserver(signalObserver)
9595

9696
calculations shouldEqual mutable.Buffer()
@@ -168,7 +168,7 @@ class SampleCombineEventStream2Spec extends FunSpec with Matchers {
168168

169169
// --
170170

171-
combinedStream.addObserver(combinedObserver)
171+
val subCombined = combinedStream.addObserver(combinedObserver)
172172

173173
calculations shouldEqual mutable.Buffer()
174174
effects shouldEqual mutable.Buffer()
@@ -218,7 +218,7 @@ class SampleCombineEventStream2Spec extends FunSpec with Matchers {
218218

219219
// --
220220

221-
combinedStream.removeObserver(combinedObserver)
221+
subCombined.kill()
222222

223223
calculations shouldEqual mutable.Buffer()
224224
effects shouldEqual mutable.Buffer()

src/test/scala/com/raquo/airstream/eventstream/SwitchEventStreamSpec.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class SwitchEventStreamSpec extends FunSpec with Matchers {
3232
val flattenStream = $latestNumber
3333
.map(Calculation.log("flattened", calculations))
3434

35-
flattenStream.addObserver(flattenObserver)
35+
val subFlatten = flattenStream.addObserver(flattenObserver)
3636

3737
calculations shouldEqual mutable.Buffer()
3838
effects shouldEqual mutable.Buffer()
@@ -93,7 +93,7 @@ class SwitchEventStreamSpec extends FunSpec with Matchers {
9393
val sourceStream2Observer = Observer[Int](effects += Effect("source-2-obs", _))
9494

9595
sourceStreams(2).addObserver(sourceStream2Observer)
96-
flattenStream.removeObserver(flattenObserver)
96+
subFlatten.kill()
9797

9898
calculations shouldEqual mutable.Buffer()
9999
effects shouldEqual mutable.Buffer()

src/test/scala/com/raquo/airstream/signal/FoldSignalSpec.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class FoldSignalSpec extends FunSpec with Matchers {
3333

3434
// --
3535

36-
signal.addObserver(signalObserver)
36+
val sub = signal.addObserver(signalObserver)
3737

3838
calculations shouldEqual mutable.Buffer(
3939
Calculation("signal", "numbers:")
@@ -61,7 +61,7 @@ class FoldSignalSpec extends FunSpec with Matchers {
6161

6262
// --
6363

64-
signal.removeObserver(signalObserver)
64+
sub.kill()
6565
bus.writer.onNext(3)
6666

6767
signal.addObserver(signalObserver)

src/test/scala/com/raquo/airstream/signal/SignalSpec.scala

+10-10
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class SignalSpec extends FunSpec with Matchers {
4242
// When observer is added, it immediately gets the last evaluated current value
4343
// Note: Because signal had no observer when bus fired value `1`, that event was NOT used to compute new value.
4444
// This is expected. If this is undesirable to your case, use State instead of Signal.
45-
signal.addObserver(signalObserver1)
45+
val sub1 = signal.addObserver(signalObserver1)
4646

4747
calculations shouldEqual mutable.Buffer(
4848
Calculation("map-signal", -1) // First time current value of this signal was needed, so it is now calculated
@@ -86,7 +86,7 @@ class SignalSpec extends FunSpec with Matchers {
8686

8787
// When adding a new observer, it gets the signal's current value.
8888
// Here the current value has been updated by the previous event, and the signal remembers it.
89-
signal.addObserver(signalObserver2)
89+
val sub2 = signal.addObserver(signalObserver2)
9090

9191
calculations shouldEqual mutable.Buffer() // Using cached calculation
9292
effects shouldEqual mutable.Buffer(
@@ -113,7 +113,7 @@ class SignalSpec extends FunSpec with Matchers {
113113

114114
// --
115115

116-
signal.removeObserver(signalObserver1)
116+
sub1.kill()
117117

118118
bus.writer.onNext(4)
119119

@@ -130,7 +130,7 @@ class SignalSpec extends FunSpec with Matchers {
130130

131131
// --
132132

133-
signal.removeObserver(signalObserver2)
133+
sub2.kill()
134134

135135
bus.writer.onNext(5)
136136

@@ -197,7 +197,7 @@ class SignalSpec extends FunSpec with Matchers {
197197

198198
// --
199199

200-
changes.addObserver(changesObserver)
200+
val subChanges1 = changes.addObserver(changesObserver)
201201

202202
bus.writer.onNext(2)
203203

@@ -217,7 +217,7 @@ class SignalSpec extends FunSpec with Matchers {
217217
// --
218218

219219
// Adding observer to signal sends the last evaluated current value to it
220-
signal.addObserver(signalObserver)
220+
val subSignal = signal.addObserver(signalObserver)
221221

222222
calculations shouldEqual mutable.Buffer()
223223
effects shouldEqual mutable.Buffer(
@@ -258,7 +258,7 @@ class SignalSpec extends FunSpec with Matchers {
258258

259259
// --
260260

261-
changes.removeObserver(changesObserver)
261+
subChanges1.kill()
262262

263263
bus.writer.onNext(4)
264264

@@ -275,15 +275,15 @@ class SignalSpec extends FunSpec with Matchers {
275275

276276
// --
277277

278-
changes.addObserver(changesObserver)
278+
val subChanges2 = changes.addObserver(changesObserver)
279279

280280
calculations shouldEqual mutable.Buffer()
281281
effects shouldEqual mutable.Buffer()
282282

283283
// --
284284

285-
changes.removeObserver(changesObserver)
286-
signal.removeObserver(signalObserver)
285+
subChanges2.kill()
286+
subSignal.kill()
287287

288288
bus.writer.onNext(5)
289289

src/test/scala/com/raquo/airstream/state/StateSpec.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class StateSpec extends FunSpec with Matchers {
7575

7676
// --
7777

78-
state1.addObserver(stateObserver1)
78+
val sub1 = state1.addObserver(stateObserver1)
7979

8080
calculations shouldEqual mutable.Buffer()
8181
effects shouldEqual mutable.Buffer(
@@ -102,7 +102,7 @@ class StateSpec extends FunSpec with Matchers {
102102

103103
// --
104104

105-
state2.addObserver(stateObserver2)
105+
val sub2 = state2.addObserver(stateObserver2)
106106

107107
calculations shouldEqual mutable.Buffer()
108108
effects shouldEqual mutable.Buffer(
@@ -130,8 +130,8 @@ class StateSpec extends FunSpec with Matchers {
130130

131131
// --
132132

133-
state2.removeObserver(stateObserver2)
134-
state1.removeObserver(stateObserver1)
133+
sub2.kill()
134+
sub1.kill()
135135

136136
calculations shouldEqual mutable.Buffer()
137137
effects shouldEqual mutable.Buffer()

0 commit comments

Comments
 (0)