From 1b21c01abb8da91a5e1d8d3737b17a588f3e13be Mon Sep 17 00:00:00 2001 From: EYHN Date: Wed, 17 Jan 2024 15:50:11 +0800 Subject: [PATCH] feat(infra): new workspace infra --- .../tinykeys-npm-2.1.0-819feeaed0.patch | 54 +++ packages/common/infra/package.json | 3 +- packages/common/infra/src/index.ts | 9 +- .../src/utils/__tests__/async-queue.spec.ts | 45 ++ .../utils/__tests__/throw-if-aborted.spec.ts | 13 + .../common/infra/src/utils/async-queue.ts | 101 ++++ packages/common/infra/src/utils/index.ts | 5 + .../common/infra/src/utils/merge-updates.ts | 17 + .../common/infra/src/utils/object-pool.ts | 96 ++++ .../infra/src/utils/throw-if-aborted.ts | 9 + .../src/workspace/__tests__/workspace.spec.ts | 38 ++ .../common/infra/src/workspace/context.ts | 76 +++ .../infra/src/workspace/engine/awareness.ts | 25 + .../common/infra/src/workspace/engine/blob.ts | 212 +++++++++ .../infra/src/workspace/engine/index.ts | 71 +++ .../engine/sync/__tests__/engine.spec.ts | 152 ++++++ .../engine/sync/__tests__/peer.spec.ts | 100 ++++ .../engine/sync/__tests__/test-storage.ts | 42 ++ .../infra/src/workspace/engine/sync/consts.ts | 15 + .../infra/src/workspace/engine/sync/engine.ts | 285 +++++++++++ .../infra/src/workspace/engine/sync/index.ts | 20 + .../infra/src/workspace/engine/sync/peer.ts | 444 ++++++++++++++++++ .../src/workspace/engine/sync/storage.ts | 25 + .../common/infra/src/workspace/factory.ts | 15 + .../infra/src/workspace/global-schema.ts | 6 + packages/common/infra/src/workspace/index.ts | 81 ++++ .../common/infra/src/workspace/list/cache.ts | 25 + .../common/infra/src/workspace/list/index.ts | 305 ++++++++++++ .../infra/src/workspace/list/information.ts | 92 ++++ .../common/infra/src/workspace/manager.ts | 193 ++++++++ .../common/infra/src/workspace/metadata.ts | 3 + .../infra/src/workspace/service-scope.ts | 3 + .../common/infra/src/workspace/testing.ts | 244 ++++++++++ .../common/infra/src/workspace/upgrade.ts | 142 ++++++ .../common/infra/src/workspace/workspace.ts | 134 ++++++ packages/frontend/electron/package.json | 2 +- yarn.lock | 14 +- 37 files changed, 3110 insertions(+), 6 deletions(-) create mode 100644 .yarn/patches/tinykeys-npm-2.1.0-819feeaed0.patch create mode 100644 packages/common/infra/src/utils/__tests__/async-queue.spec.ts create mode 100644 packages/common/infra/src/utils/__tests__/throw-if-aborted.spec.ts create mode 100644 packages/common/infra/src/utils/async-queue.ts create mode 100644 packages/common/infra/src/utils/index.ts create mode 100644 packages/common/infra/src/utils/merge-updates.ts create mode 100644 packages/common/infra/src/utils/object-pool.ts create mode 100644 packages/common/infra/src/utils/throw-if-aborted.ts create mode 100644 packages/common/infra/src/workspace/__tests__/workspace.spec.ts create mode 100644 packages/common/infra/src/workspace/context.ts create mode 100644 packages/common/infra/src/workspace/engine/awareness.ts create mode 100644 packages/common/infra/src/workspace/engine/blob.ts create mode 100644 packages/common/infra/src/workspace/engine/index.ts create mode 100644 packages/common/infra/src/workspace/engine/sync/__tests__/engine.spec.ts create mode 100644 packages/common/infra/src/workspace/engine/sync/__tests__/peer.spec.ts create mode 100644 packages/common/infra/src/workspace/engine/sync/__tests__/test-storage.ts create mode 100644 packages/common/infra/src/workspace/engine/sync/consts.ts create mode 100644 packages/common/infra/src/workspace/engine/sync/engine.ts create mode 100644 packages/common/infra/src/workspace/engine/sync/index.ts create mode 100644 packages/common/infra/src/workspace/engine/sync/peer.ts create mode 100644 packages/common/infra/src/workspace/engine/sync/storage.ts create mode 100644 packages/common/infra/src/workspace/factory.ts create mode 100644 packages/common/infra/src/workspace/global-schema.ts create mode 100644 packages/common/infra/src/workspace/index.ts create mode 100644 packages/common/infra/src/workspace/list/cache.ts create mode 100644 packages/common/infra/src/workspace/list/index.ts create mode 100644 packages/common/infra/src/workspace/list/information.ts create mode 100644 packages/common/infra/src/workspace/manager.ts create mode 100644 packages/common/infra/src/workspace/metadata.ts create mode 100644 packages/common/infra/src/workspace/service-scope.ts create mode 100644 packages/common/infra/src/workspace/testing.ts create mode 100644 packages/common/infra/src/workspace/upgrade.ts create mode 100644 packages/common/infra/src/workspace/workspace.ts diff --git a/.yarn/patches/tinykeys-npm-2.1.0-819feeaed0.patch b/.yarn/patches/tinykeys-npm-2.1.0-819feeaed0.patch new file mode 100644 index 0000000000000..6c0c42ceb4e34 --- /dev/null +++ b/.yarn/patches/tinykeys-npm-2.1.0-819feeaed0.patch @@ -0,0 +1,54 @@ +diff --git a/dist/tinykeys.module.js b/dist/tinykeys.module.js +deleted file mode 100644 +index cc374b8fc33dbd90e5daf525e929fd77c89c9c5d..0000000000000000000000000000000000000000 +--- a/dist/tinykeys.module.js ++++ /dev/null +@@ -1,2 +0,0 @@ +-var t=["Shift","Meta","Alt","Control"],e="object"==typeof navigator?navigator.platform:"",n=/Mac|iPod|iPhone|iPad/.test(e),r=n?"Meta":"Control",o="Win32"===e?["Control","Alt"]:n?["Alt"]:[];function i(t,e){return"function"==typeof t.getModifierState&&(t.getModifierState(e)||o.includes(e)&&t.getModifierState("AltGraph"))}function a(t){return t.trim().split(" ").map(function(t){var e=t.split(/\b\+/),n=e.pop();return[e=e.map(function(t){return"$mod"===t?r:t}),n]})}function u(e,n){var r;void 0===n&&(n={});var o=null!=(r=n.timeout)?r:1e3,u=Object.keys(e).map(function(t){return[a(t),e[t]]}),c=new Map,f=null;return function(e){e instanceof KeyboardEvent&&(u.forEach(function(n){var r=n[0],o=n[1],a=c.get(r)||r;!function(e,n){return!(n[1].toUpperCase()!==e.key.toUpperCase()&&n[1]!==e.code||n[0].find(function(t){return!i(e,t)})||t.find(function(t){return!n[0].includes(t)&&n[1]!==t&&i(e,t)}))}(e,a[0])?i(e,e.key)||c.delete(r):a.length>1?c.set(r,a.slice(1)):(c.delete(r),o(e))}),f&&clearTimeout(f),f=setTimeout(c.clear.bind(c),o))}}function c(t,e,n){var r;void 0===n&&(n={});var o=null!=(r=n.event)?r:"keydown",i=u(e,n);return t.addEventListener(o,i),function(){t.removeEventListener(o,i)}}export{u as createKeybindingsHandler,a as parseKeybinding,c as tinykeys}; +-//# sourceMappingURL=tinykeys.module.js.map +diff --git a/dist/tinykeys.module.js.map b/dist/tinykeys.module.js.map +deleted file mode 100644 +index 0959712c165780d0326d8a524518d9577b0d7f2e..0000000000000000000000000000000000000000 +--- a/dist/tinykeys.module.js.map ++++ /dev/null +@@ -1 +0,0 @@ +-{"version":3,"file":"tinykeys.module.js","sources":["../src/tinykeys.ts"],"sourcesContent":["type KeyBindingPress = [string[], string]\n\n/**\n * A map of keybinding strings to event handlers.\n */\nexport interface KeyBindingMap {\n\t[keybinding: string]: (event: KeyboardEvent) => void\n}\n\nexport interface KeyBindingHandlerOptions {\n\t/**\n\t * Keybinding sequences will wait this long between key presses before\n\t * cancelling (default: 1000).\n\t *\n\t * **Note:** Setting this value too low (i.e. `300`) will be too fast for many\n\t * of your users.\n\t */\n\ttimeout?: number\n}\n\n/**\n * Options to configure the behavior of keybindings.\n */\nexport interface KeyBindingOptions extends KeyBindingHandlerOptions {\n\t/**\n\t * Key presses will listen to this event (default: \"keydown\").\n\t */\n\tevent?: \"keydown\" | \"keyup\"\n}\n\n/**\n * These are the modifier keys that change the meaning of keybindings.\n *\n * Note: Ignoring \"AltGraph\" because it is covered by the others.\n */\nlet KEYBINDING_MODIFIER_KEYS = [\"Shift\", \"Meta\", \"Alt\", \"Control\"]\n\n/**\n * Keybinding sequences should timeout if individual key presses are more than\n * 1s apart by default.\n */\nlet DEFAULT_TIMEOUT = 1000\n\n/**\n * Keybinding sequences should bind to this event by default.\n */\nlet DEFAULT_EVENT = \"keydown\"\n\n/**\n * Platform detection code.\n * @see https://github.com/jamiebuilds/tinykeys/issues/184\n */\nlet PLATFORM = typeof navigator === \"object\" ? navigator.platform : \"\"\nlet APPLE_DEVICE = /Mac|iPod|iPhone|iPad/.test(PLATFORM)\n\n/**\n * An alias for creating platform-specific keybinding aliases.\n */\nlet MOD = APPLE_DEVICE ? \"Meta\" : \"Control\"\n\n/**\n * Meaning of `AltGraph`, from MDN:\n * - Windows: Both Alt and Ctrl keys are pressed, or AltGr key is pressed\n * - Mac: ⌥ Option key pressed\n * - Linux: Level 3 Shift key (or Level 5 Shift key) pressed\n * - Android: Not supported\n * @see https://github.com/jamiebuilds/tinykeys/issues/185\n */\nlet ALT_GRAPH_ALIASES =\n\tPLATFORM === \"Win32\" ? [\"Control\", \"Alt\"] : APPLE_DEVICE ? [\"Alt\"] : []\n\n/**\n * There's a bug in Chrome that causes event.getModifierState not to exist on\n * KeyboardEvent's for F1/F2/etc keys.\n */\nfunction getModifierState(event: KeyboardEvent, mod: string) {\n\treturn typeof event.getModifierState === \"function\"\n\t\t? event.getModifierState(mod) ||\n\t\t\t\t(ALT_GRAPH_ALIASES.includes(mod) && event.getModifierState(\"AltGraph\"))\n\t\t: false\n}\n\n/**\n * Parses a \"Key Binding String\" into its parts\n *\n * grammar = ``\n * = ` ...`\n * = `` or `+`\n * = `++...`\n */\nexport function parseKeybinding(str: string): KeyBindingPress[] {\n\treturn str\n\t\t.trim()\n\t\t.split(\" \")\n\t\t.map(press => {\n\t\t\tlet mods = press.split(/\\b\\+/)\n\t\t\tlet key = mods.pop() as string\n\t\t\tmods = mods.map(mod => (mod === \"$mod\" ? MOD : mod))\n\t\t\treturn [mods, key]\n\t\t})\n}\n\n/**\n * This tells us if a series of events matches a key binding sequence either\n * partially or exactly.\n */\nfunction match(event: KeyboardEvent, press: KeyBindingPress): boolean {\n\t// prettier-ignore\n\treturn !(\n\t\t// Allow either the `event.key` or the `event.code`\n\t\t// MDN event.key: https://developer.mozilla.org/en-US/docs/Web/API/KeyboardEvent/key\n\t\t// MDN event.code: https://developer.mozilla.org/en-US/docs/Web/API/KeyboardEvent/code\n\t\t(\n\t\t\tpress[1].toUpperCase() !== event.key.toUpperCase() &&\n\t\t\tpress[1] !== event.code\n\t\t) ||\n\n\t\t// Ensure all the modifiers in the keybinding are pressed.\n\t\tpress[0].find(mod => {\n\t\t\treturn !getModifierState(event, mod)\n\t\t}) ||\n\n\t\t// KEYBINDING_MODIFIER_KEYS (Shift/Control/etc) change the meaning of a\n\t\t// keybinding. So if they are pressed but aren't part of the current\n\t\t// keybinding press, then we don't have a match.\n\t\tKEYBINDING_MODIFIER_KEYS.find(mod => {\n\t\t\treturn !press[0].includes(mod) && press[1] !== mod && getModifierState(event, mod)\n\t\t})\n\t)\n}\n\n/**\n * Creates an event listener for handling keybindings.\n *\n * @example\n * ```js\n * import { createKeybindingsHandler } from \"../src/keybindings\"\n *\n * let handler = createKeybindingsHandler({\n * \t\"Shift+d\": () => {\n * \t\talert(\"The 'Shift' and 'd' keys were pressed at the same time\")\n * \t},\n * \t\"y e e t\": () => {\n * \t\talert(\"The keys 'y', 'e', 'e', and 't' were pressed in order\")\n * \t},\n * \t\"$mod+d\": () => {\n * \t\talert(\"Either 'Control+d' or 'Meta+d' were pressed\")\n * \t},\n * })\n *\n * window.addEvenListener(\"keydown\", handler)\n * ```\n */\nexport function createKeybindingsHandler(\n\tkeyBindingMap: KeyBindingMap,\n\toptions: KeyBindingHandlerOptions = {},\n): EventListener {\n\tlet timeout = options.timeout ?? DEFAULT_TIMEOUT\n\n\tlet keyBindings = Object.keys(keyBindingMap).map(key => {\n\t\treturn [parseKeybinding(key), keyBindingMap[key]] as const\n\t})\n\n\tlet possibleMatches = new Map()\n\tlet timer: number | null = null\n\n\treturn event => {\n\t\t// Ensure and stop any event that isn't a full keyboard event.\n\t\t// Autocomplete option navigation and selection would fire a instanceof Event,\n\t\t// instead of the expected KeyboardEvent\n\t\tif (!(event instanceof KeyboardEvent)) {\n\t\t\treturn\n\t\t}\n\n\t\tkeyBindings.forEach(keyBinding => {\n\t\t\tlet sequence = keyBinding[0]\n\t\t\tlet callback = keyBinding[1]\n\n\t\t\tlet prev = possibleMatches.get(sequence)\n\t\t\tlet remainingExpectedPresses = prev ? prev : sequence\n\t\t\tlet currentExpectedPress = remainingExpectedPresses[0]\n\n\t\t\tlet matches = match(event, currentExpectedPress)\n\n\t\t\tif (!matches) {\n\t\t\t\t// Modifier keydown events shouldn't break sequences\n\t\t\t\t// Note: This works because:\n\t\t\t\t// - non-modifiers will always return false\n\t\t\t\t// - if the current keypress is a modifier then it will return true when we check its state\n\t\t\t\t// MDN: https://developer.mozilla.org/en-US/docs/Web/API/KeyboardEvent/getModifierState\n\t\t\t\tif (!getModifierState(event, event.key)) {\n\t\t\t\t\tpossibleMatches.delete(sequence)\n\t\t\t\t}\n\t\t\t} else if (remainingExpectedPresses.length > 1) {\n\t\t\t\tpossibleMatches.set(sequence, remainingExpectedPresses.slice(1))\n\t\t\t} else {\n\t\t\t\tpossibleMatches.delete(sequence)\n\t\t\t\tcallback(event)\n\t\t\t}\n\t\t})\n\n\t\tif (timer) {\n\t\t\tclearTimeout(timer)\n\t\t}\n\n\t\ttimer = setTimeout(possibleMatches.clear.bind(possibleMatches), timeout)\n\t}\n}\n\n/**\n * Subscribes to keybindings.\n *\n * Returns an unsubscribe method.\n *\n * @example\n * ```js\n * import { tinykeys } from \"../src/tinykeys\"\n *\n * tinykeys(window, {\n * \t\"Shift+d\": () => {\n * \t\talert(\"The 'Shift' and 'd' keys were pressed at the same time\")\n * \t},\n * \t\"y e e t\": () => {\n * \t\talert(\"The keys 'y', 'e', 'e', and 't' were pressed in order\")\n * \t},\n * \t\"$mod+d\": () => {\n * \t\talert(\"Either 'Control+d' or 'Meta+d' were pressed\")\n * \t},\n * })\n * ```\n */\nexport function tinykeys(\n\ttarget: Window | HTMLElement,\n\tkeyBindingMap: KeyBindingMap,\n\toptions: KeyBindingOptions = {},\n): () => void {\n\tlet event = options.event ?? DEFAULT_EVENT\n\tlet onKeyEvent = createKeybindingsHandler(keyBindingMap, options)\n\n\ttarget.addEventListener(event, onKeyEvent)\n\n\treturn () => {\n\t\ttarget.removeEventListener(event, onKeyEvent)\n\t}\n}\n"],"names":["KEYBINDING_MODIFIER_KEYS","PLATFORM","navigator","platform","APPLE_DEVICE","test","MOD","ALT_GRAPH_ALIASES","getModifierState","event","mod","includes","parseKeybinding","str","trim","split","map","press","mods","key","pop","createKeybindingsHandler","keyBindingMap","options","timeout","_options$timeout","keyBindings","Object","keys","possibleMatches","Map","timer","KeyboardEvent","forEach","keyBinding","sequence","callback","remainingExpectedPresses","get","toUpperCase","code","find","match","length","set","slice","clearTimeout","setTimeout","clear","bind","tinykeys","target","_options$event","onKeyEvent","addEventListener","removeEventListener"],"mappings":"AAmCA,IAAIA,EAA2B,CAAC,QAAS,OAAQ,MAAO,WAiBpDC,EAAgC,iBAAdC,UAAyBA,UAAUC,SAAW,GAChEC,EAAe,uBAAuBC,KAAKJ,GAK3CK,EAAMF,EAAe,OAAS,UAU9BG,EACU,UAAbN,EAAuB,CAAC,UAAW,OAASG,EAAe,CAAC,OAAS,GAMtE,SAASI,EAAiBC,EAAsBC,GAC/C,MAAyC,mBAA3BD,EAAMD,mBACjBC,EAAMD,iBAAiBE,IACtBH,EAAkBI,SAASD,IAAQD,EAAMD,iBAAiB,sBAY/CI,EAAgBC,GAC/B,OAAOA,EACLC,OACAC,MAAM,KACNC,IAAI,SAAAC,GACJ,IAAIC,EAAOD,EAAMF,MAAM,QACnBI,EAAMD,EAAKE,MAEf,MAAO,CADPF,EAAOA,EAAKF,IAAI,SAAAN,SAAgB,SAARA,EAAiBJ,EAAMI,IACjCS,cAuDDE,EACfC,EACAC,kBAAAA,IAAAA,EAAoC,IAEpC,IAAIC,SAAOC,EAAGF,EAAQC,SAAOC,EApHR,IAsHjBC,EAAcC,OAAOC,KAAKN,GAAeN,IAAI,SAAAG,GAChD,MAAO,CAACP,EAAgBO,GAAMG,EAAcH,MAGzCU,EAAkB,IAAIC,IACtBC,EAAuB,KAE3B,gBAAOtB,GAIAA,aAAiBuB,gBAIvBN,EAAYO,QAAQ,SAAAC,GACnB,IAAIC,EAAWD,EAAW,GACtBE,EAAWF,EAAW,GAGtBG,EADOR,EAAgBS,IAAIH,IACcA,GAzEhD,SAAe1B,EAAsBQ,GAEpC,QAKEA,EAAM,GAAGsB,gBAAkB9B,EAAMU,IAAIoB,eACrCtB,EAAM,KAAOR,EAAM+B,MAIpBvB,EAAM,GAAGwB,KAAK,SAAA/B,GACb,OAAQF,EAAiBC,EAAOC,MAMjCV,EAAyByC,KAAK,SAAA/B,GAC7B,OAAQO,EAAM,GAAGN,SAASD,IAAQO,EAAM,KAAOP,GAAOF,EAAiBC,EAAOC,MAwDhEgC,CAAMjC,EAFO4B,EAAyB,IAU9C7B,EAAiBC,EAAOA,EAAMU,MAClCU,SAAuBM,GAEdE,EAAyBM,OAAS,EAC5Cd,EAAgBe,IAAIT,EAAUE,EAAyBQ,MAAM,KAE7DhB,SAAuBM,GACvBC,EAAS3B,MAIPsB,GACHe,aAAaf,GAGdA,EAAQgB,WAAWlB,EAAgBmB,MAAMC,KAAKpB,GAAkBL,cA0BlD0B,EACfC,EACA7B,EACAC,kBAAAA,IAAAA,EAA6B,IAE7B,IAAId,SAAK2C,EAAG7B,EAAQd,OAAK2C,EA9LN,UA+LfC,EAAahC,EAAyBC,EAAeC,GAIzD,OAFA4B,EAAOG,iBAAiB7C,EAAO4C,cAG9BF,EAAOI,oBAAoB9C,EAAO4C"} +\ No newline at end of file +diff --git a/dist/tinykeys.module.mjs b/dist/tinykeys.module.mjs +new file mode 100644 +index 0000000000000000000000000000000000000000..cc374b8fc33dbd90e5daf525e929fd77c89c9c5d +--- /dev/null ++++ b/dist/tinykeys.module.mjs +@@ -0,0 +1,2 @@ ++var t=["Shift","Meta","Alt","Control"],e="object"==typeof navigator?navigator.platform:"",n=/Mac|iPod|iPhone|iPad/.test(e),r=n?"Meta":"Control",o="Win32"===e?["Control","Alt"]:n?["Alt"]:[];function i(t,e){return"function"==typeof t.getModifierState&&(t.getModifierState(e)||o.includes(e)&&t.getModifierState("AltGraph"))}function a(t){return t.trim().split(" ").map(function(t){var e=t.split(/\b\+/),n=e.pop();return[e=e.map(function(t){return"$mod"===t?r:t}),n]})}function u(e,n){var r;void 0===n&&(n={});var o=null!=(r=n.timeout)?r:1e3,u=Object.keys(e).map(function(t){return[a(t),e[t]]}),c=new Map,f=null;return function(e){e instanceof KeyboardEvent&&(u.forEach(function(n){var r=n[0],o=n[1],a=c.get(r)||r;!function(e,n){return!(n[1].toUpperCase()!==e.key.toUpperCase()&&n[1]!==e.code||n[0].find(function(t){return!i(e,t)})||t.find(function(t){return!n[0].includes(t)&&n[1]!==t&&i(e,t)}))}(e,a[0])?i(e,e.key)||c.delete(r):a.length>1?c.set(r,a.slice(1)):(c.delete(r),o(e))}),f&&clearTimeout(f),f=setTimeout(c.clear.bind(c),o))}}function c(t,e,n){var r;void 0===n&&(n={});var o=null!=(r=n.event)?r:"keydown",i=u(e,n);return t.addEventListener(o,i),function(){t.removeEventListener(o,i)}}export{u as createKeybindingsHandler,a as parseKeybinding,c as tinykeys}; ++//# sourceMappingURL=tinykeys.module.js.map +diff --git a/dist/tinykeys.module.mjs.map b/dist/tinykeys.module.mjs.map +new file mode 100644 +index 0000000000000000000000000000000000000000..0959712c165780d0326d8a524518d9577b0d7f2e +--- /dev/null ++++ b/dist/tinykeys.module.mjs.map +@@ -0,0 +1 @@ ++{"version":3,"file":"tinykeys.module.js","sources":["../src/tinykeys.ts"],"sourcesContent":["type KeyBindingPress = [string[], string]\n\n/**\n * A map of keybinding strings to event handlers.\n */\nexport interface KeyBindingMap {\n\t[keybinding: string]: (event: KeyboardEvent) => void\n}\n\nexport interface KeyBindingHandlerOptions {\n\t/**\n\t * Keybinding sequences will wait this long between key presses before\n\t * cancelling (default: 1000).\n\t *\n\t * **Note:** Setting this value too low (i.e. `300`) will be too fast for many\n\t * of your users.\n\t */\n\ttimeout?: number\n}\n\n/**\n * Options to configure the behavior of keybindings.\n */\nexport interface KeyBindingOptions extends KeyBindingHandlerOptions {\n\t/**\n\t * Key presses will listen to this event (default: \"keydown\").\n\t */\n\tevent?: \"keydown\" | \"keyup\"\n}\n\n/**\n * These are the modifier keys that change the meaning of keybindings.\n *\n * Note: Ignoring \"AltGraph\" because it is covered by the others.\n */\nlet KEYBINDING_MODIFIER_KEYS = [\"Shift\", \"Meta\", \"Alt\", \"Control\"]\n\n/**\n * Keybinding sequences should timeout if individual key presses are more than\n * 1s apart by default.\n */\nlet DEFAULT_TIMEOUT = 1000\n\n/**\n * Keybinding sequences should bind to this event by default.\n */\nlet DEFAULT_EVENT = \"keydown\"\n\n/**\n * Platform detection code.\n * @see https://github.com/jamiebuilds/tinykeys/issues/184\n */\nlet PLATFORM = typeof navigator === \"object\" ? navigator.platform : \"\"\nlet APPLE_DEVICE = /Mac|iPod|iPhone|iPad/.test(PLATFORM)\n\n/**\n * An alias for creating platform-specific keybinding aliases.\n */\nlet MOD = APPLE_DEVICE ? \"Meta\" : \"Control\"\n\n/**\n * Meaning of `AltGraph`, from MDN:\n * - Windows: Both Alt and Ctrl keys are pressed, or AltGr key is pressed\n * - Mac: ⌥ Option key pressed\n * - Linux: Level 3 Shift key (or Level 5 Shift key) pressed\n * - Android: Not supported\n * @see https://github.com/jamiebuilds/tinykeys/issues/185\n */\nlet ALT_GRAPH_ALIASES =\n\tPLATFORM === \"Win32\" ? [\"Control\", \"Alt\"] : APPLE_DEVICE ? [\"Alt\"] : []\n\n/**\n * There's a bug in Chrome that causes event.getModifierState not to exist on\n * KeyboardEvent's for F1/F2/etc keys.\n */\nfunction getModifierState(event: KeyboardEvent, mod: string) {\n\treturn typeof event.getModifierState === \"function\"\n\t\t? event.getModifierState(mod) ||\n\t\t\t\t(ALT_GRAPH_ALIASES.includes(mod) && event.getModifierState(\"AltGraph\"))\n\t\t: false\n}\n\n/**\n * Parses a \"Key Binding String\" into its parts\n *\n * grammar = ``\n * = ` ...`\n * = `` or `+`\n * = `++...`\n */\nexport function parseKeybinding(str: string): KeyBindingPress[] {\n\treturn str\n\t\t.trim()\n\t\t.split(\" \")\n\t\t.map(press => {\n\t\t\tlet mods = press.split(/\\b\\+/)\n\t\t\tlet key = mods.pop() as string\n\t\t\tmods = mods.map(mod => (mod === \"$mod\" ? MOD : mod))\n\t\t\treturn [mods, key]\n\t\t})\n}\n\n/**\n * This tells us if a series of events matches a key binding sequence either\n * partially or exactly.\n */\nfunction match(event: KeyboardEvent, press: KeyBindingPress): boolean {\n\t// prettier-ignore\n\treturn !(\n\t\t// Allow either the `event.key` or the `event.code`\n\t\t// MDN event.key: https://developer.mozilla.org/en-US/docs/Web/API/KeyboardEvent/key\n\t\t// MDN event.code: https://developer.mozilla.org/en-US/docs/Web/API/KeyboardEvent/code\n\t\t(\n\t\t\tpress[1].toUpperCase() !== event.key.toUpperCase() &&\n\t\t\tpress[1] !== event.code\n\t\t) ||\n\n\t\t// Ensure all the modifiers in the keybinding are pressed.\n\t\tpress[0].find(mod => {\n\t\t\treturn !getModifierState(event, mod)\n\t\t}) ||\n\n\t\t// KEYBINDING_MODIFIER_KEYS (Shift/Control/etc) change the meaning of a\n\t\t// keybinding. So if they are pressed but aren't part of the current\n\t\t// keybinding press, then we don't have a match.\n\t\tKEYBINDING_MODIFIER_KEYS.find(mod => {\n\t\t\treturn !press[0].includes(mod) && press[1] !== mod && getModifierState(event, mod)\n\t\t})\n\t)\n}\n\n/**\n * Creates an event listener for handling keybindings.\n *\n * @example\n * ```js\n * import { createKeybindingsHandler } from \"../src/keybindings\"\n *\n * let handler = createKeybindingsHandler({\n * \t\"Shift+d\": () => {\n * \t\talert(\"The 'Shift' and 'd' keys were pressed at the same time\")\n * \t},\n * \t\"y e e t\": () => {\n * \t\talert(\"The keys 'y', 'e', 'e', and 't' were pressed in order\")\n * \t},\n * \t\"$mod+d\": () => {\n * \t\talert(\"Either 'Control+d' or 'Meta+d' were pressed\")\n * \t},\n * })\n *\n * window.addEvenListener(\"keydown\", handler)\n * ```\n */\nexport function createKeybindingsHandler(\n\tkeyBindingMap: KeyBindingMap,\n\toptions: KeyBindingHandlerOptions = {},\n): EventListener {\n\tlet timeout = options.timeout ?? DEFAULT_TIMEOUT\n\n\tlet keyBindings = Object.keys(keyBindingMap).map(key => {\n\t\treturn [parseKeybinding(key), keyBindingMap[key]] as const\n\t})\n\n\tlet possibleMatches = new Map()\n\tlet timer: number | null = null\n\n\treturn event => {\n\t\t// Ensure and stop any event that isn't a full keyboard event.\n\t\t// Autocomplete option navigation and selection would fire a instanceof Event,\n\t\t// instead of the expected KeyboardEvent\n\t\tif (!(event instanceof KeyboardEvent)) {\n\t\t\treturn\n\t\t}\n\n\t\tkeyBindings.forEach(keyBinding => {\n\t\t\tlet sequence = keyBinding[0]\n\t\t\tlet callback = keyBinding[1]\n\n\t\t\tlet prev = possibleMatches.get(sequence)\n\t\t\tlet remainingExpectedPresses = prev ? prev : sequence\n\t\t\tlet currentExpectedPress = remainingExpectedPresses[0]\n\n\t\t\tlet matches = match(event, currentExpectedPress)\n\n\t\t\tif (!matches) {\n\t\t\t\t// Modifier keydown events shouldn't break sequences\n\t\t\t\t// Note: This works because:\n\t\t\t\t// - non-modifiers will always return false\n\t\t\t\t// - if the current keypress is a modifier then it will return true when we check its state\n\t\t\t\t// MDN: https://developer.mozilla.org/en-US/docs/Web/API/KeyboardEvent/getModifierState\n\t\t\t\tif (!getModifierState(event, event.key)) {\n\t\t\t\t\tpossibleMatches.delete(sequence)\n\t\t\t\t}\n\t\t\t} else if (remainingExpectedPresses.length > 1) {\n\t\t\t\tpossibleMatches.set(sequence, remainingExpectedPresses.slice(1))\n\t\t\t} else {\n\t\t\t\tpossibleMatches.delete(sequence)\n\t\t\t\tcallback(event)\n\t\t\t}\n\t\t})\n\n\t\tif (timer) {\n\t\t\tclearTimeout(timer)\n\t\t}\n\n\t\ttimer = setTimeout(possibleMatches.clear.bind(possibleMatches), timeout)\n\t}\n}\n\n/**\n * Subscribes to keybindings.\n *\n * Returns an unsubscribe method.\n *\n * @example\n * ```js\n * import { tinykeys } from \"../src/tinykeys\"\n *\n * tinykeys(window, {\n * \t\"Shift+d\": () => {\n * \t\talert(\"The 'Shift' and 'd' keys were pressed at the same time\")\n * \t},\n * \t\"y e e t\": () => {\n * \t\talert(\"The keys 'y', 'e', 'e', and 't' were pressed in order\")\n * \t},\n * \t\"$mod+d\": () => {\n * \t\talert(\"Either 'Control+d' or 'Meta+d' were pressed\")\n * \t},\n * })\n * ```\n */\nexport function tinykeys(\n\ttarget: Window | HTMLElement,\n\tkeyBindingMap: KeyBindingMap,\n\toptions: KeyBindingOptions = {},\n): () => void {\n\tlet event = options.event ?? DEFAULT_EVENT\n\tlet onKeyEvent = createKeybindingsHandler(keyBindingMap, options)\n\n\ttarget.addEventListener(event, onKeyEvent)\n\n\treturn () => {\n\t\ttarget.removeEventListener(event, onKeyEvent)\n\t}\n}\n"],"names":["KEYBINDING_MODIFIER_KEYS","PLATFORM","navigator","platform","APPLE_DEVICE","test","MOD","ALT_GRAPH_ALIASES","getModifierState","event","mod","includes","parseKeybinding","str","trim","split","map","press","mods","key","pop","createKeybindingsHandler","keyBindingMap","options","timeout","_options$timeout","keyBindings","Object","keys","possibleMatches","Map","timer","KeyboardEvent","forEach","keyBinding","sequence","callback","remainingExpectedPresses","get","toUpperCase","code","find","match","length","set","slice","clearTimeout","setTimeout","clear","bind","tinykeys","target","_options$event","onKeyEvent","addEventListener","removeEventListener"],"mappings":"AAmCA,IAAIA,EAA2B,CAAC,QAAS,OAAQ,MAAO,WAiBpDC,EAAgC,iBAAdC,UAAyBA,UAAUC,SAAW,GAChEC,EAAe,uBAAuBC,KAAKJ,GAK3CK,EAAMF,EAAe,OAAS,UAU9BG,EACU,UAAbN,EAAuB,CAAC,UAAW,OAASG,EAAe,CAAC,OAAS,GAMtE,SAASI,EAAiBC,EAAsBC,GAC/C,MAAyC,mBAA3BD,EAAMD,mBACjBC,EAAMD,iBAAiBE,IACtBH,EAAkBI,SAASD,IAAQD,EAAMD,iBAAiB,sBAY/CI,EAAgBC,GAC/B,OAAOA,EACLC,OACAC,MAAM,KACNC,IAAI,SAAAC,GACJ,IAAIC,EAAOD,EAAMF,MAAM,QACnBI,EAAMD,EAAKE,MAEf,MAAO,CADPF,EAAOA,EAAKF,IAAI,SAAAN,SAAgB,SAARA,EAAiBJ,EAAMI,IACjCS,cAuDDE,EACfC,EACAC,kBAAAA,IAAAA,EAAoC,IAEpC,IAAIC,SAAOC,EAAGF,EAAQC,SAAOC,EApHR,IAsHjBC,EAAcC,OAAOC,KAAKN,GAAeN,IAAI,SAAAG,GAChD,MAAO,CAACP,EAAgBO,GAAMG,EAAcH,MAGzCU,EAAkB,IAAIC,IACtBC,EAAuB,KAE3B,gBAAOtB,GAIAA,aAAiBuB,gBAIvBN,EAAYO,QAAQ,SAAAC,GACnB,IAAIC,EAAWD,EAAW,GACtBE,EAAWF,EAAW,GAGtBG,EADOR,EAAgBS,IAAIH,IACcA,GAzEhD,SAAe1B,EAAsBQ,GAEpC,QAKEA,EAAM,GAAGsB,gBAAkB9B,EAAMU,IAAIoB,eACrCtB,EAAM,KAAOR,EAAM+B,MAIpBvB,EAAM,GAAGwB,KAAK,SAAA/B,GACb,OAAQF,EAAiBC,EAAOC,MAMjCV,EAAyByC,KAAK,SAAA/B,GAC7B,OAAQO,EAAM,GAAGN,SAASD,IAAQO,EAAM,KAAOP,GAAOF,EAAiBC,EAAOC,MAwDhEgC,CAAMjC,EAFO4B,EAAyB,IAU9C7B,EAAiBC,EAAOA,EAAMU,MAClCU,SAAuBM,GAEdE,EAAyBM,OAAS,EAC5Cd,EAAgBe,IAAIT,EAAUE,EAAyBQ,MAAM,KAE7DhB,SAAuBM,GACvBC,EAAS3B,MAIPsB,GACHe,aAAaf,GAGdA,EAAQgB,WAAWlB,EAAgBmB,MAAMC,KAAKpB,GAAkBL,cA0BlD0B,EACfC,EACA7B,EACAC,kBAAAA,IAAAA,EAA6B,IAE7B,IAAId,SAAK2C,EAAG7B,EAAQd,OAAK2C,EA9LN,UA+LfC,EAAahC,EAAyBC,EAAeC,GAIzD,OAFA4B,EAAOG,iBAAiB7C,EAAO4C,cAG9BF,EAAOI,oBAAoB9C,EAAO4C"} +\ No newline at end of file +diff --git a/package.json b/package.json +index d6a93bb7cce03b4836cf96deba308de0417079a3..ce382afc534607d67b04d877cdf6990769c59d1b 100644 +--- a/package.json ++++ b/package.json +@@ -7,7 +7,7 @@ + "repository": "jamiebuilds/tinykeys", + "source": "src/tinykeys.ts", + "main": "dist/tinykeys.js", +- "module": "dist/tinykeys.module.js", ++ "module": "dist/tinykeys.module.mjs", + "unpkg": "dist/tinykeys.umd.js", + "types": "dist/tinykeys.d.ts", + "files": [ +@@ -15,7 +15,7 @@ + ], + "exports": { + ".": { +- "import": "./dist/tinykeys.module.js", ++ "import": "./dist/tinykeys.module.mjs", + "require": "./dist/tinykeys.js" + } + }, diff --git a/packages/common/infra/package.json b/packages/common/infra/package.json index befd21c0c1c59..127d52cf9aac2 100644 --- a/packages/common/infra/package.json +++ b/packages/common/infra/package.json @@ -23,9 +23,10 @@ "foxact": "^0.2.20", "jotai": "^2.5.1", "jotai-effect": "^0.2.3", + "lodash-es": "^4.17.21", "nanoid": "^5.0.3", "react": "18.2.0", - "tinykeys": "^2.1.0", + "tinykeys": "patch:tinykeys@npm%3A2.1.0#~/.yarn/patches/tinykeys-npm-2.1.0-819feeaed0.patch", "yjs": "^13.6.10", "zod": "^3.22.4" }, diff --git a/packages/common/infra/src/index.ts b/packages/common/infra/src/index.ts index d228f9670c8b4..d4d5815909371 100644 --- a/packages/common/infra/src/index.ts +++ b/packages/common/infra/src/index.ts @@ -5,17 +5,24 @@ export * from './command'; export * from './di'; export * from './livedata'; export * from './storage'; +export * from './utils'; +export * from './workspace'; import type { ServiceCollection } from './di'; import { CleanupService } from './lifecycle'; import { GlobalCache, GlobalState, MemoryMemento } from './storage'; +import { + configureTestingWorkspaceServices, + configureWorkspaceServices, +} from './workspace'; export function configureInfraServices(services: ServiceCollection) { services.add(CleanupService); + configureWorkspaceServices(services); } export function configureTestingInfraServices(services: ServiceCollection) { - configureInfraServices(services); + configureTestingWorkspaceServices(services); services.addImpl(GlobalCache, MemoryMemento); services.addImpl(GlobalState, MemoryMemento); } diff --git a/packages/common/infra/src/utils/__tests__/async-queue.spec.ts b/packages/common/infra/src/utils/__tests__/async-queue.spec.ts new file mode 100644 index 0000000000000..017401ec84eb9 --- /dev/null +++ b/packages/common/infra/src/utils/__tests__/async-queue.spec.ts @@ -0,0 +1,45 @@ +import { describe, expect, test, vi } from 'vitest'; + +import { AsyncQueue } from '../async-queue'; + +describe('async-queue', () => { + test('push & pop', async () => { + const queue = new AsyncQueue(); + queue.push(1, 2, 3); + expect(queue.length).toBe(3); + expect(await queue.next()).toBe(1); + expect(await queue.next()).toBe(2); + expect(await queue.next()).toBe(3); + expect(queue.length).toBe(0); + }); + + test('await', async () => { + const queue = new AsyncQueue(); + queue.push(1, 2); + expect(await queue.next()).toBe(1); + expect(await queue.next()).toBe(2); + + let v = -1; + + // setup 2 pop tasks + queue.next().then(next => { + v = next; + }); + queue.next().then(next => { + v = next; + }); + + // Wait for 100ms + await new Promise(resolve => setTimeout(resolve, 100)); + // v should not be changed + expect(v).toBe(-1); + + // push 3, should trigger the first pop task + queue.push(3); + await vi.waitFor(() => v === 3); + + // push 4, should trigger the second pop task + queue.push(4); + await vi.waitFor(() => v === 4); + }); +}); diff --git a/packages/common/infra/src/utils/__tests__/throw-if-aborted.spec.ts b/packages/common/infra/src/utils/__tests__/throw-if-aborted.spec.ts new file mode 100644 index 0000000000000..137f748a6b849 --- /dev/null +++ b/packages/common/infra/src/utils/__tests__/throw-if-aborted.spec.ts @@ -0,0 +1,13 @@ +import { describe, expect, test } from 'vitest'; + +import { throwIfAborted } from '../throw-if-aborted'; + +describe('throw-if-aborted', () => { + test('basic', async () => { + const abortController = new AbortController(); + const abortSignal = abortController.signal; + expect(throwIfAborted(abortSignal)).toBe(true); + abortController.abort('TEST_ABORT'); + expect(() => throwIfAborted(abortSignal)).toThrowError('TEST_ABORT'); + }); +}); diff --git a/packages/common/infra/src/utils/async-queue.ts b/packages/common/infra/src/utils/async-queue.ts new file mode 100644 index 0000000000000..e7f994a39bbdb --- /dev/null +++ b/packages/common/infra/src/utils/async-queue.ts @@ -0,0 +1,101 @@ +export class AsyncQueue { + private _queue: T[]; + + private _resolveUpdate: (() => void) | null = null; + private _waitForUpdate: Promise | null = null; + + constructor(init: T[] = []) { + this._queue = init; + } + + get length() { + return this._queue.length; + } + + async next( + abort?: AbortSignal, + dequeue: (arr: T[]) => T | undefined = a => a.shift() + ): Promise { + const update = dequeue(this._queue); + if (update) { + return update; + } else { + if (!this._waitForUpdate) { + this._waitForUpdate = new Promise(resolve => { + this._resolveUpdate = resolve; + }); + } + + await Promise.race([ + this._waitForUpdate, + new Promise((_, reject) => { + if (abort?.aborted) { + reject(abort?.reason); + } + abort?.addEventListener('abort', () => { + reject(abort.reason); + }); + }), + ]); + + return this.next(abort, dequeue); + } + } + + push(...updates: T[]) { + this._queue.push(...updates); + if (this._resolveUpdate) { + const resolve = this._resolveUpdate; + this._resolveUpdate = null; + this._waitForUpdate = null; + resolve(); + } + } + + remove(predicate: (update: T) => boolean) { + const index = this._queue.findIndex(predicate); + if (index !== -1) { + this._queue.splice(index, 1); + } + } + + find(predicate: (update: T) => boolean) { + return this._queue.find(predicate); + } + + clear() { + this._queue = []; + } +} + +export class PriorityAsyncQueue< + T extends { id: string }, +> extends AsyncQueue { + constructor( + init: T[] = [], + public readonly priorityTarget: SharedPriorityTarget = new SharedPriorityTarget() + ) { + super(init); + } + + override next(abort?: AbortSignal | undefined): Promise { + return super.next(abort, arr => { + if (this.priorityTarget.priorityRule !== null) { + const index = arr.findIndex( + update => this.priorityTarget.priorityRule?.(update.id) + ); + if (index !== -1) { + return arr.splice(index, 1)[0]; + } + } + return arr.shift(); + }); + } +} + +/** + * Shared priority target can be shared by multiple queues. + */ +export class SharedPriorityTarget { + public priorityRule: ((id: string) => boolean) | null = null; +} diff --git a/packages/common/infra/src/utils/index.ts b/packages/common/infra/src/utils/index.ts new file mode 100644 index 0000000000000..08fe1ee8ca425 --- /dev/null +++ b/packages/common/infra/src/utils/index.ts @@ -0,0 +1,5 @@ +export * from './async-queue'; +export * from './merge-updates'; +export * from './object-pool'; +export * from './stable-hash'; +export * from './throw-if-aborted'; diff --git a/packages/common/infra/src/utils/merge-updates.ts b/packages/common/infra/src/utils/merge-updates.ts new file mode 100644 index 0000000000000..e3c8a4a06ac6a --- /dev/null +++ b/packages/common/infra/src/utils/merge-updates.ts @@ -0,0 +1,17 @@ +import { applyUpdate, Doc, encodeStateAsUpdate } from 'yjs'; + +export function mergeUpdates(updates: Uint8Array[]) { + if (updates.length === 0) { + return new Uint8Array(); + } + if (updates.length === 1) { + return updates[0]; + } + const doc = new Doc(); + doc.transact(() => { + updates.forEach(update => { + applyUpdate(doc, update); + }); + }); + return encodeStateAsUpdate(doc); +} diff --git a/packages/common/infra/src/utils/object-pool.ts b/packages/common/infra/src/utils/object-pool.ts new file mode 100644 index 0000000000000..a8569ee3e7342 --- /dev/null +++ b/packages/common/infra/src/utils/object-pool.ts @@ -0,0 +1,96 @@ +import { Unreachable } from '@affine/env/constant'; + +export interface RcRef { + obj: T; + release: () => void; +} + +export class ObjectPool { + objects = new Map(); + timeoutToGc: NodeJS.Timeout | null = null; + + constructor( + private readonly options: { + onDelete?: (obj: T) => void; + onDangling?: (obj: T) => boolean; + } = {} + ) {} + + get(key: Key): RcRef | null { + const exist = this.objects.get(key); + if (exist) { + exist.rc++; + let released = false; + return { + obj: exist.obj, + release: () => { + // avoid double release + if (released) { + return; + } + released = true; + exist.rc--; + this.requestGc(); + }, + }; + } + return null; + } + + put(key: Key, obj: T) { + const ref = { obj, rc: 0 }; + this.objects.set(key, ref); + + const r = this.get(key); + if (!r) { + throw new Unreachable(); + } + + return r; + } + + private requestGc() { + if (this.timeoutToGc) { + clearInterval(this.timeoutToGc); + } + + // do gc every 1s + this.timeoutToGc = setInterval(() => { + this.gc(); + }, 1000); + } + + private gc() { + for (const [key, { obj, rc }] of new Map( + this.objects /* clone the map, because the origin will be modified during iteration */ + )) { + if ( + rc === 0 && + (!this.options.onDangling || this.options.onDangling(obj)) + ) { + this.options.onDelete?.(obj); + + this.objects.delete(key); + } + } + + for (const [_, { rc }] of this.objects) { + if (rc === 0) { + return; + } + } + + // if all object has referrer, stop gc + if (this.timeoutToGc) { + clearInterval(this.timeoutToGc); + } + } + + clear() { + for (const { obj } of this.objects.values()) { + this.options.onDelete?.(obj); + } + + this.objects.clear(); + } +} diff --git a/packages/common/infra/src/utils/throw-if-aborted.ts b/packages/common/infra/src/utils/throw-if-aborted.ts new file mode 100644 index 0000000000000..54e2c81ac9c0f --- /dev/null +++ b/packages/common/infra/src/utils/throw-if-aborted.ts @@ -0,0 +1,9 @@ +// because AbortSignal.throwIfAborted is not available in abortcontroller-polyfill +export function throwIfAborted(abort?: AbortSignal) { + if (abort?.aborted) { + throw new Error(abort.reason); + } + return true; +} + +export const MANUALLY_STOP = 'manually-stop'; diff --git a/packages/common/infra/src/workspace/__tests__/workspace.spec.ts b/packages/common/infra/src/workspace/__tests__/workspace.spec.ts new file mode 100644 index 0000000000000..8d15d1196c94f --- /dev/null +++ b/packages/common/infra/src/workspace/__tests__/workspace.spec.ts @@ -0,0 +1,38 @@ +import { WorkspaceFlavour } from '@affine/env/workspace'; +import { describe, expect, test } from 'vitest'; + +import { configureInfraServices, configureTestingInfraServices } from '../..'; +import { ServiceCollection } from '../../di'; +import { WorkspaceListService, WorkspaceManager } from '../'; + +describe('Workspace System', () => { + test('create workspace', async () => { + const services = new ServiceCollection(); + configureInfraServices(services); + configureTestingInfraServices(services); + + const provider = services.provider(); + const workspaceManager = provider.get(WorkspaceManager); + const workspaceListService = provider.get(WorkspaceListService); + expect(workspaceListService.workspaceList.value.length).toBe(0); + + const { workspace } = workspaceManager.open( + await workspaceManager.createWorkspace(WorkspaceFlavour.LOCAL) + ); + + expect(workspaceListService.workspaceList.value.length).toBe(1); + + const page = workspace.blockSuiteWorkspace.createPage({ + id: 'page0', + }); + await page.load(); + page.addBlock('affine:page', { + title: new page.Text('test-page'), + }); + + expect(workspace.blockSuiteWorkspace.pages.size).toBe(1); + expect( + (page!.getBlockByFlavour('affine:page')[0] as any).title.toString() + ).toBe('test-page'); + }); +}); diff --git a/packages/common/infra/src/workspace/context.ts b/packages/common/infra/src/workspace/context.ts new file mode 100644 index 0000000000000..41e2f8d2481ce --- /dev/null +++ b/packages/common/infra/src/workspace/context.ts @@ -0,0 +1,76 @@ +/** + * This module contains the context of the workspace scope. + * You can use those context when declare workspace service. + * + * Is helpful when implement workspace low level providers, like `SyncEngine`, + * which need to access workspace low level components. + * + * Normally, business service should depend on `Workspace` service, not workspace context. + * + * @example + * ```ts + * import { declareWorkspaceService } from '@toeverything/infra'; + * declareWorkspaceService(XXXService, { + * factory: declareFactory( + * [BlockSuiteWorkspaceContext, RootYDocContext], // <== inject workspace context + * (bs, rootDoc) => new XXXService(bs.value, rootDoc.value) + * ), + * }) + */ + +import { Workspace as BlockSuiteWorkspace } from '@blocksuite/store'; +import { nanoid } from 'nanoid'; +import type { Awareness } from 'y-protocols/awareness.js'; +import type { Doc as YDoc } from 'yjs'; + +import { createIdentifier, type ServiceCollection } from '../di'; +import { BlobEngine } from './engine/blob'; +import { globalBlockSuiteSchema } from './global-schema'; +import type { WorkspaceMetadata } from './metadata'; +import { WorkspaceScope } from './service-scope'; + +export const BlockSuiteWorkspaceContext = createIdentifier( + 'BlockSuiteWorkspaceContext' +); + +export const RootYDocContext = createIdentifier('RootYDocContext'); + +export const AwarenessContext = createIdentifier('AwarenessContext'); + +export const WorkspaceMetadataContext = createIdentifier( + 'WorkspaceMetadataContext' +); + +export const WorkspaceIdContext = + createIdentifier('WorkspaceIdContext'); + +export function configureWorkspaceContext( + services: ServiceCollection, + workspaceMetadata: WorkspaceMetadata +) { + services + .scope(WorkspaceScope) + .addImpl(WorkspaceMetadataContext, workspaceMetadata) + .addImpl(WorkspaceIdContext, workspaceMetadata.id) + .addImpl(BlockSuiteWorkspaceContext, provider => { + return new BlockSuiteWorkspace({ + id: workspaceMetadata.id, + blobStorages: [ + () => ({ + crud: provider.get(BlobEngine), + }), + ], + idGenerator: () => nanoid(), + schema: globalBlockSuiteSchema, + }); + }) + .addImpl( + AwarenessContext, + provider => + provider.get(BlockSuiteWorkspaceContext).awarenessStore.awareness + ) + .addImpl( + RootYDocContext, + provider => provider.get(BlockSuiteWorkspaceContext).doc + ); +} diff --git a/packages/common/infra/src/workspace/engine/awareness.ts b/packages/common/infra/src/workspace/engine/awareness.ts new file mode 100644 index 0000000000000..4964b264f3ec1 --- /dev/null +++ b/packages/common/infra/src/workspace/engine/awareness.ts @@ -0,0 +1,25 @@ +import { createIdentifier } from '../../di'; + +export interface AwarenessProvider { + connect(): void; + disconnect(): void; +} + +export const AwarenessProvider = + createIdentifier('AwarenessProvider'); + +export class AwarenessEngine { + constructor(public readonly providers: AwarenessProvider[]) {} + + static get EMPTY() { + return new AwarenessEngine([]); + } + + connect() { + this.providers.forEach(provider => provider.connect()); + } + + disconnect() { + this.providers.forEach(provider => provider.disconnect()); + } +} diff --git a/packages/common/infra/src/workspace/engine/blob.ts b/packages/common/infra/src/workspace/engine/blob.ts new file mode 100644 index 0000000000000..ec3113e2904cb --- /dev/null +++ b/packages/common/infra/src/workspace/engine/blob.ts @@ -0,0 +1,212 @@ +import { DebugLogger } from '@affine/debug'; +import { difference } from 'lodash-es'; + +import { createIdentifier } from '../../di'; + +const logger = new DebugLogger('affine:blob-engine'); + +export interface BlobStorage { + name: string; + readonly: boolean; + get: (key: string) => Promise; + set: (key: string, value: Blob) => Promise; + delete: (key: string) => Promise; + list: () => Promise; +} + +export const LocalBlobStorage = + createIdentifier('LocalBlobStorage'); + +export const RemoteBlobStorage = + createIdentifier('RemoteBlobStorage'); + +/** + * # BlobEngine + * + * sync blobs between storages in background. + * + * all operations priority use local, then use remote. + */ +export class BlobEngine { + private abort: AbortController | null = null; + + constructor( + private readonly local: BlobStorage, + private readonly remotes: BlobStorage[] + ) {} + + static get EMPTY() { + return new BlobEngine(createEmptyBlobStorage(), []); + } + + start() { + if (this.abort) { + return; + } + this.abort = new AbortController(); + const abortSignal = this.abort.signal; + + const sync = () => { + if (abortSignal.aborted) { + return; + } + + this.sync() + .catch(error => { + logger.error('sync blob error', error); + }) + .finally(() => { + // sync every 1 minute + setTimeout(sync, 60000); + }); + }; + + sync(); + } + + stop() { + this.abort?.abort(); + this.abort = null; + } + + get storages() { + return [this.local, ...this.remotes]; + } + + async sync() { + if (this.local.readonly) { + return; + } + logger.debug('start syncing blob...'); + for (const remote of this.remotes) { + let localList: string[] = []; + let remoteList: string[] = []; + + if (!remote.readonly) { + try { + localList = await this.local.list(); + remoteList = await remote.list(); + } catch (err) { + logger.error(`error when sync`, err); + continue; + } + + const needUpload = difference(localList, remoteList); + for (const key of needUpload) { + try { + const data = await this.local.get(key); + if (data) { + await remote.set(key, data); + } + } catch (err) { + logger.error( + `error when sync ${key} from [${this.local.name}] to [${remote.name}]`, + err + ); + } + } + } + + const needDownload = difference(remoteList, localList); + + for (const key of needDownload) { + try { + const data = await remote.get(key); + if (data) { + await this.local.set(key, data); + } + } catch (err) { + logger.error( + `error when sync ${key} from [${remote.name}] to [${this.local.name}]`, + err + ); + } + } + } + + logger.debug('finish syncing blob'); + } + + async get(key: string) { + logger.debug('get blob', key); + for (const storage of this.storages) { + const data = await storage.get(key); + if (data) { + return data; + } + } + return null; + } + + async set(key: string, value: Blob) { + if (this.local.readonly) { + throw new Error('local peer is readonly'); + } + + // await upload to the local peer + await this.local.set(key, value); + + // uploads to other peers in the background + Promise.allSettled( + this.remotes + .filter(r => !r.readonly) + .map(peer => + peer.set(key, value).catch(err => { + logger.error('error when upload to peer', err); + }) + ) + ) + .then(result => { + if (result.some(({ status }) => status === 'rejected')) { + logger.error( + `blob ${key} update finish, but some peers failed to update` + ); + } else { + logger.debug(`blob ${key} update finish`); + } + }) + .catch(() => { + // Promise.allSettled never reject + }); + + return key; + } + + async delete(_key: string) { + // not supported + } + + async list() { + const blobList = new Set(); + + for (const peer of this.storages) { + const list = await peer.list(); + if (list) { + for (const blob of list) { + blobList.add(blob); + } + } + } + + return Array.from(blobList); + } +} + +export function createEmptyBlobStorage() { + return { + name: 'empty', + readonly: true, + async get(_key: string) { + return null; + }, + async set(_key: string, _value: Blob) { + throw new Error('not supported'); + }, + async delete(_key: string) { + throw new Error('not supported'); + }, + async list() { + return []; + }, + } satisfies BlobStorage; +} diff --git a/packages/common/infra/src/workspace/engine/index.ts b/packages/common/infra/src/workspace/engine/index.ts new file mode 100644 index 0000000000000..597fd3d863c80 --- /dev/null +++ b/packages/common/infra/src/workspace/engine/index.ts @@ -0,0 +1,71 @@ +import { Slot } from '@blocksuite/global/utils'; + +import { throwIfAborted } from '../../utils/throw-if-aborted'; +import type { AwarenessEngine } from './awareness'; +import type { BlobEngine } from './blob'; +import type { SyncEngine } from './sync'; +import { type SyncEngineStatus } from './sync'; + +export interface WorkspaceEngineStatus { + sync: SyncEngineStatus; +} + +/** + * # WorkspaceEngine + * + * sync ydoc, blob, awareness together + */ +export class WorkspaceEngine { + _status: WorkspaceEngineStatus; + onStatusChange = new Slot(); + + get status() { + return this._status; + } + + set status(status: WorkspaceEngineStatus) { + this._status = status; + this.onStatusChange.emit(status); + } + + constructor( + public blob: BlobEngine, + public sync: SyncEngine, + public awareness: AwarenessEngine + ) { + this._status = { + sync: sync.status, + }; + sync.onStatusChange.on(status => { + this.status = { + sync: status, + }; + }); + } + + start() { + this.sync.start(); + this.awareness.connect(); + this.blob.start(); + } + + canGracefulStop() { + return this.sync.canGracefulStop(); + } + + async waitForGracefulStop(abort?: AbortSignal) { + await this.sync.waitForGracefulStop(abort); + throwIfAborted(abort); + this.forceStop(); + } + + forceStop() { + this.sync.forceStop(); + this.awareness.disconnect(); + this.blob.stop(); + } +} + +export * from './awareness'; +export * from './blob'; +export * from './sync'; diff --git a/packages/common/infra/src/workspace/engine/sync/__tests__/engine.spec.ts b/packages/common/infra/src/workspace/engine/sync/__tests__/engine.spec.ts new file mode 100644 index 0000000000000..f29fac912255e --- /dev/null +++ b/packages/common/infra/src/workspace/engine/sync/__tests__/engine.spec.ts @@ -0,0 +1,152 @@ +import { WorkspaceFlavour } from '@affine/env/workspace'; +import { Workspace } from '@blocksuite/store'; +import { beforeEach, describe, expect, test, vi } from 'vitest'; +import { Doc } from 'yjs'; + +import { MemoryMemento } from '../../../../storage'; +import { globalBlockSuiteSchema } from '../../../global-schema'; +import { TestingSyncStorage } from '../../../testing'; +import { SyncEngineStep, SyncPeerStep } from '../consts'; +import { SyncEngine } from '../engine'; +import { createTestStorage } from './test-storage'; + +beforeEach(() => { + vi.useFakeTimers({ toFake: ['requestIdleCallback'] }); +}); + +const testMeta = { + id: 'test', + flavour: WorkspaceFlavour.LOCAL, +}; + +describe('SyncEngine', () => { + test('basic - indexeddb', async () => { + const storage = new MemoryMemento(); + const storage1 = new MemoryMemento(); + const storage2 = new MemoryMemento(); + let prev: any; + { + const workspace = new Workspace({ + id: 'test', + + schema: globalBlockSuiteSchema, + }); + + const syncEngine = new SyncEngine( + workspace.doc, + new TestingSyncStorage(testMeta, storage), + [ + new TestingSyncStorage(testMeta, storage1), + new TestingSyncStorage(testMeta, storage2), + ] + ); + syncEngine.start(); + + const page = workspace.createPage({ + id: 'page0', + }); + await page.load(); + const pageBlockId = page.addBlock('affine:page', { + title: new page.Text(''), + }); + page.addBlock('affine:surface', {}, pageBlockId); + const frameId = page.addBlock('affine:note', {}, pageBlockId); + page.addBlock('affine:paragraph', {}, frameId); + await syncEngine.waitForSynced(); + syncEngine.forceStop(); + prev = workspace.doc.toJSON(); + } + + for (const current of [storage, storage1, storage2]) { + const workspace = new Workspace({ + id: 'test', + + schema: globalBlockSuiteSchema, + }); + const syncEngine = new SyncEngine( + workspace.doc, + new TestingSyncStorage(testMeta, current), + [] + ); + syncEngine.start(); + await syncEngine.waitForSynced(); + expect(workspace.doc.toJSON()).toEqual({ + ...prev, + }); + syncEngine.forceStop(); + } + }); + + test('status', async () => { + const ydoc = new Doc({ guid: 'test' }); + + const storage1 = new MemoryMemento(); + const storage2 = new MemoryMemento(); + + const localStorage = createTestStorage( + new TestingSyncStorage(testMeta, storage1) + ); + const remoteStorage = createTestStorage( + new TestingSyncStorage(testMeta, storage2) + ); + + localStorage.pausePull(); + localStorage.pausePush(); + remoteStorage.pausePull(); + remoteStorage.pausePush(); + + const syncEngine = new SyncEngine(ydoc, localStorage, [remoteStorage]); + expect(syncEngine.status.step).toEqual(SyncEngineStep.Stopped); + + syncEngine.start(); + + await vi.waitFor(() => { + expect(syncEngine.status.step).toEqual(SyncEngineStep.Syncing); + expect(syncEngine.status.local?.step).toEqual( + SyncPeerStep.LoadingRootDoc + ); + }); + + localStorage.resumePull(); + + await vi.waitFor(() => { + expect(syncEngine.status.step).toEqual(SyncEngineStep.Syncing); + expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Synced); + expect(syncEngine.status.remotes[0]?.step).toEqual( + SyncPeerStep.LoadingRootDoc + ); + }); + + remoteStorage.resumePull(); + + await vi.waitFor(() => { + expect(syncEngine.status.step).toEqual(SyncEngineStep.Synced); + expect(syncEngine.status.remotes[0]?.step).toEqual(SyncPeerStep.Synced); + expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Synced); + }); + + ydoc.getArray('test').insert(0, [1, 2, 3]); + + await vi.waitFor(() => { + expect(syncEngine.status.step).toEqual(SyncEngineStep.Syncing); + expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Syncing); + expect(syncEngine.status.remotes[0]?.step).toEqual(SyncPeerStep.Syncing); + }); + + localStorage.resumePush(); + + await vi.waitFor(() => { + expect(syncEngine.status.step).toEqual(SyncEngineStep.Syncing); + expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Synced); + expect(syncEngine.status.remotes[0]?.step).toEqual(SyncPeerStep.Syncing); + }); + + remoteStorage.resumePush(); + + await vi.waitFor(() => { + expect(syncEngine.status.step).toEqual(SyncEngineStep.Synced); + expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Synced); + expect(syncEngine.status.remotes[0]?.step).toEqual(SyncPeerStep.Synced); + }); + }); +}); diff --git a/packages/common/infra/src/workspace/engine/sync/__tests__/peer.spec.ts b/packages/common/infra/src/workspace/engine/sync/__tests__/peer.spec.ts new file mode 100644 index 0000000000000..b9c53e444febb --- /dev/null +++ b/packages/common/infra/src/workspace/engine/sync/__tests__/peer.spec.ts @@ -0,0 +1,100 @@ +import { WorkspaceFlavour } from '@affine/env/workspace'; +import { Workspace } from '@blocksuite/store'; +import { beforeEach, describe, expect, test, vi } from 'vitest'; + +import { MemoryMemento } from '../../../../storage'; +import { globalBlockSuiteSchema } from '../../../global-schema'; +import { TestingSyncStorage } from '../../../testing'; +import { SyncPeerStep } from '../consts'; +import { SyncPeer } from '../peer'; + +beforeEach(() => { + vi.useFakeTimers({ toFake: ['requestIdleCallback'] }); +}); + +const testMeta = { + id: 'test', + flavour: WorkspaceFlavour.LOCAL, +}; + +describe('SyncPeer', () => { + test('basic - indexeddb', async () => { + const storage = new MemoryMemento(); + + let prev: any; + { + const workspace = new Workspace({ + id: 'test', + + schema: globalBlockSuiteSchema, + }); + + const syncPeer = new SyncPeer( + workspace.doc, + new TestingSyncStorage(testMeta, storage) + ); + await syncPeer.waitForLoaded(); + + const page = workspace.createPage({ + id: 'page0', + }); + await page.load(); + const pageBlockId = page.addBlock('affine:page', { + title: new page.Text(''), + }); + page.addBlock('affine:surface', {}, pageBlockId); + const frameId = page.addBlock('affine:note', {}, pageBlockId); + page.addBlock('affine:paragraph', {}, frameId); + await syncPeer.waitForSynced(); + syncPeer.stop(); + prev = workspace.doc.toJSON(); + } + + { + const workspace = new Workspace({ + id: 'test', + + schema: globalBlockSuiteSchema, + }); + const syncPeer = new SyncPeer( + workspace.doc, + new TestingSyncStorage(testMeta, storage) + ); + await syncPeer.waitForSynced(); + expect(workspace.doc.toJSON()).toEqual({ + ...prev, + }); + syncPeer.stop(); + } + }); + + test('status', async () => { + const storage = new MemoryMemento(); + + const workspace = new Workspace({ + id: 'test', + + schema: globalBlockSuiteSchema, + }); + + const syncPeer = new SyncPeer( + workspace.doc, + new TestingSyncStorage(testMeta, storage) + ); + expect(syncPeer.status.step).toBe(SyncPeerStep.LoadingRootDoc); + await syncPeer.waitForSynced(); + expect(syncPeer.status.step).toBe(SyncPeerStep.Synced); + + const page = workspace.createPage({ + id: 'page0', + }); + expect(syncPeer.status.step).toBe(SyncPeerStep.LoadingSubDoc); + await page.load(); + await syncPeer.waitForSynced(); + page.addBlock('affine:page', { + title: new page.Text(''), + }); + expect(syncPeer.status.step).toBe(SyncPeerStep.Syncing); + syncPeer.stop(); + }); +}); diff --git a/packages/common/infra/src/workspace/engine/sync/__tests__/test-storage.ts b/packages/common/infra/src/workspace/engine/sync/__tests__/test-storage.ts new file mode 100644 index 0000000000000..733cd3ee1cf39 --- /dev/null +++ b/packages/common/infra/src/workspace/engine/sync/__tests__/test-storage.ts @@ -0,0 +1,42 @@ +import type { SyncStorage } from '../storage'; + +export function createTestStorage(origin: SyncStorage) { + const controler = { + pausedPull: Promise.resolve(), + resumePull: () => {}, + pausedPush: Promise.resolve(), + resumePush: () => {}, + }; + + return { + name: `${origin.name}(testing)`, + pull(docId: string, state: Uint8Array) { + return controler.pausedPull.then(() => origin.pull(docId, state)); + }, + push(docId: string, data: Uint8Array) { + return controler.pausedPush.then(() => origin.push(docId, data)); + }, + subscribe( + cb: (docId: string, data: Uint8Array) => void, + disconnect: (reason: string) => void + ) { + return origin.subscribe(cb, disconnect); + }, + pausePull() { + controler.pausedPull = new Promise(resolve => { + controler.resumePull = resolve; + }); + }, + resumePull() { + controler.resumePull?.(); + }, + pausePush() { + controler.pausedPush = new Promise(resolve => { + controler.resumePush = resolve; + }); + }, + resumePush() { + controler.resumePush?.(); + }, + }; +} diff --git a/packages/common/infra/src/workspace/engine/sync/consts.ts b/packages/common/infra/src/workspace/engine/sync/consts.ts new file mode 100644 index 0000000000000..e5fd2e8718905 --- /dev/null +++ b/packages/common/infra/src/workspace/engine/sync/consts.ts @@ -0,0 +1,15 @@ +export enum SyncEngineStep { + Stopped = 0, + Syncing = 1, + Synced = 2, +} + +export enum SyncPeerStep { + Stopped = 0, + Retrying = 1, + LoadingRootDoc = 2, + LoadingSubDoc = 3, + Loaded = 4.5, + Syncing = 5, + Synced = 6, +} diff --git a/packages/common/infra/src/workspace/engine/sync/engine.ts b/packages/common/infra/src/workspace/engine/sync/engine.ts new file mode 100644 index 0000000000000..996d422a91119 --- /dev/null +++ b/packages/common/infra/src/workspace/engine/sync/engine.ts @@ -0,0 +1,285 @@ +import { DebugLogger } from '@affine/debug'; +import { Slot } from '@blocksuite/global/utils'; +import type { Doc } from 'yjs'; + +import { createIdentifier } from '../../../di'; +import { SharedPriorityTarget } from '../../../utils/async-queue'; +import { MANUALLY_STOP, throwIfAborted } from '../../../utils/throw-if-aborted'; +import { SyncEngineStep, SyncPeerStep } from './consts'; +import { SyncPeer, type SyncPeerStatus } from './peer'; +import { type SyncStorage } from './storage'; + +export interface SyncEngineStatus { + step: SyncEngineStep; + local: SyncPeerStatus | null; + remotes: (SyncPeerStatus | null)[]; + retrying: boolean; +} + +export const LocalSyncStorage = + createIdentifier('LocalSyncStorage'); + +export const RemoteSyncStorage = + createIdentifier('RemoteSyncStorage'); + +/** + * # SyncEngine + * + * ``` + * ┌────────────┐ + * │ SyncEngine │ + * └─────┬──────┘ + * │ + * ▼ + * ┌────────────┐ + * │ SyncPeer │ + * ┌─────────┤ local ├─────────┐ + * │ └─────┬──────┘ │ + * │ │ │ + * ▼ ▼ ▼ + * ┌────────────┐ ┌────────────┐ ┌────────────┐ + * │ SyncPeer │ │ SyncPeer │ │ SyncPeer │ + * │ Remote │ │ Remote │ │ Remote │ + * └────────────┘ └────────────┘ └────────────┘ + * ``` + * + * Sync engine manage sync peers + * + * Sync steps: + * 1. start local sync + * 2. wait for local sync complete + * 3. start remote sync + * 4. continuously sync local and remote + */ +export class SyncEngine { + get rootDocId() { + return this.rootDoc.guid; + } + + logger = new DebugLogger('affine:sync-engine:' + this.rootDocId); + private _status: SyncEngineStatus; + onStatusChange = new Slot(); + private set status(s: SyncEngineStatus) { + this.logger.debug('status change', s); + this._status = s; + this.onStatusChange.emit(s); + } + + priorityTarget = new SharedPriorityTarget(); + + get status() { + return this._status; + } + + private abort = new AbortController(); + + constructor( + private readonly rootDoc: Doc, + private readonly local: SyncStorage, + private readonly remotes: SyncStorage[] + ) { + this._status = { + step: SyncEngineStep.Stopped, + local: null, + remotes: remotes.map(() => null), + retrying: false, + }; + } + + start() { + if (this.status.step !== SyncEngineStep.Stopped) { + this.forceStop(); + } + this.abort = new AbortController(); + + this.sync(this.abort.signal).catch(err => { + // should never reach here + this.logger.error(err); + }); + } + + canGracefulStop() { + return !!this.status.local && this.status.local.pendingPushUpdates === 0; + } + + async waitForGracefulStop(abort?: AbortSignal) { + await Promise.race([ + new Promise((_, reject) => { + if (abort?.aborted) { + reject(abort?.reason); + } + abort?.addEventListener('abort', () => { + reject(abort.reason); + }); + }), + new Promise(resolve => { + this.onStatusChange.on(() => { + if (this.canGracefulStop()) { + resolve(); + } + }); + }), + ]); + throwIfAborted(abort); + this.forceStop(); + } + + forceStop() { + this.abort.abort(MANUALLY_STOP); + this._status = { + step: SyncEngineStep.Stopped, + local: null, + remotes: this.remotes.map(() => null), + retrying: false, + }; + } + + // main sync process, should never return until abort + async sync(signal: AbortSignal) { + const state: { + localPeer: SyncPeer | null; + remotePeers: (SyncPeer | null)[]; + } = { + localPeer: null, + remotePeers: this.remotes.map(() => null), + }; + + const cleanUp: (() => void)[] = []; + try { + // Step 1: start local sync peer + state.localPeer = new SyncPeer( + this.rootDoc, + this.local, + this.priorityTarget + ); + + cleanUp.push( + state.localPeer.onStatusChange.on(() => { + if (!signal.aborted) + this.updateSyncingState(state.localPeer, state.remotePeers); + }).dispose + ); + + this.updateSyncingState(state.localPeer, state.remotePeers); + + // Step 2: wait for local sync complete + await state.localPeer.waitForLoaded(signal); + + // Step 3: start remote sync peer + state.remotePeers = this.remotes.map(remote => { + const peer = new SyncPeer(this.rootDoc, remote, this.priorityTarget); + cleanUp.push( + peer.onStatusChange.on(() => { + if (!signal.aborted) + this.updateSyncingState(state.localPeer, state.remotePeers); + }).dispose + ); + return peer; + }); + + this.updateSyncingState(state.localPeer, state.remotePeers); + + // Step 4: continuously sync local and remote + + // wait for abort + await new Promise((_, reject) => { + if (signal.aborted) { + reject(signal.reason); + } + signal.addEventListener('abort', () => { + reject(signal.reason); + }); + }); + } catch (error) { + if (error === MANUALLY_STOP || signal.aborted) { + return; + } + throw error; + } finally { + // stop peers + state.localPeer?.stop(); + for (const remotePeer of state.remotePeers) { + remotePeer?.stop(); + } + for (const clean of cleanUp) { + clean(); + } + } + } + + updateSyncingState(local: SyncPeer | null, remotes: (SyncPeer | null)[]) { + let step = SyncEngineStep.Synced; + const allPeer = [local, ...remotes]; + for (const peer of allPeer) { + if (!peer || peer.status.step !== SyncPeerStep.Synced) { + step = SyncEngineStep.Syncing; + break; + } + } + this.status = { + step, + local: local?.status ?? null, + remotes: remotes.map(peer => peer?.status ?? null), + retrying: allPeer.some( + peer => peer?.status.step === SyncPeerStep.Retrying + ), + }; + } + + async waitForSynced(abort?: AbortSignal) { + if (this.status.step === SyncEngineStep.Synced) { + return; + } else { + return Promise.race([ + new Promise(resolve => { + this.onStatusChange.on(status => { + if (status.step === SyncEngineStep.Synced) { + resolve(); + } + }); + }), + new Promise((_, reject) => { + if (abort?.aborted) { + reject(abort?.reason); + } + abort?.addEventListener('abort', () => { + reject(abort.reason); + }); + }), + ]); + } + } + + async waitForLoadedRootDoc(abort?: AbortSignal) { + function isLoadedRootDoc(status: SyncEngineStatus) { + return ![status.local, ...status.remotes].some( + peer => !peer || peer.step <= SyncPeerStep.LoadingRootDoc + ); + } + if (isLoadedRootDoc(this.status)) { + return; + } else { + return Promise.race([ + new Promise(resolve => { + this.onStatusChange.on(status => { + if (isLoadedRootDoc(status)) { + resolve(); + } + }); + }), + new Promise((_, reject) => { + if (abort?.aborted) { + reject(abort?.reason); + } + abort?.addEventListener('abort', () => { + reject(abort.reason); + }); + }), + ]); + } + } + + setPriorityRule(target: ((id: string) => boolean) | null) { + this.priorityTarget.priorityRule = target; + } +} diff --git a/packages/common/infra/src/workspace/engine/sync/index.ts b/packages/common/infra/src/workspace/engine/sync/index.ts new file mode 100644 index 0000000000000..0e3d766d79620 --- /dev/null +++ b/packages/common/infra/src/workspace/engine/sync/index.ts @@ -0,0 +1,20 @@ +/** + * + * **SyncEngine** + * + * Manages one local storage and multiple remote storages. + * + * Responsible for creating SyncPeers for synchronization, following the local-first strategy. + * + * **SyncPeer** + * + * Responsible for synchronizing a single storage with Y.Doc. + * + * Carries the main synchronization logic. + * + */ + +export * from './consts'; +export * from './engine'; +export * from './peer'; +export * from './storage'; diff --git a/packages/common/infra/src/workspace/engine/sync/peer.ts b/packages/common/infra/src/workspace/engine/sync/peer.ts new file mode 100644 index 0000000000000..fd465d7728069 --- /dev/null +++ b/packages/common/infra/src/workspace/engine/sync/peer.ts @@ -0,0 +1,444 @@ +import { DebugLogger } from '@affine/debug'; +import { Slot } from '@blocksuite/global/utils'; +import { isEqual } from '@blocksuite/global/utils'; +import type { Doc } from 'yjs'; +import { applyUpdate, encodeStateAsUpdate, encodeStateVector } from 'yjs'; + +import { + PriorityAsyncQueue, + SharedPriorityTarget, +} from '../../../utils/async-queue'; +import { mergeUpdates } from '../../../utils/merge-updates'; +import { MANUALLY_STOP, throwIfAborted } from '../../../utils/throw-if-aborted'; +import { SyncPeerStep } from './consts'; +import type { SyncStorage } from './storage'; + +export interface SyncPeerStatus { + step: SyncPeerStep; + totalDocs: number; + loadedDocs: number; + pendingPullUpdates: number; + pendingPushUpdates: number; +} + +/** + * # SyncPeer + * A SyncPeer is responsible for syncing one Storage with one Y.Doc and its subdocs. + * + * ``` + * ┌─────┐ + * │Start│ + * └──┬──┘ + * │ + * ┌──────┐ ┌─────▼──────┐ ┌────┐ + * │listen◄─────┤pull rootdoc│ │peer│ + * └──┬───┘ └─────┬──────┘ └──┬─┘ + * │ │ onLoad() │ + * ┌──▼───┐ ┌─────▼──────┐ ┌────▼────┐ + * │listen◄─────┤pull subdocs│ │subscribe│ + * └──┬───┘ └─────┬──────┘ └────┬────┘ + * │ │ onReady() │ + * ┌──▼──┐ ┌─────▼───────┐ ┌──▼──┐ + * │queue├──────►apply updates◄───────┤queue│ + * └─────┘ └─────────────┘ └─────┘ + * ``` + * + * listen: listen for updates from ydoc, typically from user modifications. + * subscribe: listen for updates from storage, typically from other users. + * + */ +export class SyncPeer { + private _status: SyncPeerStatus = { + step: SyncPeerStep.LoadingRootDoc, + totalDocs: 1, + loadedDocs: 0, + pendingPullUpdates: 0, + pendingPushUpdates: 0, + }; + onStatusChange = new Slot(); + readonly abort = new AbortController(); + get name() { + return this.storage.name; + } + logger = new DebugLogger('affine:sync-peer:' + this.name); + + constructor( + private readonly rootDoc: Doc, + private readonly storage: SyncStorage, + private readonly priorityTarget = new SharedPriorityTarget() + ) { + this.logger.debug('peer start'); + + this.syncRetryLoop(this.abort.signal).catch(err => { + // should not reach here + console.error(err); + }); + } + + private set status(s: SyncPeerStatus) { + if (!isEqual(s, this._status)) { + this.logger.debug('status change', s); + this._status = s; + this.onStatusChange.emit(s); + } + } + + get status() { + return this._status; + } + + /** + * stop sync + * + * SyncPeer is one-time use, this peer should be discarded after call stop(). + */ + stop() { + this.logger.debug('peer stop'); + this.abort.abort(MANUALLY_STOP); + } + + /** + * auto retry after 5 seconds if sync failed + */ + async syncRetryLoop(abort: AbortSignal) { + while (abort.aborted === false) { + try { + await this.sync(abort); + } catch (err) { + if (err === MANUALLY_STOP || abort.aborted) { + return; + } + + this.logger.error('sync error', err); + } + try { + this.logger.error('retry after 5 seconds'); + this.status = { + step: SyncPeerStep.Retrying, + totalDocs: 1, + loadedDocs: 0, + pendingPullUpdates: 0, + pendingPushUpdates: 0, + }; + await Promise.race([ + new Promise(resolve => { + setTimeout(resolve, 5 * 1000); + }), + new Promise((_, reject) => { + // exit if manually stopped + if (abort.aborted) { + reject(abort.reason); + } + abort.addEventListener('abort', () => { + reject(abort.reason); + }); + }), + ]); + } catch (err) { + if (err === MANUALLY_STOP || abort.aborted) { + return; + } + + // should never reach here + throw err; + } + } + } + + private readonly state: { + connectedDocs: Map; + pushUpdatesQueue: PriorityAsyncQueue<{ + id: string; + data: Uint8Array[]; + }>; + pushingUpdate: boolean; + pullUpdatesQueue: PriorityAsyncQueue<{ + id: string; + data: Uint8Array; + }>; + subdocLoading: boolean; + subdocsLoadQueue: PriorityAsyncQueue<{ id: string; doc: Doc }>; + } = { + connectedDocs: new Map(), + pushUpdatesQueue: new PriorityAsyncQueue([], this.priorityTarget), + pushingUpdate: false, + pullUpdatesQueue: new PriorityAsyncQueue([], this.priorityTarget), + subdocLoading: false, + subdocsLoadQueue: new PriorityAsyncQueue([], this.priorityTarget), + }; + + initState() { + this.state.connectedDocs.clear(); + this.state.pushUpdatesQueue.clear(); + this.state.pullUpdatesQueue.clear(); + this.state.subdocsLoadQueue.clear(); + this.state.pushingUpdate = false; + this.state.subdocLoading = false; + } + + /** + * main synchronization logic + */ + async sync(abortOuter: AbortSignal) { + this.initState(); + const abortInner = new AbortController(); + + abortOuter.addEventListener('abort', reason => { + abortInner.abort(reason); + }); + + let dispose: (() => void) | null = null; + try { + this.reportSyncStatus(); + + // start listen storage updates + dispose = await this.storage.subscribe( + this.handleStorageUpdates, + reason => { + // abort if storage disconnect, should trigger retry loop + abortInner.abort('subscribe disconnect:' + reason); + } + ); + throwIfAborted(abortInner.signal); + + // Step 1: load root doc + await this.connectDoc(this.rootDoc, abortInner.signal); + + // Step 2: load subdocs + this.state.subdocsLoadQueue.push( + ...Array.from(this.rootDoc.getSubdocs()).map(doc => ({ + id: doc.guid, + doc, + })) + ); + this.reportSyncStatus(); + + this.rootDoc.on('subdocs', this.handleSubdocsUpdate); + + // Finally: start sync + await Promise.all([ + // load subdocs + (async () => { + while (throwIfAborted(abortInner.signal)) { + const subdoc = await this.state.subdocsLoadQueue.next( + abortInner.signal + ); + this.state.subdocLoading = true; + this.reportSyncStatus(); + await this.connectDoc(subdoc.doc, abortInner.signal); + this.state.subdocLoading = false; + this.reportSyncStatus(); + } + })(), + // pull updates + (async () => { + while (throwIfAborted(abortInner.signal)) { + const { id, data } = await this.state.pullUpdatesQueue.next( + abortInner.signal + ); + // don't apply empty data or Uint8Array([0, 0]) + if ( + !( + data.byteLength === 0 || + (data.byteLength === 2 && data[0] === 0 && data[1] === 0) + ) + ) { + const subdoc = this.state.connectedDocs.get(id); + if (subdoc) { + applyUpdate(subdoc, data, this.name); + } + } + this.reportSyncStatus(); + } + })(), + // push updates + (async () => { + while (throwIfAborted(abortInner.signal)) { + const { id, data } = await this.state.pushUpdatesQueue.next( + abortInner.signal + ); + this.state.pushingUpdate = true; + this.reportSyncStatus(); + + const merged = mergeUpdates(data); + + // don't push empty data or Uint8Array([0, 0]) + if ( + !( + merged.byteLength === 0 || + (merged.byteLength === 2 && merged[0] === 0 && merged[1] === 0) + ) + ) { + await this.storage.push(id, merged); + } + + this.state.pushingUpdate = false; + this.reportSyncStatus(); + } + })(), + ]); + } finally { + dispose?.(); + for (const docs of this.state.connectedDocs.values()) { + this.disconnectDoc(docs); + } + this.rootDoc.off('subdocs', this.handleSubdocsUpdate); + } + } + + async connectDoc(doc: Doc, abort: AbortSignal) { + const { data: docData, state: inStorageState } = + (await this.storage.pull(doc.guid, encodeStateVector(doc))) ?? {}; + throwIfAborted(abort); + + if (docData) { + applyUpdate(doc, docData, 'load'); + } + + // diff root doc and in-storage, save updates to pendingUpdates + this.state.pushUpdatesQueue.push({ + id: doc.guid, + data: [encodeStateAsUpdate(doc, inStorageState)], + }); + + this.state.connectedDocs.set(doc.guid, doc); + + // start listen root doc changes + doc.on('update', this.handleYDocUpdates); + + // mark rootDoc as loaded + doc.emit('sync', [true]); + + this.reportSyncStatus(); + } + + disconnectDoc(doc: Doc) { + doc.off('update', this.handleYDocUpdates); + this.state.connectedDocs.delete(doc.guid); + this.reportSyncStatus(); + } + + // handle updates from ydoc + handleYDocUpdates = (update: Uint8Array, origin: string, doc: Doc) => { + // don't push updates from storage + if (origin === this.name) { + return; + } + + const exist = this.state.pushUpdatesQueue.find(({ id }) => id === doc.guid); + if (exist) { + exist.data.push(update); + } else { + this.state.pushUpdatesQueue.push({ + id: doc.guid, + data: [update], + }); + } + + this.reportSyncStatus(); + }; + + // handle subdocs changes, append new subdocs to queue, remove subdocs from queue + handleSubdocsUpdate = ({ + added, + removed, + }: { + added: Set; + removed: Set; + }) => { + for (const subdoc of added) { + this.state.subdocsLoadQueue.push({ id: subdoc.guid, doc: subdoc }); + } + + for (const subdoc of removed) { + this.disconnectDoc(subdoc); + this.state.subdocsLoadQueue.remove(doc => doc.doc === subdoc); + } + this.reportSyncStatus(); + }; + + // handle updates from storage + handleStorageUpdates = (id: string, data: Uint8Array) => { + this.state.pullUpdatesQueue.push({ + id, + data, + }); + this.reportSyncStatus(); + }; + + reportSyncStatus() { + let step; + if (this.state.connectedDocs.size === 0) { + step = SyncPeerStep.LoadingRootDoc; + } else if (this.state.subdocsLoadQueue.length || this.state.subdocLoading) { + step = SyncPeerStep.LoadingSubDoc; + } else if ( + this.state.pullUpdatesQueue.length || + this.state.pushUpdatesQueue.length || + this.state.pushingUpdate + ) { + step = SyncPeerStep.Syncing; + } else { + step = SyncPeerStep.Synced; + } + + this.status = { + step: step, + totalDocs: + this.state.connectedDocs.size + this.state.subdocsLoadQueue.length, + loadedDocs: this.state.connectedDocs.size, + pendingPullUpdates: + this.state.pullUpdatesQueue.length + (this.state.subdocLoading ? 1 : 0), + pendingPushUpdates: + this.state.pushUpdatesQueue.length + (this.state.pushingUpdate ? 1 : 0), + }; + } + + async waitForSynced(abort?: AbortSignal) { + if (this.status.step >= SyncPeerStep.Synced) { + return; + } else { + return Promise.race([ + new Promise(resolve => { + this.onStatusChange.on(status => { + if (status.step >= SyncPeerStep.Synced) { + resolve(); + } + }); + }), + new Promise((_, reject) => { + if (abort?.aborted) { + reject(abort?.reason); + } + abort?.addEventListener('abort', () => { + reject(abort.reason); + }); + }), + ]); + } + } + + async waitForLoaded(abort?: AbortSignal) { + if (this.status.step > SyncPeerStep.Loaded) { + return; + } else { + return Promise.race([ + new Promise(resolve => { + this.onStatusChange.on(status => { + if (status.step > SyncPeerStep.Loaded) { + resolve(); + } + }); + }), + new Promise((_, reject) => { + if (abort?.aborted) { + reject(abort?.reason); + } + abort?.addEventListener('abort', () => { + reject(abort.reason); + }); + }), + ]); + } + } +} diff --git a/packages/common/infra/src/workspace/engine/sync/storage.ts b/packages/common/infra/src/workspace/engine/sync/storage.ts new file mode 100644 index 0000000000000..34784f1d40f34 --- /dev/null +++ b/packages/common/infra/src/workspace/engine/sync/storage.ts @@ -0,0 +1,25 @@ +export interface SyncStorage { + /** + * for debug + */ + name: string; + + pull( + docId: string, + state: Uint8Array + ): Promise<{ data: Uint8Array; state?: Uint8Array } | null>; + push(docId: string, data: Uint8Array): Promise; + + /** + * Subscribe to updates from peer + * + * @param cb callback to handle updates + * @param disconnect callback to handle disconnect, reason can be something like 'network-error' + * + * @returns unsubscribe function + */ + subscribe( + cb: (docId: string, data: Uint8Array) => void, + disconnect: (reason: string) => void + ): Promise<() => void>; +} diff --git a/packages/common/infra/src/workspace/factory.ts b/packages/common/infra/src/workspace/factory.ts new file mode 100644 index 0000000000000..fd3d85d13f0f9 --- /dev/null +++ b/packages/common/infra/src/workspace/factory.ts @@ -0,0 +1,15 @@ +import { createIdentifier, type ServiceCollection } from '../di'; + +export interface WorkspaceFactory { + name: string; + + configureWorkspace(services: ServiceCollection): void; + + /** + * get blob without open workspace + */ + getWorkspaceBlob(id: string, blobKey: string): Promise; +} + +export const WorkspaceFactory = + createIdentifier('WorkspaceFactory'); diff --git a/packages/common/infra/src/workspace/global-schema.ts b/packages/common/infra/src/workspace/global-schema.ts new file mode 100644 index 0000000000000..e03dc9a7c29c4 --- /dev/null +++ b/packages/common/infra/src/workspace/global-schema.ts @@ -0,0 +1,6 @@ +import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models'; +import { Schema } from '@blocksuite/store'; + +export const globalBlockSuiteSchema = new Schema(); + +globalBlockSuiteSchema.register(AffineSchemas).register(__unstableSchemas); diff --git a/packages/common/infra/src/workspace/index.ts b/packages/common/infra/src/workspace/index.ts new file mode 100644 index 0000000000000..74629684f7a99 --- /dev/null +++ b/packages/common/infra/src/workspace/index.ts @@ -0,0 +1,81 @@ +export * from './context'; +export * from './engine'; +export * from './factory'; +export * from './global-schema'; +export * from './list'; +export * from './manager'; +export * from './metadata'; +export * from './service-scope'; +export * from './testing'; +export * from './upgrade'; +export * from './workspace'; + +import { type ServiceCollection, ServiceProvider } from '../di'; +import { CleanupService } from '../lifecycle'; +import { GlobalCache, GlobalState } from '../storage'; +import { + BlockSuiteWorkspaceContext, + RootYDocContext, + WorkspaceMetadataContext, +} from './context'; +import { + AwarenessEngine, + AwarenessProvider, + BlobEngine, + LocalBlobStorage, + LocalSyncStorage, + RemoteBlobStorage, + RemoteSyncStorage, + SyncEngine, + WorkspaceEngine, +} from './engine'; +import { WorkspaceFactory } from './factory'; +import { WorkspaceListProvider, WorkspaceListService } from './list'; +import { WorkspaceManager } from './manager'; +import { WorkspaceScope } from './service-scope'; +import { + TestingLocalWorkspaceFactory, + TestingLocalWorkspaceListProvider, +} from './testing'; +import { WorkspaceUpgradeController } from './upgrade'; +import { Workspace } from './workspace'; + +export function configureWorkspaceServices(services: ServiceCollection) { + // global scope + services + .add(WorkspaceManager, [ + WorkspaceListService, + [WorkspaceFactory], + ServiceProvider, + ]) + .add(WorkspaceListService, [[WorkspaceListProvider], GlobalCache]); + + // workspace scope + services + .scope(WorkspaceScope) + .add(CleanupService) + .add(Workspace, [ + WorkspaceMetadataContext, + WorkspaceEngine, + BlockSuiteWorkspaceContext, + WorkspaceUpgradeController, + ServiceProvider, + ]) + .add(WorkspaceEngine, [BlobEngine, SyncEngine, AwarenessEngine]) + .add(AwarenessEngine, [[AwarenessProvider]]) + .add(BlobEngine, [LocalBlobStorage, [RemoteBlobStorage]]) + .add(SyncEngine, [RootYDocContext, LocalSyncStorage, [RemoteSyncStorage]]) + .add(WorkspaceUpgradeController, [ + BlockSuiteWorkspaceContext, + SyncEngine, + WorkspaceMetadataContext, + ]); +} + +export function configureTestingWorkspaceServices(services: ServiceCollection) { + services + .addImpl(WorkspaceListProvider, TestingLocalWorkspaceListProvider, [ + GlobalState, + ]) + .addImpl(WorkspaceFactory, TestingLocalWorkspaceFactory, [GlobalState]); +} diff --git a/packages/common/infra/src/workspace/list/cache.ts b/packages/common/infra/src/workspace/list/cache.ts new file mode 100644 index 0000000000000..ecf1005037f2e --- /dev/null +++ b/packages/common/infra/src/workspace/list/cache.ts @@ -0,0 +1,25 @@ +import type { GlobalCache } from '../../storage'; +import { type WorkspaceMetadata } from '../metadata'; + +const CACHE_STORAGE_KEY = 'jotai-workspaces'; + +export function readWorkspaceListCache(cache: GlobalCache) { + const metadata = cache.get(CACHE_STORAGE_KEY); + if (metadata) { + try { + const items = metadata as WorkspaceMetadata[]; + return [...items]; + } catch (e) { + console.error('cannot parse worksapce', e); + } + return []; + } + return []; +} + +export function writeWorkspaceListCache( + cache: GlobalCache, + metadata: WorkspaceMetadata[] +) { + cache.set(CACHE_STORAGE_KEY, metadata); +} diff --git a/packages/common/infra/src/workspace/list/index.ts b/packages/common/infra/src/workspace/list/index.ts new file mode 100644 index 0000000000000..8f956cf9d1ca3 --- /dev/null +++ b/packages/common/infra/src/workspace/list/index.ts @@ -0,0 +1,305 @@ +import { DebugLogger } from '@affine/debug'; +import type { WorkspaceFlavour } from '@affine/env/workspace'; +import type { Workspace as BlockSuiteWorkspace } from '@blocksuite/store'; +import { differenceWith } from 'lodash-es'; +import { map } from 'rxjs'; + +import { createIdentifier } from '../../di'; +import { LiveData } from '../../livedata'; +import type { GlobalCache } from '../../storage'; +import type { BlobStorage } from '../engine'; +import type { WorkspaceMetadata } from '../metadata'; +import { readWorkspaceListCache, writeWorkspaceListCache } from './cache'; +import { type WorkspaceInfo, WorkspaceInformation } from './information'; + +export * from './information'; + +const logger = new DebugLogger('affine:workspace:list'); + +export interface WorkspaceListProvider { + name: WorkspaceFlavour; + + /** + * get workspaces list + */ + getList(): Promise; + + /** + * delete workspace by id + */ + delete(workspaceId: string): Promise; + + /** + * create workspace + * @param initial callback to put initial data to workspace + */ + create( + initial: ( + workspace: BlockSuiteWorkspace, + blobStorage: BlobStorage + ) => Promise + ): Promise; + + /** + * Start subscribe workspaces list + * + * @returns unsubscribe function + */ + subscribe( + callback: (changed: { + added?: WorkspaceMetadata[]; + deleted?: WorkspaceMetadata[]; + }) => void + ): () => void; + + /** + * get workspace avatar and name by id + * + * @param id workspace id + */ + getInformation(id: string): Promise; +} + +export const WorkspaceListProvider = createIdentifier( + 'WorkspaceListProvider' +); + +export interface WorkspaceListStatus { + /** + * is workspace list doing first loading. + * if false, UI can display workspace not found page. + */ + loading: boolean; + workspaceList: WorkspaceMetadata[]; +} + +/** + * # WorkspaceList + * + * manage multiple workspace metadata list providers. + * provide a __cache-first__ and __offline useable__ workspace list. + */ +export class WorkspaceListService { + private readonly abortController = new AbortController(); + + private readonly workspaceInformationList = new Map< + string, + WorkspaceInformation + >(); + + status = new LiveData({ + loading: true, + workspaceList: [], + }); + + setStatus(status: WorkspaceListStatus) { + this.status.next(status); + // update cache + writeWorkspaceListCache(this.cache, status.workspaceList); + } + + workspaceList = LiveData.from( + this.status.pipe(map(x => x.workspaceList)), + [] + ); + + constructor( + private readonly providers: WorkspaceListProvider[], + private readonly cache: GlobalCache + ) { + // initialize workspace list from cache + const cached = readWorkspaceListCache(cache); + const workspaceList = cached; + this.status.next({ + ...this.status.value, + workspaceList, + }); + + // start first load + this.startLoad(); + } + + /** + * create workspace + * @param flavour workspace flavour + * @param initial callback to put initial data to workspace + * @returns workspace id + */ + async create( + flavour: WorkspaceFlavour, + initial: ( + workspace: BlockSuiteWorkspace, + blobStorage: BlobStorage + ) => Promise = () => Promise.resolve() + ) { + const provider = this.providers.find(x => x.name === flavour); + if (!provider) { + throw new Error(`Unknown workspace flavour: ${flavour}`); + } + const metadata = await provider.create(initial); + // update workspace list + this.setStatus(this.addWorkspace(this.status.value, metadata)); + return metadata; + } + + /** + * delete workspace + * @param workspaceMetadata + */ + async delete(workspaceMetadata: WorkspaceMetadata) { + logger.info( + `delete workspace [${workspaceMetadata.flavour}] ${workspaceMetadata.id}` + ); + const provider = this.providers.find( + x => x.name === workspaceMetadata.flavour + ); + if (!provider) { + throw new Error( + `Unknown workspace flavour: ${workspaceMetadata.flavour}` + ); + } + await provider.delete(workspaceMetadata.id); + + // delete workspace from list + this.setStatus(this.deleteWorkspace(this.status.value, workspaceMetadata)); + } + + /** + * add workspace to list + */ + private addWorkspace( + status: WorkspaceListStatus, + workspaceMetadata: WorkspaceMetadata + ) { + if (status.workspaceList.some(x => x.id === workspaceMetadata.id)) { + return status; + } + return { + ...status, + workspaceList: status.workspaceList.concat(workspaceMetadata), + }; + } + + /** + * delete workspace from list + */ + private deleteWorkspace( + status: WorkspaceListStatus, + workspaceMetadata: WorkspaceMetadata + ) { + if (!status.workspaceList.some(x => x.id === workspaceMetadata.id)) { + return status; + } + return { + ...status, + workspaceList: status.workspaceList.filter( + x => x.id !== workspaceMetadata.id + ), + }; + } + + /** + * callback for subscribe workspaces list + */ + private handleWorkspaceChange(changed: { + added?: WorkspaceMetadata[]; + deleted?: WorkspaceMetadata[]; + }) { + let status = this.status.value; + + for (const added of changed.added ?? []) { + status = this.addWorkspace(status, added); + } + for (const deleted of changed.deleted ?? []) { + status = this.deleteWorkspace(status, deleted); + } + + this.setStatus(status); + } + + /** + * start first load workspace list + */ + private startLoad() { + for (const provider of this.providers) { + // subscribe workspace list change + const unsubscribe = provider.subscribe(changed => { + this.handleWorkspaceChange(changed); + }); + + // unsubscribe when abort + if (this.abortController.signal.aborted) { + unsubscribe(); + return; + } + this.abortController.signal.addEventListener('abort', () => { + unsubscribe(); + }); + } + + this.revalidate() + .catch(error => { + logger.error('load workspace list error: ' + error); + }) + .finally(() => { + this.setStatus({ + ...this.status.value, + loading: false, + }); + }); + } + + async revalidate() { + await Promise.allSettled( + this.providers.map(async provider => { + try { + const list = await provider.getList(); + const oldList = this.workspaceList.value.filter( + w => w.flavour === provider.name + ); + this.handleWorkspaceChange({ + added: differenceWith(list, oldList, (a, b) => a.id === b.id), + deleted: differenceWith(oldList, list, (a, b) => a.id === b.id), + }); + } catch (error) { + logger.error('load workspace list error: ' + error); + } + }) + ); + } + + /** + * get workspace information, if not exists, create it. + */ + getInformation(meta: WorkspaceMetadata) { + const exists = this.workspaceInformationList.get(meta.id); + if (exists) { + return exists; + } + + return this.createInformation(meta); + } + + private createInformation(workspaceMetadata: WorkspaceMetadata) { + const provider = this.providers.find( + x => x.name === workspaceMetadata.flavour + ); + if (!provider) { + throw new Error( + `Unknown workspace flavour: ${workspaceMetadata.flavour}` + ); + } + const information = new WorkspaceInformation( + workspaceMetadata, + provider, + this.cache + ); + information.fetch(); + this.workspaceInformationList.set(workspaceMetadata.id, information); + return information; + } + + dispose() { + this.abortController.abort(); + } +} diff --git a/packages/common/infra/src/workspace/list/information.ts b/packages/common/infra/src/workspace/list/information.ts new file mode 100644 index 0000000000000..e143d72e5011f --- /dev/null +++ b/packages/common/infra/src/workspace/list/information.ts @@ -0,0 +1,92 @@ +import { DebugLogger } from '@affine/debug'; +import { Slot } from '@blocksuite/global/utils'; + +import type { Memento } from '../..'; +import type { WorkspaceMetadata } from '../metadata'; +import type { Workspace } from '../workspace'; +import type { WorkspaceListProvider } from '.'; + +const logger = new DebugLogger('affine:workspace:list:information'); + +const WORKSPACE_INFORMATION_CACHE_KEY = 'workspace-information:'; + +export interface WorkspaceInfo { + avatar?: string; + name?: string; +} + +/** + * # WorkspaceInformation + * + * This class take care of workspace avatar and name + * + * The class will try to get from 3 places: + * - local cache + * - fetch from `WorkspaceListProvider`, which will fetch from database or server + * - sync with active workspace + */ +export class WorkspaceInformation { + private _info: WorkspaceInfo = {}; + + public set info(info: WorkspaceInfo) { + if (info.avatar !== this._info.avatar || info.name !== this._info.name) { + this.cache.set(WORKSPACE_INFORMATION_CACHE_KEY + this.meta.id, info); + this._info = info; + this.onUpdated.emit(info); + } + } + + public get info() { + return this._info; + } + + public onUpdated = new Slot(); + + constructor( + public meta: WorkspaceMetadata, + public provider: WorkspaceListProvider, + public cache: Memento + ) { + const cached = this.getCachedInformation(); + // init with cached information + this.info = { ...cached }; + } + + /** + * sync information with workspace + */ + syncWithWorkspace(workspace: Workspace) { + this.info = { + avatar: workspace.blockSuiteWorkspace.meta.avatar ?? this.info.avatar, + name: workspace.blockSuiteWorkspace.meta.name ?? this.info.name, + }; + workspace.blockSuiteWorkspace.meta.commonFieldsUpdated.on(() => { + this.info = { + avatar: workspace.blockSuiteWorkspace.meta.avatar ?? this.info.avatar, + name: workspace.blockSuiteWorkspace.meta.name ?? this.info.name, + }; + }); + } + + getCachedInformation() { + return this.cache.get( + WORKSPACE_INFORMATION_CACHE_KEY + this.meta.id + ); + } + + /** + * fetch information from provider + */ + fetch() { + this.provider + .getInformation(this.meta.id) + .then(info => { + if (info) { + this.info = info; + } + }) + .catch(err => { + logger.warn('get workspace information error: ' + err); + }); + } +} diff --git a/packages/common/infra/src/workspace/manager.ts b/packages/common/infra/src/workspace/manager.ts new file mode 100644 index 0000000000000..66f6ca970cafd --- /dev/null +++ b/packages/common/infra/src/workspace/manager.ts @@ -0,0 +1,193 @@ +import { DebugLogger } from '@affine/debug'; +import { WorkspaceFlavour } from '@affine/env/workspace'; +import { assertEquals } from '@blocksuite/global/utils'; +import type { Workspace as BlockSuiteWorkspace } from '@blocksuite/store'; +import { applyUpdate, encodeStateAsUpdate } from 'yjs'; + +import { fixWorkspaceVersion } from '../blocksuite'; +import type { ServiceProvider } from '../di'; +import { ServiceCollection } from '../di'; +import { ObjectPool } from '../utils/object-pool'; +import { configureWorkspaceContext } from './context'; +import type { BlobStorage } from './engine'; +import type { WorkspaceFactory } from './factory'; +import type { WorkspaceListService } from './list'; +import type { WorkspaceMetadata } from './metadata'; +import { WorkspaceScope } from './service-scope'; +import { Workspace } from './workspace'; + +const logger = new DebugLogger('affine:workspace-manager'); + +/** + * # `WorkspaceManager` + * + * This class acts as the central hub for managing various aspects of workspaces. + * It is structured as follows: + * + * ``` + * ┌───────────┐ + * │ Workspace │ + * │ Manager │ + * └─────┬─────┘ + * ┌─────────────┼─────────────┐ + * ┌───┴───┐ ┌───┴───┐ ┌─────┴─────┐ + * │ List │ │ Pool │ │ Factories │ + * └───────┘ └───────┘ └───────────┘ + * ``` + * + * Manage every about workspace + * + * # List + * + * The `WorkspaceList` component stores metadata for all workspaces, also include workspace avatar and custom name. + * + * # Factories + * + * This class contains a collection of `WorkspaceFactory`, + * We utilize `metadata.flavour` to identify the appropriate factory for opening a workspace. + * Once opened, workspaces are stored in the `WorkspacePool`. + * + * # Pool + * + * The `WorkspacePool` use reference counting to manage active workspaces. + * Calling `use()` to create a reference to the workspace. Calling `release()` to release the reference. + * When the reference count is 0, it will close the workspace. + * + */ +export class WorkspaceManager { + pool = new ObjectPool({ + onDelete(workspace) { + workspace.forceStop(); + }, + onDangling(workspace) { + return workspace.canGracefulStop(); + }, + }); + + constructor( + public readonly list: WorkspaceListService, + public readonly factories: WorkspaceFactory[], + private readonly serviceProvider: ServiceProvider + ) {} + + /** + * get workspace reference by metadata. + * + * You basically don't need to call this function directly, use the react hook `useWorkspace(metadata)` instead. + * + * @returns the workspace reference and a release function, don't forget to call release function when you don't + * need the workspace anymore. + */ + open(metadata: WorkspaceMetadata): { + workspace: Workspace; + release: () => void; + } { + const exist = this.pool.get(metadata.id); + if (exist) { + return { + workspace: exist.obj, + release: exist.release, + }; + } + + const workspace = this.instantiate(metadata); + const ref = this.pool.put(workspace.meta.id, workspace); + + return { + workspace: ref.obj, + release: ref.release, + }; + } + + createWorkspace( + flavour: WorkspaceFlavour, + initial?: ( + workspace: BlockSuiteWorkspace, + blobStorage: BlobStorage + ) => Promise + ): Promise { + logger.info(`create workspace [${flavour}]`); + return this.list.create(flavour, initial); + } + + /** + * delete workspace by metadata, same as `WorkspaceList.deleteWorkspace` + */ + async deleteWorkspace(metadata: WorkspaceMetadata) { + await this.list.delete(metadata); + } + + /** + * helper function to transform local workspace to cloud workspace + */ + async transformLocalToCloud(local: Workspace): Promise { + assertEquals(local.flavour, WorkspaceFlavour.LOCAL); + + await local.engine.sync.waitForSynced(); + + const newId = await this.list.create( + WorkspaceFlavour.AFFINE_CLOUD, + async (ws, bs) => { + applyUpdate(ws.doc, encodeStateAsUpdate(local.blockSuiteWorkspace.doc)); + + for (const subdoc of local.blockSuiteWorkspace.doc.getSubdocs()) { + for (const newSubdoc of ws.doc.getSubdocs()) { + if (newSubdoc.guid === subdoc.guid) { + applyUpdate(newSubdoc, encodeStateAsUpdate(subdoc)); + } + } + } + + const blobList = await local.engine.blob.list(); + + for (const blobKey of blobList) { + const blob = await local.engine.blob.get(blobKey); + if (blob) { + await bs.set(blobKey, blob); + } + } + } + ); + + await this.list.delete(local.meta); + + return newId; + } + + /** + * helper function to get blob without open workspace, its be used for download workspace avatars. + */ + getWorkspaceBlob(metadata: WorkspaceMetadata, blobKey: string) { + const factory = this.factories.find(x => x.name === metadata.flavour); + if (!factory) { + throw new Error(`Unknown workspace flavour: ${metadata.flavour}`); + } + return factory.getWorkspaceBlob(metadata.id, blobKey); + } + + private instantiate(metadata: WorkspaceMetadata) { + logger.info(`open workspace [${metadata.flavour}] ${metadata.id} `); + const factory = this.factories.find(x => x.name === metadata.flavour); + if (!factory) { + throw new Error(`Unknown workspace flavour: ${metadata.flavour}`); + } + const serviceCollection = this.serviceProvider + .get(ServiceCollection) + .clone(); + factory.configureWorkspace(serviceCollection); + configureWorkspaceContext(serviceCollection, metadata); + const provider = serviceCollection.provider( + WorkspaceScope, + this.serviceProvider + ); + const workspace = provider.get(Workspace); + + // sync information with workspace list, when workspace's avatar and name changed, information will be updated + this.list.getInformation(metadata).syncWithWorkspace(workspace); + + // apply compatibility fix + fixWorkspaceVersion(workspace.blockSuiteWorkspace.doc); + + return workspace; + } +} diff --git a/packages/common/infra/src/workspace/metadata.ts b/packages/common/infra/src/workspace/metadata.ts new file mode 100644 index 0000000000000..d73b79f8a65e2 --- /dev/null +++ b/packages/common/infra/src/workspace/metadata.ts @@ -0,0 +1,3 @@ +import type { WorkspaceFlavour } from '@affine/env/workspace'; + +export type WorkspaceMetadata = { id: string; flavour: WorkspaceFlavour }; diff --git a/packages/common/infra/src/workspace/service-scope.ts b/packages/common/infra/src/workspace/service-scope.ts new file mode 100644 index 0000000000000..4212cf9ed7823 --- /dev/null +++ b/packages/common/infra/src/workspace/service-scope.ts @@ -0,0 +1,3 @@ +import { createScope } from '../di'; + +export const WorkspaceScope = createScope('workspace'); diff --git a/packages/common/infra/src/workspace/testing.ts b/packages/common/infra/src/workspace/testing.ts new file mode 100644 index 0000000000000..6f2b364b0f738 --- /dev/null +++ b/packages/common/infra/src/workspace/testing.ts @@ -0,0 +1,244 @@ +import { WorkspaceFlavour } from '@affine/env/workspace'; +import { Workspace as BlockSuiteWorkspace } from '@blocksuite/store'; +import { differenceBy } from 'lodash-es'; +import { nanoid } from 'nanoid'; +import { applyUpdate, encodeStateAsUpdate } from 'yjs'; + +import { type ServiceCollection } from '../di'; +import { GlobalState, type Memento } from '../storage'; +import { mergeUpdates } from '../utils/merge-updates'; +import { WorkspaceMetadataContext } from './context'; +import { + AwarenessProvider, + type BlobStorage, + LocalBlobStorage, + LocalSyncStorage, + type SyncStorage, +} from './engine'; +import type { WorkspaceFactory } from './factory'; +import { globalBlockSuiteSchema } from './global-schema'; +import type { WorkspaceListProvider } from './list'; +import { type WorkspaceInfo } from './list'; +import { type WorkspaceMetadata } from './metadata'; +import { WorkspaceScope } from './service-scope'; + +const LIST_STORE_KEY = 'testing-workspace-list'; + +export class TestingLocalWorkspaceListProvider + implements WorkspaceListProvider +{ + name = WorkspaceFlavour.LOCAL; + + constructor(private readonly state: Memento) {} + + getList(): Promise { + const list = this.state.get(LIST_STORE_KEY); + return Promise.resolve(list ?? []); + } + delete(workspaceId: string): Promise { + const list = this.state.get(LIST_STORE_KEY) ?? []; + const newList = list.filter(meta => meta.id !== workspaceId); + this.state.set(LIST_STORE_KEY, newList); + return Promise.resolve(); + } + async create( + initial: ( + workspace: BlockSuiteWorkspace, + blobStorage: BlobStorage + ) => Promise + ): Promise { + const id = nanoid(); + const meta = { id, flavour: WorkspaceFlavour.LOCAL }; + + const blobStorage = new TestingBlobStorage(meta, this.state); + const syncStorage = new TestingSyncStorage(meta, this.state); + + const workspace = new BlockSuiteWorkspace({ + id: id, + idGenerator: () => nanoid(), + schema: globalBlockSuiteSchema, + }); + + // apply initial state + await initial(workspace, blobStorage); + + // save workspace to storage + await syncStorage.push(id, encodeStateAsUpdate(workspace.doc)); + for (const subdocs of workspace.doc.getSubdocs()) { + await syncStorage.push(subdocs.guid, encodeStateAsUpdate(subdocs)); + } + + const list = this.state.get(LIST_STORE_KEY) ?? []; + this.state.set(LIST_STORE_KEY, [...list, meta]); + + return { id, flavour: WorkspaceFlavour.LOCAL }; + } + subscribe( + callback: (changed: { + added?: WorkspaceMetadata[] | undefined; + deleted?: WorkspaceMetadata[] | undefined; + }) => void + ): () => void { + let lastWorkspaces: WorkspaceMetadata[] = + this.state.get(LIST_STORE_KEY) ?? []; + + const sub = this.state + .watch(LIST_STORE_KEY) + .subscribe(allWorkspaces => { + if (allWorkspaces) { + const added = differenceBy(allWorkspaces, lastWorkspaces, v => v.id); + const deleted = differenceBy( + lastWorkspaces, + allWorkspaces, + v => v.id + ); + lastWorkspaces = allWorkspaces; + if (added.length || deleted.length) { + callback({ added, deleted }); + } + } + }); + return () => { + sub.unsubscribe(); + }; + } + async getInformation(id: string): Promise { + // get information from root doc + const storage = new TestingSyncStorage( + { + flavour: WorkspaceFlavour.LOCAL, + id, + }, + this.state + ); + const data = await storage.pull(id, new Uint8Array([])); + + if (!data) { + return; + } + + const bs = new BlockSuiteWorkspace({ + id, + schema: globalBlockSuiteSchema, + }); + + applyUpdate(bs.doc, data.data); + + return { + name: bs.meta.name, + avatar: bs.meta.avatar, + }; + } +} + +export class TestingLocalWorkspaceFactory implements WorkspaceFactory { + constructor(private readonly state: Memento) {} + + name = WorkspaceFlavour.LOCAL; + + configureWorkspace(services: ServiceCollection): void { + services + .scope(WorkspaceScope) + .addImpl(LocalBlobStorage, TestingBlobStorage, [ + WorkspaceMetadataContext, + GlobalState, + ]) + .addImpl(LocalSyncStorage, TestingSyncStorage, [ + WorkspaceMetadataContext, + GlobalState, + ]) + .addImpl(AwarenessProvider, TestingAwarenessProvider); + } + + getWorkspaceBlob(id: string, blobKey: string): Promise { + return new TestingBlobStorage( + { + flavour: WorkspaceFlavour.LOCAL, + id, + }, + this.state + ).get(blobKey); + } +} + +export class TestingSyncStorage implements SyncStorage { + constructor( + private readonly metadata: WorkspaceMetadata, + private readonly state: Memento + ) {} + name: string = 'testing'; + async pull( + docId: string, + _: Uint8Array + ): Promise<{ data: Uint8Array; state?: Uint8Array | undefined } | null> { + const key = 'testing-sync/' + this.metadata.id + '/' + docId; + const data = this.state.get(key); + if (data) { + return { data }; + } else { + return null; + } + } + async push(docId: string, data: Uint8Array): Promise { + const key = 'testing-sync/' + this.metadata.id + '/' + docId; + const oldData = this.state.get(key); + const update = mergeUpdates(oldData ? [oldData, data] : [data]); + this.state.set(key, update); + } + async subscribe( + _cb: (docId: string, data: Uint8Array) => void, + _disconnect: (reason: string) => void + ): Promise<() => void> { + return () => {}; + } +} + +export class TestingBlobStorage implements BlobStorage { + name = 'testing'; + readonly = false; + + constructor( + private readonly metadata: WorkspaceMetadata, + private readonly state: Memento + ) {} + + get(key: string) { + const storeKey = 'testing-blob/' + this.metadata.id + '/' + key; + return Promise.resolve(this.state.get(storeKey) ?? null); + } + set(key: string, value: Blob) { + const storeKey = 'testing-blob/' + this.metadata.id + '/' + key; + this.state.set(storeKey, value); + + const listKey = 'testing-blob-list/' + this.metadata.id; + const list = this.state.get>(listKey) ?? new Set(); + list.add(key); + this.state.set(listKey, list); + + return Promise.resolve(key); + } + delete(key: string) { + this.state.set(key, null); + + const listKey = 'testing-blob-list/' + this.metadata.id; + const list = this.state.get>(listKey) ?? new Set(); + list.delete(key); + this.state.set(listKey, list); + + return Promise.resolve(); + } + list() { + const listKey = 'testing-blob-list/' + this.metadata.id; + const list = this.state.get>(listKey); + return Promise.resolve(list ? Array.from(list) : []); + } +} + +export class TestingAwarenessProvider implements AwarenessProvider { + connect(): void { + /* do nothing */ + } + disconnect(): void { + /* do nothing */ + } +} diff --git a/packages/common/infra/src/workspace/upgrade.ts b/packages/common/infra/src/workspace/upgrade.ts new file mode 100644 index 0000000000000..2b57dafce4a01 --- /dev/null +++ b/packages/common/infra/src/workspace/upgrade.ts @@ -0,0 +1,142 @@ +import { Unreachable } from '@affine/env/constant'; +import { WorkspaceFlavour } from '@affine/env/workspace'; +import { Slot } from '@blocksuite/global/utils'; +import type { Workspace as BlockSuiteWorkspace } from '@blocksuite/store'; +import { applyUpdate, Doc as YDoc, encodeStateAsUpdate } from 'yjs'; + +import { checkWorkspaceCompatibility, MigrationPoint } from '../blocksuite'; +import { forceUpgradePages, upgradeV1ToV2 } from '../blocksuite'; +import { migrateGuidCompatibility } from '../blocksuite'; +import type { SyncEngine } from './engine/sync'; +import type { WorkspaceManager } from './manager'; +import { type WorkspaceMetadata } from './metadata'; + +export interface WorkspaceUpgradeStatus { + needUpgrade: boolean; + upgrading: boolean; +} + +export class WorkspaceUpgradeController { + _status: Readonly = { + needUpgrade: false, + upgrading: false, + }; + readonly onStatusChange = new Slot(); + + get status() { + return this._status; + } + + set status(value) { + if ( + value.needUpgrade !== this._status.needUpgrade || + value.upgrading !== this._status.upgrading + ) { + this._status = value; + this.onStatusChange.emit(value); + } + } + + constructor( + private readonly blockSuiteWorkspace: BlockSuiteWorkspace, + private readonly sync: SyncEngine, + private readonly workspaceMetadata: WorkspaceMetadata + ) { + blockSuiteWorkspace.doc.on('update', () => { + this.checkIfNeedUpgrade(); + }); + } + + checkIfNeedUpgrade() { + const needUpgrade = !!checkWorkspaceCompatibility( + this.blockSuiteWorkspace, + this.workspaceMetadata.flavour === WorkspaceFlavour.AFFINE_CLOUD + ); + this.status = { + ...this.status, + needUpgrade, + }; + return needUpgrade; + } + + async upgrade( + workspaceManager: WorkspaceManager + ): Promise { + if (this.status.upgrading) { + return null; + } + + this.status = { ...this.status, upgrading: true }; + + try { + await this.sync.waitForSynced(); + + const step = checkWorkspaceCompatibility( + this.blockSuiteWorkspace, + this.workspaceMetadata.flavour === WorkspaceFlavour.AFFINE_CLOUD + ); + + if (!step) { + return null; + } + + // Clone a new doc to prevent change events. + const clonedDoc = new YDoc({ + guid: this.blockSuiteWorkspace.doc.guid, + }); + applyDoc(clonedDoc, this.blockSuiteWorkspace.doc); + + if (step === MigrationPoint.SubDoc) { + const newWorkspace = await workspaceManager.createWorkspace( + WorkspaceFlavour.LOCAL, + async (workspace, blobStorage) => { + await upgradeV1ToV2(clonedDoc, workspace.doc); + migrateGuidCompatibility(clonedDoc); + await forceUpgradePages( + workspace.doc, + this.blockSuiteWorkspace.schema + ); + const blobList = await this.blockSuiteWorkspace.blob.list(); + + for (const blobKey of blobList) { + const blob = await this.blockSuiteWorkspace.blob.get(blobKey); + if (blob) { + await blobStorage.set(blobKey, blob); + } + } + } + ); + await workspaceManager.deleteWorkspace(this.workspaceMetadata); + return newWorkspace; + } else if (step === MigrationPoint.GuidFix) { + migrateGuidCompatibility(clonedDoc); + await forceUpgradePages(clonedDoc, this.blockSuiteWorkspace.schema); + applyDoc(this.blockSuiteWorkspace.doc, clonedDoc); + await this.sync.waitForSynced(); + return null; + } else if (step === MigrationPoint.BlockVersion) { + await forceUpgradePages(clonedDoc, this.blockSuiteWorkspace.schema); + applyDoc(this.blockSuiteWorkspace.doc, clonedDoc); + await this.sync.waitForSynced(); + return null; + } else { + throw new Unreachable(); + } + } finally { + this.status = { ...this.status, upgrading: false }; + } + } +} + +function applyDoc(target: YDoc, result: YDoc) { + applyUpdate(target, encodeStateAsUpdate(result)); + for (const targetSubDoc of target.subdocs.values()) { + const resultSubDocs = Array.from(result.subdocs.values()); + const resultSubDoc = resultSubDocs.find( + item => item.guid === targetSubDoc.guid + ); + if (resultSubDoc) { + applyDoc(targetSubDoc, resultSubDoc); + } + } +} diff --git a/packages/common/infra/src/workspace/workspace.ts b/packages/common/infra/src/workspace/workspace.ts new file mode 100644 index 0000000000000..4874f1a44cdab --- /dev/null +++ b/packages/common/infra/src/workspace/workspace.ts @@ -0,0 +1,134 @@ +import { DebugLogger } from '@affine/debug'; +import { Slot } from '@blocksuite/global/utils'; +import type { Workspace as BlockSuiteWorkspace } from '@blocksuite/store'; + +import type { ServiceProvider } from '../di'; +import { CleanupService } from '../lifecycle'; +import type { WorkspaceEngine} from './engine'; +import { type WorkspaceEngineStatus } from './engine'; +import { type WorkspaceMetadata } from './metadata'; +import type { + WorkspaceUpgradeController} from './upgrade'; +import { + type WorkspaceUpgradeStatus, +} from './upgrade'; + +const logger = new DebugLogger('affine:workspace'); + +export type WorkspaceStatus = { + mode: 'ready' | 'closed'; + engine: WorkspaceEngineStatus; + upgrade: WorkspaceUpgradeStatus; +}; + +/** + * # Workspace + * + * ``` + * ┌───────────┐ + * │ Workspace │ + * └─────┬─────┘ + * │ + * │ + * ┌──────────────┼─────────────┐ + * │ │ │ + * ┌───┴─────┐ ┌──────┴─────┐ ┌───┴────┐ + * │ Upgrade │ │ blocksuite │ │ Engine │ + * └─────────┘ └────────────┘ └───┬────┘ + * │ + * ┌──────┼─────────┐ + * │ │ │ + * ┌──┴─┐ ┌──┴─┐ ┌─────┴───┐ + * │sync│ │blob│ │awareness│ + * └────┘ └────┘ └─────────┘ + * ``` + * + * This class contains all the components needed to run a workspace. + */ +export class Workspace { + get id() { + return this.meta.id; + } + get flavour() { + return this.meta.flavour; + } + + private _status: WorkspaceStatus; + + onStatusChange = new Slot(); + get status() { + return this._status; + } + + set status(status: WorkspaceStatus) { + this._status = status; + this.onStatusChange.emit(status); + } + + constructor( + public meta: WorkspaceMetadata, + public engine: WorkspaceEngine, + public blockSuiteWorkspace: BlockSuiteWorkspace, + public upgrade: WorkspaceUpgradeController, + public services: ServiceProvider + ) { + this._status = { + mode: 'closed', + engine: engine.status, + upgrade: this.upgrade.status, + }; + this.engine.onStatusChange.on(status => { + this.status = { + ...this.status, + engine: status, + }; + }); + this.upgrade.onStatusChange.on(status => { + this.status = { + ...this.status, + upgrade: status, + }; + }); + + this.start(); + } + + /** + * workspace start when create and workspace is one-time use + */ + private start() { + if (this.status.mode === 'ready') { + return; + } + logger.info('start workspace', this.id); + this.engine.start(); + this.status = { + ...this.status, + mode: 'ready', + engine: this.engine.status, + }; + } + + canGracefulStop() { + return this.engine.canGracefulStop() && !this.status.upgrade.upgrading; + } + + forceStop() { + if (this.status.mode === 'closed') { + return; + } + logger.info('stop workspace', this.id); + this.engine.forceStop(); + this.status = { + ...this.status, + mode: 'closed', + engine: this.engine.status, + }; + this.services.get(CleanupService).cleanup(); + } + + // same as `WorkspaceEngine.sync.setPriorityRule` + setPriorityRule(target: ((id: string) => boolean) | null) { + this.engine.sync.setPriorityRule(target); + } +} diff --git a/packages/frontend/electron/package.json b/packages/frontend/electron/package.json index b2cd94fc190bb..8eb7d2a9a68e1 100644 --- a/packages/frontend/electron/package.json +++ b/packages/frontend/electron/package.json @@ -55,7 +55,7 @@ "lodash-es": "^4.17.21", "rxjs": "^7.8.1", "semver": "^7.5.4", - "tinykeys": "^2.1.0", + "tinykeys": "patch:tinykeys@npm%3A2.1.0#~/.yarn/patches/tinykeys-npm-2.1.0-819feeaed0.patch", "tree-kill": "^1.2.2", "ts-node": "^10.9.1", "undici": "^6.0.0", diff --git a/yarn.lock b/yarn.lock index 3db0f7499b76e..8fe422d33a4e8 100644 --- a/yarn.lock +++ b/yarn.lock @@ -477,7 +477,7 @@ __metadata: nanoid: "npm:^5.0.3" rxjs: "npm:^7.8.1" semver: "npm:^7.5.4" - tinykeys: "npm:^2.1.0" + tinykeys: "patch:tinykeys@npm%3A2.1.0#~/.yarn/patches/tinykeys-npm-2.1.0-819feeaed0.patch" tree-kill: "npm:^1.2.2" ts-node: "npm:^10.9.1" undici: "npm:^6.0.0" @@ -12816,10 +12816,11 @@ __metadata: foxact: "npm:^0.2.20" jotai: "npm:^2.5.1" jotai-effect: "npm:^0.2.3" + lodash-es: "npm:^4.17.21" nanoid: "npm:^5.0.3" react: "npm:^18.2.0" rxjs: "npm:^7.8.1" - tinykeys: "npm:^2.1.0" + tinykeys: "patch:tinykeys@npm%3A2.1.0#~/.yarn/patches/tinykeys-npm-2.1.0-819feeaed0.patch" vite: "npm:^5.0.6" vite-plugin-dts: "npm:3.7.0" vitest: "npm:1.1.3" @@ -32982,13 +32983,20 @@ __metadata: languageName: node linkType: hard -"tinykeys@npm:^2.1.0": +"tinykeys@npm:2.1.0": version: 2.1.0 resolution: "tinykeys@npm:2.1.0" checksum: 64d222e08472d1a55e42fb1f20f8c4587f7fab633cba0a23754eea3bf477044ae3160f203fd1061435352ed3df900fb49ecf83e829c090e847d7fa45f54491ad languageName: node linkType: hard +"tinykeys@patch:tinykeys@npm%3A2.1.0#~/.yarn/patches/tinykeys-npm-2.1.0-819feeaed0.patch": + version: 2.1.0 + resolution: "tinykeys@patch:tinykeys@npm%3A2.1.0#~/.yarn/patches/tinykeys-npm-2.1.0-819feeaed0.patch::version=2.1.0&hash=822e37" + checksum: 0adb15480be7747d73032959b8f04c541a5bc99e8200461887aea0c42984fae591daa1dac1bd848dfb3b7d0674b28cb2d0573775b0044e5b82b7f50f1e1dcd16 + languageName: node + linkType: hard + "tinypool@npm:^0.8.1": version: 0.8.1 resolution: "tinypool@npm:0.8.1"