Skip to content

Commit

Permalink
feat(infra): new workspace infra
Browse files Browse the repository at this point in the history
  • Loading branch information
EYHN committed Jan 19, 2024
1 parent f00bda7 commit 1b21c01
Show file tree
Hide file tree
Showing 37 changed files with 3,110 additions and 6 deletions.
54 changes: 54 additions & 0 deletions .yarn/patches/tinykeys-npm-2.1.0-819feeaed0.patch

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion packages/common/infra/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
"foxact": "^0.2.20",
"jotai": "^2.5.1",
"jotai-effect": "^0.2.3",
"lodash-es": "^4.17.21",
"nanoid": "^5.0.3",
"react": "18.2.0",
"tinykeys": "^2.1.0",
"tinykeys": "patch:tinykeys@npm%3A2.1.0#~/.yarn/patches/tinykeys-npm-2.1.0-819feeaed0.patch",
"yjs": "^13.6.10",
"zod": "^3.22.4"
},
Expand Down
9 changes: 8 additions & 1 deletion packages/common/infra/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,24 @@ export * from './command';
export * from './di';
export * from './livedata';
export * from './storage';
export * from './utils';
export * from './workspace';

import type { ServiceCollection } from './di';
import { CleanupService } from './lifecycle';
import { GlobalCache, GlobalState, MemoryMemento } from './storage';
import {
configureTestingWorkspaceServices,
configureWorkspaceServices,
} from './workspace';

export function configureInfraServices(services: ServiceCollection) {
services.add(CleanupService);
configureWorkspaceServices(services);
}

export function configureTestingInfraServices(services: ServiceCollection) {
configureInfraServices(services);
configureTestingWorkspaceServices(services);
services.addImpl(GlobalCache, MemoryMemento);
services.addImpl(GlobalState, MemoryMemento);
}
45 changes: 45 additions & 0 deletions packages/common/infra/src/utils/__tests__/async-queue.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { describe, expect, test, vi } from 'vitest';

import { AsyncQueue } from '../async-queue';

describe('async-queue', () => {
test('push & pop', async () => {
const queue = new AsyncQueue();
queue.push(1, 2, 3);
expect(queue.length).toBe(3);
expect(await queue.next()).toBe(1);
expect(await queue.next()).toBe(2);
expect(await queue.next()).toBe(3);
expect(queue.length).toBe(0);
});

test('await', async () => {
const queue = new AsyncQueue<number>();
queue.push(1, 2);
expect(await queue.next()).toBe(1);
expect(await queue.next()).toBe(2);

let v = -1;

// setup 2 pop tasks
queue.next().then(next => {
v = next;
});
queue.next().then(next => {
v = next;
});

// Wait for 100ms
await new Promise(resolve => setTimeout(resolve, 100));
// v should not be changed
expect(v).toBe(-1);

// push 3, should trigger the first pop task
queue.push(3);
await vi.waitFor(() => v === 3);

// push 4, should trigger the second pop task
queue.push(4);
await vi.waitFor(() => v === 4);
});
});
13 changes: 13 additions & 0 deletions packages/common/infra/src/utils/__tests__/throw-if-aborted.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { describe, expect, test } from 'vitest';

import { throwIfAborted } from '../throw-if-aborted';

describe('throw-if-aborted', () => {
test('basic', async () => {
const abortController = new AbortController();
const abortSignal = abortController.signal;
expect(throwIfAborted(abortSignal)).toBe(true);
abortController.abort('TEST_ABORT');
expect(() => throwIfAborted(abortSignal)).toThrowError('TEST_ABORT');
});
});
101 changes: 101 additions & 0 deletions packages/common/infra/src/utils/async-queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
export class AsyncQueue<T> {
private _queue: T[];

private _resolveUpdate: (() => void) | null = null;
private _waitForUpdate: Promise<void> | null = null;

constructor(init: T[] = []) {
this._queue = init;
}

get length() {
return this._queue.length;
}

async next(
abort?: AbortSignal,
dequeue: (arr: T[]) => T | undefined = a => a.shift()
): Promise<T> {
const update = dequeue(this._queue);
if (update) {
return update;
} else {
if (!this._waitForUpdate) {
this._waitForUpdate = new Promise(resolve => {
this._resolveUpdate = resolve;
});
}

await Promise.race([
this._waitForUpdate,
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);

return this.next(abort, dequeue);
}
}

push(...updates: T[]) {
this._queue.push(...updates);
if (this._resolveUpdate) {
const resolve = this._resolveUpdate;
this._resolveUpdate = null;
this._waitForUpdate = null;
resolve();
}
}

remove(predicate: (update: T) => boolean) {
const index = this._queue.findIndex(predicate);
if (index !== -1) {
this._queue.splice(index, 1);
}
}

find(predicate: (update: T) => boolean) {
return this._queue.find(predicate);
}

clear() {
this._queue = [];
}
}

export class PriorityAsyncQueue<
T extends { id: string },
> extends AsyncQueue<T> {
constructor(
init: T[] = [],
public readonly priorityTarget: SharedPriorityTarget = new SharedPriorityTarget()
) {
super(init);
}

override next(abort?: AbortSignal | undefined): Promise<T> {
return super.next(abort, arr => {
if (this.priorityTarget.priorityRule !== null) {
const index = arr.findIndex(
update => this.priorityTarget.priorityRule?.(update.id)
);
if (index !== -1) {
return arr.splice(index, 1)[0];
}
}
return arr.shift();
});
}
}

/**
* Shared priority target can be shared by multiple queues.
*/
export class SharedPriorityTarget {
public priorityRule: ((id: string) => boolean) | null = null;
}
5 changes: 5 additions & 0 deletions packages/common/infra/src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export * from './async-queue';
export * from './merge-updates';
export * from './object-pool';
export * from './stable-hash';
export * from './throw-if-aborted';
17 changes: 17 additions & 0 deletions packages/common/infra/src/utils/merge-updates.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { applyUpdate, Doc, encodeStateAsUpdate } from 'yjs';

export function mergeUpdates(updates: Uint8Array[]) {
if (updates.length === 0) {
return new Uint8Array();
}
if (updates.length === 1) {
return updates[0];
}
const doc = new Doc();
doc.transact(() => {
updates.forEach(update => {
applyUpdate(doc, update);
});
});
return encodeStateAsUpdate(doc);
}
96 changes: 96 additions & 0 deletions packages/common/infra/src/utils/object-pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { Unreachable } from '@affine/env/constant';

export interface RcRef<T> {
obj: T;
release: () => void;
}

export class ObjectPool<Key, T> {
objects = new Map<Key, { obj: T; rc: number }>();
timeoutToGc: NodeJS.Timeout | null = null;

constructor(
private readonly options: {
onDelete?: (obj: T) => void;
onDangling?: (obj: T) => boolean;
} = {}
) {}

get(key: Key): RcRef<T> | null {
const exist = this.objects.get(key);
if (exist) {
exist.rc++;
let released = false;
return {
obj: exist.obj,
release: () => {
// avoid double release
if (released) {
return;
}
released = true;
exist.rc--;
this.requestGc();
},
};
}
return null;
}

put(key: Key, obj: T) {
const ref = { obj, rc: 0 };
this.objects.set(key, ref);

const r = this.get(key);
if (!r) {
throw new Unreachable();
}

return r;
}

private requestGc() {
if (this.timeoutToGc) {
clearInterval(this.timeoutToGc);
}

// do gc every 1s
this.timeoutToGc = setInterval(() => {
this.gc();
}, 1000);
}

private gc() {
for (const [key, { obj, rc }] of new Map(
this.objects /* clone the map, because the origin will be modified during iteration */
)) {
if (
rc === 0 &&
(!this.options.onDangling || this.options.onDangling(obj))
) {
this.options.onDelete?.(obj);

this.objects.delete(key);
}
}

for (const [_, { rc }] of this.objects) {
if (rc === 0) {
return;
}
}

// if all object has referrer, stop gc
if (this.timeoutToGc) {
clearInterval(this.timeoutToGc);
}
}

clear() {
for (const { obj } of this.objects.values()) {
this.options.onDelete?.(obj);
}

this.objects.clear();
}
}
9 changes: 9 additions & 0 deletions packages/common/infra/src/utils/throw-if-aborted.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// because AbortSignal.throwIfAborted is not available in abortcontroller-polyfill
export function throwIfAborted(abort?: AbortSignal) {
if (abort?.aborted) {
throw new Error(abort.reason);
}
return true;
}

export const MANUALLY_STOP = 'manually-stop';
38 changes: 38 additions & 0 deletions packages/common/infra/src/workspace/__tests__/workspace.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { WorkspaceFlavour } from '@affine/env/workspace';
import { describe, expect, test } from 'vitest';

import { configureInfraServices, configureTestingInfraServices } from '../..';
import { ServiceCollection } from '../../di';
import { WorkspaceListService, WorkspaceManager } from '../';

describe('Workspace System', () => {
test('create workspace', async () => {
const services = new ServiceCollection();
configureInfraServices(services);
configureTestingInfraServices(services);

const provider = services.provider();
const workspaceManager = provider.get(WorkspaceManager);
const workspaceListService = provider.get(WorkspaceListService);
expect(workspaceListService.workspaceList.value.length).toBe(0);

const { workspace } = workspaceManager.open(
await workspaceManager.createWorkspace(WorkspaceFlavour.LOCAL)
);

expect(workspaceListService.workspaceList.value.length).toBe(1);

const page = workspace.blockSuiteWorkspace.createPage({
id: 'page0',
});
await page.load();
page.addBlock('affine:page', {
title: new page.Text('test-page'),
});

expect(workspace.blockSuiteWorkspace.pages.size).toBe(1);
expect(
(page!.getBlockByFlavour('affine:page')[0] as any).title.toString()
).toBe('test-page');
});
});
Loading

0 comments on commit 1b21c01

Please sign in to comment.