Leverage event messaging with SAP Event Mesh and SAP CAP

In a previous post I wrote about Asychronous Queueing architecture pattern and its pros and cons. Now it is time for implementation.

In this post I am going to describe how to implement it using SAP CAP and the SAP Event Mesh service, available in SAP BTP.

This post is part of a series of posts about this topic.

  1. Understanding Asynchronous Queueing pattern
  2. Leverage event messaging with SAP Event Mesh and SAP CAP (this post)
  3. Consuming CAP services with UI5 via WebSockets (in progress...)
  4. Autoscaling based on queue size in SAP BTP (in progress...)

What are we building?

We will create 3 CAP instances, 1 emitter and 2 receivers.

The emitter will be in charge of emitting events to specific topics. Each receiver will have its own dedicated queue, where they will be listening for messages to come. Each queue will be subscribed to 2 topics. One for a dedicated event for each of the queues, and one common to both queues.

event mesh architecture diagram

Prerequisites

To build this you need a subscription to the Event Mesh service in BTP with the default plan. Follow this tutorial to subscribe to it.

It is important to understand the general concepts of an event messaging service, and also this great post about the advance concepts.

Furthermore you need Node.js and a basic knowledge of SAP CAP, the Cloud Foundry CLI, and the CAP CLI.

Step 1: Create an Event Mesh instance

Yo can follow this tutorial to create an instance of Event Mesh, or make it with CF CLI.

Creating an instance with CF CLI

Save a JSON file in your folder with the following content:

{
  "emname": "<emname>",
  "namespace": "<namespace e.g. a/b/c>",
  "version": "1.1.0",
  "options": {
    "management": true,
    "messagingrest": true,
    "messaging": true
  },
  "rules": {
    "queueRules": {
      "publishFilter": ["${namespace}/*"],
      "subscribeFilter": ["${namespace}/*"]
    },
    "topicRules": {
      "publishFilter": ["${namespace}/*"],
      "subscribeFilter": ["${namespace}/*"]
    }
  },
  "xs-security": {
    //optional//
    "oauth2-configuration": {
      "credential-types": ["binding-secret", "x509"]
    }
  }
}

It is important that you se your own emname and namespace.

Then run the command

cf create-service enterprise-messaging default <your_instance_name> -c <your_json_file.json>

Step 2: Create a queue and subscribe it to a topic in the Event Mesh Management UI

Queues can be created manually in the Event Mesh Management UI

Just click on the "queue" tab and then "create queue". Remember to name it with the chose namespaceas prefix

Alt text

Queues will be automatically created when subscribing a listener to them. We will see this when conficuring the receivers

Step 3: Create the emitter CAP app

3.1 Initiate CAP app

Create a new folder and give it a name. For example "/emitter".

Go into that folder and run:

cds init
3.2 Binding the Event Mesh instance to run the CAP app in hybrid mode

First create a service key

cf create-service-key <event_mesh_instance_name> <event_mesh_key_name>

Then bind the CAP app to it

cds bind --to <event_mesh_instance_name>:<event_mesh_key_name>

This binding allow us to execute the CAP app in localhost in hybrid mode. Connecting to the Cloud instance of Event Mesh.

3.3 Configure the messaging service in the CAP app

Add the following to the package.json file

  "cds": {
    "requires": {
      "messaging": {
        "publishPrefix": "$namespace/",
        "subscribePrefix": "$namespace/",
        "queue": {
          "name": "$namespace/emitter"
        },
        "[development]": {
          "kind": "file-based-messaging"
        },
        "[hybrid]": {
          "kind": "enterprise-messaging-shared"
        },
        "[production]": {
          "kind": "enterprise-messaging"
        }
      }
    }
  }

Step 4: Emit the first events

Create a file in the /srv folder and call it emitter-servce.cds with the following code.

service EmitterService @(path: '/emitter') {}

Then create another file in the /srv folder and call it emitter-service.js with the following code

const cds = require('@sap/cds');

module.exports = cds.service.impl(async function () {
  // "messaging" is described in the package.json and describes the Event Mesh service instance
  const messaging = await cds.connect.to('messaging');

  await messaging.emit('processOnReceiver1', { itemId: 111 });
  await messaging.emit('removeAllItems');
});

Note that we are connecting to messaging which is how we names the enterprise messaging instance in our package.json.

Furthermore we are emitting events to 2 Topics "testOnReceiver1" and "removeAllItems", both will be prefixed with the namespace you chose.

STEP 5: Create the receiver CAP application

Create a new folder /receiver1 in the root level. Navigate into it and repeat steps 3.1, 3.2, and 3.3.

Change the queue name in the package.json file

  ...
  "queue": {
    "name": "$namespace/receiver1"
  },
  ...

Step 6: Setup the event handlers in the receiver app

Create a file in the /srv folder called receiver-service.cds with the code

service ReceiverService @(path: '/receiver') {}

and another file called receiver-service.js with the code

const cds = require('@sap/cds');

module.exports = cds.service.impl(async function () {
  // "messagingQueue" is described in the package.json
  const messaging = await cds.connect.to('messaging');

  // Queue Subscriptions
  /***********************/
  // Subscribe to a topic. The messages will be stored in the queue specified in package.json
  // listens for events in "$namespace/removeAllItems" topic stored in the queue specified in the package.json
  await messaging.on(`removeAllItems`, (msg) =>
    console.log('> event on topic "$namespace/removeAllItems" stored in queue "receiver1": ', msg)
  );

  // listens for events in "$namespace/processOnReceiver1" topic stored in the queue specified in the package.json
  await messaging.on(`processOnReceiver1`, (msg) =>
    console.log('> event on topic "$namespace/processOnReceiver1" stored in queue "receiver1": ', msg)
  );
});

Step 7: Run

Now open a terminal and run the receiver instance executing in the /receiver folder the command

cds watch --profile hybrid --port 4004

It will run the server in port 4004, create the queue with the name you set in the package.json file if it does not exist yet, and it will also subscribe that queue to the topics removeAllItems and processOnReceiver1.

Then navigate to the /emitter folder and run run it with the command

cds watch --profile hybrid --port 4005

It will start the service in port 4005, and as soon as it start, it will emit the 2 events.

Check the console logs in both terminals

In the emitter terminal you will find

[enterprise-messaging-amqp] - Emit { topic: '<your_namespace>/processOnReceiver1' }
[enterprise-messaging-amqp] - Emit { topic: '<your_namespace>/removeAllItems' }

Whicha are the 2 events sent to the 2 topics.

In the receiver you will see

> event on topic "$namespace/processOnReceiver1" stored in queue "receiver1":  EventMessage {
  data: { itemId: 111 },
  headers: {},
  inbound: true,
  event: 'processOnReceiver1'
}
> event on topic "$namespace/removeAllItems" stored in queue "receiver1":  EventMessage {
  data: {},
  headers: {},
  inbound: true,
  event: 'runEngine'
}

which are the console.log() statements we set in event listeners.

Try out next

Simulate a busy server

Now you can stop the receiver machine, simulating that it is not available for processing messages (like a busy machine for example), and restart the emitter again. This will emit the 2 events again, and they will be stored in the queue waiting to be processed.

You can see those 2 messages stored in the queue in the Event Mesh Management UI.

Alt text

As soon you run the service again the receiver will pull those messages and they will disappear from the queue.

Connect a second receiver

A nice exercise would be to connect a second receiver with its own queue and subscribing to one of the topics. For example /removeAllItems one.

This will make that both receivers will get the message on that event, every time the emitter emits it. This means, it will work as a broadcast message.

Recap

With this project we have created an async communication between 2 CAP instances using Event Mesh service. We have emit 2 events to 2 different topics, which have been pulled from the receiver for processing.

This ensures the decoupling between Emitter and Receiver and the benefits of Asynchronous Queueing Architecture

Published on
  • September 30, 2023
Category
Tags
About Me
Me

Hi! I'm Rafa, a fullstack software developer living in Madrid (Spain).

I love to code and the freedom it gives me to create things out of ideas. When I'm not coding, I'm usualy practicing sports (kitesurfing, team handball, padel tenis, golf, ...), reading or getting some beers while enjoying the spanish sun with some friends.

Wanna get some? Check out the About page to know how to reach out to me.