Sharing Our Passion for Technology
& continuous learning
〈  Back to Blog

Node Reference - Change Events

Teammates sitting together and looking at code

The “Join” Problem

Modern applications no longer exist in a silo. They need to integrate with other systems within the organization. In a microservice architecture, this is even more valid since very few business processes can be completed by involving only one service.

Let us suppose that our product service is the new source of truth for product data.
Also, assume there exist other applications within the organization that need access to the product data. How can those applications access the data for which our application is responsible?

Having the applications query our RESTful service’s data store (DynamoDB) directly would illustrate the Integration Database anti-pattern. Once one or more outside applications talk directly to your data store (instead of through your well-defined REST interface), your service becomes very brittle. You may not be able to change your underlying data structure without breaking dependent applications. Consequently, people will become wary of making any improvements to your service and as we’ve all seen happen, the service will stagnate as a result.

Instead, the simplest method of integration is to call the service’s exposed REST endpoints as needed in response to user behavior. With this method, the client is always getting up-to-date information, and the interaction between services is straightforward and done via their public interfaces. Because of the simplicity, this is our default practice for most use cases.

However, there are cases where the RESTful service-to-service approach breaks down. One such case is when the consuming application needs to execute aggregations on the data. In this case, calling the listing endpoint and looping over all of the data every time we needed to summarize would not be performant.

In order to support these kinds of needs (i.e. reporting), the downstream application can benefit from a local cache of the data that it can refer to directly and with low latency.

The consuming application should not simply call the listing point and page through the data to build (and periodically rebuild) this cache for a couple of reasons:

  • The paging strategy puts a performance burden on our RESTful service. We may not have a lot of product records, but some services could easily hold hundreds of thousands or millions of records. Most of these records will not change, but would still need to be scanned to detect changes.
  • One application may be able to get away with paging through our service’s records. However, imagine a dozen applications across the organization all implementing this same strategy. If every application scheduled this job to run at 2 a.m., then, effectively, we have built a DDOS Attack against our own service.
  • Once the downstream applications realize the value of having product data, they may want that data faster. A once-a-day job becomes a twice-a-day job, then becomes an every hour job, then becomes an every fifteen-minute job.

Other teams might gravitate towards utilizing a CDC solution but may then find out that the CDC between disparate data sources of varying types is not supported (e.g. SymmetricDS does not support NoSQL databases).

Eventing

What we really need is a mechanism to alert downstream applications about when a product has changed so they can react to it without periodic upstream polling. One possible option is to have the product service call a service in the downstream application. This is sometimes called a “webhook”. However, this creates a dependency of our service on the downstream system. What do we do if the downstream service produces an error when we call it? What if the downstream system is slow? Do we slow down the saving of a product because of this webhook call?

Instead, we are going to leverage a Topic/Queue eventing pattern. The AWS implementation of a Topic is the Simple Notification Service (“SNS” for short). A message is sent to SNS and then SNS handles forwarding a copy of that message to any number of “Subscribers”. Our service has no knowledge of these subscribers and they can subscribe and unsubscribe as needed. SNS can forward the message to an HTTP endpoint. Or, more commonly, it can forward the message to a Simple Queue Service Queue.

First, we will start by adding our Topic to our cloudformation.template.yml:

ProductEventsTopic: 
    Type: "AWS::SNS::Topic"
    Properties: {}

(Outside of the scope of this article is the use of a DLQ, but we highly recommend using a DLQ and notifying your team when messages enter the DLQ).

We need to add an environment variable to our ECS task definition so it knows about the topic:

- Name: PRODUCT_EVENTS_TOPIC_ARN
  Value: !Ref ProductEventsTopic

Now grant the application the ability to publish to the topic by adding this YAML block to the TaskPolicy resource:

- Effect: "Allow"
  Action: 
    - sns:Publish
  Resource: !Ref ProductEventsTopic

Lastly, it would probably be good to export this topic so that other applications can reference it:

ProductEventsTopic:
  Value: !Ref ProductEventsTopic
  Export:
    Name: !Sub "${AWS::StackName}:ProductEventsTopic"

We now have access to a topic that can be used to publish messages. What should we publish and how?

The first inclination might be to broadcast a JSON payload of the product that is being changed. (This is the Event-Carried State Transfer pattern). This strategy works in most cases, but it has a significant drawback that makes it less than ideal. Just as with our previous post on history tracking, probably the most important design constraint is that the information sent to the downstream system is always accurate.

Let us say that we broadcast the product as an event immediately after we save it to the database. Because DynamoDB is not transactional, if the broadcast fails for any reason (an exception is thrown or the process crashes), then the record will have been saved but downstream systems will never be notified. This will cause downstream applications to slowly become out of sync with the source system. This may not be noticeable for some time and can, upon discovery, require a large amount of data cleanup to reconcile and correct.

We could broadcast the event immediately before we save. This ensures downstream applications always have every product that is saved. Unfortunately, because SNS is not transactional, those applications will still receive products even if the save operation fails. This causes them to have phantom records and to become out of sync with the upstream source of truth.

Another issue is the ordering of events. If the downstream application is running more than one instance (a safe assumption for a highly reliable, horizontally scaled application). And, we make more than one change to a product in a short amount of time. Then, the downstream application could process the events out-of-order and result in data in its system moving in a “backward” direction.

Instead of broadcasting the entire entity, what if we just broadcasted an Id and forced the downstream application to issue a GET request to fetch the state? This has several benefits:

  1. If the Id is published and the save fails, the downstream application will receive a 404 response and can ignore the event.
  2. It doesn’t matter what order the events are processed in because they all contain the same data.
  3. It is impossible to save an entity without successfully broadcasting an event if we broadcast before save.
  4. Messages are smaller in size.

The main downside of the above approach is performance. Each application has to make an HTTP request for each message it processes. In practice, this isn’t as much of a burden as it first seems. The downstream applications are only making a request to the “get by id” endpoint which is generally the most efficient type of call you can make to a data store. For a lot of use cases, applications need to make other calls to look up reference data anyway so this may not make much of an impact.

To implement this strategy, we will start with a new module that can be responsible for executing the broadcast. Create a products/broadcastProductEvent.spec.js file with these contents:

const proxyquire = require('proxyquire');

describe('broadcastProductEvent', function() {
    beforeEach(function() {
        process.env.PRODUCT_EVENTS_TOPIC_ARN = 'aws::arn::products-topic';

        const mockSNSClient = this.mockSNSClient = {
            publish() {
                return {
                    promise: () => Promise.resolve()
                };
            }
        };
        spyOn(mockSNSClient, 'publish').and.callThrough();

        const SNS = function() {
            return mockSNSClient;
        };
        this.broadcastProductEvent = proxyquire('./broadcastProductEvent', {
            'aws-sdk': {SNS}
        });
    });

    it('should send the message to the correct topic', async function() {
        await this.broadcastProductEvent('abc');
        expect(this.mockSNSClient.publish.calls.argsFor(0)[0].TopicArn).toEqual('aws::arn::products-topic');
    });

    it('should send the id as a JSON encoded object', async function () {
        await this.broadcastProductEvent('abc');
        const message = this.mockSNSClient.publish.calls.argsFor(0)[0].Message;
        const body = JSON.parse(message);
        expect(body).toEqual({id: 'abc'});
    });

    // This is so we can run the application locally without having to setup a topic
    it('should not call publish if PRODUCT_EVENTS_TOPIC_ARN is undefined', async function() {
        delete process.env.PRODUCT_EVENTS_TOPIC_ARN;
        await this.broadcastProductEvent('abc');
        expect(this.mockSNSClient.publish).not.toHaveBeenCalled();        
    });

    it('should not call publish if PRODUCT_EVENTS_TOPIC_ARN is empty', async function() {
        process.env.PRODUCT_EVENTS_TOPIC_ARN = '';
        await this.broadcastProductEvent('abc');
        expect(this.mockSNSClient.publish).not.toHaveBeenCalled();        
    });
});

And here is the straightforward implementation in products/broadcastProductEvent.js:

const AWS = require('aws-sdk');
const snsClient = new AWS.SNS();

module.exports = async function broadcastProductEvent(id) {
    const TopicArn = process.env.PRODUCT_EVENTS_TOPIC_ARN;
    if(TopicArn) {
        await snsClient.publish({
            TopicArn,
            Message: JSON.stringify({id})
        });
    }
};

Now we need to add a couple test cases to each handler that modifies products:

products/createProduct.spec.js

    beforeEach(function () {
        ...
        this.broadcastProductEvent = () => Promise.resolve();
        spyOn(this, 'broadcastProductEvent').and.callThrough();
        ...
        this.createProduct = proxyquire('./createProduct', {
            ...
            './broadcastProductEvent': this.broadcastProductEvent
        });
    });
    ...
    it('should call broadcastProductEvent with the id', async function() {
        await this.createProduct(this.context);
        const id = this.documentClient.put.calls.argsFor(0)[0].Item.id;
        expect(this.broadcastProductEvent).toHaveBeenCalledWith(id);
    });

    it('should not save if broadcast fails', async function() {
        this.broadcastProductEvent.and.callFake(() => Promise.reject());
        try {
            await this.createProduct(this.context);
        } catch(e) {}
        expect(this.documentClient.put).not.toHaveBeenCalled();
    });
    ...

products/deleteProduct.spec.js

beforeEach(function () {
        ...
        this.broadcastProductEvent = () => Promise.resolve();
        spyOn(this, 'broadcastProductEvent').and.callThrough();
        ...
        this.deleteProduct = proxyquire('./deleteProduct', {
            ...
            './broadcastProductEvent': this.broadcastProductEvent
        });
    });
    ...
    it('should call broadcastProductEvent with the id', async function() {
        await this.deleteProduct(this.context);
        const id = this.documentClient.update.calls.argsFor(0)[0].Key.id;
        expect(this.broadcastProductEvent).toHaveBeenCalledWith(id);
    });

    it('should not save if broadcast fails', async function() {
        this.broadcastProductEvent.and.callFake(() => Promise.reject());
        try {
            await this.deleteProduct(this.context);
        } catch(e) {}
        expect(this.documentClient.update).not.toHaveBeenCalled();
    });
    ...

products/updateProduct.spec.js

beforeEach(function () {
        ...
        this.broadcastProductEvent = () => Promise.resolve();
        spyOn(this, 'broadcastProductEvent').and.callThrough();
        ...
        this.updateProduct = proxyquire('./updateProduct', {
            ...
            './broadcastProductEvent': this.broadcastProductEvent
        });
    });
    ...
    it('should call broadcastProductEvent with the id', async function() {
        await this.updateProduct(this.context);
        const id = this.documentClient.put.calls.argsFor(0)[0].Item.id;
        expect(this.broadcastProductEvent).toHaveBeenCalledWith(id);
    });

    it('should not save if broadcast fails', async function() {
        this.broadcastProductEvent.and.callFake(() => Promise.reject());
        try {
            await this.updateProduct(this.context);
        } catch(e) {}
        expect(this.documentClient.put).not.toHaveBeenCalled();
    });
    ...

Implement the above by importing our utility function and calling it at the appropriate time.

Downstream applications can subscribe to this topic by including something like the following in their cloud formation template:

ProductsQueue:
  Type: AWS::SQS::Queue
  Properties:
    DelaySeconds: 5
    RedrivePolicy:
      DeadLetterTargetArn: !GetAtt ProductsDeadLetterQueue.Arn
      MaxReceiveCount: 3
ProductsDeadLetterQueue:
  Type: AWS::SQS::Queue
ProductsQueuePolicy:
  Type: AWS::SQS::QueuePolicy
  Properties:
    Queues:
     - !Ref: ProductsQueue
    PolicyDocument:
      Version: "2012-10-17"
      Statement:
       - Effect: Allow
         Action:
          - "sqs:SendMessage"
         Resource: '*'
         Principal: '*'
         Condition:
           ArnEquals:
             'aws:SourceArn': !ImportValue "ProductService:ProductEventsTopic"
ProductsSubscription:
  Type: AWS::SNS::Subscription
  Properties:
    TopicArn: !ImportValue "ProductService:ProductEventsTopic"
    Protocol: sqs
    Endpoint: !GetAtt ProductsQueue.Arn

We now have a simple and reliable eventing system that allows other applications to listen for and react to changes in our service.

See the changes we made here.

Table of Contents

If you have questions or feedback on this series, contact the authors at nodereference@sourceallies.com.

〈  Back to Blog