From d0d040d1faff97022e8de0e608ad620c5f9660a8 Mon Sep 17 00:00:00 2001 From: EYHN Date: Tue, 23 Jan 2024 23:23:15 +0800 Subject: [PATCH] feat(infra): livadata --- packages/common/infra/package.json | 4 + .../src/livedata/__tests__/livedata.spec.ts | 188 +++++++++++ .../src/livedata/__tests__/react.spec.tsx | 60 ++++ packages/common/infra/src/livedata/index.ts | 299 ++++++++++++++++++ packages/common/infra/src/livedata/react.ts | 44 +++ yarn.lock | 2 + 6 files changed, 597 insertions(+) create mode 100644 packages/common/infra/src/livedata/__tests__/livedata.spec.ts create mode 100644 packages/common/infra/src/livedata/__tests__/react.spec.tsx create mode 100644 packages/common/infra/src/livedata/index.ts create mode 100644 packages/common/infra/src/livedata/react.ts diff --git a/packages/common/infra/package.json b/packages/common/infra/package.json index f64d9baf8b664..093f2cf22f149 100644 --- a/packages/common/infra/package.json +++ b/packages/common/infra/package.json @@ -7,6 +7,7 @@ "./command": "./src/command/index.ts", "./atom": "./src/atom/index.ts", "./app-config-storage": "./src/app-config-storage.ts", + "./livedata": "./src/livedata/index.ts", ".": "./src/index.ts" }, "dependencies": { @@ -16,9 +17,11 @@ "@blocksuite/blocks": "0.12.0-nightly-202401290223-b6302df", "@blocksuite/global": "0.12.0-nightly-202401290223-b6302df", "@blocksuite/store": "0.12.0-nightly-202401290223-b6302df", + "foxact": "^0.2.20", "jotai": "^2.5.1", "jotai-effect": "^0.2.3", "nanoid": "^5.0.3", + "react": "18.2.0", "tinykeys": "^2.1.0", "yjs": "^13.6.10", "zod": "^3.22.4" @@ -28,6 +31,7 @@ "@affine/templates": "workspace:*", "@blocksuite/lit": "0.12.0-nightly-202401290223-b6302df", "@blocksuite/presets": "0.12.0-nightly-202401290223-b6302df", + "@testing-library/react": "^14.0.0", "async-call-rpc": "^6.3.1", "react": "^18.2.0", "rxjs": "^7.8.1", diff --git a/packages/common/infra/src/livedata/__tests__/livedata.spec.ts b/packages/common/infra/src/livedata/__tests__/livedata.spec.ts new file mode 100644 index 0000000000000..a48545a9a24a1 --- /dev/null +++ b/packages/common/infra/src/livedata/__tests__/livedata.spec.ts @@ -0,0 +1,188 @@ +import type { Subscriber } from 'rxjs'; +import { combineLatest, Observable, of } from 'rxjs'; +import { describe, expect, test, vitest } from 'vitest'; + +import { LiveData } from '..'; + +describe('livedata', () => { + test('LiveData', async () => { + const livedata = new LiveData(0); + expect(livedata.value).toBe(0); + livedata.next(1); + expect(livedata.value).toBe(1); + let subscribed = 0; + livedata.subscribe(v => { + subscribed = v; + }); + livedata.next(2); + expect(livedata.value).toBe(2); + await vitest.waitFor(() => subscribed === 2); + }); + + test('from', async () => { + { + const livedata = LiveData.from(of(1, 2, 3, 4), 0); + expect(livedata.value).toBe(4); + } + + { + let subscriber: Subscriber = null!; + const observable = new Observable(s => { + subscriber = s; + }); + const livedata = LiveData.from(observable, 0); + let value = 0; + livedata.subscribe(v => { + value = v; + }); + + expect(value).toBe(0); + subscriber.next(1); + expect(value).toBe(1); + subscriber.next(2); + expect(value).toBe(2); + } + + { + let observableSubscribed = false; + let observableClosed = false; + const observable = new Observable(subscriber => { + observableSubscribed = true; + subscriber.next(1); + return () => { + observableClosed = true; + }; + }); + const livedata = LiveData.from(observable, 0); + expect(observableSubscribed).toBe(false); + const subscription = livedata.subscribe(_ => {}); + expect(observableSubscribed).toBe(true); + expect(observableClosed).toBe(false); + subscription.unsubscribe(); + expect(observableClosed).toBe(true); + } + + { + let subscriber: Subscriber = null!; + const observable = new Observable(s => { + subscriber = s; + }); + const livedata = LiveData.from(observable, 0); + let value1 = 0; + livedata.subscribe(v => { + value1 = v; + }); + + let value2 = 0; + livedata.subscribe(v => { + value2 = v; + }); + + expect(value1).toBe(0); + expect(value2).toBe(0); + subscriber.next(1); + expect(value1).toBe(1); + expect(value2).toBe(1); + subscriber.next(2); + expect(value1).toBe(2); + expect(value2).toBe(2); + } + + { + let observableSubscribed = false; + let observableClosed = false; + const observable = new Observable(subscriber => { + observableSubscribed = true; + subscriber.next(1); + return () => { + observableClosed = true; + }; + }); + const livedata = LiveData.from(observable, 0); + expect(observableSubscribed).toBe(false); + const subscription1 = livedata.subscribe(_ => {}); + const subscription2 = livedata.subscribe(_ => {}); + expect(observableSubscribed).toBe(true); + expect(observableClosed).toBe(false); + subscription1.unsubscribe(); + expect(observableClosed).toBe(false); + subscription2.unsubscribe(); + expect(observableClosed).toBe(true); + } + + { + let observerCount = 0; + const observable = new Observable(_ => { + observerCount++; + }); + const livedata = LiveData.from(observable, 0); + livedata.subscribe(_ => {}); + livedata.subscribe(_ => {}); + expect(observerCount).toBe(1); + } + + { + let value = 0; + const observable = new Observable(subscriber => { + subscriber.next(value); + }); + const livedata = LiveData.from(observable, 0); + expect(livedata.value).toBe(0); + value = 1; + expect(livedata.value).toBe(1); + } + }); + + test('map', () => { + { + const livedata = new LiveData(0); + const mapped = livedata.map(v => v + 1); + expect(mapped.value).toBe(1); + livedata.next(1); + expect(mapped.value).toBe(2); + } + + { + const livedata = new LiveData(0); + const mapped = livedata.map(v => v + 1); + let value = 0; + mapped.subscribe(v => { + value = v; + }); + expect(value).toBe(1); + livedata.next(1); + expect(value).toBe(2); + } + + { + let observableSubscribed = false; + let observableClosed = false; + const observable = new Observable(subscriber => { + observableSubscribed = true; + subscriber.next(1); + return () => { + observableClosed = true; + }; + }); + + const livedata = LiveData.from(observable, 0); + const mapped = livedata.map(v => v + 1); + + expect(observableSubscribed).toBe(false); + const subscription = mapped.subscribe(_ => {}); + expect(observableSubscribed).toBe(true); + expect(observableClosed).toBe(false); + subscription.unsubscribe(); + expect(observableClosed).toBe(true); + } + }); + + test('interop with rxjs', () => { + const ob = combineLatest([new LiveData(1)]); + let value = 0; + ob.subscribe(v => { + value = v[0]; + }); + expect(value).toBe(1); + }); +}); diff --git a/packages/common/infra/src/livedata/__tests__/react.spec.tsx b/packages/common/infra/src/livedata/__tests__/react.spec.tsx new file mode 100644 index 0000000000000..74e9ff25bdde3 --- /dev/null +++ b/packages/common/infra/src/livedata/__tests__/react.spec.tsx @@ -0,0 +1,60 @@ +/** + * @vitest-environment happy-dom + */ +import { render, screen } from '@testing-library/react'; +import { useRef } from 'react'; +import { Observable } from 'rxjs'; +import { describe, expect, test, vi } from 'vitest'; + +import { LiveData, useLiveData } from '..'; + +describe('livedata', () => { + test('react', () => { + const livedata = new LiveData(0); + const Component = () => { + const renderCount = useRef(0); + renderCount.current++; + const value = useLiveData(livedata); + return ( +
+ {renderCount.current}:{value} +
+ ); + }; + const { rerender } = render(); + expect(screen.getByRole('main').innerText).toBe('1:0'); + livedata.next(1); + rerender(); + expect(screen.getByRole('main').innerText).toBe('3:1'); + }); + + test('lifecycle', async () => { + let observableSubscribed = false; + let observableClosed = false; + const observable = new Observable(subscriber => { + observableSubscribed = true; + subscriber.next(1); + console.log(1); + return () => { + observableClosed = true; + }; + }); + + const livedata = LiveData.from(observable, 0); + const Component1 = () => { + const value = useLiveData(livedata); + return
{value}
; + }; + + expect(observableSubscribed).toBe(false); + const { rerender } = render(); + expect(observableSubscribed).toBe(true); + + expect(observableClosed).toBe(false); + const Component2 = () => { + return
; + }; + rerender(); + await vi.waitUntil(() => observableClosed); + }); +}); diff --git a/packages/common/infra/src/livedata/index.ts b/packages/common/infra/src/livedata/index.ts new file mode 100644 index 0000000000000..49aeeb7babab0 --- /dev/null +++ b/packages/common/infra/src/livedata/index.ts @@ -0,0 +1,299 @@ +import { DebugLogger } from '@affine/debug'; +import { + distinctUntilChanged, + EMPTY, + filter, + type InteropObservable, + map, + Observable, + type Observer, + of, + type OperatorFunction, + scan, + skip, + type Subscription, + switchMap, +} from 'rxjs'; +import { BehaviorSubject, Subject } from 'rxjs'; + +export * from './react'; + +const logger = new DebugLogger('livedata'); + +/** + * LiveData is a reactive data type. + * + * ## basic usage + * + * @example + * ```ts + * const livedata = new LiveData(0); // create livedata with initial value + * + * livedata.next(1); // update value + * + * console.log(livedata.value); // get current value + * + * livedata.subscribe(v => { // subscribe to value changes + * console.log(v); // 1 + * }); + * ``` + * + * ## observable + * + * LiveData is a rxjs observable, you can use rxjs operators. + * + * @example + * ```ts + * new LiveData(0).pipe( + * map(v => v + 1), + * filter(v => v > 1), + * ... + * ) + * ``` + * + * NOTICE: different from normal observable, LiveData will always emit the latest value when you subscribe to it. + * + * ## from observable + * + * LiveData can be created from observable or from other livedata. + * + * @example + * ```ts + * const A = LiveData.from( + * of(1, 2, 3, 4), // from observable + * 0 // initial value + * ); + * + * const B = LiveData.from( + * A.pipe(map(v => 'from a ' + v)), // from other livedata + * '' // initial value + * ); + * ``` + * + * ## Why is it called LiveData + * + * This API is very similar to LiveData in Android, as both are based on Observable, so I named it LiveData. + * + * @see {@link https://rxjs.dev/api/index/class/BehaviorSubject} + * @see {@link https://developer.android.com/topic/libraries/architecture/livedata} + */ +export class LiveData implements InteropObservable { + static from( + upstream: + | Observable + | InteropObservable + | ((stream: Observable) => Observable), + initialValue: T + ): LiveData { + const data = new LiveData( + initialValue, + typeof upstream === 'function' + ? upstream + : stream => + stream.pipe( + filter( + (op): op is Exclude => op !== 'set' + ), + switchMap(v => { + if (v === 'get') { + return of('watch' as const, 'unwatch' as const); + } else { + return of(v); + } + }), + scan((acc, op) => { + if (op === 'watch') { + return acc + 1; + } else if (op === 'unwatch') { + return acc - 1; + } else { + return acc; + } + }, 0), + map(count => { + if (count > 0) { + return 'watch'; + } else { + return 'unwatch'; + } + }), + distinctUntilChanged(), + switchMap(op => { + if (op === 'watch') { + return upstream; + } else { + return EMPTY; + } + }) + ) + ); + + return data; + } + + private readonly raw: BehaviorSubject; + private readonly ops = new Subject(); + private readonly upstreamSubscription: Subscription | undefined; + + constructor( + initialValue: T, + upstream: + | ((upstream: Observable) => Observable) + | undefined = undefined + ) { + this.raw = new BehaviorSubject(initialValue); + if (upstream) { + this.upstreamSubscription = upstream(this.ops).subscribe({ + next: v => { + this.raw.next(v); + }, + complete: () => { + if (!this.raw.closed) { + logger.error('livedata upstream unexpected complete'); + } + }, + error: err => { + logger.error('uncatched error in livedata', err); + }, + }); + } + } + + getValue(): T { + this.ops.next('get'); + return this.raw.value; + } + + setValue(v: T) { + this.raw.next(v); + this.ops.next('set'); + } + + get value(): T { + return this.getValue(); + } + + set value(v: T) { + this.setValue(v); + } + + next(v: T) { + this.setValue(v); + } + + subscribe( + observer: Partial> | ((value: T) => void) | undefined + ): Subscription { + this.ops.next('watch'); + const subscription = this.raw.subscribe(observer); + subscription.add(() => { + this.ops.next('unwatch'); + }); + return subscription; + } + + map(mapper: (v: T) => R): LiveData { + const sub = LiveData.from( + new Observable(subscriber => + this.subscribe({ + next: v => { + subscriber.next(mapper(v)); + }, + complete: () => { + sub.complete(); + }, + }) + ), + undefined as R // is safe + ); + + return sub; + } + + asObservable(): Observable { + return new Observable(subscriber => { + return this.subscribe(subscriber); + }); + } + + pipe(): Observable; + pipe(op1: OperatorFunction): Observable; + pipe( + op1: OperatorFunction, + op2: OperatorFunction + ): Observable; + pipe( + op1: OperatorFunction, + op2: OperatorFunction, + op3: OperatorFunction + ): Observable; + pipe( + op1: OperatorFunction, + op2: OperatorFunction, + op3: OperatorFunction, + op4: OperatorFunction + ): Observable; + pipe( + op1: OperatorFunction, + op2: OperatorFunction, + op3: OperatorFunction, + op4: OperatorFunction, + op5: OperatorFunction + ): Observable; + pipe( + op1: OperatorFunction, + op2: OperatorFunction, + op3: OperatorFunction, + op4: OperatorFunction, + op5: OperatorFunction, + op6: OperatorFunction + ): Observable; + pipe(...args: any[]) { + return new Observable(subscriber => { + this.ops.next('watch'); + // eslint-disable-next-line prefer-spread + const subscription = this.raw.pipe + .apply(this.raw, args as any) + .subscribe(subscriber); + subscription.add(() => { + this.ops.next('unwatch'); + }); + return subscription; + }); + } + + complete() { + this.ops.complete(); + this.raw.complete(); + this.upstreamSubscription?.unsubscribe(); + } + + reactSubscribe = (cb: () => void) => { + this.ops.next('watch'); + const subscription = this.raw + .pipe(distinctUntilChanged(), skip(1)) + .subscribe(cb); + subscription.add(() => { + this.ops.next('unwatch'); + }); + return () => subscription.unsubscribe(); + }; + + reactGetSnapshot = () => { + this.ops.next('watch'); + setImmediate(() => { + this.ops.next('unwatch'); + }); + return this.raw.value; + }; + + [Symbol.observable || '@@observable']() { + return this; + } + + [Symbol.observable]() { + return this; + } +} + +export type LiveDataOperation = 'set' | 'get' | 'watch' | 'unwatch'; diff --git a/packages/common/infra/src/livedata/react.ts b/packages/common/infra/src/livedata/react.ts new file mode 100644 index 0000000000000..004f21ab7da3b --- /dev/null +++ b/packages/common/infra/src/livedata/react.ts @@ -0,0 +1,44 @@ +import { use } from 'foxact/use'; +import { useSyncExternalStore } from 'react'; + +import type { LiveData } from './index'; + +/** + * subscribe LiveData and return the value. + */ +export function useLiveData(liveData: LiveData): T { + return useSyncExternalStore( + liveData.reactSubscribe, + liveData.reactGetSnapshot + ); +} + +/** + * subscribe LiveData and return the value. If the value is nullish, will suspends until the value is not nullish. + */ +export function useEnsureLiveData(liveData: LiveData): NonNullable { + const data = useLiveData(liveData); + + if (data === null || data === undefined) { + return use( + new Promise((resolve, reject) => { + const subscription = liveData.subscribe({ + next(value) { + if (value === null || value === undefined) { + resolve(value); + subscription.unsubscribe(); + } + }, + error(err) { + reject(err); + }, + complete() { + reject(new Error('Unexpected completion')); + }, + }); + }) + ); + } + + return data; +} diff --git a/yarn.lock b/yarn.lock index 0f456c132df41..31b402e731eef 100644 --- a/yarn.lock +++ b/yarn.lock @@ -13049,7 +13049,9 @@ __metadata: "@blocksuite/lit": "npm:0.12.0-nightly-202401290223-b6302df" "@blocksuite/presets": "npm:0.12.0-nightly-202401290223-b6302df" "@blocksuite/store": "npm:0.12.0-nightly-202401290223-b6302df" + "@testing-library/react": "npm:^14.0.0" async-call-rpc: "npm:^6.3.1" + foxact: "npm:^0.2.20" jotai: "npm:^2.5.1" jotai-effect: "npm:^0.2.3" nanoid: "npm:^5.0.3"