Node Reference - History – Source Allies Node Reference - History | Source Allies
← Node Reference - Patch Product Node Reference - Delete →

Node Reference - History

07/30/2018 By Paul Rowe, Matt Vincent

Prerequisites

This article builds on the prior article: Patch.

Tracking History

The way our service is currently built, all modifications (PATCH requests) happen in-place to records in DynamoDB. It is common in most production applications that someone will want to know the history of a record. A business user may want to know who changed a record and at what time. Or, the development/operations team may be troubleshooting an issue and may want to know what the data looked like at a certain point in time prior to the issue emerging. In some industries, there may be regulatory or legal requirements to track the changes made to the system and who made those changes. In order to support these use cases, we need a way to track the “history” of an entity.

Two years ago, PWC described how tracking history, may be the the future norm:

In a relational database, files are mutable, which means a given cell can be overwritten when there are changes to the data relevant to that cell. 
Most [immutable databases have] an accumulate-only file system that overwrites nothing. 
Each file is immutable, and any changes are recorded as separate timestamped files. 
The method lends itself not only to faster and more capable stream processing, but also to various kinds of historical time-series analysis—that is, both ends of the data lifecycle continuum.

The traditional method—overwriting mutable files—originated during the era of smaller data when storage was expensive and all systems were transactional. 
Now disk storage is inexpensive, and enterprises have lots of data they never had before. 
They want to minimize the latency caused by overwriting data and to keep all data they write in a full-fidelity form so they can analyze and learn from it. 
In the future, mutable could become the exception rather than the rule.

There are two main considerations when designing any history tracking system:

  1. It must be exactly accurate. The history cannot show a change that never occured. And, a change can never be allowed to occur without tracking the who, what and when of the change.
  2. It should be subtle. Tracking history should not overly impact the complexity of the system or its performance.

If the backing datastore of an application is Atomic, such as a relational database, then we can simply insert a row in a history table in the same transaction as our modification to the main table.

However, in most “no-SQL” databases (such as DynamoDB), atomicity is not guaranteed across records. Relaxing this constraint of ACID compliance is one way DynamoDB is able to achieve its noteworthy scale.

Let us start by considering the strategy as we would use in a relational database. We create a second DynamoDB table to store history. We then add a second documentClient.put call in our handler function immediately after the existing one to store the change and who performed it. We then test our design by asking the question “Is the system accurate (according to constraint #1 above) if an exception (or system crash) were to occur at any point around our database calls?”.

  • If an exception happens before either put then nothing in the database changes, so we are still accurate
  • If an exception occurs after we store the first record into the Products table, but before we store the history table record, then the client will get an error.

The problem with this 2nd case is that the system will still show the user’s change but will lack an audit of that change. Once the next change happens, and the user listed in the Product table is overwritten, the person who performed the prior change will have been lost. We can’t simply implement a resolution in a “catch block” because the process could have completly crashed.

What if we reverse the order of the calls? We can store the change that is about to happen in the history table before we perform the change in the main table.

  • We are still fine if an exception happens before anything is saved.
  • If an exception happens after the history row is saved (but before the main table is updated), then the history will show a change that never actually occurred. There is no way to reconcile this after the fact because the next change to succeed to the main table will hide that the phantom change never happened.

So, no matter what order we try, the system is left in an inaccurate state. Another option would be to not use a separate table and use “active indicators” or “effective/term dates”. These strategies suffer from the same problem because one record has to be “termed” while another record is inserted to replace it and they both have to succeed.

Another strategy

We start with a couple of observations:

  • A single record that is never modified, has no “changes” that need to be tracked. This is because the current state of the record is the only state it has ever had and the “lastModified” and “modifiedBy” is the only time and only user that has ever touched it.
  • If a record is modified one or more times, then the current state of the record represents the result of its most recent modification. (This is probably obvious but still important to call out.)

All we need to do to hold all history is to maintain this lost state when a record is modified. To do this, we store a copy of the record unchanged immediately before updating the record. This “snapshot” is stored with a composite primary key that is the combination of the “id” and the “lastModified” fields. This is not the current modification that is in-flight, but rather, the lastModified that was loaded from the database.

With this in place, the ProductSnapshots (a.k.a history) table holds every state a product has ever been in, with the exception of the current state. To complete the history, we simply need to perform a Set Union between the Snapshots for a record and the current state of the record using the “id” and “lastModified” fields as the comparison criteria.

Let’s look at the state of the system when various exceptions could happen:

  • If an exception happens before any calls to DynamoDB, we are fine because nothing has changed.
  • If an exception happens after the current record is stored to the ProductSnapshots table but before we modify the main table, we are fine. This is because the record in the Snapshots table, while duplicating the state of the main table, is still accurate. It represents the state of the entity as of the previous modification.
  • If an exception happens after the ProductSnapshot is saved, and the main record is updated, then we are fine because no changes were lost.

Let’s implement this strategy. We will start by adding a “ProductSnapshotsTable” to cloudformation.template.yml:

  ProductsSnapshotTable:
    Type: "AWS::DynamoDB::Table"
    Properties:
      AttributeDefinitions:
        - AttributeName: id
          AttributeType: S
        - AttributeName: lastModified
          AttributeType: S
      KeySchema:
        - AttributeName: id
          KeyType: "HASH"
        - AttributeName: lastModified
          KeyType: "RANGE" #note the keyType. This allows us to search for snapshots "as-of" a particular date and to order our snapshots
      ProvisionedThroughput:
        ReadCapacityUnits: 1
        WriteCapacityUnits: 1

This table will be written to at the same throughput as the Products DynamoDB table, but it may be read from at a different throughput. Add Auto Scaling with the following yaml:

  SnapshotWriteCapacityScalableTarget:
    Type: "AWS::ApplicationAutoScaling::ScalableTarget"
    Properties:
      MaxCapacity: 5
      MinCapacity: 1
      ResourceId: !Join
        - /
        - - table
          - !Ref ProductsSnapshotTable
      RoleARN: !GetAtt ScalingRole.Arn
      ScalableDimension: dynamodb:table:WriteCapacityUnits
      ServiceNamespace: dynamodb
  SnapshotReadCapacityScalableTarget:
    Type: "AWS::ApplicationAutoScaling::ScalableTarget"
    Properties:
      MaxCapacity: 5
      MinCapacity: 1
      ResourceId: !Join
        - /
        - - table
          - !Ref ProductsSnapshotTable
      RoleARN: !GetAtt ScalingRole.Arn
      ScalableDimension: dynamodb:table:ReadCapacityUnits
      ServiceNamespace: dynamodb
  SnapshotWriteScalingPolicy:
    Type: "AWS::ApplicationAutoScaling::ScalingPolicy"
    Properties:
      PolicyName: WriteAutoScalingPolicy
      PolicyType: TargetTrackingScaling
      ScalingTargetId: !Ref SnapshotWriteCapacityScalableTarget
      TargetTrackingScalingPolicyConfiguration:
        TargetValue: 50.0
        ScaleInCooldown: 60
        ScaleOutCooldown: 60
        PredefinedMetricSpecification:
          PredefinedMetricType: DynamoDBWriteCapacityUtilization
  SnapshotReadScalingPolicy:
    Type: "AWS::ApplicationAutoScaling::ScalingPolicy"
    Properties:
      PolicyName: ReadAutoScalingPolicy
      PolicyType: TargetTrackingScaling
      ScalingTargetId: !Ref SnapshotReadCapacityScalableTarget
      TargetTrackingScalingPolicyConfiguration:
        TargetValue: 50.0
        ScaleInCooldown: 60
        ScaleOutCooldown: 60
        PredefinedMetricSpecification:
          PredefinedMetricType: DynamoDBReadCapacityUtilization

We also need to inject our PRODUCTS_SNAPSHOT_TABLE_NAME table name into our service as an environment variable:

  ContainerDefinitions:
    - Name: ProductService
      Environment:
        - Name: PRODUCTS_TABLE_NAME
          Value: !Ref ProductsTable
        - Name: PRODUCTS_SNAPSHOT_TABLE_NAME
          Value: !Ref ProductsSnapshotTable

We need a function that stores a snapshot. Create products/snapshots/snapshotProduct.spec.js with these test cases:

const proxyquire = require('proxyquire');

describe('products', function () {
    describe('snapshotProduct', function () {
        beforeEach(function () {
            process.env.PRODUCTS_SNAPSHOT_TABLE_NAME = 'ProductSnapshots';
            this.product = {
                id: 'abc',
                lastModified: '2018-04-03T10:00:00.000Z'
            };

            this.awsResult = {
                promise: () => Promise.resolve()
            };
            const documentClient = this.documentClient ={
                put: (params) => this.awsResult
            };
            spyOn(this.documentClient, 'put').and.callThrough();

            this.snapshotProduct = proxyquire('./snapshotProduct', {
                'aws-sdk': {
                    DynamoDB: {
                        DocumentClient: function() {
                            return documentClient;
                        }
                    }
                }
            });
        });

        it('should pass the correct TableName to documentClient.put', async function () {
            await this.snapshotProduct(this.product);
            expect(this.documentClient.put.calls.argsFor(0)[0].TableName).toEqual('ProductSnapshots');
        });

        it('should pass the product to documentClient.put', async function () {
            await this.snapshotProduct(this.product);
            expect(this.documentClient.put.calls.argsFor(0)[0].Item).toBe(this.product);
        });
    });
});

Add an implementation in products/snapshots/snapshotProduct.js:

const AWS = require('aws-sdk');
const documentClient = new AWS.DynamoDB.DocumentClient();
const productsSnapshotTableName = process.env.PRODUCTS_SNAPSHOT_TABLE_NAME;

module.exports = async function snapshotProduct(product) {
    return await documentClient.put({
        TableName: productsSnapshotTableName,
        Item: product,
    }).promise();
};

Lastly, we simply need to call our “snapshotProduct” function during our patch request (there is nothing to snapshot when a record is created). Add a mock, proxyquire entry and a test case to products/updateProduct.spec.js:

    beforeEach(function() {
        ...
        this.snapshotProduct = (product) => Promise.resolve();
            spyOn(this, 'snapshotProduct');
        ...
        this.updateProduct = proxyquire('./updateProduct', {
            'aws-sdk': {
                DynamoDB: {
                    DocumentClient: function() {
                        return documentClient;
                    }
                }
            },
            './validateProduct': this.validateProduct,
            './snapshots/snapshotProduct': this.snapshotProduct
        });
    });

    it('should pass the unpatched product to snapshotProduct', async function () {
        await this.updateProduct(this.context);

        const expectedProduct = {
            lastModified: '2018-01-02T03:04:05.000Z'
        };
        expect(this.snapshotProduct.calls.argsFor(0)[0]).toEqual(expectedProduct);
    });

    it('should not save if snapshotting fails', async function () {
        this.snapshotProduct.and.callFake(() => Promise.reject());
        try {
            await this.updateProduct(this.context);
        } catch (e) {}
        
        expect(this.documentClient.put).not.toHaveBeenCalled();
    });
  ...

Implement the above test in products/updateProduct.js to complete the storing of history.

const snapshotProduct = require('./snapshots/snapshotProduct');
...


module.exports = async function(ctx) {
    ...
    await snapshotProduct({...product});
    ... 
};

See an example of the changes made.

Now, run your tests:

npm test

Querying History

Storing history of a Product isn’t much use if there is no easy way to get access to it. Because snapshots can be thought of as a sub-resource under the product they are tracking, we can create a new endpoint that allows a client to query the snapshots for a specific product to see how that product has changed over time.

Start by creating a products/snapshots/listSnapshots.spec.js file with these tests:

const proxyquire = require('proxyquire');

describe('products', function () {
    describe('listSnapshots', function () {
        beforeEach(function () {
            process.env.PRODUCTS_SNAPSHOT_TABLE_NAME = 'ProductSnapshots';

            this.response = {
                Items: [{},{}]
            }

            this.context = {
                params: {
                    id: 'abc123'
                },
                query: {},
                response: {
                    headers: {},
                    set(field, value) {
                        this.headers[field] = value;
                    }
                }
            };

            this.awsResult = {
                promise: () => Promise.resolve(this.response)
            };
            const documentClient = this.documentClient = {
                query: (params) => this.awsResult
            };
            spyOn(this.documentClient, 'query').and.callThrough();

            this.listSnapshots = proxyquire('./listSnapshots', {
                'aws-sdk': {
                    DynamoDB: {
                        DocumentClient: function() {
                            return documentClient;
                        }
                    }
                }
            });
        });

        it('should pass the correct parameter structure to dyanmodb.scan', async function () {
            this.context.params.id = 'abc123';
            await this.listSnapshots(this.context);
            const expectedParams = {
                ExclusiveStartKey: undefined,
                Limit: 25,
                TableName: 'ProductSnapshots',
                KeyConditionExpression: 'id = :id',
                ExpressionAttributeValues: {
                    ':id': 'abc123'
                },
                ScanIndexForward: false
            };
            expect(this.documentClient.query.calls.argsFor(0)[0]).toEqual(expectedParams);
        });

        it('should return the product list', async function() {
            await this.listSnapshots(this.context);
            expect(this.context.body).toEqual(this.response.Items);
        });

        describe('pagination', function() {
            it('should not return a Link header if the returned LastEvaluatedKey is undefined', async function() {
                delete this.response.LastEvaluatedKey;
                await this.listSnapshots(this.context);
                expect(this.context.response.headers.Link).toBeUndefined();
            });

            it('should return a properly formatted link header when LastEvaluatedKey is returned', async function() {
                this.response.LastEvaluatedKey = {
                    id: 'id123'
                };
                await this.listSnapshots(this.context);
                expect(this.context.response.headers.link)
                    .toEqual('</products?_lek=id123>; rel="next"');
            });

            it('should pass the _lek param to Dyanmo if this is a pagination request', async function() {
                this.context.query = {
                    _lek: 'key123'
                };
                await this.listSnapshots(this.context);
                expect(this.documentClient.query.calls.argsFor(0)[0].ExclusiveStartKey)
                    .toEqual({id: 'key123'});
            });
        });
    });
});

We are setting ScanIndexForward to false as well as implementing pagination via the same Link header employed in our listing endpoint. This allows the client to logically page backwards through time to access the entity’s history.

Implement the above in products/snapshots/listSnapshots.js:

const documentClient = require('../documentClient');
const formatLinkHeader = require('format-link-header');
const productsSnapshotTableName = process.env.PRODUCTS_SNAPSHOT_TABLE_NAME;

function getExclusiveStartKey(ctx) {
    if (ctx.query && ctx.query._lek) {
        return {
            id: ctx.query._lek
        };
    }
}

function addLinkHeaderIfNeeded(ctx, lastEvaluatedKey) {
    if (lastEvaluatedKey) {
        const link = {
            next: {
                rel: 'next',
                url: `/products?_lek=${lastEvaluatedKey.id}`
            }
        };
        ctx.response.set('link', formatLinkHeader(link));
    }
}

module.exports = async function listSnapshots(ctx) {
    const scanOutput = await documentClient.query({
        TableName: productsSnapshotTableName,
        Limit: 25,
        ExclusiveStartKey: getExclusiveStartKey(ctx),
        KeyConditionExpression: 'id = :id',
        ExpressionAttributeValues: {
            ':id': ctx.params.id
        },
        ScanIndexForward: false
    }).promise();

    addLinkHeaderIfNeeded(ctx, scanOutput.LastEvaluatedKey);
    ctx.body = scanOutput.Items;
};

Then add a route for it to server.js:

router.get('/products/:id/snapshots', require('./products/snapshots/listSnapshots'));

Finally, exercise your tests:

npm test

See the changes we made here.

For alternative solutions to the maintaining model history, event sourcing is another approach. It has gained popularity with the rise of micro services, yet as Martin Fowler points out, it is as old as accounting ledgers or source code version control systems. With an event sourcing approach, you log events (productCreated, productUpdated, etc) and you can perform the equivalent of a functional fold of events to get the current state of the model at any point in time. Some of the benefits and drawbacks of event-sourcing are enumerated in this 2016 video.

Table of Contents

If you have questions or feedback on this series, contact the authors at nodereference@sourceallies.com.

← Node Reference - Patch Product Node Reference - Delete →
Source Allies Logo © Source Allies, Inc.