Skip to content

Commit 647b540

Browse files
author
Jan Kryl
committed
fix(moac): serialize volume creation
As part of that, request cache has been implemented, that is used to detect duplicate CSI requests. All of this improves stability of the moac if it gets under the load. e2e stress tests are out of scope and will be implemented on behalf of a different ticket. Minor improvement losely related to the changes is workq getting converted to typescript to take advantage of type checking. Resolves: CAS-673
1 parent d23adb4 commit 647b540

14 files changed

+431
-116
lines changed

csi/moac/.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@
1212
/volumes.js
1313
/volume_operator.js
1414
/watcher.js
15+
/workq.js
1516
/*.js.map

csi/moac/csi.ts

+94-8
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,36 @@ function createK8sVolumeObject (volume: Volume): K8sVolume {
114114
return obj;
115115
}
116116

117+
// Duplicate request cache entry helps to detect retransmits of the same request
118+
//
119+
// This may seem like a useless thing but k8s is agressive on retransmitting
120+
// requests. The first retransmit happens just a tens of ms after the original
121+
// request. Having many requests that are the same in progress creates havoc
122+
// and forces mayastor to execute repeating code.
123+
//
124+
// NOTE: Assumption is that k8s doesn't submit duplicate request for the same
125+
// volume (the same uuid) with different parameters.
126+
//
127+
class Request {
128+
uuid: string; // ID of the object in the operation
129+
op: string; // name of the operation
130+
callbacks: CsiDoneCb[]; // callbacks to call when done
131+
132+
constructor (uuid: string, op: string, cb: CsiDoneCb) {
133+
this.uuid = uuid;
134+
this.op = op;
135+
this.callbacks = [cb];
136+
}
137+
138+
wait (cb: CsiDoneCb) {
139+
this.callbacks.push(cb);
140+
}
141+
142+
done (err: any, resp?: any) {
143+
this.callbacks.forEach((cb) => cb(err, resp));
144+
}
145+
}
146+
117147
// CSI Controller implementation.
118148
//
119149
// It implements Identity and Controller grpc services from csi proto file.
@@ -127,6 +157,7 @@ class CsiServer {
127157
private sockPath: string;
128158
private nextListContextId: number;
129159
private listContexts: Record<string, ListContext>;
160+
private duplicateRequestCache: Request[];
130161

131162
// Creates new csi server
132163
//
@@ -139,6 +170,7 @@ class CsiServer {
139170
this.sockPath = sockPath;
140171
this.nextListContextId = 1;
141172
this.listContexts = {};
173+
this.duplicateRequestCache = [];
142174

143175
// The data returned by identity service should be kept in sync with
144176
// responses for the same methods on storage node.
@@ -255,6 +287,32 @@ class CsiServer {
255287
this.ready = false;
256288
}
257289

290+
// Find outstanding request by uuid and operation type.
291+
_findRequest (uuid: string, op: string): Request | undefined {
292+
return this.duplicateRequestCache.find((e) => e.uuid === uuid && e.op === op);
293+
}
294+
295+
_beginRequest (uuid: string, op: string, cb: CsiDoneCb): Request | undefined {
296+
let request = this._findRequest(uuid, op);
297+
if (request) {
298+
log.debug(`Duplicate ${op} volume request detected`);
299+
request.wait(cb);
300+
return;
301+
}
302+
request = new Request(uuid, op, cb);
303+
this.duplicateRequestCache.push(request);
304+
return request;
305+
}
306+
307+
// Remove request entry from the cache and call done callbacks.
308+
_endRequest (request: Request, err: any, resp?: any) {
309+
let idx = this.duplicateRequestCache.indexOf(request);
310+
if (idx >= 0) {
311+
this.duplicateRequestCache.splice(idx, 1);
312+
}
313+
request.done(err, resp);
314+
}
315+
258316
//
259317
// Implementation of CSI identity methods
260318
//
@@ -400,6 +458,12 @@ class CsiServer {
400458
count = 1;
401459
}
402460

461+
// If this is a duplicate request then assure it is executed just once.
462+
let request = this._beginRequest(uuid, 'create', cb);
463+
if (!request) {
464+
return;
465+
}
466+
403467
// create the volume
404468
let volume;
405469
try {
@@ -412,12 +476,14 @@ class CsiServer {
412476
protocol: protocol
413477
});
414478
} catch (err) {
415-
return cb(err);
479+
this._endRequest(request, err);
480+
return;
416481
}
417482

418483
// This was used in the old days for NBD protocol
419484
const accessibleTopology: TopologyKeys[] = [];
420-
cb(null, {
485+
486+
this._endRequest(request, null, {
421487
volume: {
422488
capacityBytes: volume.getSize(),
423489
volumeId: uuid,
@@ -437,13 +503,19 @@ class CsiServer {
437503

438504
log.debug(`Request to destroy volume "${args.volumeId}"`);
439505

506+
// If this is a duplicate request then assure it is executed just once.
507+
let request = this._beginRequest(args.volumeId, 'delete', cb);
508+
if (!request) {
509+
return;
510+
}
511+
440512
try {
441513
await this.volumes.destroyVolume(args.volumeId);
442514
} catch (err) {
443-
return cb(err);
515+
return this._endRequest(request, err);
444516
}
445517
log.info(`Volume "${args.volumeId}" destroyed`);
446-
cb(null);
518+
this._endRequest(request, null);
447519
}
448520

449521
async listVolumes (call: any, cb: CsiDoneCb) {
@@ -542,6 +614,12 @@ class CsiServer {
542614
return cb(err);
543615
}
544616

617+
// If this is a duplicate request then assure it is executed just once.
618+
let request = this._beginRequest(args.volumeId, 'publish', cb);
619+
if (!request) {
620+
return;
621+
}
622+
545623
const publishContext: any = {};
546624
try {
547625
publishContext.uri = await volume.publish(protocol);
@@ -551,15 +629,16 @@ class CsiServer {
551629
} catch (err) {
552630
if (err.code === grpc.status.ALREADY_EXISTS) {
553631
log.debug(`Volume "${args.volumeId}" already published on this node`);
554-
cb(null, { publishContext });
632+
this._endRequest(request, null, { publishContext });
555633
} else {
556634
cb(err);
635+
this._endRequest(request, err);
557636
}
558637
return;
559638
}
560639

561640
log.info(`Published volume "${args.volumeId}" over ${protocol}`);
562-
cb(null, { publishContext });
641+
this._endRequest(request, null, { publishContext });
563642
}
564643

565644
async controllerUnpublishVolume (call: any, cb: CsiDoneCb) {
@@ -580,13 +659,20 @@ class CsiServer {
580659
} catch (err) {
581660
return cb(err);
582661
}
662+
663+
// If this is a duplicate request then assure it is executed just once.
664+
let request = this._beginRequest(args.volumeId, 'unpublish', cb);
665+
if (!request) {
666+
return;
667+
}
668+
583669
try {
584670
await volume.unpublish();
585671
} catch (err) {
586-
return cb(err);
672+
return this._endRequest(request, err);
587673
}
588674
log.info(`Unpublished volume "${args.volumeId}"`);
589-
cb(null, {});
675+
this._endRequest(request, null, {});
590676
}
591677

592678
async validateVolumeCapabilities (call: any, cb: CsiDoneCb) {

csi/moac/node.ts

+20-19
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,18 @@ import assert from 'assert';
66
import { Pool } from './pool';
77
import { Nexus } from './nexus';
88
import { Replica } from './replica';
9+
import { Workq } from './workq';
910

1011
const EventEmitter = require('events');
11-
const Workq = require('./workq');
1212
const log = require('./logger').Logger('node');
1313
const { GrpcClient, GrpcCode, GrpcError } = require('./grpc_client');
1414

15+
// Type used in workq for calling grpc
16+
type GrpcCallArgs = {
17+
method: string;
18+
args: any;
19+
}
20+
1521
// Object represents mayastor storage node.
1622
//
1723
// Node emits following events:
@@ -25,7 +31,7 @@ export class Node extends EventEmitter {
2531
syncBadLimit: number;
2632
endpoint: string | null;
2733
client: any;
28-
workq: any;
34+
workq: Workq;
2935
syncFailed: number;
3036
syncTimer: NodeJS.Timeout | null;
3137
nexus: Nexus[];
@@ -49,7 +55,7 @@ export class Node extends EventEmitter {
4955

5056
this.endpoint = null;
5157
this.client = null; // grpc client handle
52-
this.workq = new Workq(); // work queue for serializing grpc calls
58+
this.workq = new Workq('grpc call'); // work queue for serializing grpc calls
5359
// We don't want to switch all objects to offline state when moac starts
5460
// just because a node is not reachable from the beginning. That's why we
5561
// set syncFailed to syncBadLimit + 1.
@@ -129,17 +135,19 @@ export class Node extends EventEmitter {
129135
// @returns A promise that evals to return value of gRPC method.
130136
//
131137
async call(method: string, args: any): Promise<any> {
132-
return this.workq.push({ method, args }, this._call.bind(this));
138+
return this.workq.push({ method, args }, (args: GrpcCallArgs) => {
139+
return this._call(args.method, args.args);
140+
});
133141
}
134142

135-
async _call(ctx: any) {
143+
async _call(method: string, args: any): Promise<any> {
136144
if (!this.client) {
137145
throw new GrpcError(
138146
GrpcCode.INTERNAL,
139147
`Broken connection to mayastor on node "${this.name}"`
140148
);
141149
}
142-
return this.client.call(ctx.method, ctx.args);
150+
return this.client.call(method, args);
143151
}
144152

145153
// Sync triggered by the timer. It ensures that the sync does run in
@@ -149,7 +157,9 @@ export class Node extends EventEmitter {
149157
this.syncTimer = null;
150158

151159
try {
152-
await this.workq.push({}, this._sync.bind(this));
160+
await this.workq.push(null, () => {
161+
return this._sync();
162+
});
153163
nextSync = this.syncPeriod;
154164
} catch (err) {
155165
// We don't want to cover up unexpected errors. But it's hard to
@@ -180,20 +190,11 @@ export class Node extends EventEmitter {
180190
log.debug(`Syncing the node "${this.name}"`);
181191

182192
// TODO: Harden checking of outputs of the methods below
183-
let reply = await this._call({
184-
method: 'listNexus',
185-
args: {}
186-
});
193+
let reply = await this._call('listNexus', {});
187194
const nexus = reply.nexusList;
188-
reply = await this._call({
189-
method: 'listPools',
190-
args: {}
191-
});
195+
reply = await this._call('listPools', {});
192196
const pools = reply.pools;
193-
reply = await this._call({
194-
method: 'listReplicas',
195-
args: {}
196-
});
197+
reply = await this._call('listReplicas', {});
197198
const replicas = reply.replicas;
198199

199200
// Move the the node to online state before we attempt to merge objects

csi/moac/node_operator.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ import {
1717
CustomResourceCache,
1818
CustomResourceMeta,
1919
} from './watcher';
20+
import { Workq } from './workq';
2021

2122
const yaml = require('js-yaml');
2223
const EventStream = require('./event_stream');
2324
const log = require('./logger').Logger('node-operator');
24-
const Workq = require('./workq');
2525

2626
const RESOURCE_NAME: string = 'mayastornode';
2727
const crdNode = yaml.safeLoad(
@@ -75,7 +75,7 @@ export class NodeOperator {
7575
watcher: CustomResourceCache<NodeResource>; // k8s resource watcher for nodes
7676
registry: any;
7777
namespace: string;
78-
workq: any; // for serializing node operations
78+
workq: Workq; // for serializing node operations
7979
eventStream: any; // events from the registry
8080

8181
// Create node operator object.
@@ -92,7 +92,7 @@ export class NodeOperator {
9292
) {
9393
assert(registry);
9494
this.namespace = namespace;
95-
this.workq = new Workq();
95+
this.workq = new Workq('mayastornode');
9696
this.registry = registry;
9797
this.watcher = new CustomResourceCache(
9898
this.namespace,

csi/moac/package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
},
1515
"scripts": {
1616
"prepare": "./bundle_protos.sh",
17-
"clean": "rm -f csi.js node.js replica.js pool.js nexus.js watcher.js node_operator.js pool_operator.js volume.js volumes.js volume_operator.js *.js.map",
18-
"purge": "rm -rf node_modules proto csi.js node.js replica.js pool.js nexus.js watcher.js node_operator.js pool_operator.js volume.js volumes.js volume_operator.js *.js.map",
17+
"clean": "rm -f csi.js node.js replica.js pool.js nexus.js watcher.js node_operator.js pool_operator.js volume.js volumes.js volume_operator.js workq.js *.js.map",
18+
"purge": "rm -rf node_modules proto csi.js node.js replica.js pool.js nexus.js watcher.js node_operator.js pool_operator.js volume.js volumes.js volume_operator.js workq.js *.js.map",
1919
"compile": "tsc --pretty",
2020
"start": "./index.js",
2121
"test": "mocha test/index.js",

csi/moac/pool_operator.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ import {
1313
CustomResourceCache,
1414
CustomResourceMeta,
1515
} from './watcher';
16+
import { Workq } from './workq';
1617

1718
const yaml = require('js-yaml');
1819
const log = require('./logger').Logger('pool-operator');
1920
const EventStream = require('./event_stream');
20-
const Workq = require('./workq');
2121

2222
const RESOURCE_NAME: string = 'mayastorpool';
2323
const POOL_FINALIZER = 'finalizer.mayastor.openebs.io';
@@ -125,7 +125,7 @@ export class PoolOperator {
125125
watcher: CustomResourceCache<PoolResource>; // k8s resource watcher for pools
126126
registry: any; // registry containing info about mayastor nodes
127127
eventStream: any; // A stream of node and pool events.
128-
workq: any; // for serializing pool operations
128+
workq: Workq; // for serializing pool operations
129129

130130
// Create pool operator.
131131
//
@@ -142,7 +142,7 @@ export class PoolOperator {
142142
this.namespace = namespace;
143143
this.registry = registry; // registry containing info about mayastor nodes
144144
this.eventStream = null; // A stream of node and pool events.
145-
this.workq = new Workq(); // for serializing pool operations
145+
this.workq = new Workq('mayastorpool'); // for serializing pool operations
146146
this.watcher = new CustomResourceCache(
147147
this.namespace,
148148
RESOURCE_NAME,

0 commit comments

Comments
 (0)