From 1e0f9efb4eb660538efb5f175e30594a6b175143 Mon Sep 17 00:00:00 2001 From: Andras Toth Date: Wed, 8 Feb 2017 17:18:29 +0100 Subject: [PATCH] Feat: Receive message attributes --- package.json | 3 ++- src/Message.js | 36 ++++++++++++++++++++++++++++++++++++ src/index.js | 3 ++- test/src/Message.spec.js | 26 +++++++++++++++++++++++++- test/src/index.spec.js | 3 ++- 5 files changed, 67 insertions(+), 4 deletions(-) diff --git a/package.json b/package.json index f1af91d..a3abf0d 100644 --- a/package.json +++ b/package.json @@ -37,7 +37,8 @@ }, "homepage": "https://github.com/TechnologyAdvice/Squiss#readme", "dependencies": { - "aws-sdk": "^2.10.0" + "aws-sdk": "^2.10.0", + "lodash.reduce": "4.6.0" }, "devDependencies": { "chai": "^3.5.0", diff --git a/src/Message.js b/src/Message.js index e2e8348..d9fe98e 100644 --- a/src/Message.js +++ b/src/Message.js @@ -4,6 +4,8 @@ 'use strict' +const reduce = require('lodash.reduce') + /** * The message class is a wrapper for Amazon SQS messages that provides the raw and parsed message body, * optionally removed SNS wrappers, and provides convenience functions to delete or keep a given message. @@ -33,6 +35,7 @@ class Message { this.topicName = unwrapped.TopicArn.substr(unwrapped.TopicArn.lastIndexOf(':') + 1) } this.body = Message._formatMessage(this.body, opts.bodyFormat) + this.attributes = Message._parseMessageAttributes(opts.msg.MessageAttributes) this._squiss = opts.squiss this._handled = false } @@ -86,4 +89,37 @@ Message._formatMessage = (msg, format) => { } } +/** + * Parses the MessageAttributes + * @param {Object} messageAttributes + * @returns {Object} Key - value pairs + * @private + */ +Message._parseMessageAttributes = (messageAttributes) => { + return reduce(messageAttributes, (parsedAttributes, unparsedAttribute, name) => Object.assign(parsedAttributes, { + [name]: Message._parseAttributeValue(unparsedAttribute) + }), {}) +} + +/** + * Parses a value of a MessageAttribute + * @param {Object} unparsedAttribute + * @returns {number|string|Buffer} + * @private + */ +Message._parseAttributeValue = (unparsedAttribute) => { + const type = unparsedAttribute.DataType + const stringValue = unparsedAttribute.StringValue + const binaryValue = unparsedAttribute.BinaryValue + + switch (type) { + case 'Number': + return Number(stringValue) + case 'Binary': + return binaryValue + default: + return stringValue || binaryValue + } +} + module.exports = Message diff --git a/src/index.js b/src/index.js index e2788a3..cc3c369 100644 --- a/src/index.js +++ b/src/index.js @@ -407,7 +407,8 @@ class Squiss extends EventEmitter { const params = { QueueUrl: queueUrl, MaxNumberOfMessages: this._opts.receiveBatchSize, - WaitTimeSeconds: this._opts.receiveWaitTimeSecs + WaitTimeSeconds: this._opts.receiveWaitTimeSecs, + MessageAttributeNames: ['All'] } if (this._opts.visibilityTimeoutSecs !== undefined) { params.VisibilityTimeout = this._opts.visibilityTimeoutSecs diff --git a/test/src/Message.spec.js b/test/src/Message.spec.js index 225c399..8ab3766 100644 --- a/test/src/Message.spec.js +++ b/test/src/Message.spec.js @@ -11,7 +11,25 @@ function getSQSMsg(body) { MessageId: 'msgId', ReceiptHandle: 'handle', MD5OfBody: 'abcdeabcdeabcdeabcdeabcdeabcde12', - Body: body + Body: body, + MessageAttributes: { + SomeNumber: { + DataType: 'Number', + StringValue: '1' + }, + SomeString: { + DataType: 'String', + StringValue: 's' + }, + SomeBinary: { + DataType: 'Binary', + BinaryValue: new Buffer(['s']) + }, + SomeCustomBinary: { + DataType: 'CustomBinary', + BinaryValue: new Buffer(['c']) + } + } } } @@ -49,6 +67,12 @@ describe('Message', () => { msg.body.should.be.an('object') msg.body.should.have.property('Message').equal('foo') msg.body.should.have.property('bar').equal('baz') + msg.attributes.should.be.eql({ + SomeNumber: 1, + SomeString: 's', + SomeBinary: new Buffer(['s']), + SomeCustomBinary: new Buffer(['c']) + }) }) it('calls Squiss.deleteMessage on delete', (done) => { const msg = new Message({ diff --git a/test/src/index.spec.js b/test/src/index.spec.js index 6f84265..571f978 100644 --- a/test/src/index.spec.js +++ b/test/src/index.spec.js @@ -226,7 +226,8 @@ describe('index', () => { QueueUrl: 'foo', MaxNumberOfMessages: 10, WaitTimeSeconds: 20, - VisibilityTimeout: 10 + VisibilityTimeout: 10, + MessageAttributeNames: ['All'] }) }) })