Skip to content

Commit

Permalink
Merge pull request #205 from oliver-oloughlin/runtime-interop
Browse files Browse the repository at this point in the history
Runtime interop
  • Loading branch information
oliver-oloughlin authored Apr 14, 2024
2 parents a28d4cd + 06dd479 commit 357312e
Show file tree
Hide file tree
Showing 22 changed files with 1,383 additions and 1,402 deletions.
30 changes: 20 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dependencies by default. It's purpose is to enhance the experience of using
Deno's KV store through additional features such as indexing, strongly typed
collections and serialization/compression, while maintaining as much of the
native functionality as possible, like atomic operations, real-time data updates
and queue listeners.
and queue listeners. Also works with other runtimes such as Node.js and Bun.

_Supported Deno verisons:_ **^1.42.0**

Expand Down Expand Up @@ -271,8 +271,13 @@ const db = kvdex(kv, {

Specify serialization for the collection. This lets large objects that exceed
the native size limit of 64kb to be stored, by serializing, compressing and
dividing the value across multiple key/value entries. There is a tradeoff
between speed and storage efficiency.
dividing the value across multiple key/value entries. When serialization is
used, there is a tradeoff between speed and storage efficiency. If the serialize
option is not set, or if a custom serialize configuration is used, then JSON
serialization is used by default for any unset serialize functions, while Brotli
compression is used for any unset compress functions. V8 serialization and
Brotli compression rely on runtime implementations, and are therefore only
compatible with runtimes that implement them (Deno, Node.js).

```ts
import { kvdex, collection, model } from "jsr:@olli/kvdex"
Expand All @@ -281,13 +286,18 @@ const kv = await Deno.openKv()

const db = kvdex(kv, {
users: collection(model<User>(), {
// Use the custom json-serializer, compatible with every runtime
// Custom JSON serializer + Brotli compression
serialize: "json",

// Use the faster built-in V8 serializer,
// only works in runtimes that implement the v8 node module
// Custom JSON serializer and no compression (best runtime compatibility)
serialize: "json-uncompressed",

// Built-in V8 serializer + Brotli compression,
serialize: "v8",

// Built-in V8 serializer and no compression
serialize: "v8-uncompressed",

// Set custom serialize, deserialize, compress and decompress functions
serialize: {
serialize: ...,
Expand Down Expand Up @@ -1405,9 +1415,9 @@ await migrate({
## Blob Storage

To store large blob sizes, and bypass the data limit of a single atomic
operation, a combination of serialized collections and batching atomic
operations can be used. By default, batching is disabled to ensure consistency
and improve performance.
operation, a combination of serialized collections and batched atomic operations
can be used. By default, batching is disabled to ensure consistency and improve
performance.

```ts
import { collection, kvdex, model } from "jsr:@olli/kvdex"
Expand All @@ -1419,7 +1429,7 @@ const db = kvdex(kv, {

const blob = // read from disk, etc.

const result = await db.blobs.add(blob, { batching: true })
const result = await db.blobs.add(blob, { batched: true })
```

## Development
Expand Down
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@olli/kvdex",
"version": "0.36.1",
"version": "1.0.0",
"exports": {
".": "./mod.ts",
"./ext/zod": "./ext/zod.ts",
Expand Down
26 changes: 26 additions & 0 deletions deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 25 additions & 20 deletions src/atomic_builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import type {
AtomicSetOptions,
CollectionOptions,
CollectionSelector,
DenoAtomicCheck,
DenoKv,
DenoKvCommitError,
DenoKvCommitResult,
DenoKvU64,
EnqueueOptions,
HistoryEntry,
KvId,
Expand Down Expand Up @@ -37,21 +42,21 @@ export class AtomicBuilder<
const TInput,
const TOutput extends KvValue,
> {
private kv: Deno.Kv
private kv: DenoKv
private schema: TSchema
private operations: Operations
private collection: Collection<TInput, TOutput, CollectionOptions<TOutput>>

/**
* Create a new AtomicBuilder for building and executing atomic operations in the KV store.
*
* @param kv - The Deno KV instance to be used.
* @param kv - The DenoKV instance to be used.
* @param schema - The database schema containing all accessible collections.
* @param collection - The collection currently in context for building atomic operations.
* @param operations - List of prepared operations from previous instance.
*/
constructor(
kv: Deno.Kv,
kv: DenoKv,
schema: TSchema,
collection: Collection<TInput, TOutput, CollectionOptions<TOutput>>,
operations?: Operations,
Expand Down Expand Up @@ -181,11 +186,11 @@ export class AtomicBuilder<

// Add delete preperation function to prepeare delete functions list
this.operations.prepareDeleteFns.push(async (kv) => {
const doc = await kv.get<KvObject>(idKey)
const doc = await kv.get(idKey)

return {
id,
data: doc.value ?? {},
data: doc.value as KvObject ?? {},
}
})
}
Expand Down Expand Up @@ -223,8 +228,8 @@ export class AtomicBuilder<
* @returns Current AtomicBuilder instance.
*/
check(...atomicChecks: AtomicCheck<TOutput>[]): this {
// Create Deno atomic checks from atomci checks input list
const checks: Deno.AtomicCheck[] = atomicChecks.map(
// Create Denoatomic checks from atomci checks input list
const checks: DenoAtomicCheck[] = atomicChecks.map(
({ id, versionstamp }) => {
const key = extendKey(this.collection._keys.id, id)
return {
Expand All @@ -243,20 +248,20 @@ export class AtomicBuilder<

/**
* Adds the given value to the value of the document with the given id.
* Sum only works for documents of type Deno.KvU64 and will throw an error for documents of any other type.
* Sum only works for documents of type DenoKvU64 and will throw an error for documents of any other type.
*
* @example
* ```ts
* db
* .atomic(schema => schema.u64s) // Select collection of Deno.KvU64 values
* .atomic(schema => schema.u64s) // Select collection of DenoKvU64 values
* .sum("num1", 100n)
* ```
*
* @param id - Id of document that contains the value to be updated.
* @param value - The value to add to the document value.
* @returns Current AtomicBuilder instance.
*/
sum(id: KvId, value: TOutput extends Deno.KvU64 ? bigint : never): this {
sum(id: KvId, value: TOutput extends DenoKvU64 ? bigint : never): this {
const idKey = extendKey(this.collection._keys.id, id)
this.operations.atomic.sum(idKey, value)
return this
Expand All @@ -265,20 +270,20 @@ export class AtomicBuilder<
/**
* Sets the document value to the minimum of the existing and the given value.
*
* min only works for documents of type Deno.KvU64 and will throw an error for documents of any other type.
* min only works for documents of type DenoKvU64 and will throw an error for documents of any other type.
*
* @example
* ```ts
* db
* .atomic(schema => schema.u64s) // Select collection of Deno.KvU64 values
* .atomic(schema => schema.u64s) // Select collection of DenoKvU64 values
* .min("num1", 100n)
* ```
*
* @param id - Id of document that contains the value to be updated.
* @param value - The value to compare with the existing value.
* @returns Current AtomicBuilder instance.
*/
min(id: KvId, value: TOutput extends Deno.KvU64 ? bigint : never): this {
min(id: KvId, value: TOutput extends DenoKvU64 ? bigint : never): this {
const idKey = extendKey(this.collection._keys.id, id)
this.operations.atomic.min(idKey, value)
return this
Expand All @@ -287,20 +292,20 @@ export class AtomicBuilder<
/**
* Sets the document value to the maximum of the existing and the given value.
*
* max only works for documents of type Deno.KvU64 and will throw an error for documents of any other type.
* max only works for documents of type DenoKvU64 and will throw an error for documents of any other type.
*
* @example
* ```ts
* db
* .atomic(schema => schema.u64s) // Select collection of Deno.KvU64 values
* .atomic(schema => schema.u64s) // Select collection of DenoKvU64 values
* .max("num1", 100n)
* ```
*
* @param id - Id of document that contains the value to be updated.
* @param value - The value to compare with the existing value.
* @returns Current AtomicBuilder instance.
*/
max(id: KvId, value: TOutput extends Deno.KvU64 ? bigint : never): this {
max(id: KvId, value: TOutput extends DenoKvU64 ? bigint : never): this {
const idKey = extendKey(this.collection._keys.id, id)
this.operations.atomic.max(idKey, value)
return this
Expand All @@ -321,7 +326,7 @@ export class AtomicBuilder<
* {
* type: "set",
* id: "num2",
* value: new Deno.KvU64(200n)
* value: new DenoKvU64(200n)
* }
* )
* ```
Expand Down Expand Up @@ -386,7 +391,7 @@ export class AtomicBuilder<
*
* @param data - Data to be added to the collection queue.
* @param options - Enqueue options, optional.
* @returns A promise resolving to Deno.KvCommitResult.
* @returns A promise resolving to DenoKvCommitResult.
*/
enqueue(data: KvValue, options?: EnqueueOptions): this {
// Prepare and add enqueue operation
Expand All @@ -407,9 +412,9 @@ export class AtomicBuilder<
* Executes the built atomic operation.
* Will always fail if trying to delete and add/set to the same indexable collection in the same operation.
*
* @returns A promise that resolves to a Deno.KvCommitResult if the operation is successful, or Deno.KvCommitError if not.
* @returns A promise that resolves to a DenoKvCommitResult if the operation is successful, or DenoKvCommitError if not.
*/
async commit(): Promise<Deno.KvCommitResult | Deno.KvCommitError> {
async commit(): Promise<DenoKvCommitResult | DenoKvCommitError> {
// Perform async mutations
for (const mut of this.operations.asyncMutations) {
await mut()
Expand Down
38 changes: 20 additions & 18 deletions src/atomic_pool.ts
Original file line number Diff line number Diff line change
@@ -1,44 +1,46 @@
import type { AtomicSetOptions } from "./types.ts"
import type {
AtomicSetOptions,
DenoAtomicCheck,
DenoAtomicOperation,
DenoKvCommitError,
DenoKvCommitResult,
DenoKvStrictKey,
} from "./types.ts"

/** Holds atomic mutations in a pool until bound to an actual atomic operation */
export class AtomicPool implements Deno.AtomicOperation {
private pool: Array<(op: Deno.AtomicOperation) => Deno.AtomicOperation>
export class AtomicPool implements DenoAtomicOperation {
private pool: Array<(op: DenoAtomicOperation) => DenoAtomicOperation>

constructor() {
this.pool = []
}

set(key: Deno.KvKey, value: unknown, options?: AtomicSetOptions) {
set(key: DenoKvStrictKey, value: unknown, options?: AtomicSetOptions) {
this.pool.push((op) => op.set(key, value, options))
return this
}

delete(key: Deno.KvKey) {
delete(key: DenoKvStrictKey) {
this.pool.push((op) => op.delete(key))
return this
}

mutate(...mutations: Deno.KvMutation[]) {
this.pool.push((op) => op.mutate(...mutations))
return this
}

check(...checks: Deno.AtomicCheck[]) {
check(...checks: DenoAtomicCheck[]) {
this.pool.push((op) => op.check(...checks))
return this
}

sum(key: Deno.KvKey, n: bigint) {
sum(key: DenoKvStrictKey, n: bigint) {
this.pool.push((op) => op.sum(key, n))
return this
}

max(key: Deno.KvKey, n: bigint) {
max(key: DenoKvStrictKey, n: bigint) {
this.pool.push((op) => op.max(key, n))
return this
}

min(key: Deno.KvKey, n: bigint): this {
min(key: DenoKvStrictKey, n: bigint): this {
this.pool.push((op) => op.min(key, n))
return this
}
Expand All @@ -47,18 +49,18 @@ export class AtomicPool implements Deno.AtomicOperation {
value: unknown,
options?: {
delay?: number | undefined
keysIfUndelivered?: Deno.KvKey[] | undefined
} | undefined,
keysIfUndelivered?: DenoKvStrictKey[]
},
) {
this.pool.push((op) => op.enqueue(value, options))
return this
}

commit(): Promise<Deno.KvCommitResult | Deno.KvCommitError> {
commit(): Promise<DenoKvCommitResult | DenoKvCommitError> {
throw Error("Not Implemented")
}

bindTo(atomic: Deno.AtomicOperation) {
bindTo(atomic: DenoAtomicOperation) {
this.pool.forEach((mutation) => mutation(atomic))
}
}
Loading

0 comments on commit 357312e

Please sign in to comment.