-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmiddleware-registry.ts
202 lines (172 loc) · 6.34 KB
/
middleware-registry.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import type { Middleware, MiddlewareDecoratorOptions, MiddlewareRegistrarOptions } from './index.js'
import type { ComponentLogger, Logger, Startable, StreamHandler, StreamHandlerRecord, StreamHandlerOptions, Topology, IncomingStreamData } from '@libp2p/interface'
import type { Registrar } from '@libp2p/interface-internal'
/**
* A Registrar implementation that decorates protocol handlers with middleware,
* and adds them to standard registrar.
*/
export class MiddlewareRegistrar implements Registrar, Startable {
private readonly registrar: Registrar
private readonly middleware: Middleware
private readonly log: Logger
private readonly decoratedHandlers: Map<string, StreamHandler>
private readonly handlerOptions: Map<string, MiddlewareDecoratorOptions>
private readonly options: MiddlewareRegistrarOptions
private started: boolean
constructor (registrar: Registrar, middleware: Middleware, logger: ComponentLogger, options: MiddlewareRegistrarOptions = {}) {
this.registrar = registrar
this.middleware = middleware
this.log = logger.forComponent('libp2p:middleware-registrar')
this.decoratedHandlers = new Map()
this.handlerOptions = new Map()
this.options = options
this.started = false
if (this.options.exclude != null && this.options.include != null) {
throw new Error('Cannot use both include and exclude options')
}
}
readonly [Symbol.toStringTag] = '@libp2p/middleware-registrar'
/**
* Set middleware options for a specific protocol
*/
setProtocolOptions (protocol: string, options: MiddlewareDecoratorOptions): void {
this.handlerOptions.set(protocol, options)
}
/**
* Start the registry and its provider
*/
async start (): Promise<void> {
if (this.started) {
return
}
await this.middleware.start()
this.started = true
this.log('Middleware registry started')
}
/**
* Stop the registry and its provider
*/
async stop (): Promise<void> {
if (!this.started) {
return
}
await this.middleware.stop()
this.started = false
this.log('Middleware registry stopped')
}
/**
* Check if registry is started
*/
isStarted (): boolean {
return this.started
}
/**
* Get all registered protocols
*/
getProtocols (): string[] {
return this.registrar.getProtocols()
}
/**
* Get a handler for a specific protocol
*/
getHandler (protocol: string): StreamHandlerRecord {
return this.registrar.getHandler(protocol)
}
/**
* Register a handler with middleware decorating
*/
async handle (protocol: string, handler: StreamHandler, options?: StreamHandlerOptions): Promise<void> {
if (this.middleware.protocol != null) {
if (protocol === this.middleware.protocol) {
this.log(`Skipping middleware for ${protocol}, registering with standard registrar`)
await this.registrar.handle(protocol, handler, options)
return
}
}
if (this.options.include != null) {
if (!this.options.include.includes(protocol)) {
this.log(`Skipping middleware for ${protocol}, registering with standard registrar`)
await this.registrar.handle(protocol, handler, options)
return
}
}
if (this.options.exclude != null) {
if (this.options.exclude.includes(protocol)) {
this.log(`Excluding middleware for ${protocol}, registering with standard registrar`)
await this.registrar.handle(protocol, handler, options)
return
}
}
this.log(`Registering handler for ${protocol} with middleware decorator`)
// Store the original handler
this.decoratedHandlers.set(protocol, handler)
// Create a decorated handler that checks the connection's status
const decoratedHandler: StreamHandler = (data: IncomingStreamData): void => {
void this.decorateAndHandleStream(protocol, handler, data)
}
// Register the decorated handler with the original registrar
await this.registrar.handle(protocol, decoratedHandler, options)
this.log(`Successfully registered decorated handler for ${protocol}`)
}
/**
* Apply middleware and handle the stream
*/
private async decorateAndHandleStream (protocol: string, handler: StreamHandler, data: IncomingStreamData): Promise<void> {
try {
// Check if the connection has middleware already applied
// Use type assertion to handle the id property which might be missing in some Connection implementations
const connectionId = (data.connection as any).id
if (!this.middleware.isDecorated(connectionId)) {
this.log(`Applying middleware to connection ${connectionId}`)
try {
// Apply middleware to the connection
const applied = await this.middleware.decorate(connectionId)
if (!applied) {
this.log.error(`Failed to apply middleware to connection ${(data.connection as any).id}, closing connection`)
// data.stream.abort(new Error('Middleware failed'))
data.connection.abort(new Error('Middleware failed, aborted connection'))
return
}
} catch (err) {
this.log.error(`Error applying middleware to connection ${(data.connection as any).id}: ${err}`)
// data.stream.abort(new Error('Middleware failed'))
data.connection.abort(new Error('Middleware failed, aborted connection'))
return
}
}
// Connection is decorated, call the original handler
handler(data)
} catch (err) {
this.log.error(`Error in decorated handler for ${protocol}: ${err}`)
data.stream.abort(err instanceof Error ? err : new Error(String(err)))
}
}
/**
* Unregister a handler
*/
async unhandle (protocol: string): Promise<void> {
// Clean up our handler tracking
this.decoratedHandlers.delete(protocol)
this.handlerOptions.delete(protocol)
// Unregister from the original registrar
await this.registrar.unhandle(protocol)
}
/**
* Register a topology for a protocol
*/
async register (protocol: string, topology: Topology): Promise<string> {
return this.registrar.register(protocol, topology)
}
/**
* Unregister a topology
*/
unregister (id: string): void {
this.registrar.unregister(id)
}
/**
* Get registrar topologies
*/
getTopologies (protocol: string): Topology[] {
return this.registrar.getTopologies(protocol)
}
}