diff --git a/examples/middleware.ts b/examples/middleware.ts new file mode 100644 index 00000000..936a6230 --- /dev/null +++ b/examples/middleware.ts @@ -0,0 +1,260 @@ +import { observable, registerMiddleware, MiddlewareEvent, MiddlewareHandler } from '../index'; + +// Import necessary types +import type { NodeInfo } from '../src/observableInterfaces'; + +// Create an observable store for our examples +const store = observable({ count: 0, user: { name: 'John' } }); +// Get the root node for middleware registration +const rootNode = (store as any).__node as NodeInfo; + +// Example 1: Register for listener-added events on the root node +const listenersAddedHandler = registerMiddleware(rootNode, 'listener-added', (event: MiddlewareEvent) => { + console.log('Listener added to node:', event.node); +}); + +// Example 2: Register for listener-removed events on the root node +const listenersRemovedHandler = registerMiddleware(rootNode, 'listener-removed', (event: MiddlewareEvent) => { + console.log('Listener removed from node:', event.node); +}); + +// Example 3: Register for listeners-cleared events on the root node +const listenersClearedHandler = registerMiddleware(rootNode, 'listeners-cleared', (event: MiddlewareEvent) => { + console.log('All listeners cleared from node:', event.node); +}); + +// Example 4: Register for a specific event type on a specific node - Listener Added +function createListenerAddedLogger(node: NodeInfo) { + return registerMiddleware( + node, + 'listener-added', + // eslint-disable-next-line @typescript-eslint/no-unused-vars + (() => { + console.log(`[${new Date().toISOString()}] Listener added to node`); + }) as MiddlewareHandler, + ); +} + +// Example 5: Register handlers for multiple specific event types +function createListenerTracker(node: NodeInfo) { + let stats = { + listenersAdded: 0, + listenersRemoved: 0, + nodesCleared: 0, + }; + + const addHandler = registerMiddleware( + node, + 'listener-added', + // eslint-disable-next-line @typescript-eslint/no-unused-vars + (() => { + stats.listenersAdded++; + console.log(`Listener added to node`); + }) as MiddlewareHandler, + ); + + const removeHandler = registerMiddleware( + node, + 'listener-removed', + // eslint-disable-next-line @typescript-eslint/no-unused-vars + (() => { + stats.listenersRemoved++; + console.log(`Listener removed from node`); + }) as MiddlewareHandler, + ); + + const clearHandler = registerMiddleware( + node, + 'listeners-cleared', + // eslint-disable-next-line @typescript-eslint/no-unused-vars + (() => { + stats.nodesCleared++; + console.log(`All listeners cleared from node`); + }) as MiddlewareHandler, + ); + + return { + getStats: () => ({ ...stats }), + reset: () => { + stats = { listenersAdded: 0, listenersRemoved: 0, nodesCleared: 0 }; + }, + unregister: () => { + addHandler(); + removeHandler(); + clearHandler(); + }, + }; +} + +// Example 6: Demonstrating the listeners-cleared event +function demonstrateListenersCleared() { + console.log('\n--- Demonstrating listeners-cleared event ---'); + + // Create a specific node for this example + const demoNode = (store.count as any).__node as NodeInfo; + + // Register a handler specifically for the listeners-cleared event + const clearedHandler = registerMiddleware( + demoNode, + 'listeners-cleared', + // eslint-disable-next-line @typescript-eslint/no-unused-vars + (_event: MiddlewareEvent) => { + console.log(`CLEARED EVENT: All listeners have been removed from node at ${new Date().toISOString()}`); + }, + ); + + console.log('Adding first listener...'); + const unsubscribe1 = store.count.onChange(({ value }) => console.log('First listener:', value)); + + console.log('Adding second listener...'); + const unsubscribe2 = store.count.onChange(({ value }) => console.log('Second listener:', value)); + + console.log('Removing first listener...'); + unsubscribe1(); + + console.log('Removing second listener (should trigger listeners-cleared)...'); + unsubscribe2(); + + // Clean up the middleware handler + clearedHandler(); + console.log('--- End of listeners-cleared demo ---\n'); +} + +// Example 7: Demonstrating microtask batching behavior +function demonstrateMicrotaskBatching() { + console.log('\n--- Demonstrating microtask batching ---'); + + // Create a specific node for this example + const demoNode = (store.user.name as any).__node as NodeInfo; + + // Register handlers for all event types + const eventLog: string[] = []; + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const addedHandler = registerMiddleware(demoNode, 'listener-added', (event: MiddlewareEvent) => { + eventLog.push(`Added listener at ${new Date().toISOString()}`); + }); + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const removedHandler = registerMiddleware(demoNode, 'listener-removed', (event: MiddlewareEvent) => { + eventLog.push(`Removed listener at ${new Date().toISOString()}`); + }); + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const clearedHandler = registerMiddleware(demoNode, 'listeners-cleared', (event: MiddlewareEvent) => { + eventLog.push(`Cleared listeners at ${new Date().toISOString()}`); + }); + + console.log('Adding and immediately removing a listener (should not trigger any events):'); + const unsubscribe = store.user.name.onChange(({ value }) => console.log('Temp listener:', value)); + unsubscribe(); // Immediately unsubscribe + + console.log('Adding a persistent listener:'); + const persistentUnsubscribe = store.user.name.onChange(({ value }) => console.log('Persistent listener:', value)); + + // Use a setTimeout to see what events were actually dispatched after the microtask + setTimeout(() => { + console.log('Events that occurred (should only show Added listener):'); + eventLog.forEach((log) => console.log(` - ${log}`)); + eventLog.length = 0; // Clear the log + + console.log('Now removing the persistent listener:'); + persistentUnsubscribe(); + + // Use another setTimeout to see what events were dispatched + setTimeout(() => { + console.log('Events that occurred after removing persistent listener:'); + eventLog.forEach((log) => console.log(` - ${log}`)); + + // Clean up + addedHandler(); + removedHandler(); + clearedHandler(); + console.log('--- End of microtask batching demo ---\n'); + }, 0); + }, 0); +} + +// Example 8: Registering middleware on different nodes in the tree +function exampleMultiNodeMiddleware() { + // Get nodes for different parts of the store + const countNode = (store.count as any).__node as NodeInfo; + const userNode = (store.user as any).__node as NodeInfo; + + // Register middleware specific to the count node + const countLogger = createListenerAddedLogger(countNode); + + // Register middleware specific to the user node + const userTracker = createListenerTracker(userNode); + + // Now events will only be dispatched to the appropriate node's middleware + + // Add a listener to count + const countUnsubscribe = store.count.onChange(({ value }) => console.log('Count is now:', value)); + + // Add a listener to user.name + const nameUnsubscribe = store.user.name.onChange(({ value }) => console.log('Name is now:', value)); + + // Only count middleware would have been triggered for the count listener + // Only user middleware would have been triggered for the name listener + + console.log('User node stats:', userTracker.getStats()); // Should show no listeners on user node + + // Clean up + countUnsubscribe(); + nameUnsubscribe(); + countLogger(); + userTracker.unregister(); +} + +// Example 9: Usage in an application +function exampleUsage() { + // Register node-specific middleware + const addedLogger = createListenerAddedLogger(rootNode); + const listenerTracker = createListenerTracker(rootNode); + + // Add a listener + const unsubscribe = store.count.onChange(({ value }) => { + console.log('Count changed:', value); + }); + + // Change values + store.count.set(1); + store.count.set(2); + store.user.name.set('Alice'); + + // Get stats from middleware after a microtask + setTimeout(() => { + console.log('Listener stats:', listenerTracker.getStats()); + + // Remove listener + unsubscribe(); + + // Change more values + store.count.set(3); + + // Get updated stats after another microtask + setTimeout(() => { + console.log('Updated listener stats:', listenerTracker.getStats()); + + // Demonstrate the listeners-cleared event + demonstrateListenersCleared(); + + // Demonstrate batching behavior + demonstrateMicrotaskBatching(); + + // Demonstrate multi-node middleware + exampleMultiNodeMiddleware(); + + // Clean up all middleware + addedLogger(); + listenerTracker.unregister(); + listenersAddedHandler(); + listenersRemovedHandler(); + listenersClearedHandler(); + }, 0); + }, 0); +} + +// Run the example +exampleUsage(); diff --git a/index.ts b/index.ts index fb640f3f..96b796a5 100644 --- a/index.ts +++ b/index.ts @@ -76,6 +76,7 @@ import { import { deepMerge, getValueAtPath, initializePathType, setAtPath } from './src/helpers'; import { tracking } from './src/tracking'; import { ObservablePrimitiveClass } from './src/ObservablePrimitive'; +import { registerMiddleware } from './src/middleware'; export const internal = { createPreviousHandler, @@ -97,6 +98,7 @@ export const internal = { observableFns, optimized, peek, + registerMiddleware, safeParse, safeStringify, set, diff --git a/src/middleware.ts b/src/middleware.ts new file mode 100644 index 00000000..636087dc --- /dev/null +++ b/src/middleware.ts @@ -0,0 +1,181 @@ +import type { NodeInfo, NodeListener } from './observableInterfaces'; + +// Types for middleware events and handlers +export type MiddlewareEventType = 'listener-added' | 'listener-removed' | 'listeners-cleared'; + +export interface MiddlewareEvent { + type: MiddlewareEventType; + node: NodeInfo; + listener?: NodeListener; // Optional because listeners-cleared event doesn't have a specific listener + timestamp: number; +} + +// Generic middleware handler that can handle specific event types +export type MiddlewareHandler = (event: MiddlewareEvent) => void; + +// Store middleware handlers in a WeakMap keyed by node +const nodeMiddlewareHandlers = new WeakMap>>(); + +// Queued events - use arrays instead of Sets for better performance in this case +const queuedNodes: NodeInfo[] = []; +const queuedListeners: (NodeListener | undefined)[] = []; +const queuedTypes: MiddlewareEventType[] = []; +let queueSize = 0; +let isMicrotaskScheduled = false; + +/** + * Register a middleware handler for a specific node and event type + * @param node The node to register the middleware handler for + * @param type The event type to handle + * @param handler The middleware handler function + * @returns A function to remove the handler + */ +export function registerMiddleware(node: NodeInfo, type: MiddlewareEventType, handler: MiddlewareHandler): () => void { + // Get or create handlers map for this node + let handlersMap = nodeMiddlewareHandlers.get(node); + if (!handlersMap) { + handlersMap = new Map(); + nodeMiddlewareHandlers.set(node, handlersMap); + } + + // Get or create handlers set for this event type + let handlers = handlersMap.get(type); + if (!handlers) { + handlers = new Set(); + handlersMap.set(type, handlers); + } + + // Add handler to the set + handlers.add(handler); + + // Return a function to remove the handler + return () => { + const handlersMap = nodeMiddlewareHandlers.get(node); + if (!handlersMap) return; + + const handlers = handlersMap.get(type); + if (!handlers) return; + + handlers.delete(handler); + + // Cleanup empty sets and maps + if (handlers.size === 0) { + handlersMap.delete(type); + if (handlersMap.size === 0) { + nodeMiddlewareHandlers.delete(node); + } + } + }; +} + +/** + * Queue a middleware event for a specific node to be processed in a microtask + * @param node The node to queue the event for + * @param listener The listener that was added or removed (optional for listeners-cleared) + * @param type The type of event + */ +export function dispatchMiddlewareEvent( + node: NodeInfo, + listener: NodeListener | undefined, + type: MiddlewareEventType, +): void { + // Fast path: Skip if there are no handlers for this node or event type + const handlersMap = nodeMiddlewareHandlers.get(node); + if (!handlersMap || !handlersMap.has(type)) { + return; + } + + // Check if handlers exist (avoid empty sets) + const handlers = handlersMap.get(type); + if (!handlers || handlers.size === 0) { + return; + } + + // Queue the event in parallel arrays for better performance + queuedNodes[queueSize] = node; + queuedListeners[queueSize] = listener; + queuedTypes[queueSize] = type; + queueSize++; + + // Schedule microtask if not already scheduled + if (!isMicrotaskScheduled) { + isMicrotaskScheduled = true; + queueMicrotask(processQueuedEvents); + } +} + +// Reusable event object to avoid allocation during processing +const eventObj: MiddlewareEvent = { + type: 'listener-added', + node: null as any, + listener: undefined, + timestamp: 0, +}; + +/** + * Process all queued middleware events in a microtask + * Using a single function for validation and processing improves performance + */ +function processQueuedEvents(): void { + isMicrotaskScheduled = false; + + // Use performance.now() if available for more precise timing + const timestamp = typeof performance !== 'undefined' ? performance.now() : Date.now(); + eventObj.timestamp = timestamp; + + // Process each queued event + for (let i = 0; i < queueSize; i++) { + const node = queuedNodes[i]; + const listener = queuedListeners[i]; + const type = queuedTypes[i]; + + // Fast check for handlers without re-fetching from WeakMap if possible + const handlersMap = nodeMiddlewareHandlers.get(node); + if (!handlersMap) continue; + + const handlers = handlersMap.get(type); + if (!handlers || handlers.size === 0) continue; + + // Get node's listener sets - avoid creating empty sets + const nodeListeners = node.listeners; + const nodeListenersImmediate = node.listenersImmediate; + + if (!nodeListeners && !nodeListenersImmediate) { + continue; + } + + // Validate event based on type (optimized validation logic) + let isValid = false; + + // Use cached string constants for faster comparison + if (type === 'listener-added') { + isValid = !!nodeListeners?.has(listener!) || !!nodeListenersImmediate?.has(listener!); + } else if (type === 'listener-removed') { + isValid = !nodeListeners?.has(listener!) && !nodeListenersImmediate?.has(listener!); + } else { + // type === 'listener-cleared' + isValid = !nodeListeners?.size && !nodeListenersImmediate?.size; + } + + // Only dispatch if the event is valid + if (isValid) { + // Update properties of the reused event object + eventObj.type = type; + eventObj.node = node; + eventObj.listener = listener; + + // Iterator optimization for Sets + const iterableHandlers = Array.from(handlers); + for (let j = 0; j < iterableHandlers.length; j++) { + try { + iterableHandlers[j](eventObj); + } catch (error) { + console.error(`Error in middleware handler for ${type}:`, error); + } + } + } + } + + // Clear the queue by resetting size rather than reallocating arrays + queueSize = 0; +} diff --git a/src/onChange.ts b/src/onChange.ts index 03dc3579..de1be9aa 100644 --- a/src/onChange.ts +++ b/src/onChange.ts @@ -1,5 +1,6 @@ import { getNodeValue } from './globals'; import { deconstructObjectWithPath } from './helpers'; +import { dispatchMiddlewareEvent } from './middleware'; import type { ListenerFn, ListenerParams, NodeInfo, NodeListener, TrackingType } from './observableInterfaces'; export function onChange( @@ -76,6 +77,7 @@ export function onChange( node.numListenersRecursive++; let parent = node.parent; let pathParent: string[] = [node!.key!]; + while (parent) { if (parent.linkedFromNodes) { for (const linkedFromNode of parent.linkedFromNodes) { @@ -91,16 +93,30 @@ export function onChange( parent = parent.parent; } + // Queue middleware event for listener added + dispatchMiddlewareEvent(node, listener, 'listener-added'); + return () => { - listeners!.delete(listener); + // Remove the listener from the set + listeners.delete(listener); + + // Clean up linked node listeners extraDisposes?.forEach((fn) => fn()); + // Update listener counts up the tree let parent = node; while (parent) { parent.numListenersRecursive--; - parent = parent.parent!; } + + // Queue middleware event for listener removed + dispatchMiddlewareEvent(node, listener, 'listener-removed'); + + // If there are no more listeners in this set, queue the listeners-cleared event + if (listeners.size === 0) { + dispatchMiddlewareEvent(node, undefined, 'listeners-cleared'); + } }; } diff --git a/src/sync/syncObservable.ts b/src/sync/syncObservable.ts index ccf3aadf..88bf3efd 100644 --- a/src/sync/syncObservable.ts +++ b/src/sync/syncObservable.ts @@ -56,8 +56,17 @@ import type { import { waitForSet } from './waitForSet'; import { createRevertChanges } from './revertChanges'; -const { clone, deepMerge, getNode, getNodeValue, getValueAtPath, globalState, symbolLinked, createPreviousHandler } = - internal; +const { + clone, + createPreviousHandler, + deepMerge, + getNode, + getNodeValue, + getValueAtPath, + globalState, + registerMiddleware, + symbolLinked, +} = internal; export const mapSyncPlugins: WeakMap< ClassConstructor | ObservablePersistPlugin, @@ -1050,268 +1059,269 @@ export function syncObservable( const lastSync = metadatas.get(obs$)?.lastSync; const pending = localState.pendingChanges; - if (get || subscribe) { - const { waitFor } = syncOptions; + const { waitFor } = syncOptions; - const runGet = () => { - const onChange = async ({ value, mode, lastSync }: UpdateFnParams) => { - mode = mode || syncOptions.mode || 'set'; - if (value !== undefined) { - value = transformLoadData(value, syncOptions, true, 'get'); - if (isPromise(value)) { - value = await (value as Promise); - } - - const pending = localState.pendingChanges; - const currentValue = obs$.peek(); - if (pending) { - let didChangeMetadata = false; - // Merge pending values onto remote changes - Object.keys(pending).forEach((key) => { - const p = key.split('/').filter((k) => k !== ''); - const { v, t } = pending[key]; + const runGet = () => { + const onChange = async ({ value, mode, lastSync }: UpdateFnParams) => { + mode = mode || syncOptions.mode || 'set'; + if (value !== undefined) { + value = transformLoadData(value, syncOptions, true, 'get'); + if (isPromise(value)) { + value = await (value as Promise); + } - if (t.length === 0 || !value) { + const pending = localState.pendingChanges; + const currentValue = obs$.peek(); + if (pending) { + let didChangeMetadata = false; + // Merge pending values onto remote changes + Object.keys(pending).forEach((key) => { + const p = key.split('/').filter((k) => k !== ''); + const { v, t } = pending[key]; + + if (t.length === 0 || !value) { + // Update pending previous value with result + const oldValue = clone(value); + pending[key].p = key ? oldValue[key] : oldValue; + + if (isObject(value) && isObject(v)) { + Object.assign(value, key ? { [key]: v } : v); + } else if (!key) { + value = v; + } + } else if ((value as any)[p[0]] !== undefined) { + const curValue = getValueAtPath(currentValue as object, p); + const newValue = getValueAtPath(value as object, p); + if (JSON.stringify(curValue) === JSON.stringify(newValue)) { + delete pending[key]; + didChangeMetadata = true; + } else { // Update pending previous value with result const oldValue = clone(value); - pending[key].p = key ? oldValue[key] : oldValue; - - if (isObject(value) && isObject(v)) { - Object.assign(value, key ? { [key]: v } : v); - } else if (!key) { - value = v; - } - } else if ((value as any)[p[0]] !== undefined) { - const curValue = getValueAtPath(currentValue as object, p); - const newValue = getValueAtPath(value as object, p); - if (JSON.stringify(curValue) === JSON.stringify(newValue)) { - delete pending[key]; - didChangeMetadata = true; - } else { - // Update pending previous value with result - const oldValue = clone(value); - pending[key].p = getValueAtPath(oldValue, p); - - didChangeMetadata = true; - (value as any) = setAtPath( - value as any, - p, - t, - v, - 'merge', - obs$.peek(), - (path: string[], value: any) => { - delete pending[key]; - pending[path.join('/')] = { - p: null, - v: value, - t: t.slice(0, path.length), - }; - }, - ); - } + pending[key].p = getValueAtPath(oldValue, p); + + didChangeMetadata = true; + (value as any) = setAtPath( + value as any, + p, + t, + v, + 'merge', + obs$.peek(), + (path: string[], value: any) => { + delete pending[key]; + pending[path.join('/')] = { + p: null, + v: value, + t: t.slice(0, path.length), + }; + }, + ); } - }); - - if (didChangeMetadata && syncOptions.persist) { - updateMetadataImmediate(obs$, localState, syncState$, syncOptions, { - pending, - }); } + }); + + if (didChangeMetadata && syncOptions.persist) { + updateMetadataImmediate(obs$, localState, syncState$, syncOptions, { + pending, + }); } + } - onChangeRemote(() => { - if (isPlainObject(value)) { - value = ObservableHint.plain(value); + onChangeRemote(() => { + if (isPlainObject(value)) { + value = ObservableHint.plain(value); + } + if (mode === 'assign') { + (obs$ as unknown as Observable).assign(value); + } else if (mode === 'append') { + if ( + (process.env.NODE_ENV === 'development' || process.env.NODE_ENV === 'test') && + !isArray(value) + ) { + console.error('[legend-state] mode:append expects the value to be an array'); } - if (mode === 'assign') { - (obs$ as unknown as Observable).assign(value); - } else if (mode === 'append') { - if ( - (process.env.NODE_ENV === 'development' || process.env.NODE_ENV === 'test') && - !isArray(value) - ) { - console.error('[legend-state] mode:append expects the value to be an array'); - } - (obs$ as unknown as Observable).push(...value); - } else if (mode === 'prepend') { - if ( - (process.env.NODE_ENV === 'development' || process.env.NODE_ENV === 'test') && - !isArray(value) - ) { - console.error('[legend-state] mode:prepend expects the value to be an array'); - } - (obs$ as unknown as Observable).splice(0, 0, ...value); - } else if (mode === 'merge') { - mergeIntoObservable(obs$, value); - } else { - obs$.set(value); + (obs$ as unknown as Observable).push(...value); + } else if (mode === 'prepend') { + if ( + (process.env.NODE_ENV === 'development' || process.env.NODE_ENV === 'test') && + !isArray(value) + ) { + console.error('[legend-state] mode:prepend expects the value to be an array'); } - }); - } - if (lastSync && syncOptions.persist) { - updateMetadata(obs$, localState, syncState$, syncOptions, { - lastSync, - }); - } - }; - if (node.activationState) { - node.activationState!.onChange = onChange; + (obs$ as unknown as Observable).splice(0, 0, ...value); + } else if (mode === 'merge') { + mergeIntoObservable(obs$, value); + } else { + obs$.set(value); + } + }); } - // Subscribe before getting to ensure we don't miss updates between now and the get returning - if (!isSubscribed && syncOptions.subscribe) { - const subscribe = syncOptions.subscribe; - isSubscribed = true; - const doSubscribe = () => { - const subscribeParams: SyncedSubscribeParams = { - node, - value$: obs$, - lastSync, - update: (params: UpdateFnParams) => { - when( - () => !get || syncState$.isLoaded.get(), - () => { - when(waitFor || true, () => { - params.mode ||= syncOptions.mode || 'merge'; - onChange(params); - - // If no get then we need to set the loaded state - if (!syncState$.isLoaded.peek()) { - syncState$.assign({ - isLoaded: syncStateValue.numPendingRemoteLoads! < 1, - error: undefined, - isGetting: syncStateValue.numPendingGets! > 0, - }); - } - }); - }, - ); - }, - refresh: () => when(syncState$.isLoaded, sync), - onError: (error: Error) => - onGetError(error, { - source: 'subscribe', - subscribeParams, - type: 'get', - retry: {} as OnErrorRetryParams, - }), - }; - unsubscribe = subscribe(subscribeParams); - }; - - if (waitFor) { - whenReady(waitFor, doSubscribe); - } else { - doSubscribe(); - } + if (lastSync && syncOptions.persist) { + updateMetadata(obs$, localState, syncState$, syncOptions, { + lastSync, + }); } - const existingValue = getNodeValue(node); - - if (get) { - const getParams: SyncedGetParams = { + }; + if (node.activationState) { + node.activationState!.onChange = onChange; + } + // Subscribe before getting to ensure we don't miss updates between now and the get returning + if (!isSubscribed && syncOptions.subscribe) { + const subscribe = syncOptions.subscribe; + const doSubscribe = () => { + isSubscribed = true; + const subscribeParams: SyncedSubscribeParams = { node, value$: obs$, - value: - isFunction(existingValue) || existingValue?.[symbolLinked] ? undefined : existingValue, - mode: syncOptions.mode!, - refresh: sync, - options: syncOptions, lastSync, - updateLastSync: (lastSync: number) => (getParams.lastSync = lastSync), - onError: onGetError, - retryNum: 0, - cancelRetry: false, + update: (params: UpdateFnParams) => { + when( + () => !get || syncState$.isLoaded.get(), + () => { + when(waitFor || true, () => { + params.mode ||= syncOptions.mode || 'merge'; + onChange(params); + + // If no get then we need to set the loaded state + if (!syncState$.isLoaded.peek()) { + syncState$.assign({ + isLoaded: syncStateValue.numPendingRemoteLoads! < 1, + error: undefined, + isGetting: syncStateValue.numPendingGets! > 0, + }); + } + }); + }, + ); + }, + refresh: () => when(syncState$.isLoaded, sync), + onError: (error: Error) => + onGetError(error, { + source: 'subscribe', + subscribeParams, + type: 'get', + retry: {} as OnErrorRetryParams, + }), }; + unsubscribe = subscribe(subscribeParams); + registerMiddleware(node, 'listeners-cleared', () => { + if (unsubscribe) { + isSubscribed = false; + unsubscribe(); + unsubscribe = undefined; + } + }); + registerMiddleware(node, 'listener-added', () => { + if (!isSubscribed) { + doSubscribe(); + } + }); + }; - let modeBeforeReset: GetMode | undefined = undefined; + if (waitFor) { + whenReady(waitFor, doSubscribe); + } else { + doSubscribe(); + } + } + const existingValue = getNodeValue(node); + + if (get) { + const getParams: SyncedGetParams = { + node, + value$: obs$, + value: isFunction(existingValue) || existingValue?.[symbolLinked] ? undefined : existingValue, + mode: syncOptions.mode!, + refresh: sync, + options: syncOptions, + lastSync, + updateLastSync: (lastSync: number) => (getParams.lastSync = lastSync), + onError: onGetError, + retryNum: 0, + cancelRetry: false, + }; - const beforeGetParams: Parameters>['onBeforeGet']>[0] = { - value: getParams.value, - lastSync, - pendingChanges: pending && !isEmpty(pending) ? pending : undefined, - clearPendingChanges: async () => { - localState.pendingChanges = {}; - await updateMetadataImmediate(obs$, localState, syncState$, syncOptions, { - pending: localState.pendingChanges, + let modeBeforeReset: GetMode | undefined = undefined; + + const beforeGetParams: Parameters>['onBeforeGet']>[0] = { + value: getParams.value, + lastSync, + pendingChanges: pending && !isEmpty(pending) ? pending : undefined, + clearPendingChanges: async () => { + localState.pendingChanges = {}; + await updateMetadataImmediate(obs$, localState, syncState$, syncOptions, { + pending: localState.pendingChanges, + }); + }, + resetCache: () => { + modeBeforeReset = getParams.mode; + getParams.mode = 'set'; + return syncStateValue.resetPersistence?.(); + }, + cancel: false, + }; + + syncOptions.onBeforeGet?.(beforeGetParams); + + if (!beforeGetParams.cancel) { + syncState$.assign({ + numPendingGets: (syncStateValue.numPendingGets! || 0) + 1, + isGetting: true, + }); + const got = runWithRetry(getParams, syncOptions.retry, node, (retryEvent) => { + const params = getParams as SyncedGetParams; + params.cancelRetry = retryEvent.cancelRetry; + params.retryNum = retryEvent.retryNum; + return get(params); + }); + const numGets = (node.numGets = (node.numGets || 0) + 1); + const handle = (value: any) => { + syncState$.numPendingGets.set((v) => v! - 1); + if (isWaitingForLoad) { + isWaitingForLoad = false; + syncStateValue.numPendingRemoteLoads!--; + } + // If this is from an older Promise than one that resolved already, + // ignore it as the newer one wins + if (numGets >= (node.getNumResolved || 0)) { + node.getNumResolved = node.numGets; + + onChange({ + value, + lastSync: getParams.lastSync, + mode: getParams.mode!, }); - }, - resetCache: () => { - modeBeforeReset = getParams.mode; - getParams.mode = 'set'; - return syncStateValue.resetPersistence?.(); - }, - cancel: false, - }; + } - syncOptions.onBeforeGet?.(beforeGetParams); + if (modeBeforeReset) { + getParams.mode = modeBeforeReset; + modeBeforeReset = undefined; + } - if (!beforeGetParams.cancel) { syncState$.assign({ - numPendingGets: (syncStateValue.numPendingGets! || 0) + 1, - isGetting: true, + isLoaded: syncStateValue.numPendingRemoteLoads! < 1, + error: undefined, + isGetting: syncStateValue.numPendingGets! > 0, }); - const got = runWithRetry(getParams, syncOptions.retry, node, (retryEvent) => { - const params = getParams as SyncedGetParams; - params.cancelRetry = retryEvent.cancelRetry; - params.retryNum = retryEvent.retryNum; - return get(params); + }; + if (isPromise(got)) { + got.then(handle).catch((error) => { + onGetError(error, { getParams, source: 'get', type: 'get', retry: getParams }, true); }); - const numGets = (node.numGets = (node.numGets || 0) + 1); - const handle = (value: any) => { - syncState$.numPendingGets.set((v) => v! - 1); - if (isWaitingForLoad) { - isWaitingForLoad = false; - syncStateValue.numPendingRemoteLoads!--; - } - // If this is from an older Promise than one that resolved already, - // ignore it as the newer one wins - if (numGets >= (node.getNumResolved || 0)) { - node.getNumResolved = node.numGets; - - onChange({ - value, - lastSync: getParams.lastSync, - mode: getParams.mode!, - }); - } - - if (modeBeforeReset) { - getParams.mode = modeBeforeReset; - modeBeforeReset = undefined; - } - - syncState$.assign({ - isLoaded: syncStateValue.numPendingRemoteLoads! < 1, - error: undefined, - isGetting: syncStateValue.numPendingGets! > 0, - }); - }; - if (isPromise(got)) { - got.then(handle).catch((error) => { - onGetError( - error, - { getParams, source: 'get', type: 'get', retry: getParams }, - true, - ); - }); - } else { - handle(got); - } + } else { + handle(got); } } - }; - - if (waitFor) { - whenReady(waitFor, () => trackSelector(runGet, sync)); - } else { - trackSelector(runGet, sync); } + }; + + if (waitFor) { + whenReady(waitFor, () => trackSelector(runGet, sync)); } else { - syncState$.assign({ - isLoaded: true, - error: undefined, - }); + trackSelector(runGet, sync); } + if (!isSynced) { isSynced = true; isApplyingPendingAfterSync = true; diff --git a/tests/middleware.test.ts b/tests/middleware.test.ts new file mode 100644 index 00000000..8d9a6c5e --- /dev/null +++ b/tests/middleware.test.ts @@ -0,0 +1,402 @@ +import { getNode, Observable, observable } from '@legendapp/state'; +import { registerMiddleware, dispatchMiddlewareEvent } from '../src/middleware'; +import type { MiddlewareEvent } from '../src/middleware'; +import type { NodeInfo } from '../src/observableInterfaces'; + +jest?.setTimeout?.(1000); + +describe('Middleware System', () => { + let store: Observable<{ count: number; user: { name: string } }>; + let rootNode: NodeInfo; + let countNode: NodeInfo; + let userNameNode: NodeInfo; + + // Setup before each test + beforeEach(() => { + // Create a fresh store for each test + store = observable({ count: 0, user: { name: 'John' } }); + rootNode = getNode(store); + countNode = getNode(store.count); + userNameNode = getNode(store.user.name); + + // Reset any microtasks + jest.useRealTimers(); + }); + + describe('Middleware Registration', () => { + test('should register a middleware handler for a specific event type', () => { + const handler = jest.fn(); + const unregister = registerMiddleware(countNode, 'listener-added', handler); + + // Make sure the returned function is callable + expect(typeof unregister).toBe('function'); + + // Add a listener to trigger the middleware + store.count.onChange(() => {}); + + // Wait for microtask to complete + return new Promise((resolve) => { + setTimeout(() => { + expect(handler).toHaveBeenCalledTimes(1); + const event = handler.mock.calls[0][0] as MiddlewareEvent; + expect(event.type).toBe('listener-added'); + expect(event.node).toBe(countNode); + expect(event.timestamp).toBeDefined(); + resolve(null); + }, 0); + }); + }); + + test('should unregister a middleware handler', () => { + const handler = jest.fn(); + const unregister = registerMiddleware(countNode, 'listener-added', handler); + + // Unregister the handler + unregister(); + + // Add a listener which would trigger the middleware if still registered + store.count.onChange(() => {}); + + // Wait for microtask to complete + return new Promise((resolve) => { + setTimeout(() => { + expect(handler).not.toHaveBeenCalled(); + resolve(null); + }, 0); + }); + }); + + test('should support multiple handlers for the same event type', () => { + const handler1 = jest.fn(); + const handler2 = jest.fn(); + + registerMiddleware(countNode, 'listener-added', handler1); + registerMiddleware(countNode, 'listener-added', handler2); + + // Add a listener to trigger both middleware handlers + store.count.onChange(() => {}); + + // Wait for microtask to complete + return new Promise((resolve) => { + setTimeout(() => { + expect(handler1).toHaveBeenCalledTimes(1); + expect(handler2).toHaveBeenCalledTimes(1); + resolve(null); + }, 0); + }); + }); + + test('should support handlers for different event types', () => { + const addedHandler = jest.fn(); + const removedHandler = jest.fn(); + + registerMiddleware(countNode, 'listener-added', addedHandler); + registerMiddleware(countNode, 'listener-removed', removedHandler); + + // Add and then remove a listener to trigger both middleware handlers + const unsubscribe = store.count.onChange(() => {}); + + // Wait for microtask to complete + return new Promise((resolve) => { + setTimeout(() => { + expect(addedHandler).toHaveBeenCalledTimes(1); + expect(removedHandler).toHaveBeenCalledTimes(0); + + setTimeout(() => { + unsubscribe(); + setTimeout(() => { + expect(addedHandler).toHaveBeenCalledTimes(1); + expect(removedHandler).toHaveBeenCalledTimes(1); + resolve(null); + }, 0); + }, 0); + }, 0); + }); + }); + }); + + describe('Event Validation and Batching', () => { + test('should batch events in a microtask', () => { + const countHandler = jest.fn(); + const nameHandler = jest.fn(); + + registerMiddleware(countNode, 'listener-added', countHandler); + registerMiddleware(userNameNode, 'listener-added', nameHandler); + + // Add multiple listeners + store.count.onChange(() => {}); + store.user.name.onChange(() => {}); + + // Handlers should not be called synchronously + expect(countHandler).not.toHaveBeenCalled(); + expect(nameHandler).not.toHaveBeenCalled(); + + // Wait for microtask to complete + return new Promise((resolve) => { + setTimeout(() => { + expect(countHandler).toHaveBeenCalledTimes(1); + expect(nameHandler).toHaveBeenCalledTimes(1); + resolve(null); + }, 0); + }); + }); + + test('should validate listener-added events before dispatching', () => { + const handler = jest.fn(); + registerMiddleware(countNode, 'listener-added', handler); + + // Create a listener and get the NodeListener object + const unsubscribe = store.count.onChange(() => {}); + + // Extract the listener from the node's listener set + const listener = Array.from(countNode.listeners || [])[0]; + + // Directly dispatch a listener-added event with a listener that exists + dispatchMiddlewareEvent(countNode, listener, 'listener-added'); + + // Wait for first microtask to complete + return new Promise((resolve) => { + setTimeout(() => { + // Should be called twice - once from onChange and once from direct dispatch + expect(handler).toHaveBeenCalledTimes(2); + + // Now remove the listener + unsubscribe(); + + // Reset the mock + handler.mockReset(); + + // Try to dispatch for a listener that no longer exists + dispatchMiddlewareEvent(countNode, listener, 'listener-added'); + + // Wait for second microtask + setTimeout(() => { + // Should not be called because listener is no longer valid + expect(handler).not.toHaveBeenCalled(); + resolve(null); + }, 0); + }, 0); + }); + }); + + test('should validate listener-removed events before dispatching', () => { + const handler = jest.fn(); + registerMiddleware(countNode, 'listener-removed', handler); + + // Create a listener and immediately capture its reference + const unsubscribe = store.count.onChange(() => {}); + const listener = Array.from(countNode.listeners || [])[0]; + + // Remove the listener + unsubscribe(); + + // Wait for first microtask to complete + return new Promise((resolve) => { + setTimeout(() => { + expect(handler).toHaveBeenCalledTimes(1); + + // Reset the mock + handler.mockReset(); + + // Try to dispatch another removed event for the same listener + dispatchMiddlewareEvent(countNode, listener, 'listener-removed'); + + // Wait for second microtask + setTimeout(() => { + // Should be called because the listener is indeed removed + expect(handler).toHaveBeenCalledTimes(1); + resolve(null); + }, 0); + }, 0); + }); + }); + + test('should validate listeners-cleared events before dispatching', () => { + const handler = jest.fn(); + registerMiddleware(countNode, 'listeners-cleared', handler); + + // Add and remove a single listener to trigger listeners-cleared + const unsubscribe = store.count.onChange(() => {}); + unsubscribe(); + + // Wait for first microtask to complete + return new Promise((resolve) => { + setTimeout(() => { + expect(handler).toHaveBeenCalledTimes(1); + + // Reset the mock + handler.mockReset(); + + // Add another listener so the node is not empty + store.count.onChange(() => {}); + + // Try to dispatch a cleared event when there are still listeners + dispatchMiddlewareEvent(countNode, undefined, 'listeners-cleared'); + + // Wait for second microtask + setTimeout(() => { + // Should not be called because node still has listeners + expect(handler).not.toHaveBeenCalled(); + resolve(null); + }, 0); + }, 0); + }); + }); + }); + + describe('Event Types', () => { + test('should detect when all listeners are cleared from a node', () => { + const clearedHandler = jest.fn(); + registerMiddleware(countNode, 'listeners-cleared', clearedHandler); + + // Add multiple listeners + const unsubscribe1 = store.count.onChange(() => {}); + const unsubscribe2 = store.count.onChange(() => {}); + + // Remove just one listener - should not trigger cleared event + unsubscribe1(); + + // Wait for first microtask to complete + return new Promise((resolve) => { + setTimeout(() => { + expect(clearedHandler).not.toHaveBeenCalled(); + + // Remove the last listener - should trigger cleared event + unsubscribe2(); + + // Wait for second microtask + setTimeout(() => { + expect(clearedHandler).toHaveBeenCalledTimes(1); + resolve(null); + }, 0); + }, 0); + }); + }); + + test('should handle immediate and non-immediate listeners correctly', () => { + const addedHandler = jest.fn(); + const removedHandler = jest.fn(); + const clearedHandler = jest.fn(); + + registerMiddleware(countNode, 'listener-added', addedHandler); + registerMiddleware(countNode, 'listener-removed', removedHandler); + registerMiddleware(countNode, 'listeners-cleared', clearedHandler); + + // Add an immediate and non-immediate listener + const unsubscribe1 = store.count.onChange(() => {}, { immediate: false }); + const unsubscribe2 = store.count.onChange(() => {}, { immediate: true }); + + // Wait for first microtask to complete + return new Promise((resolve) => { + setTimeout(() => { + expect(addedHandler).toHaveBeenCalledTimes(2); + + // Remove one of each type of listener + unsubscribe1(); + unsubscribe2(); + + // Wait for second microtask + setTimeout(() => { + expect(removedHandler).toHaveBeenCalledTimes(2); + // Both listener types were cleared + expect(clearedHandler).toHaveBeenCalledTimes(2); + resolve(null); + }, 0); + }, 0); + }); + }); + }); + + describe('Node-specific Middleware', () => { + test('should only trigger middleware for the registered node', () => { + const rootHandler = jest.fn(); + const countHandler = jest.fn(); + const userNameHandler = jest.fn(); + + registerMiddleware(rootNode, 'listener-added', rootHandler); + registerMiddleware(countNode, 'listener-added', countHandler); + registerMiddleware(userNameNode, 'listener-added', userNameHandler); + + // Add listeners to different nodes + store.count.onChange(() => {}); + store.user.name.onChange(() => {}); + + // Wait for microtask to complete + return new Promise((resolve) => { + setTimeout(() => { + // Root handler should NOT receive events from child nodes + expect(rootHandler).toHaveBeenCalledTimes(0); + + // Node-specific handlers should only see their own events + expect(countHandler).toHaveBeenCalledTimes(1); + expect(userNameHandler).toHaveBeenCalledTimes(1); + + resolve(null); + }, 0); + }); + }); + + test('should verify that events do not bubble up the tree', () => { + // Set up handlers at different levels + const parentHandler = jest.fn(); + const childHandler = jest.fn(); + + // Register middleware on parent and child nodes + registerMiddleware(rootNode, 'listener-added', parentHandler); + registerMiddleware(countNode, 'listener-added', childHandler); + + // Add a listener to the child node + store.count.onChange(() => {}); + + // Add a listener to the parent node + store.onChange(() => {}); + + // Wait for microtask to complete + return new Promise((resolve) => { + setTimeout(() => { + // Parent handler should only be called for events on itself + expect(parentHandler).toHaveBeenCalledTimes(1); + + // Child handler should only be called for events on itself + expect(childHandler).toHaveBeenCalledTimes(1); + + resolve(null); + }, 0); + }); + }); + }); + + describe('Error Handling', () => { + test('should continue processing events if a handler throws', () => { + const errorHandler = jest.fn().mockImplementation(() => { + throw new Error('Test error'); + }); + const normalHandler = jest.fn(); + + // Spy on console.error + const consoleErrorSpy = jest.spyOn(console, 'error').mockImplementation(() => {}); + + registerMiddleware(countNode, 'listener-added', errorHandler); + registerMiddleware(countNode, 'listener-added', normalHandler); + + // Add a listener to trigger the middleware + store.count.onChange(() => {}); + + // Wait for microtask to complete + return new Promise((resolve) => { + setTimeout(() => { + // Error should be caught and logged + expect(consoleErrorSpy).toHaveBeenCalled(); + + // Both handlers should be called + expect(errorHandler).toHaveBeenCalledTimes(1); + expect(normalHandler).toHaveBeenCalledTimes(1); + + // Clean up + consoleErrorSpy.mockRestore(); + resolve(null); + }, 0); + }); + }); + }); +}); diff --git a/tests/synced.test.ts b/tests/synced.test.ts new file mode 100644 index 00000000..1ea6e068 --- /dev/null +++ b/tests/synced.test.ts @@ -0,0 +1,58 @@ +import { observable, observe } from '@legendapp/state'; +import { synced } from '@legendapp/state/sync'; +import { promiseTimeout } from './testglobals'; + +describe('unsubscribe', () => { + test('Canceling observe unsubscribes', async () => { + let numObserves = 0; + let numSubscribes = 0; + let numUnsubscribes = 0; + + const obs$ = observable( + synced({ + get: () => 'foo', + subscribe: () => { + numSubscribes++; + + return () => { + numUnsubscribes++; + }; + }, + }), + ); + + const unsubscribe = observe(() => { + numObserves++; + obs$.get(); + }); + + expect(numObserves).toEqual(1); + expect(numSubscribes).toEqual(1); + expect(numUnsubscribes).toEqual(0); + + unsubscribe(); + + await promiseTimeout(0); + + expect(numObserves).toEqual(1); + expect(numSubscribes).toEqual(1); + expect(numUnsubscribes).toEqual(1); + + obs$.get(); + + expect(numObserves).toEqual(1); + expect(numSubscribes).toEqual(1); + expect(numUnsubscribes).toEqual(1); + + observe(() => { + numObserves++; + obs$.get(); + }); + + await promiseTimeout(0); + + expect(numObserves).toEqual(2); + expect(numSubscribes).toEqual(2); + expect(numUnsubscribes).toEqual(1); + }); +});