Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(csi/643): add fx-notify forwarding #542

Merged
merged 5 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ jobs:
curl localhost:3000/health && \

# run integration tests
npm run test:xint | tee ./test/results/test-int.log
npm run test:integration | tee ./test/results/test-int.log
environment:
ENDPOINT_URL: http://localhost:4545/notification
- store_artifacts:
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ services:

central-ledger:
# image: mojaloop/central-ledger:latest
image: mojaloop/central-ledger:v17.8.0.551-snapshot.16
image: mojaloop/central-ledger:v17.8.0.643-snapshot.0
container_name: ml_central-ledger
command: sh -c "/opt/app/wait4/wait4.js central-ledger && npm run migrate && node src/api/index.js"
links:
Expand Down
381 changes: 168 additions & 213 deletions package-lock.json

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@
"@mojaloop/central-services-health": "15.0.0",
"@mojaloop/central-services-logger": "11.5.1",
"@mojaloop/central-services-metrics": "12.0.8",
"@mojaloop/central-services-shared": "18.7.3",
"@mojaloop/central-services-shared": "18.8.0",
"@mojaloop/central-services-stream": "11.3.1",
"@mojaloop/event-sdk": "14.1.1",
"@mojaloop/sdk-standard-components": "18.4.0",
"@mojaloop/sdk-standard-components": "18.4.1",
"@now-ims/hapi-now-auth": "2.1.0",
"axios": "1.7.7",
"blipp": "4.0.2",
Expand Down Expand Up @@ -126,21 +126,21 @@
"jsdoc": "4.0.3",
"leaked-handles": "^5.2.0",
"license-checker": "25.0.1",
"nodemon": "3.1.4",
"nodemon": "3.1.5",
"npm-audit-resolver": "3.0.0-RC.0",
"npm-check-updates": "17.1.1",
"nyc": "17.0.0",
"pre-commit": "1.2.2",
"proxyquire": "2.1.3",
"replace": "^1.2.2",
"rewire": "7.0.0",
"sinon": "18.0.0",
"standard": "17.1.0",
"sinon": "19.0.2",
"standard": "17.1.2",
"standard-version": "^9.5.0",
"supertest": "7.0.0",
"tap-spec": "^5.0.0",
"tap-xunit": "2.4.1",
"tape": "5.8.1",
"tape": "5.9.0",
"tapes": "4.1.0",
"uuid4": "2.0.3"
},
Expand Down
3 changes: 2 additions & 1 deletion src/handlers/notification/dto.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ const FX_ACTIONS = [
Action.FX_FULFIL_DUPLICATE,
Action.FX_ABORT_DUPLICATE,
Action.FX_TIMEOUT_RESERVED,
Action.FX_TIMEOUT_RECEIVED
Action.FX_TIMEOUT_RECEIVED,
Action.FX_NOTIFY
]

const getCallbackPayload = (content) => {
Expand Down
33 changes: 33 additions & 0 deletions src/handlers/notification/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,39 @@ const processMessage = async (msg, span) => {
return true
}

if ([Action.FX_NOTIFY].includes(action)) {
if (!isSuccess) {
throw ErrorHandler.Factory.createFSPIOPError(
ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR,
'FX_NOTIFY action must be successful'
)
}

const { url: callbackURLTo } = await getEndpointFn(destination, REQUEST_TYPE.PATCH, true)
const endpointTemplate = getEndpointTemplate(REQUEST_TYPE.PATCH)
headers = createCallbackHeaders({ headers: content.headers, httpMethod: PATCH, endpointTemplate })
logger.debug(`Notification::processMessage - Callback.sendRequest({ ${callbackURLTo}, ${PATCH}, ${JSON.stringify(content.headers)}, ${payload}, ${id}, ${source}, ${destination} ${hubNameRegex} })`)
let response = { status: 'unknown' }
const histTimerEndSendRequest = Metrics.getHistogram(
'notification_event_delivery',
'notification_event_delivery - metric for sending notification requests to FSPs',
['success', 'from', 'to', 'dest', 'action', 'status']
).startTimer()

try {
response = await Callback.sendRequest({ url: callbackURLTo, headers, source, destination, method: PATCH, payload, responseType, span, protocolVersions, hubNameRegex })
} catch (err) {
logger.error(err)
histTimerEndSendRequest({ success: false, from: source, dest: destination, action, status: response.status })
histTimerEnd({ success: false, action })
throw err
}
histTimerEndSendRequest({ success: true, from: source, dest: destination, action, status: response.status })
histTimerEnd({ success: true, action })

return true
}

Logger.warn(`Unknown action received from kafka: ${action}`)
histTimerEnd({ success: false, action: 'unknown' })
return false
Expand Down
146 changes: 132 additions & 14 deletions test/integration/handlers/notification/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ const GeneralTopicTemplate = Config.KAFKA_CONFIG.TOPIC_TEMPLATES.GENERAL_TOPIC_T

const timeoutAttempts = 10
const callbackWaitSeconds = 2
const retryDelay = process?.env?.test_INT_RETRY_DELAY || 2
const retryCount = process?.env?.test_INT_RETRY_COUNT || 40
const retryOpts = {
retries: retryCount,
minTimeout: retryDelay,
maxTimeout: retryDelay
}
const wrapWithRetriesConf = {
remainingRetries: retryOpts?.retries || 10, // default 10
timeout: retryOpts?.maxTimeout || 2 // default 2
}

const getNotificationUrl = process.env.ENDPOINT_URL
const hubNameRegex = HeaderValidation.getHubNameRegex(Config.HUB_NAME)
Expand Down Expand Up @@ -151,9 +162,7 @@ Test('Notification Handler', notificationHandlerTest => {
Config.KAFKA_CONFIG, EventTypes.TRANSFER, EventActions.PREPARE,
GeneralTopicTemplate, EventTypes.NOTIFICATION, EventActions.EVENT
)
await new Promise(resolve => setTimeout(resolve, 5000)) // wait for RESERVED
const response = await testNotification(messageProtocol, 'post', transferId, kafkaConfig, topicConfig, undefined, undefined, 'dfsp2')
await new Promise(resolve => setTimeout(resolve, 5000)) // wait for RESERVED_FORWARDED

await db.connect({
client: centralLedgerConfig.DATABASE.DIALECT,
connection: {
Expand All @@ -164,14 +173,44 @@ Test('Notification Handler', notificationHandlerTest => {
database: centralLedgerConfig.DATABASE.SCHEMA
}
})

// wait for RESERVED
try {
const stateChange = await db.from('transferStateChange').findOne({ transferId, transferStateId: Enum.Transfers.TransferInternalState.RESERVED_FORWARDED })
test.equal(stateChange.transferStateId, Enum.Transfers.TransferInternalState.RESERVED_FORWARDED, 'Transfer state changed to RESERVED_FORWARDED')
} finally {
await db.disconnect()
await wrapWithRetries(async () => {
const stateChange = await db
.from('transferStateChange')
.findOne({ transferId, transferStateId: Enum.Transfers.TransferInternalState.RESERVED })
if (stateChange?.transferStateId !== Enum.Transfers.TransferInternalState.RESERVED) {
throw new Error('Transfer state not changed to RESERVED')
}
test.equal(stateChange.transferStateId, Enum.Transfers.TransferInternalState.RESERVED, 'Transfer state changed to RESERVED')
return stateChange
}, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
} catch (err) {
Logger.error(err)
test.fail(err.message)
}

const response = await testNotification(messageProtocol, 'post', transferId, kafkaConfig, topicConfig, undefined, undefined, 'dfsp2')
test.deepEqual(response.payload, messageProtocol.content.payload, 'Notification sent successfully to Payee')
// wait for RESERVED_FORWARDED
try {
await wrapWithRetries(async () => {
const stateChange = await db
.from('transferStateChange')
.findOne({ transferId, transferStateId: Enum.Transfers.TransferInternalState.RESERVED_FORWARDED })
if (stateChange?.transferStateId !== Enum.Transfers.TransferInternalState.RESERVED_FORWARDED) {
throw new Error('Transfer state not changed to RESERVED_FORWARDED')
}
test.equal(stateChange.transferStateId, Enum.Transfers.TransferInternalState.RESERVED_FORWARDED, 'Transfer state changed to RESERVED_FORWARDED')
return stateChange
}, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
} catch (err) {
Logger.error(err)
test.fail(err.message)
}

await db.disconnect()
test.end()
})

Expand Down Expand Up @@ -241,9 +280,7 @@ Test('Notification Handler', notificationHandlerTest => {
Config.KAFKA_CONFIG, EventTypes.TRANSFER, EventActions.PREPARE,
GeneralTopicTemplate, EventTypes.NOTIFICATION, EventActions.EVENT
)
await new Promise(resolve => setTimeout(resolve, 10000)) // wait for RESERVED
const response = await testNotification(messageProtocol, 'post', commitRequestId, kafkaConfig, topicConfig, undefined, undefined, 'fxp1')
await new Promise(resolve => setTimeout(resolve, 5000)) // wait for RESERVED_FORWARDED

await db.connect({
client: centralLedgerConfig.DATABASE.DIALECT,
connection: {
Expand All @@ -254,13 +291,44 @@ Test('Notification Handler', notificationHandlerTest => {
database: centralLedgerConfig.DATABASE.SCHEMA
}
})

// wait for RESERVED
try {
const stateChange = await db.from('fxTransferStateChange').findOne({ commitRequestId, transferStateId: Enum.Transfers.TransferInternalState.RESERVED_FORWARDED })
test.equal(stateChange.transferStateId, Enum.Transfers.TransferInternalState.RESERVED_FORWARDED, 'Fx Transfer state changed to RESERVED_FORWARDED')
} finally {
await db.disconnect()
await wrapWithRetries(async () => {
const stateChange = await db
.from('fxTransferStateChange')
.findOne({ commitRequestId, transferStateId: Enum.Transfers.TransferInternalState.RESERVED })
if (stateChange?.transferStateId !== Enum.Transfers.TransferInternalState.RESERVED) {
throw new Error('Transfer state not changed to RESERVED')
}
test.equal(stateChange.transferStateId, Enum.Transfers.TransferInternalState.RESERVED, 'Transfer state changed to RESERVED')
return stateChange
}, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
} catch (err) {
Logger.error(err)
test.fail(err.message)
}

const response = await testNotification(messageProtocol, 'post', commitRequestId, kafkaConfig, topicConfig, undefined, undefined, 'fxp1')
test.deepEqual(response.payload, messageProtocol.content.payload, 'Notification sent successfully to FXP')

// wait for RESERVED_FORWARDED
try {
await wrapWithRetries(async () => {
const stateChange = await db
.from('fxTransferStateChange')
.findOne({ commitRequestId, transferStateId: Enum.Transfers.TransferInternalState.RESERVED_FORWARDED })
if (stateChange?.transferStateId !== Enum.Transfers.TransferInternalState.RESERVED_FORWARDED) {
throw new Error('Transfer state not changed to RESERVED_FORWARDED')
}
test.equal(stateChange.transferStateId, Enum.Transfers.TransferInternalState.RESERVED_FORWARDED, 'Transfer state changed to RESERVED_FORWARDED')
return stateChange
}, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout)
} catch (err) {
Logger.error(err)
test.fail(err.message)
}
await db.disconnect()
test.end()
})

Expand Down Expand Up @@ -1412,8 +1480,58 @@ Test('Notification Handler', notificationHandlerTest => {
test.end()
})

notificationTest.test('consume a FX_NOTIFY message and send PATCH callback to fxp', async test => {
const commitRequestId = Uuid()
const messageProtocol = Fixtures.createMessageProtocol(
EventTypes.NOTIFICATION,
Action.FX_NOTIFY,
{
commitRequestId,
fulfilment: 'uU0nuZNNPgilLlLX2n2r-sSE7-N6U4DukIj3rOLvze1',
completedTimestamp: '2021-05-24T08:38:08.699-04:00'
},
'HUB',
'fxp1'
)
const { kafkaConfig, topicConfig } = Fixtures.createProducerConfig(
Config.KAFKA_CONFIG, EventTypes.TRANSFER, EventActions.FULFIL,
GeneralTopicTemplate, EventTypes.NOTIFICATION, EventActions.EVENT
)

const response = await testNotification(messageProtocol, 'patch', commitRequestId, kafkaConfig, topicConfig)

test.deepEqual(response.payload, messageProtocol.content.payload, 'Notification sent successfully to FXP')
test.end()
})

notificationTest.test('consume a FX_NOTIFY message and send PATCH callback to proxied fxp', async test => {
await proxy.addDfspIdToProxyMapping('nonExistentFxp', 'proxyFsp') // simulate proxy mapping
const commitRequestId = Uuid()
const messageProtocol = Fixtures.createMessageProtocol(
EventTypes.NOTIFICATION,
Action.FX_NOTIFY,
{
commitRequestId,
fulfilment: 'uU0nuZNNPgilLlLX2n2r-sSE7-N6U4DukIj3rOLvze1',
completedTimestamp: '2021-05-24T08:38:08.699-04:00'
},
'HUB',
'nonExistentFxp'
)
const { kafkaConfig, topicConfig } = Fixtures.createProducerConfig(
Config.KAFKA_CONFIG, EventTypes.TRANSFER, EventActions.FULFIL,
GeneralTopicTemplate, EventTypes.NOTIFICATION, EventActions.EVENT
)

const response = await testNotification(messageProtocol, 'patch', commitRequestId, kafkaConfig, topicConfig, undefined, undefined, 'proxyFsp')

test.deepEqual(response.payload, messageProtocol.content.payload, 'Notification sent successfully to FXP')
test.end()
})

notificationTest.test('tear down', async test => {
await proxy.disconnect()
await db.disconnect()
try {
await Kafka.Producer.disconnect()
} catch (err) { /* ignore error */ }
Expand Down
13 changes: 13 additions & 0 deletions test/integration/server/transfers/routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -240,5 +240,18 @@ module.exports = [{
failAction: 'error'
}
}
},
{
method: 'PATCH',
path: '/proxyFsp/fxTransfers/{transferId}',
handler: Handler.receiveNotificationPatch,
options: {
id: 'proxyFsp-patch',
tags,
description: 'receive patch notification for proxyFsp',
payload: {
failAction: 'error'
}
}
}
]