@@ -11,7 +11,20 @@ import { WriteConcern } from '../../../src/write_concern';
11
11
import { ReadPreference } from '../../../src/read_preference' ;
12
12
import { ClientSession } from '../../../src/sessions' ;
13
13
import { ChangeStream } from '../../../src/change_stream' ;
14
+ import { FindCursor } from '../../../src/cursor/find_cursor' ;
14
15
import type { ClientEntity , EntityDescription } from './schema' ;
16
+ import type {
17
+ ConnectionPoolCreatedEvent ,
18
+ ConnectionPoolClosedEvent ,
19
+ ConnectionCreatedEvent ,
20
+ ConnectionReadyEvent ,
21
+ ConnectionClosedEvent ,
22
+ ConnectionCheckOutStartedEvent ,
23
+ ConnectionCheckOutFailedEvent ,
24
+ ConnectionCheckedOutEvent ,
25
+ ConnectionCheckedInEvent ,
26
+ ConnectionPoolClearedEvent
27
+ } from '../../../src/cmap/connection_pool_events' ;
15
28
import type {
16
29
CommandFailedEvent ,
17
30
CommandStartedEvent ,
@@ -26,6 +39,17 @@ interface UnifiedChangeStream extends ChangeStream {
26
39
}
27
40
28
41
export type CommandEvent = CommandStartedEvent | CommandSucceededEvent | CommandFailedEvent ;
42
+ export type CmapEvent =
43
+ | ConnectionPoolCreatedEvent
44
+ | ConnectionPoolClosedEvent
45
+ | ConnectionCreatedEvent
46
+ | ConnectionReadyEvent
47
+ | ConnectionClosedEvent
48
+ | ConnectionCheckOutStartedEvent
49
+ | ConnectionCheckOutFailedEvent
50
+ | ConnectionCheckedOutEvent
51
+ | ConnectionCheckedInEvent
52
+ | ConnectionPoolClearedEvent ;
29
53
30
54
function serverApiConfig ( ) {
31
55
if ( process . env . MONGODB_API_VERSION ) {
@@ -38,52 +62,105 @@ function getClient(address) {
38
62
return new MongoClient ( `mongodb://${ address } ` , serverApi ? { serverApi } : { } ) ;
39
63
}
40
64
65
+ type PushFunction = ( e : CommandEvent | CmapEvent ) => void ;
66
+
41
67
export class UnifiedMongoClient extends MongoClient {
42
- events : CommandEvent [ ] ;
68
+ commandEvents : CommandEvent [ ] ;
69
+ cmapEvents : CmapEvent [ ] ;
43
70
failPoints : Document [ ] ;
44
71
ignoredEvents : string [ ] ;
45
- observedEvents : ( 'commandStarted' | 'commandSucceeded' | 'commandFailed' ) [ ] ;
72
+ observedCommandEvents : ( 'commandStarted' | 'commandSucceeded' | 'commandFailed' ) [ ] ;
73
+ observedCmapEvents : (
74
+ | 'connectionPoolCreated'
75
+ | 'connectionPoolClosed'
76
+ | 'connectionPoolCleared'
77
+ | 'connectionCreated'
78
+ | 'connectionReady'
79
+ | 'connectionClosed'
80
+ | 'connectionCheckOutStarted'
81
+ | 'connectionCheckOutFailed'
82
+ | 'connectionCheckedOut'
83
+ | 'connectionCheckedIn'
84
+ ) [ ] ;
46
85
47
- static EVENT_NAME_LOOKUP = {
86
+ static COMMAND_EVENT_NAME_LOOKUP = {
48
87
commandStartedEvent : 'commandStarted' ,
49
88
commandSucceededEvent : 'commandSucceeded' ,
50
89
commandFailedEvent : 'commandFailed'
51
90
} as const ;
52
91
92
+ static CMAP_EVENT_NAME_LOOKUP = {
93
+ poolCreatedEvent : 'connectionPoolCreated' ,
94
+ poolClosedEvent : 'connectionPoolClosed' ,
95
+ poolClearedEvent : 'connectionPoolCleared' ,
96
+ connectionCreatedEvent : 'connectionCreated' ,
97
+ connectionReadyEvent : 'connectionReady' ,
98
+ connectionClosedEvent : 'connectionClosed' ,
99
+ connectionCheckOutStartedEvent : 'connectionCheckOutStarted' ,
100
+ connectionCheckOutFailedEvent : 'connectionCheckOutFailed' ,
101
+ connectionCheckedOutEvent : 'connectionCheckedOut' ,
102
+ connectionCheckedInEvent : 'connectionCheckedIn'
103
+ } as const ;
104
+
53
105
constructor ( url : string , description : ClientEntity ) {
54
106
super ( url , {
55
107
monitorCommands : true ,
56
108
...description . uriOptions ,
57
109
serverApi : description . serverApi ? description . serverApi : serverApiConfig ( )
58
110
} ) ;
59
- this . events = [ ] ;
111
+ this . commandEvents = [ ] ;
112
+ this . cmapEvents = [ ] ;
60
113
this . failPoints = [ ] ;
61
114
this . ignoredEvents = [
62
115
...( description . ignoreCommandMonitoringEvents ?? [ ] ) ,
63
116
'configureFailPoint'
64
117
] ;
65
- // apm
66
- this . observedEvents = ( description . observeEvents ?? [ ] ) . map (
67
- e => UnifiedMongoClient . EVENT_NAME_LOOKUP [ e ]
68
- ) ;
69
- for ( const eventName of this . observedEvents ) {
70
- this . on ( eventName , this . pushEvent ) ;
118
+ this . observedCommandEvents = ( description . observeEvents ?? [ ] )
119
+ . map ( e => UnifiedMongoClient . COMMAND_EVENT_NAME_LOOKUP [ e ] )
120
+ . filter ( e => ! ! e ) ;
121
+ this . observedCmapEvents = ( description . observeEvents ?? [ ] )
122
+ . map ( e => UnifiedMongoClient . CMAP_EVENT_NAME_LOOKUP [ e ] )
123
+ . filter ( e => ! ! e ) ;
124
+ for ( const eventName of this . observedCommandEvents ) {
125
+ this . on ( eventName , this . pushCommandEvent ) ;
126
+ }
127
+ for ( const eventName of this . observedCmapEvents ) {
128
+ this . on ( eventName , this . pushCmapEvent ) ;
71
129
}
72
130
}
73
131
74
- // NOTE: pushEvent must be an arrow function
75
- pushEvent : ( e : CommandEvent ) => void = e => {
76
- if ( ! this . ignoredEvents . includes ( e . commandName ) ) {
77
- this . events . push ( e ) ;
132
+ isIgnored ( e : CommandEvent | CmapEvent ) : boolean {
133
+ return this . ignoredEvents . includes ( e . commandName ) ;
134
+ }
135
+
136
+ // NOTE: pushCommandEvent must be an arrow function
137
+ pushCommandEvent : ( e : CommandEvent ) => void = e => {
138
+ if ( ! this . isIgnored ( e ) ) {
139
+ this . commandEvents . push ( e ) ;
78
140
}
79
141
} ;
80
142
81
- /** Disables command monitoring for the client and returns a list of the captured events. */
82
- stopCapturingEvents ( ) : CommandEvent [ ] {
83
- for ( const eventName of this . observedEvents ) {
84
- this . off ( eventName , this . pushEvent ) ;
143
+ // NOTE: pushCmapEvent must be an arrow function
144
+ pushCmapEvent : ( e : CmapEvent ) => void = e => {
145
+ this . cmapEvents . push ( e ) ;
146
+ } ;
147
+
148
+ stopCapturingEvents ( pushFn : PushFunction ) : void {
149
+ const observedEvents = this . observedCommandEvents . concat ( this . observedCmapEvents ) ;
150
+ for ( const eventName of observedEvents ) {
151
+ this . off ( eventName , pushFn ) ;
85
152
}
86
- return this . events ;
153
+ }
154
+
155
+ /** Disables command monitoring for the client and returns a list of the captured events. */
156
+ stopCapturingCommandEvents ( ) : CommandEvent [ ] {
157
+ this . stopCapturingEvents ( this . pushCommandEvent ) ;
158
+ return this . commandEvents ;
159
+ }
160
+
161
+ stopCapturingCmapEvents ( ) : CmapEvent [ ] {
162
+ this . stopCapturingEvents ( this . pushCmapEvent ) ;
163
+ return this . cmapEvents ;
87
164
}
88
165
}
89
166
@@ -137,6 +214,7 @@ export type Entity =
137
214
| Db
138
215
| Collection
139
216
| ClientSession
217
+ | FindCursor
140
218
| UnifiedChangeStream
141
219
| GridFSBucket
142
220
| Document ; // Results from operations
@@ -147,16 +225,25 @@ export type EntityCtor =
147
225
| typeof Collection
148
226
| typeof ClientSession
149
227
| typeof ChangeStream
228
+ | typeof FindCursor
150
229
| typeof GridFSBucket ;
151
230
152
- export type EntityTypeId = 'client' | 'db' | 'collection' | 'session' | 'bucket' | 'stream' ;
231
+ export type EntityTypeId =
232
+ | 'client'
233
+ | 'db'
234
+ | 'collection'
235
+ | 'session'
236
+ | 'bucket'
237
+ | 'cursor'
238
+ | 'stream' ;
153
239
154
240
const ENTITY_CTORS = new Map < EntityTypeId , EntityCtor > ( ) ;
155
241
ENTITY_CTORS . set ( 'client' , UnifiedMongoClient ) ;
156
242
ENTITY_CTORS . set ( 'db' , Db ) ;
157
243
ENTITY_CTORS . set ( 'collection' , Collection ) ;
158
244
ENTITY_CTORS . set ( 'session' , ClientSession ) ;
159
245
ENTITY_CTORS . set ( 'bucket' , GridFSBucket ) ;
246
+ ENTITY_CTORS . set ( 'cursor' , FindCursor ) ;
160
247
ENTITY_CTORS . set ( 'stream' , ChangeStream ) ;
161
248
162
249
export class EntitiesMap < E = Entity > extends Map < string , E > {
@@ -172,6 +259,7 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
172
259
mapOf ( type : 'collection' ) : EntitiesMap < Collection > ;
173
260
mapOf ( type : 'session' ) : EntitiesMap < ClientSession > ;
174
261
mapOf ( type : 'bucket' ) : EntitiesMap < GridFSBucket > ;
262
+ mapOf ( type : 'cursor' ) : EntitiesMap < FindCursor > ;
175
263
mapOf ( type : 'stream' ) : EntitiesMap < UnifiedChangeStream > ;
176
264
mapOf ( type : EntityTypeId ) : EntitiesMap < Entity > {
177
265
const ctor = ENTITY_CTORS . get ( type ) ;
@@ -186,6 +274,7 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
186
274
getEntity ( type : 'collection' , key : string , assertExists ?: boolean ) : Collection ;
187
275
getEntity ( type : 'session' , key : string , assertExists ?: boolean ) : ClientSession ;
188
276
getEntity ( type : 'bucket' , key : string , assertExists ?: boolean ) : GridFSBucket ;
277
+ getEntity ( type : 'cursor' , key : string , assertExists ?: boolean ) : FindCursor ;
189
278
getEntity ( type : 'stream' , key : string , assertExists ?: boolean ) : UnifiedChangeStream ;
190
279
getEntity ( type : EntityTypeId , key : string , assertExists = true ) : Entity {
191
280
const entity = this . get ( key ) ;
@@ -205,11 +294,17 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
205
294
206
295
async cleanup ( ) : Promise < void > {
207
296
await this . failPoints . disableFailPoints ( ) ;
208
- for ( const [ , client ] of this . mapOf ( 'client' ) ) {
209
- await client . close ( ) ;
297
+ for ( const [ , cursor ] of this . mapOf ( 'cursor' ) ) {
298
+ await cursor . close ( ) ;
299
+ }
300
+ for ( const [ , stream ] of this . mapOf ( 'stream' ) ) {
301
+ await stream . close ( ) ;
210
302
}
211
303
for ( const [ , session ] of this . mapOf ( 'session' ) ) {
212
- await session . endSession ( ) ;
304
+ await session . endSession ( { force : true } ) ;
305
+ }
306
+ for ( const [ , client ] of this . mapOf ( 'client' ) ) {
307
+ await client . close ( ) ;
213
308
}
214
309
this . clear ( ) ;
215
310
}
@@ -222,7 +317,8 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
222
317
for ( const entity of entities ?? [ ] ) {
223
318
if ( 'client' in entity ) {
224
319
const useMultipleMongoses =
225
- config . topologyType === 'Sharded' && entity . client . useMultipleMongoses ;
320
+ ( config . topologyType === 'LoadBalanced' || config . topologyType === 'Sharded' ) &&
321
+ entity . client . useMultipleMongoses ;
226
322
const uri = config . url ( { useMultipleMongoses } ) ;
227
323
const client = new UnifiedMongoClient ( uri , entity . client ) ;
228
324
await client . connect ( ) ;
0 commit comments