-
-
Notifications
You must be signed in to change notification settings - Fork 545
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1372 from markgaylard/pluggable-auth
Ability to use custom authenticators
- Loading branch information
Showing
21 changed files
with
471 additions
and
213 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
--- | ||
id: custom-authentication-mechanism | ||
title: Custom Authentication Mechanisms | ||
--- | ||
|
||
To use an authentication mechanism that is not supported out of the box by KafkaJS, | ||
custom authentication mechanisms can be introduced: | ||
|
||
```js | ||
{ | ||
sasl: { | ||
mechanism: <mechanism name>, | ||
authenticationProvider: ({ host, port, logger, saslAuthenticate }) => { authenticate: () => Promise<void> } | ||
} | ||
} | ||
``` | ||
|
||
`<mechanism name>` needs to match the SASL mechanism configured in the `sasl.enabled.mechanisms` | ||
property in `server.properties`. See the Kafka documentation for information on how to | ||
configure your brokers. | ||
|
||
## Writing a custom authentication mechanism | ||
|
||
A custom authentication mechanism needs to fulfill the following interface: | ||
|
||
```ts | ||
type AuthenticationProviderArgs = { | ||
host: string | ||
port: number | ||
logger: Logger | ||
saslAuthenticate: <ParseResult>( | ||
request: SaslAuthenticationRequest, | ||
response?: SaslAuthenticationResponse<ParseResult> | ||
) => Promise<ParseResult | void> | ||
} | ||
|
||
type Mechanism = { | ||
mechanism: string | ||
authenticationProvider: (args: AuthenticationProviderArgs) => Authenticator | ||
} | ||
|
||
type Authenticator = { | ||
authenticate(): Promise<void> | ||
} | ||
|
||
type SaslAuthenticationRequest = { | ||
encode: () => Buffer | Promise<Buffer> | ||
} | ||
|
||
type SaslAuthenticationResponse<ParseResult> = { | ||
decode: (rawResponse: Buffer) => Buffer | Promise<Buffer> | ||
parse: (data: Buffer) => ParseResult | ||
} | ||
``` | ||
* `host` - Hostname of the specific broker to connect to | ||
* `port` - Port of the specific broker to connect to | ||
* `logger` - A logger instance namespaced to the authentication mechanism | ||
* `saslAuthenticate` - an async function to make [`SaslAuthenticate`](https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate) | ||
requests towards the broker. The `request` and `response` functions are used to encode the `auth_bytes` of the request, and to optionally | ||
decode and parse the `auth_bytes` in the response. `response` can be omitted if no response `auth_bytes` are expected. | ||
### Example | ||
In this example we will create a custom authentication mechanism called `simon`. The general | ||
flow will be: | ||
1. Send a [`SaslAuthenticate`](https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate) | ||
request with the value of `says` as `auth_bytes`. | ||
2. Read the response from the broker. If `says` starts with "Simon says", the response `auth_bytes` | ||
should equal `says`, if it does not start with "Simon says", it should be an empty string. | ||
**This is a made up example!** | ||
It is a non-existent authentication mechanism just made up to show how to implement the expected interface. It is not a real authentication mechanism. | ||
```js | ||
const simonAuthenticator = says = ({ host, port, logger, saslAuthenticate }) => { | ||
const INT32_SIZE = 4 | ||
|
||
const request = { | ||
/** | ||
* Encodes the value for `auth_bytes` in SaslAuthenticate request | ||
* @see https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate | ||
* | ||
* In this example, we are just sending `says` as a string, | ||
* with the length of the string in bytes prepended as an int32 | ||
**/ | ||
encode: () => { | ||
const byteLength = Buffer.byteLength(says, 'utf8') | ||
const buf = Buffer.alloc(INT32_SIZE + byteLength) | ||
buf.writeUInt32BE(byteLength, 0) | ||
buf.write(says, INT32_SIZE, byteLength, 'utf8') | ||
return buf | ||
}, | ||
} | ||
const response = { | ||
/** | ||
* Decodes the `auth_bytes` in SaslAuthenticate response | ||
* @see https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate | ||
* | ||
* This is essentially the reverse of `request.encode`, where | ||
* we read the length of the string as an int32 and then read | ||
* that many bytes | ||
*/ | ||
decode: rawData => { | ||
const byteLength = rawData.readInt32BE(0) | ||
return rawData.slice(INT32_SIZE, INT32_SIZE + byteLength) | ||
}, | ||
/** | ||
* The return value from `response.decode` is passed into | ||
* this function, which is responsible for interpreting | ||
* the data. In this case, we just turn the buffer back | ||
* into a string | ||
*/ | ||
parse: data => { | ||
return data.toString() | ||
}, | ||
} | ||
return { | ||
/** | ||
* This function is responsible for orchestrating the authentication flow. | ||
* Essentially we will send a SaslAuthenticate request with the | ||
* value of `sasl.says` to the broker, and expect to | ||
* get the same value back. | ||
* | ||
* Other authentication methods may do any other operations they | ||
* like, but communication with the brokers goes through | ||
* the SaslAuthenticate request. | ||
*/ | ||
authenticate: async () => { | ||
if (says == null) { | ||
throw new Error('SASL Simon: Invalid "says"') | ||
} | ||
const broker = `${host}:${port}` | ||
try { | ||
logger.info('Authenticate with SASL Simon', { broker }) | ||
const authenticateResponse = await saslAuthenticate({ request, response }) | ||
|
||
const saidSimon = says.startsWith("Simon says ") | ||
const expectedResponse = saidSimon ? says : "" | ||
if (authenticateResponse !== expectedResponse) { | ||
throw new Error("Mismatching response from broker") | ||
} | ||
logger.info('SASL Simon authentication successful', { broker }) | ||
} catch (e) { | ||
const error = new Error( | ||
`SASL Simon authentication failed: ${e.message}` | ||
) | ||
logger.error(error.message, { broker }) | ||
throw error | ||
} | ||
}, | ||
} | ||
} | ||
``` | ||
|
||
The `response` argument to `saslAuthenticate` is optional, in case the authentication | ||
method does not require the `auth_bytes` in the response. | ||
|
||
In the example above, we expect the client to be configured as such: | ||
|
||
```js | ||
const config = { | ||
sasl: { | ||
mechanism: 'simon' | ||
authenticationProvider: simonAuthenticator('Simon says authenticate me') | ||
} | ||
} | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,43 +1,37 @@ | ||
const awsIam = require('../../protocol/sasl/awsIam') | ||
const { request, response } = require('../../protocol/sasl/awsIam') | ||
const { KafkaJSSASLAuthenticationError } = require('../../errors') | ||
|
||
module.exports = class AWSIAMAuthenticator { | ||
constructor(connection, logger, saslAuthenticate) { | ||
this.connection = connection | ||
this.logger = logger.namespace('SASLAWSIAMAuthenticator') | ||
this.saslAuthenticate = saslAuthenticate | ||
} | ||
|
||
async authenticate() { | ||
const { sasl } = this.connection | ||
if (!sasl.authorizationIdentity) { | ||
throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing authorizationIdentity') | ||
} | ||
if (!sasl.accessKeyId) { | ||
throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing accessKeyId') | ||
} | ||
if (!sasl.secretAccessKey) { | ||
throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing secretAccessKey') | ||
} | ||
if (!sasl.sessionToken) { | ||
sasl.sessionToken = '' | ||
} | ||
const awsIAMAuthenticatorProvider = sasl => ({ host, port, logger, saslAuthenticate }) => { | ||
return { | ||
authenticate: async () => { | ||
if (!sasl.authorizationIdentity) { | ||
throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing authorizationIdentity') | ||
} | ||
if (!sasl.accessKeyId) { | ||
throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing accessKeyId') | ||
} | ||
if (!sasl.secretAccessKey) { | ||
throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing secretAccessKey') | ||
} | ||
if (!sasl.sessionToken) { | ||
sasl.sessionToken = '' | ||
} | ||
|
||
const request = awsIam.request(sasl) | ||
const response = awsIam.response | ||
const { host, port } = this.connection | ||
const broker = `${host}:${port}` | ||
const broker = `${host}:${port}` | ||
|
||
try { | ||
this.logger.debug('Authenticate with SASL AWS-IAM', { broker }) | ||
await this.saslAuthenticate({ request, response }) | ||
this.logger.debug('SASL AWS-IAM authentication successful', { broker }) | ||
} catch (e) { | ||
const error = new KafkaJSSASLAuthenticationError( | ||
`SASL AWS-IAM authentication failed: ${e.message}` | ||
) | ||
this.logger.error(error.message, { broker }) | ||
throw error | ||
} | ||
try { | ||
logger.debug('Authenticate with SASL AWS-IAM', { broker }) | ||
await saslAuthenticate({ request: request(sasl), response }) | ||
logger.debug('SASL AWS-IAM authentication successful', { broker }) | ||
} catch (e) { | ||
const error = new KafkaJSSASLAuthenticationError( | ||
`SASL AWS-IAM authentication failed: ${e.message}` | ||
) | ||
logger.error(error.message, { broker }) | ||
throw error | ||
} | ||
}, | ||
} | ||
} | ||
|
||
module.exports = awsIAMAuthenticatorProvider |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,32 +1,27 @@ | ||
const { newLogger } = require('testHelpers') | ||
const AWSIAM = require('./awsIam') | ||
const awsIAMAuthenticatorProvider = require('./awsIam') | ||
|
||
describe('Broker > SASL Authenticator > AWS-IAM', () => { | ||
it('throws KafkaJSSASLAuthenticationError for missing authorizationIdentity', async () => { | ||
const awsIam = new AWSIAM({ sasl: {} }, newLogger()) | ||
const awsIam = awsIAMAuthenticatorProvider({})({ host: '', port: 0, logger: newLogger() }) | ||
await expect(awsIam.authenticate()).rejects.toThrow( | ||
'SASL AWS-IAM: Missing authorizationIdentity' | ||
) | ||
}) | ||
|
||
it('throws KafkaJSSASLAuthenticationError for invalid accessKeyId', async () => { | ||
const awsIam = new AWSIAM( | ||
{ | ||
sasl: { | ||
authorizationIdentity: '<authorizationIdentity>', | ||
secretAccessKey: '<secretAccessKey>', | ||
}, | ||
}, | ||
newLogger() | ||
) | ||
const awsIam = awsIAMAuthenticatorProvider({ | ||
authorizationIdentity: '<authorizationIdentity>', | ||
secretAccessKey: '<secretAccessKey>', | ||
})({ host: '', port: 0, logger: newLogger() }) | ||
await expect(awsIam.authenticate()).rejects.toThrow('SASL AWS-IAM: Missing accessKeyId') | ||
}) | ||
|
||
it('throws KafkaJSSASLAuthenticationError for invalid secretAccessKey', async () => { | ||
const awsIam = new AWSIAM( | ||
{ sasl: { authorizationIdentity: '<authorizationIdentity>', accessKeyId: '<accessKeyId>' } }, | ||
newLogger() | ||
) | ||
const awsIam = awsIAMAuthenticatorProvider({ | ||
authorizationIdentity: '<authorizationIdentity>', | ||
accessKeyId: '<accessKeyId>', | ||
})({ host: '', port: 0, logger: newLogger() }) | ||
await expect(awsIam.authenticate()).rejects.toThrow('SASL AWS-IAM: Missing secretAccessKey') | ||
}) | ||
}) |
Oops, something went wrong.