Leverage event messaging with SAP Event Mesh and SAP CAP
In a previous post I wrote about Asychronous Queueing architecture pattern and its pros and cons. Now it is time for implementation.
In this post I am going to describe how to implement it using SAP CAP and the SAP Event Mesh service, available in SAP BTP.
This post is part of a series of posts about this topic.
- Understanding Asynchronous Queueing pattern
- Leverage event messaging with SAP Event Mesh and SAP CAP (this post)
- Consuming CAP services with UI5 via WebSockets (in progress...)
- Autoscaling based on queue size in SAP BTP (in progress...)
What are we building?
We will create 3 CAP instances, 1 emitter and 2 receivers.
The emitter will be in charge of emitting events to specific topics. Each receiver will have its own dedicated queue, where they will be listening for messages to come. Each queue will be subscribed to 2 topics. One for a dedicated event for each of the queues, and one common to both queues.
Prerequisites
To build this you need a subscription to the Event Mesh service in BTP with the default
plan. Follow this tutorial to subscribe to it.
It is important to understand the general concepts of an event messaging service, and also this great post about the advance concepts.
Furthermore you need Node.js and a basic knowledge of SAP CAP, the Cloud Foundry CLI, and the CAP CLI.
Step 1: Create an Event Mesh instance
Yo can follow this tutorial to create an instance of Event Mesh, or make it with CF CLI.
Creating an instance with CF CLI
Save a JSON file in your folder with the following content:
{
"emname": "<emname>",
"namespace": "<namespace e.g. a/b/c>",
"version": "1.1.0",
"options": {
"management": true,
"messagingrest": true,
"messaging": true
},
"rules": {
"queueRules": {
"publishFilter": ["${namespace}/*"],
"subscribeFilter": ["${namespace}/*"]
},
"topicRules": {
"publishFilter": ["${namespace}/*"],
"subscribeFilter": ["${namespace}/*"]
}
},
"xs-security": {
//optional//
"oauth2-configuration": {
"credential-types": ["binding-secret", "x509"]
}
}
}
It is important that you se your own emname
and namespace
.
Then run the command
cf create-service enterprise-messaging default <your_instance_name> -c <your_json_file.json>
Step 2: Create a queue and subscribe it to a topic in the Event Mesh Management UI
Queues can be created manually in the Event Mesh Management UI
Just click on the "queue" tab and then "create queue". Remember to name it with the chose namespace
as prefix
Queues will be automatically created when subscribing a listener to them. We will see this when conficuring the receivers
Step 3: Create the emitter CAP app
3.1 Initiate CAP app
Create a new folder and give it a name. For example "/emitter".
Go into that folder and run:
cds init
3.2 Binding the Event Mesh instance to run the CAP app in hybrid mode
First create a service key
cf create-service-key <event_mesh_instance_name> <event_mesh_key_name>
Then bind the CAP app to it
cds bind --to <event_mesh_instance_name>:<event_mesh_key_name>
This binding allow us to execute the CAP app in localhost in hybrid mode. Connecting to the Cloud instance of Event Mesh.
3.3 Configure the messaging service in the CAP app
Add the following to the package.json
file
"cds": {
"requires": {
"messaging": {
"publishPrefix": "$namespace/",
"subscribePrefix": "$namespace/",
"queue": {
"name": "$namespace/emitter"
},
"[development]": {
"kind": "file-based-messaging"
},
"[hybrid]": {
"kind": "enterprise-messaging-shared"
},
"[production]": {
"kind": "enterprise-messaging"
}
}
}
}
Step 4: Emit the first events
Create a file in the /srv folder and call it emitter-servce.cds
with the following code.
service EmitterService @(path: '/emitter') {}
Then create another file in the /srv folder and call it emitter-service.js
with the following code
const cds = require('@sap/cds');
module.exports = cds.service.impl(async function () {
// "messaging" is described in the package.json and describes the Event Mesh service instance
const messaging = await cds.connect.to('messaging');
await messaging.emit('processOnReceiver1', { itemId: 111 });
await messaging.emit('removeAllItems');
});
Note that we are connecting to messaging
which is how we names the enterprise messaging instance in our package.json
.
Furthermore we are emitting events to 2 Topics "testOnReceiver1" and "removeAllItems", both will be prefixed with the namespace you chose.
STEP 5: Create the receiver CAP application
Create a new folder /receiver1
in the root level. Navigate into it and repeat steps 3.1, 3.2, and 3.3.
Change the queue name in the package.json file
...
"queue": {
"name": "$namespace/receiver1"
},
...
Step 6: Setup the event handlers in the receiver app
Create a file in the /srv folder called receiver-service.cds
with the code
service ReceiverService @(path: '/receiver') {}
and another file called receiver-service.js
with the code
const cds = require('@sap/cds');
module.exports = cds.service.impl(async function () {
// "messagingQueue" is described in the package.json
const messaging = await cds.connect.to('messaging');
// Queue Subscriptions
/***********************/
// Subscribe to a topic. The messages will be stored in the queue specified in package.json
// listens for events in "$namespace/removeAllItems" topic stored in the queue specified in the package.json
await messaging.on(`removeAllItems`, (msg) =>
console.log('> event on topic "$namespace/removeAllItems" stored in queue "receiver1": ', msg)
);
// listens for events in "$namespace/processOnReceiver1" topic stored in the queue specified in the package.json
await messaging.on(`processOnReceiver1`, (msg) =>
console.log('> event on topic "$namespace/processOnReceiver1" stored in queue "receiver1": ', msg)
);
});
Step 7: Run
Now open a terminal and run the receiver instance executing in the /receiver
folder the command
cds watch --profile hybrid --port 4004
It will run the server in port 4004, create the queue with the name you set in the package.json file if it does not exist yet, and it will also subscribe that queue to the topics removeAllItems
and processOnReceiver1
.
Then navigate to the /emitter
folder and run run it with the command
cds watch --profile hybrid --port 4005
It will start the service in port 4005, and as soon as it start, it will emit the 2 events.
Check the console logs in both terminals
In the emitter terminal you will find
[enterprise-messaging-amqp] - Emit { topic: '<your_namespace>/processOnReceiver1' }
[enterprise-messaging-amqp] - Emit { topic: '<your_namespace>/removeAllItems' }
Whicha are the 2 events sent to the 2 topics.
In the receiver you will see
> event on topic "$namespace/processOnReceiver1" stored in queue "receiver1": EventMessage {
data: { itemId: 111 },
headers: {},
inbound: true,
event: 'processOnReceiver1'
}
> event on topic "$namespace/removeAllItems" stored in queue "receiver1": EventMessage {
data: {},
headers: {},
inbound: true,
event: 'runEngine'
}
which are the console.log()
statements we set in event listeners.
Try out next
Simulate a busy server
Now you can stop the receiver machine, simulating that it is not available for processing messages (like a busy machine for example), and restart the emitter again. This will emit the 2 events again, and they will be stored in the queue waiting to be processed.
You can see those 2 messages stored in the queue in the Event Mesh Management UI.
As soon you run the service again the receiver will pull those messages and they will disappear from the queue.
Connect a second receiver
A nice exercise would be to connect a second receiver with its own queue and subscribing to one of the topics. For example /removeAllItems
one.
This will make that both receivers will get the message on that event, every time the emitter emits it. This means, it will work as a broadcast message.
Recap
With this project we have created an async communication between 2 CAP instances using Event Mesh service. We have emit 2 events to 2 different topics, which have been pulled from the receiver for processing.
This ensures the decoupling between Emitter and Receiver and the benefits of Asynchronous Queueing Architecture