Skip to content

Commit 835a81f

Browse files
committed
feat: expose AmqpConnectionManagerClass
The naming is strange because I don't want to introduce any breaking change. Note that jest complains about leaking resources due to: amqp-node/amqplib#584
1 parent 36f234b commit 835a81f

File tree

6 files changed

+113
-16
lines changed

6 files changed

+113
-16
lines changed

src/AmqpConnectionManager.ts

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import amqp, { Connection } from 'amqplib';
2-
import { EventEmitter } from 'events';
2+
import { EventEmitter, once } from 'events';
33
import { TcpSocketConnectOpts } from 'net';
44
import pb from 'promise-breaker';
55
import { ConnectionOptions } from 'tls';
@@ -82,6 +82,8 @@ export interface IAmqpConnectionManager {
8282
addListener(event: 'unblocked', listener: () => void): this;
8383
addListener(event: 'disconnect', listener: (arg: { err: Error }) => void): this;
8484

85+
listeners(eventName: string | symbol): any;
86+
8587
on(event: string, listener: (...args: any[]) => void): this;
8688
on(event: 'connect', listener: ConnectListener): this;
8789
on(event: 'blocked', listener: (arg: { reason: string }) => void): this;
@@ -108,6 +110,8 @@ export interface IAmqpConnectionManager {
108110

109111
removeListener(event: string, listener: (...args: any[]) => void): this;
110112

113+
connect(options?: { timeout?: number }): Promise<void>;
114+
reconnect(): void;
111115
createChannel(options?: CreateChannelOpts): ChannelWrapper;
112116
close(): Promise<void>;
113117
isConnected(): boolean;
@@ -196,8 +200,43 @@ export default class AmqpConnectionManager extends EventEmitter implements IAmqp
196200
this.setMaxListeners(0);
197201

198202
this._findServers = options.findServers || (() => Promise.resolve(urls));
203+
}
199204

205+
/**
206+
* Start the connect retries and await the first connect result. Even if the initial connect fails or timeouts, the
207+
* reconnect attempts will continue in the background.
208+
* @param [options={}] -
209+
* @param [options.timeout] - Time to wait for initial connect
210+
*/
211+
async connect({ timeout }: { timeout?: number } = {}): Promise<void> {
200212
this._connect();
213+
214+
let reject: (reason?: any) => void;
215+
const onDisconnect = ({ err }: { err: any }) => {
216+
// Ignore disconnects caused by dead servers etc., but throw on operational errors like bad credentials.
217+
if (err.isOperational) {
218+
reject(err);
219+
}
220+
};
221+
222+
try {
223+
await Promise.race([
224+
once(this, 'connect'),
225+
new Promise((_resolve, innerReject) => {
226+
reject = innerReject;
227+
this.on('disconnect', onDisconnect);
228+
}),
229+
...(timeout
230+
? [
231+
wait(timeout).promise.then(() => {
232+
throw new Error('amqp-connection-manager: connect timeout');
233+
}),
234+
]
235+
: []),
236+
]);
237+
} finally {
238+
this.removeListener('disconnect', onDisconnect);
239+
}
201240
}
202241

203242
// `options` here are any options that can be passed to ChannelWrapper.

src/index.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,15 @@ export function connect(
1515
urls: ConnectionUrl | ConnectionUrl[] | undefined | null,
1616
options?: AmqpConnectionManagerOptions
1717
): IAmqpConnectionManager {
18-
return new AmqpConnectionManager(urls, options);
18+
const conn = new AmqpConnectionManager(urls, options);
19+
conn.connect().catch(() => {
20+
/* noop */
21+
});
22+
return conn;
1923
}
2024

25+
export { AmqpConnectionManager as AmqpConnectionManagerClass };
26+
2127
const amqp = { connect };
2228

2329
export default amqp;

test/AmqpConnectionManagerTest.ts

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ describe('AmqpConnectionManager', function () {
2727

2828
it('should establish a connection to a broker', async () => {
2929
amqp = new AmqpConnectionManager('amqp://localhost');
30+
amqp.connect();
3031
const [{ connection, url }] = await once(amqp, 'connect');
3132
expect(url, 'url').to.equal('amqp://localhost');
3233
expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5');
@@ -37,6 +38,7 @@ describe('AmqpConnectionManager', function () {
3738
protocol: 'amqp',
3839
hostname: 'localhost',
3940
});
41+
amqp.connect();
4042
const [{ connection, url }] = await once(amqp, 'connect');
4143
expect(url, 'url').to.eql({
4244
protocol: 'amqp',
@@ -51,14 +53,15 @@ describe('AmqpConnectionManager', function () {
5153

5254
it('should establish a url object based connection to a broker', async () => {
5355
amqp = new AmqpConnectionManager({ url: 'amqp://localhost' });
54-
56+
amqp.connect();
5557
const [{ connection, url }] = await once(amqp, 'connect');
5658
expect(url, 'url').to.equal('amqp://localhost');
5759
expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5');
5860
});
5961

6062
it('should close connection to a broker', async () => {
6163
amqp = new AmqpConnectionManager('amqp://localhost');
64+
amqp.connect();
6265
const [{ connection, url }] = await once(amqp, 'connect');
6366
expect(url, 'url').to.equal('amqp://localhost');
6467
expect((connection as any).url, 'connection.url').to.equal('amqp://localhost?heartbeat=5');
@@ -77,6 +80,7 @@ describe('AmqpConnectionManager', function () {
7780
let connected = false;
7881

7982
amqp = new AmqpConnectionManager('amqp://localhost');
83+
amqp.connect();
8084
// Connection should not yet be established
8185
expect(amqp.connection, 'current connection').to.equal(undefined);
8286
// Connection should be pending though
@@ -123,6 +127,7 @@ describe('AmqpConnectionManager', function () {
123127
return Promise.resolve('amqp://localhost');
124128
},
125129
});
130+
amqp.connect();
126131
const [{ connection, url }] = await once(amqp, 'connect');
127132
expect(url, 'url').to.equal('amqp://localhost');
128133
expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5');
@@ -134,6 +139,7 @@ describe('AmqpConnectionManager', function () {
134139
return Promise.resolve({ url: 'amqp://localhost' });
135140
},
136141
});
142+
amqp.connect();
137143
const [{ connection, url }] = await once(amqp, 'connect');
138144
expect(url, 'url').to.equal('amqp://localhost');
139145
expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5');
@@ -145,13 +151,29 @@ describe('AmqpConnectionManager', function () {
145151
return Promise.resolve(null);
146152
},
147153
});
154+
amqp.connect();
148155
const [{ err }] = await once(amqp, 'disconnect');
149156
expect(err.message).to.contain('No servers found');
150157
return amqp?.close();
151158
});
152159

160+
it('should timeout connect', async () => {
161+
jest.spyOn(origAmpq, 'connect').mockImplementation((): any => {
162+
return promiseTools.delay(200);
163+
});
164+
amqp = new AmqpConnectionManager('amqp://localhost');
165+
let err;
166+
try {
167+
await amqp.connect({ timeout: 0.1 });
168+
} catch (error) {
169+
err = error;
170+
}
171+
expect(err.message).to.equal('amqp-connection-manager: connect timeout');
172+
});
173+
153174
it('should work with a URL with a query', async () => {
154175
amqp = new AmqpConnectionManager('amqp://localhost?frameMax=0x1000');
176+
amqp.connect();
155177
const [{ connection }] = await once(amqp, 'connect');
156178
expect(connection.url, 'connection.url').to.equal(
157179
'amqp://localhost?frameMax=0x1000&heartbeat=5'
@@ -171,6 +193,7 @@ describe('AmqpConnectionManager', function () {
171193
amqp = new AmqpConnectionManager(['amqp://rabbit1', 'amqp://rabbit2'], {
172194
heartbeatIntervalInSeconds: 0.01,
173195
});
196+
amqp.connect();
174197

175198
let disconnectEventsSeen = 0;
176199
amqp.on('disconnect', function () {
@@ -196,10 +219,10 @@ describe('AmqpConnectionManager', function () {
196219
let disconnectsSeen = 0;
197220
amqp.on('disconnect', () => disconnectsSeen++);
198221

199-
await once(amqp, 'connect');
222+
await amqp.connect();
200223
amqplib.kill();
201224

202-
await once(amqp, 'connect');
225+
await amqp.connect();
203226
expect(disconnectsSeen).to.equal(1);
204227
});
205228

@@ -211,7 +234,7 @@ describe('AmqpConnectionManager', function () {
211234
let disconnectsSeen = 0;
212235
amqp.on('disconnect', () => disconnectsSeen++);
213236

214-
await once(amqp, 'connect');
237+
await amqp.connect();
215238

216239
// Close the connection nicely
217240
amqplib.simulateRemoteClose();
@@ -222,6 +245,7 @@ describe('AmqpConnectionManager', function () {
222245

223246
it('should know if it is connected or not', async () => {
224247
amqp = new AmqpConnectionManager('amqp://localhost');
248+
amqp.connect();
225249

226250
expect(amqp.isConnected()).to.be.false;
227251

@@ -231,7 +255,7 @@ describe('AmqpConnectionManager', function () {
231255

232256
it('should be able to manually reconnect', async () => {
233257
amqp = new AmqpConnectionManager('amqp://localhost');
234-
await once(amqp, 'connect');
258+
await amqp.connect();
235259

236260
amqp.reconnect();
237261
await once(amqp, 'disconnect');
@@ -240,13 +264,14 @@ describe('AmqpConnectionManager', function () {
240264

241265
it('should throw on manual reconnect after close', async () => {
242266
amqp = new AmqpConnectionManager('amqp://localhost');
243-
await once(amqp, 'connect');
244-
await amqp.close()
245-
expect(amqp.reconnect).to.throw()
246-
})
267+
await amqp.connect();
268+
await amqp.close();
269+
expect(amqp.reconnect).to.throw();
270+
});
247271

248272
it('should create and clean up channel wrappers', async function () {
249273
amqp = new AmqpConnectionManager('amqp://localhost');
274+
await amqp.connect();
250275
const channel = amqp.createChannel({ name: 'test-chan' });
251276

252277
// Channel should register with connection manager
@@ -264,6 +289,7 @@ describe('AmqpConnectionManager', function () {
264289

265290
it('should clean up channels on close', async function () {
266291
amqp = new AmqpConnectionManager('amqp://localhost');
292+
await amqp.connect();
267293
amqp.createChannel({ name: 'test-chan' });
268294

269295
// Channel should register with connection manager
@@ -286,7 +312,7 @@ describe('AmqpConnectionManager', function () {
286312

287313
let connectsSeen = 0;
288314
amqp.on('connect', () => connectsSeen++);
289-
await once(amqp, 'connect');
315+
await amqp.connect();
290316

291317
// Close the manager
292318
await amqp?.close();
@@ -308,7 +334,7 @@ describe('AmqpConnectionManager', function () {
308334

309335
amqp.on('unblocked', () => unblockSeen++);
310336

311-
await once(amqp, 'connect');
337+
await amqp.connect();
312338
// Close the connection nicely
313339
amqplib.simulateRemoteBlock();
314340
amqplib.simulateRemoteUnblock();

test/fixtures.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
/* eslint-disable @typescript-eslint/explicit-module-boundary-types */
33

44
import { Connection, Message, Options, Replies } from 'amqplib';
5-
import { EventEmitter } from 'events';
5+
import { EventEmitter, once } from 'events';
66
import { IAmqpConnectionManager } from '../src/AmqpConnectionManager';
77
import ChannelWrapper, { CreateChannelOpts } from '../src/ChannelWrapper';
88

@@ -194,6 +194,15 @@ export class FakeAmqpConnectionManager extends EventEmitter implements IAmqpConn
194194
return 0;
195195
}
196196

197+
async connect(): Promise<void> {
198+
await Promise.all([once(this, 'connect'), this.simulateConnect()]);
199+
}
200+
201+
reconnect(): void {
202+
this.simulateDisconnect();
203+
this.simulateConnect();
204+
}
205+
197206
isConnected() {
198207
return this.connected;
199208
}

test/importTest.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
import { expect } from 'chai';
2-
import amqp from '../src';
2+
import amqp, { AmqpConnectionManagerClass as AmqpConnectionManager } from '../src';
33

44
describe('import test', function () {
55
it('should let you import as default (#51)', function () {
66
expect(amqp).to.exist;
77
expect(amqp.connect).to.exist;
88
});
9+
10+
it('should let you import class', function () {
11+
new AmqpConnectionManager('url');
12+
});
913
});

test/integrationTest.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import chai from 'chai';
33
import chaiJest from 'chai-jest';
44
import pEvent from 'p-event';
55
import { defer, timeout } from 'promise-tools';
6-
import amqp from '../src';
6+
import amqp, { AmqpConnectionManagerClass as AmqpConnectionManager } from '../src';
77
import { IAmqpConnectionManager } from '../src/AmqpConnectionManager';
88

99
chai.use(chaiJest);
@@ -69,6 +69,19 @@ describe('Integration tests', () => {
6969
await timeout(pEvent(connection, 'connect'), 3000);
7070
});
7171

72+
// This test might cause jest to complain about leaked resources due to the bug described and fixed by:
73+
// https://github.com/squaremo/amqp.node/pull/584
74+
it('should throw on awaited connect with wrong password', async () => {
75+
connection = new AmqpConnectionManager('amqp://guest:wrong@localhost');
76+
let err;
77+
try {
78+
await connection.connect();
79+
} catch (error) {
80+
err = error;
81+
}
82+
expect(err.message).to.contain('ACCESS-REFUSED');
83+
});
84+
7285
it('send and receive messages', async () => {
7386
const queueName = 'testQueue1';
7487
const content = `hello world - ${Date.now()}`;

0 commit comments

Comments
 (0)