Skip to content

Commit 0a3406a

Browse files
committed
feat: add debounce and repeating task to utils (#2795)
This functionality is required in multiple places so add it to the utils module.
1 parent ad5cfd6 commit 0a3406a

File tree

6 files changed

+238
-0
lines changed

6 files changed

+238
-0
lines changed

packages/utils/package.json

+8
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@
6868
"types": "./dist/src/close-source.d.ts",
6969
"import": "./dist/src/close-source.js"
7070
},
71+
"./debounce": {
72+
"types": "./dist/src/debounce.d.ts",
73+
"import": "./dist/src/debounce.js"
74+
},
7175
"./filters": {
7276
"types": "./dist/src/filters/index.d.ts",
7377
"import": "./dist/src/filters/index.js"
@@ -112,6 +116,10 @@
112116
"types": "./dist/src/rate-limiter.d.ts",
113117
"import": "./dist/src/rate-limiter.js"
114118
},
119+
"./repeating-task": {
120+
"types": "./dist/src/repeating-task.d.ts",
121+
"import": "./dist/src/repeating-task.js"
122+
},
115123
"./stream-to-ma-conn": {
116124
"types": "./dist/src/stream-to-ma-conn.d.ts",
117125
"import": "./dist/src/stream-to-ma-conn.js"

packages/utils/src/debounce.ts

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import type { Startable } from '@libp2p/interface'
2+
3+
export interface DebouncedFunction extends Startable {
4+
(): void
5+
}
6+
7+
/**
8+
* Returns a function wrapper that will only call the passed function once
9+
*
10+
* Important - the passed function should not throw or reject
11+
*/
12+
export function debounce (func: () => void | Promise<void>, wait: number): DebouncedFunction {
13+
let timeout: ReturnType<typeof setTimeout> | undefined
14+
15+
const output = function (): void {
16+
const later = function (): void {
17+
timeout = undefined
18+
void func()
19+
}
20+
21+
clearTimeout(timeout)
22+
timeout = setTimeout(later, wait)
23+
}
24+
output.start = () => {}
25+
output.stop = () => {
26+
clearTimeout(timeout)
27+
}
28+
29+
return output
30+
}

packages/utils/src/repeating-task.ts

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import { setMaxListeners } from '@libp2p/interface'
2+
import { anySignal } from 'any-signal'
3+
import type { AbortOptions } from '@libp2p/interface'
4+
5+
export interface RepeatingTask {
6+
start(): void
7+
stop(): void
8+
}
9+
10+
export interface RepeatingTaskOptions {
11+
/**
12+
* How long the task is allowed to run before the passed AbortSignal fires an
13+
* abort event
14+
*/
15+
timeout?: number
16+
17+
/**
18+
* Whether to schedule the task to run immediately
19+
*/
20+
runImmediately?: boolean
21+
}
22+
23+
export function repeatingTask (fn: (options?: AbortOptions) => void | Promise<void>, interval: number, options?: RepeatingTaskOptions): RepeatingTask {
24+
let timeout: ReturnType<typeof setTimeout>
25+
let shutdownController: AbortController
26+
27+
function runTask (): void {
28+
const opts: AbortOptions = {
29+
signal: shutdownController.signal
30+
}
31+
32+
if (options?.timeout != null) {
33+
const signal = anySignal([shutdownController.signal, AbortSignal.timeout(options.timeout)])
34+
setMaxListeners(Infinity, signal)
35+
36+
opts.signal = signal
37+
}
38+
39+
Promise.resolve().then(async () => {
40+
await fn(opts)
41+
})
42+
.catch(() => {})
43+
.finally(() => {
44+
if (shutdownController.signal.aborted) {
45+
// task has been cancelled, bail
46+
return
47+
}
48+
49+
// reschedule
50+
timeout = setTimeout(runTask, interval)
51+
})
52+
}
53+
54+
let started = false
55+
56+
return {
57+
start: () => {
58+
if (started) {
59+
return
60+
}
61+
62+
started = true
63+
shutdownController = new AbortController()
64+
setMaxListeners(Infinity, shutdownController.signal)
65+
66+
// run now
67+
if (options?.runImmediately === true) {
68+
queueMicrotask(() => {
69+
runTask()
70+
})
71+
} else {
72+
// run later
73+
timeout = setTimeout(runTask, interval)
74+
}
75+
},
76+
stop: () => {
77+
clearTimeout(timeout)
78+
shutdownController?.abort()
79+
started = false
80+
}
81+
}
82+
}

packages/utils/test/debounce.spec.ts

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { stop } from '@libp2p/interface'
2+
import { expect } from 'aegir/chai'
3+
import delay from 'delay'
4+
import { debounce } from '../src/debounce.js'
5+
6+
describe('debounce', () => {
7+
it('should debounce function', async () => {
8+
let invocations = 0
9+
const fn = (): void => {
10+
invocations++
11+
}
12+
13+
const debounced = debounce(fn, 10)
14+
15+
debounced()
16+
debounced()
17+
debounced()
18+
debounced()
19+
debounced()
20+
21+
await delay(500)
22+
23+
expect(invocations).to.equal(1)
24+
})
25+
26+
it('should cancel debounced function', async () => {
27+
let invocations = 0
28+
const fn = (): void => {
29+
invocations++
30+
}
31+
32+
const debounced = debounce(fn, 10000)
33+
34+
debounced()
35+
debounced()
36+
debounced()
37+
debounced()
38+
debounced()
39+
40+
await stop(debounced)
41+
42+
await delay(500)
43+
44+
expect(invocations).to.equal(0)
45+
})
46+
})
+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import { expect } from 'aegir/chai'
2+
import delay from 'delay'
3+
import pDefer from 'p-defer'
4+
import { repeatingTask } from '../src/repeating-task.js'
5+
6+
describe('repeating-task', () => {
7+
it('should repeat a task', async () => {
8+
let count = 0
9+
10+
const task = repeatingTask(() => {
11+
count++
12+
}, 100)
13+
task.start()
14+
15+
await delay(1000)
16+
17+
task.stop()
18+
19+
expect(count).to.be.greaterThan(1)
20+
})
21+
22+
it('should run a task immediately', async () => {
23+
let count = 0
24+
25+
const task = repeatingTask(() => {
26+
count++
27+
}, 60000, {
28+
runImmediately: true
29+
})
30+
task.start()
31+
32+
await delay(10)
33+
34+
task.stop()
35+
36+
expect(count).to.equal(1)
37+
})
38+
39+
it('should time out a task', async () => {
40+
const deferred = pDefer()
41+
42+
const task = repeatingTask((opts) => {
43+
opts?.signal?.addEventListener('abort', () => {
44+
deferred.resolve()
45+
})
46+
}, 100, {
47+
timeout: 10
48+
})
49+
task.start()
50+
51+
await deferred.promise
52+
task.stop()
53+
})
54+
55+
it('should repeat a task that throws', async () => {
56+
let count = 0
57+
58+
const task = repeatingTask(() => {
59+
count++
60+
throw new Error('Urk!')
61+
}, 100)
62+
task.start()
63+
64+
await delay(1000)
65+
66+
task.stop()
67+
68+
expect(count).to.be.greaterThan(1)
69+
})
70+
})

packages/utils/typedoc.json

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
"./src/array-equals.ts",
88
"./src/close.ts",
99
"./src/close-source.ts",
10+
"./src/debounce.ts",
1011
"./src/filters/index.ts",
1112
"./src/ip-port-to-multiaddr.ts",
1213
"./src/is-promise.ts",
@@ -18,6 +19,7 @@
1819
"./src/private-ip.ts",
1920
"./src/queue/index.ts",
2021
"./src/rate-limiter.ts",
22+
"./src/repeating-task.ts",
2123
"./src/stream-to-ma-conn.ts",
2224
"./src/tracked-list.ts",
2325
"./src/tracked-map.ts"

0 commit comments

Comments
 (0)