From 8b5829b965b72cd5ca7b421e4aa4867d3f567418 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Tue, 29 Jun 2021 21:25:05 +0200 Subject: [PATCH 01/11] auto format code --- lib/channel_model.js | 554 +++++++++++++++++++++---------------------- 1 file changed, 274 insertions(+), 280 deletions(-) diff --git a/lib/channel_model.js b/lib/channel_model.js index 2097fd14..a66e4fd2 100644 --- a/lib/channel_model.js +++ b/lib/channel_model.js @@ -4,265 +4,270 @@ 'use strict'; -var defs = require('./defs'); -var Promise = require('bluebird'); -var inherits = require('util').inherits; -var EventEmitter = require('events').EventEmitter; -var BaseChannel = require('./channel').BaseChannel; -var acceptMessage = require('./channel').acceptMessage; -var Args = require('./api_args'); - -function ChannelModel(connection) { - if (!(this instanceof ChannelModel)) - return new ChannelModel(connection); - EventEmitter.call( this ); - this.connection = connection; - var self = this; - ['error', 'close', 'blocked', 'unblocked'].forEach(function(ev) { - connection.on(ev, self.emit.bind(self, ev)); - }); +const defs = require('./defs'); +const Promise = require('bluebird'); +const inherits = require('util').inherits; +const EventEmitter = require('events').EventEmitter; +const BaseChannel = require('./channel').BaseChannel; +const acceptMessage = require('./channel').acceptMessage; +const Args = require('./api_args'); + +class ChannelModel extends EventEmitter { + constructor(connection) { + super(); + this.connection = connection; + const self = this; + ['error', 'close', 'blocked', 'unblocked'].forEach(ev => { + connection.on(ev, self.emit.bind(self, ev)); + }); + } + + close() { + return Promise.fromCallback(this.connection.close.bind(this.connection)); + } + + createChannel() { + const c = new Channel(this.connection); + return c.open().then(openOk => { return c; }); + } + + createConfirmChannel() { + const c = new ConfirmChannel(this.connection); + return c.open() + .then(openOk => { + return c.rpc(defs.ConfirmSelect, {nowait: false}, + defs.ConfirmSelectOk) + }) + .then(() => { return c; }); + } } -inherits(ChannelModel, EventEmitter); module.exports.ChannelModel = ChannelModel; -var CM = ChannelModel.prototype; - -CM.close = function() { - return Promise.fromCallback(this.connection.close.bind(this.connection)); -}; - // Channels -function Channel(connection) { - BaseChannel.call(this, connection); - this.on('delivery', this.handleDelivery.bind(this)); - this.on('cancel', this.handleCancel.bind(this)); -} -inherits(Channel, BaseChannel); - -module.exports.Channel = Channel; - -CM.createChannel = function() { - var c = new Channel(this.connection); - return c.open().then(function(openOk) { return c; }); -}; - -var C = Channel.prototype; - -// An RPC that returns a 'proper' promise, which resolves to just the -// response's fields; this is intended to be suitable for implementing -// API procedures. -C.rpc = function(method, fields, expect) { - var self = this; - return Promise.fromCallback(function(cb) { - return self._rpc(method, fields, expect, cb); - }) - .then(function(f) { - return f.fields; - }); -}; - -// Do the remarkably simple channel open handshake -C.open = function() { - return Promise.try(this.allocate.bind(this)).then( - function(ch) { - return ch.rpc(defs.ChannelOpen, {outOfBand: ""}, - defs.ChannelOpenOk); +class Channel extends BaseChannel { + constructor(connection) { + super(connection); + this.on('delivery', this.handleDelivery.bind(this)); + this.on('cancel', this.handleCancel.bind(this)); + } + + // An RPC that returns a 'proper' promise, which resolves to just the + // response's fields; this is intended to be suitable for implementing + // API procedures. + rpc(method, fields, expect) { + const self = this; + return Promise.fromCallback(cb => { + return self._rpc(method, fields, expect, cb); + }) + .then(f => { + return f.fields; }); -}; - -C.close = function() { - var self = this; - return Promise.fromCallback(function(cb) { - return self.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS, - cb); - }); -}; - -// === Public API, declaring queues and stuff === - -C.assertQueue = function(queue, options) { - return this.rpc(defs.QueueDeclare, - Args.assertQueue(queue, options), - defs.QueueDeclareOk); -}; - -C.checkQueue = function(queue) { - return this.rpc(defs.QueueDeclare, - Args.checkQueue(queue), - defs.QueueDeclareOk); -}; - -C.deleteQueue = function(queue, options) { - return this.rpc(defs.QueueDelete, - Args.deleteQueue(queue, options), - defs.QueueDeleteOk); -}; - -C.purgeQueue = function(queue) { - return this.rpc(defs.QueuePurge, - Args.purgeQueue(queue), - defs.QueuePurgeOk); -}; - -C.bindQueue = function(queue, source, pattern, argt) { - return this.rpc(defs.QueueBind, - Args.bindQueue(queue, source, pattern, argt), - defs.QueueBindOk); -}; - -C.unbindQueue = function(queue, source, pattern, argt) { - return this.rpc(defs.QueueUnbind, - Args.unbindQueue(queue, source, pattern, argt), - defs.QueueUnbindOk); -}; - -C.assertExchange = function(exchange, type, options) { - // The server reply is an empty set of fields, but it's convenient - // to have the exchange name handed to the continuation. - return this.rpc(defs.ExchangeDeclare, - Args.assertExchange(exchange, type, options), - defs.ExchangeDeclareOk) - .then(function(_ok) { return { exchange: exchange }; }); -}; - -C.checkExchange = function(exchange) { - return this.rpc(defs.ExchangeDeclare, - Args.checkExchange(exchange), - defs.ExchangeDeclareOk); -}; - -C.deleteExchange = function(name, options) { - return this.rpc(defs.ExchangeDelete, - Args.deleteExchange(name, options), - defs.ExchangeDeleteOk); -}; - -C.bindExchange = function(dest, source, pattern, argt) { - return this.rpc(defs.ExchangeBind, - Args.bindExchange(dest, source, pattern, argt), - defs.ExchangeBindOk); -}; - -C.unbindExchange = function(dest, source, pattern, argt) { - return this.rpc(defs.ExchangeUnbind, - Args.unbindExchange(dest, source, pattern, argt), - defs.ExchangeUnbindOk); -}; - -// Working with messages - -C.publish = function(exchange, routingKey, content, options) { - var fieldsAndProps = Args.publish(exchange, routingKey, options); - return this.sendMessage(fieldsAndProps, fieldsAndProps, content); -}; - -C.sendToQueue = function(queue, content, options) { - return this.publish('', queue, content, options); -}; - -C.consume = function(queue, callback, options) { - var self = this; - // NB we want the callback to be run synchronously, so that we've - // registered the consumerTag before any messages can arrive. - var fields = Args.consume(queue, options); - return Promise.fromCallback(function(cb) { - self._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb); - }) - .then(function(ok) { - self.registerConsumer(ok.fields.consumerTag, callback); - return ok.fields; - }); -}; - -C.cancel = function(consumerTag) { - var self = this; - return Promise.fromCallback(function(cb) { - self._rpc(defs.BasicCancel, Args.cancel(consumerTag), - defs.BasicCancelOk, - cb); - }) - .then(function(ok) { - self.unregisterConsumer(consumerTag); - return ok.fields; - }); -}; - -C.get = function(queue, options) { - var self = this; - var fields = Args.get(queue, options); - return Promise.fromCallback(function(cb) { - return self.sendOrEnqueue(defs.BasicGet, fields, cb); - }) - .then(function(f) { - if (f.id === defs.BasicGetEmpty) { - return false; - } - else if (f.id === defs.BasicGetOk) { - var fields = f.fields; - return new Promise(function(resolve) { - self.handleMessage = acceptMessage(function(m) { - m.fields = fields; - resolve(m); - }); + } + + // Do the remarkably simple channel open handshake + open() { + return Promise.try(this.allocate.bind(this)).then( + ch => { + return ch.rpc(defs.ChannelOpen, {outOfBand: ""}, + defs.ChannelOpenOk); }); - } - else { - throw new Error("Unexpected response to BasicGet: " + - inspect(f)); - } - }) -}; - -C.ack = function(message, allUpTo) { - this.sendImmediately( - defs.BasicAck, - Args.ack(message.fields.deliveryTag, allUpTo)); -}; + } -C.ackAll = function() { - this.sendImmediately(defs.BasicAck, Args.ack(0, true)); -}; - -C.nack = function(message, allUpTo, requeue) { - this.sendImmediately( - defs.BasicNack, - Args.nack(message.fields.deliveryTag, allUpTo, requeue)); -}; + close() { + const self = this; + return Promise.fromCallback(cb => { + return self.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS, + cb); + }); + } + + // === Public API, declaring queues and stuff === + + assertQueue(queue, options) { + return this.rpc(defs.QueueDeclare, + Args.assertQueue(queue, options), + defs.QueueDeclareOk); + } + + checkQueue(queue) { + return this.rpc(defs.QueueDeclare, + Args.checkQueue(queue), + defs.QueueDeclareOk); + } + + deleteQueue(queue, options) { + return this.rpc(defs.QueueDelete, + Args.deleteQueue(queue, options), + defs.QueueDeleteOk); + } + + purgeQueue(queue) { + return this.rpc(defs.QueuePurge, + Args.purgeQueue(queue), + defs.QueuePurgeOk); + } + + bindQueue(queue, source, pattern, argt) { + return this.rpc(defs.QueueBind, + Args.bindQueue(queue, source, pattern, argt), + defs.QueueBindOk); + } + + unbindQueue(queue, source, pattern, argt) { + return this.rpc(defs.QueueUnbind, + Args.unbindQueue(queue, source, pattern, argt), + defs.QueueUnbindOk); + } + + assertExchange(exchange, type, options) { + // The server reply is an empty set of fields, but it's convenient + // to have the exchange name handed to the continuation. + return this.rpc(defs.ExchangeDeclare, + Args.assertExchange(exchange, type, options), + defs.ExchangeDeclareOk) + .then(_ok => { return { exchange }; }); + } + + checkExchange(exchange) { + return this.rpc(defs.ExchangeDeclare, + Args.checkExchange(exchange), + defs.ExchangeDeclareOk); + } + + deleteExchange(name, options) { + return this.rpc(defs.ExchangeDelete, + Args.deleteExchange(name, options), + defs.ExchangeDeleteOk); + } + + bindExchange(dest, source, pattern, argt) { + return this.rpc(defs.ExchangeBind, + Args.bindExchange(dest, source, pattern, argt), + defs.ExchangeBindOk); + } + + unbindExchange(dest, source, pattern, argt) { + return this.rpc(defs.ExchangeUnbind, + Args.unbindExchange(dest, source, pattern, argt), + defs.ExchangeUnbindOk); + } + + // Working with messages + + publish(exchange, routingKey, content, options) { + const fieldsAndProps = Args.publish(exchange, routingKey, options); + return this.sendMessage(fieldsAndProps, fieldsAndProps, content); + } + + sendToQueue(queue, content, options) { + return this.publish('', queue, content, options); + } + + consume(queue, callback, options) { + const self = this; + // NB we want the callback to be run synchronously, so that we've + // registered the consumerTag before any messages can arrive. + const fields = Args.consume(queue, options); + return Promise.fromCallback(cb => { + self._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb); + }) + .then(ok => { + self.registerConsumer(ok.fields.consumerTag, callback); + return ok.fields; + }); + } + + cancel(consumerTag) { + const self = this; + return Promise.fromCallback(cb => { + self._rpc(defs.BasicCancel, Args.cancel(consumerTag), + defs.BasicCancelOk, + cb); + }) + .then(ok => { + self.unregisterConsumer(consumerTag); + return ok.fields; + }); + } -C.nackAll = function(requeue) { - this.sendImmediately(defs.BasicNack, - Args.nack(0, true, requeue)); -}; + get(queue, options) { + const self = this; + const fields = Args.get(queue, options); + return Promise.fromCallback(cb => { + return self.sendOrEnqueue(defs.BasicGet, fields, cb); + }) + .then(f => { + if (f.id === defs.BasicGetEmpty) { + return false; + } + else if (f.id === defs.BasicGetOk) { + const fields = f.fields; + return new Promise(resolve => { + self.handleMessage = acceptMessage(m => { + m.fields = fields; + resolve(m); + }); + }); + } + else { + throw new Error(`Unexpected response to BasicGet: ${inspect(f)}`); + } + }); + } + + ack(message, allUpTo) { + this.sendImmediately( + defs.BasicAck, + Args.ack(message.fields.deliveryTag, allUpTo)); + } + + ackAll() { + this.sendImmediately(defs.BasicAck, Args.ack(0, true)); + } + + nack(message, allUpTo, requeue) { + this.sendImmediately( + defs.BasicNack, + Args.nack(message.fields.deliveryTag, allUpTo, requeue)); + } + + nackAll(requeue) { + this.sendImmediately(defs.BasicNack, + Args.nack(0, true, requeue)); + } + + // `Basic.Nack` is not available in older RabbitMQ versions (or in the + // AMQP specification), so you have to use the one-at-a-time + // `Basic.Reject`. This is otherwise synonymous with + // `#nack(message, false, requeue)`. + reject(message, requeue) { + this.sendImmediately( + defs.BasicReject, + Args.reject(message.fields.deliveryTag, requeue)); + } + + recover() { + return this.rpc(defs.BasicRecover, + Args.recover(), + defs.BasicRecoverOk); + } +} -// `Basic.Nack` is not available in older RabbitMQ versions (or in the -// AMQP specification), so you have to use the one-at-a-time -// `Basic.Reject`. This is otherwise synonymous with -// `#nack(message, false, requeue)`. -C.reject = function(message, requeue) { - this.sendImmediately( - defs.BasicReject, - Args.reject(message.fields.deliveryTag, requeue)); -}; +module.exports.Channel = Channel; // There are more options in AMQP than exposed here; RabbitMQ only // implements prefetch based on message count, and only for individual // channels or consumers. RabbitMQ v3.3.0 and after treat prefetch // (without `global` set) as per-consumer (for consumers following), // and prefetch with `global` set as per-channel. -C.prefetch = C.qos = function(count, global) { +Channel.prototype.prefetch = Channel.prototype.qos = function(count, global) { return this.rpc(defs.BasicQos, Args.prefetch(count, global), defs.BasicQosOk); }; -C.recover = function() { - return this.rpc(defs.BasicRecover, - Args.recover(), - defs.BasicRecoverOk); -}; - // Confirm channel. This is a channel with confirms 'switched on', // meaning sent messages will provoke a responding 'ack' or 'nack' // from the server. The upshot of this is that `publish` and @@ -270,49 +275,38 @@ C.recover = function() { // with `null` as its argument to signify 'ack', or an exception as // its argument to signify 'nack'. -function ConfirmChannel(connection) { - Channel.call(this, connection); +class ConfirmChannel extends Channel { + constructor(connection) { + super(connection); + } + + publish(exchange, routingKey, content, options, cb) { + this.pushConfirmCallback(cb); + return Channel.prototype.publish.call(this, exchange, routingKey, content, options); + } + + sendToQueue(queue, content, options, cb) { + return this.publish('', queue, content, options, cb); + } + + waitForConfirms() { + const awaiting = []; + const unconfirmed = this.unconfirmed; + unconfirmed.forEach((val, index) => { + if (val === null); // already confirmed + else { + const confirmed = new Promise((resolve, reject) => { + unconfirmed[index] = err => { + if (val) val(err); + if (err === null) resolve(); + else reject(err); + }; + }); + awaiting.push(confirmed); + } + }); + return Promise.all(awaiting); + } } -inherits(ConfirmChannel, Channel); module.exports.ConfirmChannel = ConfirmChannel; - -CM.createConfirmChannel = function() { - var c = new ConfirmChannel(this.connection); - return c.open() - .then(function(openOk) { - return c.rpc(defs.ConfirmSelect, {nowait: false}, - defs.ConfirmSelectOk) - }) - .then(function() { return c; }); -}; - -var CC = ConfirmChannel.prototype; - -CC.publish = function(exchange, routingKey, content, options, cb) { - this.pushConfirmCallback(cb); - return C.publish.call(this, exchange, routingKey, content, options); -}; - -CC.sendToQueue = function(queue, content, options, cb) { - return this.publish('', queue, content, options, cb); -}; - -CC.waitForConfirms = function() { - var awaiting = []; - var unconfirmed = this.unconfirmed; - unconfirmed.forEach(function(val, index) { - if (val === null); // already confirmed - else { - var confirmed = new Promise(function(resolve, reject) { - unconfirmed[index] = function(err) { - if (val) val(err); - if (err === null) resolve(); - else reject(err); - }; - }); - awaiting.push(confirmed); - } - }); - return Promise.all(awaiting); -}; From 9c6fdc78c6536afc6777f91a9211e3e44e5ff60b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Tue, 29 Jun 2021 21:26:03 +0200 Subject: [PATCH 02/11] rm inherit --- lib/channel_model.js | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/channel_model.js b/lib/channel_model.js index a66e4fd2..9803e80d 100644 --- a/lib/channel_model.js +++ b/lib/channel_model.js @@ -6,7 +6,6 @@ const defs = require('./defs'); const Promise = require('bluebird'); -const inherits = require('util').inherits; const EventEmitter = require('events').EventEmitter; const BaseChannel = require('./channel').BaseChannel; const acceptMessage = require('./channel').acceptMessage; From 320964fc97ae35f093a679841ba2cd5a6d8f4ccc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Tue, 29 Jun 2021 21:26:23 +0200 Subject: [PATCH 03/11] rm circular ref --- lib/channel_model.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/channel_model.js b/lib/channel_model.js index 9803e80d..615ffe8d 100644 --- a/lib/channel_model.js +++ b/lib/channel_model.js @@ -6,7 +6,7 @@ const defs = require('./defs'); const Promise = require('bluebird'); -const EventEmitter = require('events').EventEmitter; +const EventEmitter = require('events'); const BaseChannel = require('./channel').BaseChannel; const acceptMessage = require('./channel').acceptMessage; const Args = require('./api_args'); From d72530d04bcf45a1a3f23f255ac70a8fddd296d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Tue, 29 Jun 2021 21:27:11 +0200 Subject: [PATCH 04/11] destruct imports --- lib/channel_model.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/channel_model.js b/lib/channel_model.js index 615ffe8d..ad51fd98 100644 --- a/lib/channel_model.js +++ b/lib/channel_model.js @@ -7,8 +7,8 @@ const defs = require('./defs'); const Promise = require('bluebird'); const EventEmitter = require('events'); -const BaseChannel = require('./channel').BaseChannel; -const acceptMessage = require('./channel').acceptMessage; +const {BaseChannel} = require('./channel'); +const {acceptMessage} = require('./channel'); const Args = require('./api_args'); class ChannelModel extends EventEmitter { From 44e62fd8a63fecf975f0ddf41cae94c775549278 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Tue, 29 Jun 2021 21:28:16 +0200 Subject: [PATCH 05/11] sort on node, npm own packages --- lib/channel_model.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/channel_model.js b/lib/channel_model.js index ad51fd98..0d1e888b 100644 --- a/lib/channel_model.js +++ b/lib/channel_model.js @@ -4,9 +4,9 @@ 'use strict'; -const defs = require('./defs'); -const Promise = require('bluebird'); const EventEmitter = require('events'); +const Promise = require('bluebird'); +const defs = require('./defs'); const {BaseChannel} = require('./channel'); const {acceptMessage} = require('./channel'); const Args = require('./api_args'); From 52fc84d978226777e8c6eef5f39a63a9e29da2fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Tue, 29 Jun 2021 21:30:58 +0200 Subject: [PATCH 06/11] rm some self variable --- lib/channel_model.js | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/lib/channel_model.js b/lib/channel_model.js index 0d1e888b..104c81f0 100644 --- a/lib/channel_model.js +++ b/lib/channel_model.js @@ -15,9 +15,9 @@ class ChannelModel extends EventEmitter { constructor(connection) { super(); this.connection = connection; - const self = this; + ['error', 'close', 'blocked', 'unblocked'].forEach(ev => { - connection.on(ev, self.emit.bind(self, ev)); + connection.on(ev, this.emit.bind(this, ev)); }); } @@ -56,9 +56,8 @@ class Channel extends BaseChannel { // response's fields; this is intended to be suitable for implementing // API procedures. rpc(method, fields, expect) { - const self = this; return Promise.fromCallback(cb => { - return self._rpc(method, fields, expect, cb); + return this._rpc(method, fields, expect, cb); }) .then(f => { return f.fields; @@ -75,9 +74,8 @@ class Channel extends BaseChannel { } close() { - const self = this; return Promise.fromCallback(cb => { - return self.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS, + return this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS, cb); }); } @@ -165,37 +163,34 @@ class Channel extends BaseChannel { } consume(queue, callback, options) { - const self = this; // NB we want the callback to be run synchronously, so that we've // registered the consumerTag before any messages can arrive. const fields = Args.consume(queue, options); return Promise.fromCallback(cb => { - self._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb); + this._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb); }) .then(ok => { - self.registerConsumer(ok.fields.consumerTag, callback); + this.registerConsumer(ok.fields.consumerTag, callback); return ok.fields; }); } cancel(consumerTag) { - const self = this; return Promise.fromCallback(cb => { - self._rpc(defs.BasicCancel, Args.cancel(consumerTag), + this._rpc(defs.BasicCancel, Args.cancel(consumerTag), defs.BasicCancelOk, cb); }) .then(ok => { - self.unregisterConsumer(consumerTag); + this.unregisterConsumer(consumerTag); return ok.fields; }); } get(queue, options) { - const self = this; const fields = Args.get(queue, options); return Promise.fromCallback(cb => { - return self.sendOrEnqueue(defs.BasicGet, fields, cb); + return this.sendOrEnqueue(defs.BasicGet, fields, cb); }) .then(f => { if (f.id === defs.BasicGetEmpty) { @@ -204,7 +199,7 @@ class Channel extends BaseChannel { else if (f.id === defs.BasicGetOk) { const fields = f.fields; return new Promise(resolve => { - self.handleMessage = acceptMessage(m => { + this.handleMessage = acceptMessage(m => { m.fields = fields; resolve(m); }); From 318f0ac21bc7e7bfb997d9301f19544710db58d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Tue, 29 Jun 2021 21:31:57 +0200 Subject: [PATCH 07/11] rm unnecessary constructor --- lib/channel_model.js | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/channel_model.js b/lib/channel_model.js index 104c81f0..63aceaf5 100644 --- a/lib/channel_model.js +++ b/lib/channel_model.js @@ -270,10 +270,6 @@ Channel.prototype.prefetch = Channel.prototype.qos = function(count, global) { // its argument to signify 'nack'. class ConfirmChannel extends Channel { - constructor(connection) { - super(connection); - } - publish(exchange, routingKey, content, options, cb) { this.pushConfirmCallback(cb); return Channel.prototype.publish.call(this, exchange, routingKey, content, options); From 3a6409cb813d0881d36f16c3162587dbf530fe4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Tue, 29 Jun 2021 21:33:21 +0200 Subject: [PATCH 08/11] mv exports to bottom --- lib/channel_model.js | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/channel_model.js b/lib/channel_model.js index 63aceaf5..df57578d 100644 --- a/lib/channel_model.js +++ b/lib/channel_model.js @@ -41,8 +41,6 @@ class ChannelModel extends EventEmitter { } } -module.exports.ChannelModel = ChannelModel; - // Channels class Channel extends BaseChannel { @@ -249,8 +247,6 @@ class Channel extends BaseChannel { } } -module.exports.Channel = Channel; - // There are more options in AMQP than exposed here; RabbitMQ only // implements prefetch based on message count, and only for individual // channels or consumers. RabbitMQ v3.3.0 and after treat prefetch @@ -300,3 +296,5 @@ class ConfirmChannel extends Channel { } module.exports.ConfirmChannel = ConfirmChannel; +module.exports.Channel = Channel; +module.exports.ChannelModel = ChannelModel; From 5f130f2d4eaf007d041d2c4d97b2480bf234a513 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Tue, 29 Jun 2021 21:35:26 +0200 Subject: [PATCH 09/11] inspect was undefined and used --- lib/channel_model.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/channel_model.js b/lib/channel_model.js index df57578d..0dfb5bc5 100644 --- a/lib/channel_model.js +++ b/lib/channel_model.js @@ -10,6 +10,7 @@ const defs = require('./defs'); const {BaseChannel} = require('./channel'); const {acceptMessage} = require('./channel'); const Args = require('./api_args'); +const {inspect} = require('./format'); class ChannelModel extends EventEmitter { constructor(connection) { From d72af3943ebf345d71bdc033d9136ce313df85c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Tue, 29 Jun 2021 21:56:25 +0200 Subject: [PATCH 10/11] start using async where it may fit --- lib/channel_model.js | 103 ++++++++++++++++++++----------------------- 1 file changed, 49 insertions(+), 54 deletions(-) diff --git a/lib/channel_model.js b/lib/channel_model.js index 0dfb5bc5..e13b1bdc 100644 --- a/lib/channel_model.js +++ b/lib/channel_model.js @@ -26,19 +26,17 @@ class ChannelModel extends EventEmitter { return Promise.fromCallback(this.connection.close.bind(this.connection)); } - createChannel() { - const c = new Channel(this.connection); - return c.open().then(openOk => { return c; }); + async createChannel() { + const channel = new Channel(this.connection); + await channel.open(); + return channel; } - createConfirmChannel() { - const c = new ConfirmChannel(this.connection); - return c.open() - .then(openOk => { - return c.rpc(defs.ConfirmSelect, {nowait: false}, - defs.ConfirmSelectOk) - }) - .then(() => { return c; }); + async createConfirmChannel() { + const channel = new ConfirmChannel(this.connection); + await channel.open(); + await channel.rpc(defs.ConfirmSelect, {nowait: false}, defs.ConfirmSelectOk); + return channel; } } @@ -54,13 +52,12 @@ class Channel extends BaseChannel { // An RPC that returns a 'proper' promise, which resolves to just the // response's fields; this is intended to be suitable for implementing // API procedures. - rpc(method, fields, expect) { - return Promise.fromCallback(cb => { + async rpc(method, fields, expect) { + const f = await Promise.fromCallback(cb => { return this._rpc(method, fields, expect, cb); }) - .then(f => { - return f.fields; - }); + + return f.fields; } // Do the remarkably simple channel open handshake @@ -161,53 +158,50 @@ class Channel extends BaseChannel { return this.publish('', queue, content, options); } - consume(queue, callback, options) { + async consume(queue, callback, options) { // NB we want the callback to be run synchronously, so that we've // registered the consumerTag before any messages can arrive. const fields = Args.consume(queue, options); - return Promise.fromCallback(cb => { + const ok = await Promise.fromCallback(cb => { this._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb); }) - .then(ok => { - this.registerConsumer(ok.fields.consumerTag, callback); - return ok.fields; - }); + + this.registerConsumer(ok.fields.consumerTag, callback); + return ok.fields; } - cancel(consumerTag) { - return Promise.fromCallback(cb => { + async cancel(consumerTag) { + const ok = await Promise.fromCallback(cb => { this._rpc(defs.BasicCancel, Args.cancel(consumerTag), defs.BasicCancelOk, cb); }) - .then(ok => { - this.unregisterConsumer(consumerTag); - return ok.fields; - }); + + this.unregisterConsumer(consumerTag); + return ok.fields; } - get(queue, options) { + async get(queue, options) { const fields = Args.get(queue, options); - return Promise.fromCallback(cb => { - return this.sendOrEnqueue(defs.BasicGet, fields, cb); + const f = await Promise.fromCallback(cb => { + this.sendOrEnqueue(defs.BasicGet, fields, cb); }) - .then(f => { - if (f.id === defs.BasicGetEmpty) { - return false; - } - else if (f.id === defs.BasicGetOk) { - const fields = f.fields; - return new Promise(resolve => { - this.handleMessage = acceptMessage(m => { - m.fields = fields; - resolve(m); - }); + + if (f.id === defs.BasicGetEmpty) { + return false; + } + else if (f.id === defs.BasicGetOk) { + const fields = f.fields; + return new Promise(resolve => { + this.handleMessage = acceptMessage(m => { + m.fields = fields; + resolve(m); }); - } - else { - throw new Error(`Unexpected response to BasicGet: ${inspect(f)}`); - } - }); + }); + } + else { + throw new Error(`Unexpected response to BasicGet: ${inspect(f)}`); + } } ack(message, allUpTo) { @@ -246,6 +240,12 @@ class Channel extends BaseChannel { Args.recover(), defs.BasicRecoverOk); } + + qos(count, global) { + return this.rpc(defs.BasicQos, + Args.prefetch(count, global), + defs.BasicQosOk); + } } // There are more options in AMQP than exposed here; RabbitMQ only @@ -253,11 +253,7 @@ class Channel extends BaseChannel { // channels or consumers. RabbitMQ v3.3.0 and after treat prefetch // (without `global` set) as per-consumer (for consumers following), // and prefetch with `global` set as per-channel. -Channel.prototype.prefetch = Channel.prototype.qos = function(count, global) { - return this.rpc(defs.BasicQos, - Args.prefetch(count, global), - defs.BasicQosOk); -}; +Channel.prototype.prefetch = Channel.prototype.qos // Confirm channel. This is a channel with confirms 'switched on', // meaning sent messages will provoke a responding 'ack' or 'nack' @@ -280,8 +276,7 @@ class ConfirmChannel extends Channel { const awaiting = []; const unconfirmed = this.unconfirmed; unconfirmed.forEach((val, index) => { - if (val === null); // already confirmed - else { + if (val !== null) { const confirmed = new Promise((resolve, reject) => { unconfirmed[index] = err => { if (val) val(err); From c25440e57c68a189734410405b87ea2706eb68b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20Wa=CC=88rting?= Date: Wed, 7 Jul 2021 00:20:29 +0200 Subject: [PATCH 11/11] revert some failing test --- lib/channel_model.js | 55 +++++++++++++++++++++++--------------------- 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/lib/channel_model.js b/lib/channel_model.js index e13b1bdc..ea652e2f 100644 --- a/lib/channel_model.js +++ b/lib/channel_model.js @@ -158,16 +158,17 @@ class Channel extends BaseChannel { return this.publish('', queue, content, options); } - async consume(queue, callback, options) { + consume(queue, callback, options) { // NB we want the callback to be run synchronously, so that we've // registered the consumerTag before any messages can arrive. const fields = Args.consume(queue, options); - const ok = await Promise.fromCallback(cb => { + return Promise.fromCallback(cb => { this._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb); }) - - this.registerConsumer(ok.fields.consumerTag, callback); - return ok.fields; + .then(ok => { + this.registerConsumer(ok.fields.consumerTag, callback); + return ok.fields; + }); } async cancel(consumerTag) { @@ -176,32 +177,34 @@ class Channel extends BaseChannel { defs.BasicCancelOk, cb); }) - - this.unregisterConsumer(consumerTag); - return ok.fields; + .then(ok => { + this.unregisterConsumer(consumerTag); + return ok.fields; + }); } - async get(queue, options) { + get(queue, options) { const fields = Args.get(queue, options); - const f = await Promise.fromCallback(cb => { - this.sendOrEnqueue(defs.BasicGet, fields, cb); + return Promise.fromCallback(cb => { + return this.sendOrEnqueue(defs.BasicGet, fields, cb); }) - - if (f.id === defs.BasicGetEmpty) { - return false; - } - else if (f.id === defs.BasicGetOk) { - const fields = f.fields; - return new Promise(resolve => { - this.handleMessage = acceptMessage(m => { - m.fields = fields; - resolve(m); + .then(f => { + if (f.id === defs.BasicGetEmpty) { + return false; + } + else if (f.id === defs.BasicGetOk) { + const fields = f.fields; + return new Promise(resolve => { + this.handleMessage = acceptMessage(m => { + m.fields = fields; + resolve(m); + }); }); - }); - } - else { - throw new Error(`Unexpected response to BasicGet: ${inspect(f)}`); - } + } + else { + throw new Error(`Unexpected response to BasicGet: ${inspect(f)}`); + } + }); } ack(message, allUpTo) {