Skip to content

Commit 5f621ad

Browse files
committed
Merge branch 'mohd-akram-native-promise'
2 parents 8f5f328 + d090310 commit 5f621ad

File tree

11 files changed

+2986
-101
lines changed

11 files changed

+2986
-101
lines changed

channel_api.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
var raw_connect = require('./lib/connect').connect;
22
var ChannelModel = require('./lib/channel_model').ChannelModel;
3-
var Promise = require('bluebird');
3+
var promisify = require('util').promisify;
44

55
function connect(url, connOptions) {
6-
return Promise.fromCallback(function(cb) {
6+
return promisify(function(cb) {
77
return raw_connect(url, connOptions, cb);
8-
})
8+
})()
99
.then(function(conn) {
1010
return new ChannelModel(conn);
1111
});

examples/tutorials/receive_logs_direct.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#!/usr/bin/env node
22

33
var amqp = require('amqplib');
4-
var all = require('bluebird').all;
54
var basename = require('path').basename;
65

76
var severities = process.argv.slice(2);
@@ -24,7 +23,7 @@ amqp.connect('amqp://localhost').then(function(conn) {
2423

2524
ok = ok.then(function(qok) {
2625
var queue = qok.queue;
27-
return all(severities.map(function(sev) {
26+
return Promise.all(severities.map(function(sev) {
2827
ch.bindQueue(queue, ex, sev);
2928
})).then(function() { return queue; });
3029
});

examples/tutorials/receive_logs_topic.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
var amqp = require('amqplib');
44
var basename = require('path').basename;
5-
var all = require('bluebird').all;
65

76
var keys = process.argv.slice(2);
87
if (keys.length < 1) {
@@ -23,7 +22,7 @@ amqp.connect('amqp://localhost').then(function(conn) {
2322

2423
ok = ok.then(function(qok) {
2524
var queue = qok.queue;
26-
return all(keys.map(function(rk) {
25+
return Promise.all(keys.map(function(rk) {
2726
ch.bindQueue(queue, ex, rk);
2827
})).then(function() { return queue; });
2928
});

examples/tutorials/rpc_client.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
var amqp = require('amqplib');
44
var basename = require('path').basename;
5-
var Promise = require('bluebird');
65
var uuid = require('node-uuid');
76

87
// I've departed from the form of the original RPC tutorial, which

lib/callback_model.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
'use strict';
66

77
var defs = require('./defs');
8-
var Promise = require('bluebird');
98
var inherits = require('util').inherits;
109
var EventEmitter = require('events').EventEmitter;
1110
var BaseChannel = require('./channel').BaseChannel;

lib/channel_model.js

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
'use strict';
66

77
const EventEmitter = require('events');
8-
const Promise = require('bluebird');
8+
const promisify = require('util').promisify;
99
const defs = require('./defs');
1010
const {BaseChannel} = require('./channel');
1111
const {acceptMessage} = require('./channel');
@@ -23,7 +23,7 @@ class ChannelModel extends EventEmitter {
2323
}
2424

2525
close() {
26-
return Promise.fromCallback(this.connection.close.bind(this.connection));
26+
return promisify(this.connection.close.bind(this.connection))();
2727
}
2828

2929
async createChannel() {
@@ -53,27 +53,25 @@ class Channel extends BaseChannel {
5353
// response's fields; this is intended to be suitable for implementing
5454
// API procedures.
5555
async rpc(method, fields, expect) {
56-
const f = await Promise.fromCallback(cb => {
56+
const f = await promisify(cb => {
5757
return this._rpc(method, fields, expect, cb);
58-
})
58+
})();
5959

6060
return f.fields;
6161
}
6262

6363
// Do the remarkably simple channel open handshake
64-
open() {
65-
return Promise.try(this.allocate.bind(this)).then(
66-
ch => {
67-
return ch.rpc(defs.ChannelOpen, {outOfBand: ""},
68-
defs.ChannelOpenOk);
69-
});
64+
async open() {
65+
const ch = await this.allocate.bind(this)();
66+
return ch.rpc(defs.ChannelOpen, {outOfBand: ""},
67+
defs.ChannelOpenOk);
7068
}
7169

7270
close() {
73-
return Promise.fromCallback(cb => {
71+
return promisify(cb => {
7472
return this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
7573
cb);
76-
});
74+
})();
7775
}
7876

7977
// === Public API, declaring queues and stuff ===
@@ -162,21 +160,21 @@ class Channel extends BaseChannel {
162160
// NB we want the callback to be run synchronously, so that we've
163161
// registered the consumerTag before any messages can arrive.
164162
const fields = Args.consume(queue, options);
165-
return Promise.fromCallback(cb => {
166-
this._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb);
167-
})
168-
.then(ok => {
169-
this.registerConsumer(ok.fields.consumerTag, callback);
170-
return ok.fields;
163+
return new Promise((resolve, reject) => {
164+
this._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, (err, ok) => {
165+
if (err) return reject(err);
166+
this.registerConsumer(ok.fields.consumerTag, callback);
167+
resolve(ok.fields);
168+
});
171169
});
172170
}
173171

174172
async cancel(consumerTag) {
175-
const ok = await Promise.fromCallback(cb => {
173+
const ok = await promisify(cb => {
176174
this._rpc(defs.BasicCancel, Args.cancel(consumerTag),
177175
defs.BasicCancelOk,
178176
cb);
179-
})
177+
})()
180178
.then(ok => {
181179
this.unregisterConsumer(consumerTag);
182180
return ok.fields;
@@ -185,25 +183,23 @@ class Channel extends BaseChannel {
185183

186184
get(queue, options) {
187185
const fields = Args.get(queue, options);
188-
return Promise.fromCallback(cb => {
189-
return this.sendOrEnqueue(defs.BasicGet, fields, cb);
190-
})
191-
.then(f => {
192-
if (f.id === defs.BasicGetEmpty) {
193-
return false;
194-
}
195-
else if (f.id === defs.BasicGetOk) {
196-
const fields = f.fields;
197-
return new Promise(resolve => {
186+
return new Promise((resolve, reject) => {
187+
this.sendOrEnqueue(defs.BasicGet, fields, (err, f) => {
188+
if (err) return reject(err);
189+
if (f.id === defs.BasicGetEmpty) {
190+
return resolve(false);
191+
}
192+
else if (f.id === defs.BasicGetOk) {
193+
const fields = f.fields;
198194
this.handleMessage = acceptMessage(m => {
199195
m.fields = fields;
200196
resolve(m);
201197
});
202-
});
203-
}
204-
else {
205-
throw new Error(`Unexpected response to BasicGet: ${inspect(f)}`);
206-
}
198+
}
199+
else {
200+
reject(new Error(`Unexpected response to BasicGet: ${inspect(f)}`));
201+
}
202+
});
207203
});
208204
}
209205

@@ -290,6 +286,13 @@ class ConfirmChannel extends Channel {
290286
awaiting.push(confirmed);
291287
}
292288
});
289+
// Channel closed
290+
if (!this.pending) {
291+
var cb;
292+
while (cb = this.unconfirmed.shift()) {
293+
if (cb) cb(new Error('channel closed'));
294+
}
295+
}
293296
return Promise.all(awaiting);
294297
}
295298
}

0 commit comments

Comments
 (0)