Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(camunda): implement backoff on errors for REST job worker #371

Open
wants to merge 7 commits into
base: alpha
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,3 +341,12 @@ client.streamJobs({
timeout: 2000,
})
```

## Polling worker backoff in error conditions

When a polling worker encounters an error, including not being authenticated, the worker will back off subsequent polls by +2 seconds with each subsequent failure, up to a maximum of `CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS`, which is 15000 by default (15 seconds). If the failure is due to invalid credentials and occurs during the token request, then the worker backoff will be compounded with a token endpoint backoff, which is +1000ms for each subsequent failure up to a maximum of 15s.

This means that if you start a worker with invalid credentials, then the polling backoff will look like this, by default (times in seconds): 3, 6, 9, 12, 15, 18, 21, 23, 24, 25, 26, 27, 28, 29, 30, 30, 30...

If the worker is backing off for a reason other than invalid credentials - for example a backpressure signal from the gateway - it will be: 2, 4, 6, 8, 10, 12, 14, 15, 15, 15.....

10 changes: 5 additions & 5 deletions docker/docker-compose-multitenancy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ services:
# Enabling REST API security
# Without this, the REST API is open to the public
# With this, the REST API rejects a valid JWT token with 401 unauthorized
# - CAMUNDA_REST_ENABLED=true
# - SPRING_PROFILES_ACTIVE=identity-auth
# - CAMUNDA_IDENTITY_AUDIENCE=zeebe-api
# - CAMUNDA_IDENTITY_BASEURL=http://identity:8084
# - CAMUNDA_IDENTITY_ISSUER_BACKENDURL=http://keycloak:8080/auth/realms/camunda-platform
- CAMUNDA_REST_ENABLED=true
- SPRING_PROFILES_ACTIVE=identity-auth
- CAMUNDA_IDENTITY_AUDIENCE=zeebe-api
- CAMUNDA_IDENTITY_BASEURL=http://identity:8084
- CAMUNDA_IDENTITY_ISSUERBACKENDURL=http://keycloak:8080/auth/realms/camunda-platform
# End REST API security
- ZEEBE_GATEWAY_MULTITENANCY_ENABLED=true
- "JAVA_TOOL_OPTIONS=-Xms512m -Xmx512m"
Expand Down
24 changes: 23 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
"@types/node-fetch": "^2.6.11",
"@types/promise-retry": "^1.1.6",
"@types/uuid": "^9.0.8",
"@types/wtfnode": "^0.7.3",
"@typescript-eslint/eslint-plugin": "^6.14.0",
"@typescript-eslint/parser": "^6.14.0",
"basic-auth": "^2.0.1",
Expand Down Expand Up @@ -136,7 +137,8 @@
"typedoc": "^0.25.9",
"typedoc-plugin-include-example": "^1.2.0",
"typedoc-plugin-missing-exports": "^2.2.0",
"typescript": "^5.3.3"
"typescript": "^5.3.3",
"wtfnode": "^0.10.0"
},
"optionalDependencies": {
"win-ca": "3.5.1"
Expand Down Expand Up @@ -164,4 +166,4 @@
"uuid": "^7.0.3",
"winston": "^3.14.2"
}
}
}
36 changes: 0 additions & 36 deletions src/__tests__/c8/rest/unauthenticated.rest.spec.ts

This file was deleted.

203 changes: 203 additions & 0 deletions src/__tests__/c8/rest/workerBackoffRetry.rest.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
import { format } from 'winston'
import Transport from 'winston-transport'

import { createLogger } from '../../../c8/lib/C8Logger'
import { CamundaJobWorker } from '../../../c8/lib/CamundaJobWorker'
import { CamundaRestClient } from '../../../c8/lib/CamundaRestClient'
import { LosslessDto } from '../../../lib'

let restJobWorker: CamundaJobWorker<LosslessDto, LosslessDto>

jest.setTimeout(30000)
afterEach(() => {
if (restJobWorker) {
restJobWorker.stop()
}
})

// Custom log transport to suppress errors in the console, and allow them to be examined
class MemoryTransport extends Transport {
logs: { message: string; level: string; timestamp: string }[]
errors: Error[]
constructor(opts?: Transport.TransportStreamOptions | undefined) {
super(opts)
this.logs = []
this.errors = []
}
log(
info: Error | { message: string; level: string; timestamp: string },
callback: () => void
) {
// Immediately emit the logged event (this is required for Winston)
setImmediate(() => {
this.emit('logged', info)
})
if (info instanceof Error) {
this.errors.push(info)
} else {
this.logs.push(info)
}
callback()
}
}

test('REST worker will backoff on UNAUTHENTICATED', (done) => {
let durations = 0
let pollCountBackingOffWorker = 0
const backoffs: number[] = []

const transportBackingOffWorker = new MemoryTransport()
const logBackingOffWorker = createLogger({
transports: [transportBackingOffWorker],
})

const restClientBackingoffWorker = new CamundaRestClient({
config: { CAMUNDA_AUTH_STRATEGY: 'NONE', logger: logBackingOffWorker },
})
restJobWorker = restClientBackingoffWorker.createJobWorker({
type: 'unauthenticated-worker',
jobHandler: async () => {
throw new Error('Not Implemented') // is never called
},
worker: 'unauthenticated-test-worker',
maxJobsToActivate: 10,
timeout: 30000,
})

restJobWorker.on('backoff', (duration) => {
durations += duration
backoffs.push(duration)
})
restJobWorker.on('poll', () => {
pollCountBackingOffWorker++
})
setTimeout(() => {
restJobWorker.stop()
expect(durations).toBe(20000)
// In 25 seconds, we expect 4 or less attempts to poll the job
expect(pollCountBackingOffWorker).toBeLessThanOrEqual(4)
// Assert that each backoff is greater than the previous one; ie: the backoff is increasing
for (let i = 1; i < backoffs.length; i++) {
expect(backoffs[i]).toBeGreaterThan(backoffs[i - 1])
}
done()
}, 25000)
})

test('REST worker uses a supplied custom max backoff', (done) => {
const backoffs: number[] = []
const MAX_BACKOFF = 2000
const transport = new MemoryTransport()
const logger = createLogger({
transports: [transport],
})
const restClient = new CamundaRestClient({
config: {
CAMUNDA_AUTH_STRATEGY: 'NONE',
CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS: MAX_BACKOFF,
logger,
},
})

restJobWorker = restClient.createJobWorker({
type: 'unauthenticated-worker',
jobHandler: async () => {
throw new Error('Not Implemented') // is never called
},
worker: 'unauthenticated-test-worker',
maxJobsToActivate: 10,
timeout: 30000,
})
restJobWorker.on('backoff', (duration) => {
expect(duration).toBeLessThanOrEqual(MAX_BACKOFF)
backoffs.push(duration)
})
restJobWorker.on('poll', () => {
// pollCount++
})
setTimeout(() => {
restJobWorker.stop()
expect(backoffs.length).toBe(3)
for (const backoff of backoffs) {
expect(backoff).toBeLessThanOrEqual(MAX_BACKOFF)
}
done()
}, 10000)
})

/**
* This test is deliberately commented out. The behaviour was manually verified on 5 Feb, 2025.
*
* Testing the outer bound of the token endpoint backoff when it is hardcoded to 15s takes a long time.
* Making the max token endpoint backoff configurable would make this easier to test.
*
*/
xtest('REST worker uses a supplied custom max backoff with invalid secret', (done) => {
let durations = 0
let pollCount = 0
const backoffs: number[] = []

/**
* Suppress all logging output with this custom logger. The token endpoint will emit error logs during this test.
*/
const memoryTransport = new MemoryTransport()

const logger = createLogger({
format: format.combine(format.timestamp(), format.json()),
transports: [memoryTransport],
})

const MAX_BACKOFF = 2000
const restClient = new CamundaRestClient({
config: {
CAMUNDA_TOKEN_DISK_CACHE_DISABLE: true,
ZEEBE_CLIENT_ID: 'Does-not-exist',
ZEEBE_CLIENT_SECRET: 'NONE',
CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS: MAX_BACKOFF,
logger,
},
})

restJobWorker = restClient.createJobWorker({
type: 'unauthenticated-worker',
jobHandler: async () => {
throw new Error('Not Implemented') // is never called
},
worker: 'unauthenticated-test-worker',
maxJobsToActivate: 10,
timeout: 30000,
autoStart: false, // Do not autostart, so we can attach event listeners before it starts polling
})
restJobWorker.on('backoff', (duration) => {
durations += duration
backoffs.push(duration)
})
restJobWorker.on('poll', () => {
pollCount++
})
restJobWorker.start() // Start the worker now that the event listeners are attached
setTimeout(() => {
restJobWorker.stop()
const logs = memoryTransport.logs
// Convert timestamp strings to milliseconds since epoch.
const times = logs
.filter((log) =>
log.message.includes('Backing off worker poll due to failure.')
)
.map((log) => new Date(log.timestamp).getTime())
console.log('times.length', times.length)
// Calculate delays between consecutive errors.
const delays: number[] = []
for (let i = 1; i < times.length; i++) {
delays.push(times[i] - times[i - 1])
}
// Assert that each delay is less than or equal to the max backoff delay
for (let i = 1; i < delays.length; i++) {
// expect(delays[i] - delays[i - 1]).toBeLessThanOrEqual(MAX_BACKOFF)
console.log(delays[i])
}
expect(pollCount).toBe(4)
expect(durations).toBe(8000)
done()
}, 20000)
})
4 changes: 4 additions & 0 deletions src/__tests__/config/jest.cleanup.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import fs from 'fs'
import path from 'path'

import wtf from 'wtfnode'

// See: https://stackoverflow.com/a/74206721/1758461
// Without this, the paths in tsconfig.json are not resolved correctly
// eslint-disable-next-line @typescript-eslint/no-var-requires
Expand Down Expand Up @@ -75,5 +77,7 @@ export const cleanUp = async () => {
}
}
}
await zeebe.close()
}
wtf.dump()
}
3 changes: 2 additions & 1 deletion src/__tests__/config/jest.globalTeardown.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

import { cleanUp } from './jest.cleanup'

export default async () => {
console.log('Running global teardown...')
cleanUp()
await cleanUp()
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const c8 = new Camunda8({
})
const zeebe = c8.getZeebeGrpcApiClient()

afterAll(() => zeebe.close())
describe('Authenticated gRPC client (default tenant)', () => {
test('can activate jobs', async () => {
const res = await zeebe.activateJobs({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const c8 = new Camunda8({
CAMUNDA_TENANT_ID: '<default>',
})
const zeebe = c8.getZeebeGrpcApiClient()
afterAll(() => zeebe.close())

describe('Authenticated gRPC client (default tenant)', () => {
test('can deploy process', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const c8 = new Camunda8({
CAMUNDA_TENANT_ID: '<default>',
})
const zeebe = c8.getZeebeGrpcApiClient()
afterAll(() => zeebe.close())

describe('Authenticated gRPC client (default tenant)', () => {
test('can get topology', async () => {
Expand Down
Loading
Loading