Skip to content

Commit a9fbc51

Browse files
committed
feat: new grpc call for subscribing alerts and low balance alert (#864)
1 parent 405f50e commit a9fbc51

29 files changed

+2173
-378
lines changed

docs/api.md

Lines changed: 75 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/Logger.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ export enum Context {
4545
Http = 'HTTP',
4646
Backup = 'BACKUP',
4747
Service = 'SERVICE',
48+
Alerts = 'ALERTS',
4849
}
4950

5051
type Loggers = {
@@ -58,6 +59,7 @@ type Loggers = {
5859
swaps: Logger,
5960
http: Logger,
6061
service: Logger,
62+
alerts: Logger,
6163
};
6264

6365
class Logger {
@@ -133,6 +135,7 @@ class Logger {
133135
swaps: new Logger({ ...object, context: Context.Swaps }),
134136
http: new Logger({ ...object, context: Context.Http }),
135137
service: new Logger({ ...object, context: Context.Service }),
138+
alerts: new Logger({ ...object, context: Context.Alerts }),
136139
};
137140
}
138141

lib/Xud.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import SwapClientManager from './swaps/SwapClientManager';
2020
import Swaps from './swaps/Swaps';
2121
import { createSimnetChannels } from './utils/simnet-connext-channels';
2222
import { UnitConverter } from './utils/UnitConverter';
23+
import Alerts from './alerts/Alerts';
2324

2425
const version: string = require('../package.json').version;
2526

@@ -46,6 +47,7 @@ class Xud extends EventEmitter {
4647
private swapClientManager?: SwapClientManager;
4748
private unitConverter?: UnitConverter;
4849
private simnetChannels$?: Subscription;
50+
private alerts!: Alerts;
4951

5052
/**
5153
* Create an Exchange Union daemon.
@@ -203,6 +205,8 @@ class Xud extends EventEmitter {
203205
// initialize pool and start listening/connecting only once other components are initialized
204206
await this.pool.init();
205207

208+
this.alerts = new Alerts({ swapClientManager: this.swapClientManager, logger: loggers.alerts });
209+
206210
this.service = new Service({
207211
version,
208212
nodeKey,
@@ -212,6 +216,7 @@ class Xud extends EventEmitter {
212216
swaps: this.swaps,
213217
logger: loggers.service,
214218
shutdown: this.beginShutdown,
219+
alerts: this.alerts,
215220
});
216221

217222
this.service.on('logLevel', (level) => {

lib/alerts/Alerts.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import { EventEmitter } from 'events';
2+
import { BalanceAlert } from './types';
3+
import SwapClientManager from '../swaps/SwapClientManager';
4+
import { MIN_BALANCE_ALERT_THRESHOLD_IN_MS } from './consts';
5+
import Logger from '../Logger';
6+
import { AlertType, ChannelSide } from '../constants/enums';
7+
import { satsToCoinsStr } from '../cli/utils';
8+
9+
interface Alerts {
10+
on(event: 'alert', listener: (alert: any) => void): this;
11+
emit(event: 'alert', alert: any): boolean;
12+
}
13+
14+
// TODO this class still requires a cleanup if alert is not being thrown anymore after a while
15+
/**
16+
* This class works as a middleware for thrown alerts from xud's main flow. Each alert will be caught here
17+
* and re-thrown if last thrown time was before the minimum threshold that set in consts.ts
18+
*/
19+
class Alerts extends EventEmitter {
20+
private alerts = new Map<string, number>();
21+
private logger: Logger;
22+
23+
constructor({ swapClientManager, logger }: {swapClientManager: SwapClientManager, logger: Logger}) {
24+
super();
25+
this.logger = logger;
26+
this.listenLowTradingBalanceAlerts(swapClientManager);
27+
}
28+
29+
private listenLowTradingBalanceAlerts(swapClientManager: SwapClientManager) {
30+
const lndClients = swapClientManager.getLndClientsMap().values();
31+
for (const lndClient of lndClients) {
32+
lndClient.on('lowTradingBalance', this.onLowTradingBalance);
33+
}
34+
swapClientManager.connextClient?.on('lowTradingBalance', this.onLowTradingBalance);
35+
}
36+
37+
private onLowTradingBalance = (balanceAlert: BalanceAlert) => {
38+
const stringRepresentation = JSON.stringify(balanceAlert);
39+
this.logger.trace(`received low trading balance alert ${stringRepresentation}`);
40+
if (this.alerts.get(stringRepresentation) === undefined || this.checkAlertThreshold(stringRepresentation)) {
41+
this.logger.trace(`triggering low balance alert ${stringRepresentation}`);
42+
43+
balanceAlert.message = `${ChannelSide[balanceAlert.side || 0]} trading balance (${satsToCoinsStr(balanceAlert.sideBalance || 0)} ${balanceAlert.currency}) is lower than 10% of trading capacity (${satsToCoinsStr(balanceAlert.totalBalance || 0)} ${balanceAlert.currency})`;
44+
balanceAlert.type = AlertType.LowTradingBalance;
45+
46+
this.alerts.set(stringRepresentation, Date.now());
47+
this.emit('alert', balanceAlert);
48+
}
49+
}
50+
51+
private checkAlertThreshold(stringRepresentation: string) {
52+
const lastThrownTime = this.alerts.get(stringRepresentation) || 0;
53+
const passedTime = Date.now() - lastThrownTime;
54+
return passedTime > MIN_BALANCE_ALERT_THRESHOLD_IN_MS;
55+
}
56+
}
57+
58+
export default Alerts;

lib/alerts/consts.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
/** The minimum time in miliseconds to be passed to rethrow a balance alert. */
2+
export const MIN_BALANCE_ALERT_THRESHOLD_IN_MS = 10000;

lib/alerts/types.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import { AlertType, ChannelSide } from '../constants/enums';
2+
3+
export type BalanceAlert = Alert & {
4+
/** The total balance of the channel when the alert is triggered. */
5+
totalBalance: number;
6+
/** The side of the balance either local or remote. */
7+
side: ChannelSide;
8+
/** The balance that triggered the alert. */
9+
sideBalance: number;
10+
/** The alert threshold in percentage, e.g. 10 means %10. */
11+
bound: number;
12+
/** The currency of the channel. */
13+
currency: string;
14+
};
15+
16+
export type Alert = {
17+
type: AlertType;
18+
message: string;
19+
};

lib/cli/commands/streamalerts.ts

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import { Arguments, Argv } from 'yargs';
2+
import { XudClient } from '../../proto/xudrpc_grpc_pb';
3+
import * as xudrpc from '../../proto/xudrpc_pb';
4+
import { loadXudClient } from '../command';
5+
import { AlertType, ChannelSide } from '../../constants/enums';
6+
import { onStreamError, waitForClient } from '../utils';
7+
import moment from 'moment';
8+
9+
export const command = 'streamalerts';
10+
11+
export const describe = 'stream alert notifications from xud';
12+
13+
export const builder = (argv: Argv) => argv
14+
.option('pretty', {
15+
type: 'boolean',
16+
})
17+
.example('$0 streamalerts -j', 'prints alert payload in a JSON structure')
18+
.example('$0 streamalerts', 'prints alert message only');
19+
20+
export const handler = async (argv: Arguments) => {
21+
await ensureConnection(argv, true);
22+
};
23+
24+
let client: XudClient;
25+
26+
const ensureConnection = async (argv: Arguments, printError?: boolean) => {
27+
if (!client) {
28+
client = await loadXudClient(argv);
29+
}
30+
31+
waitForClient(client, argv, ensureConnection, streamalerts, printError);
32+
};
33+
34+
const structAlertJson = (alertObject: xudrpc.Alert.AsObject) => {
35+
const result: {type: string, payload: {
36+
totalBalance?: number,
37+
side?: string,
38+
bound?: number,
39+
sideBalance?: number,
40+
channelPoint?: string,
41+
currency?: string,
42+
} | undefined } = {
43+
type: AlertType[alertObject.type],
44+
payload: undefined,
45+
};
46+
47+
if (alertObject.type === xudrpc.Alert.AlertType.LOW_TRADING_BALANCE) {
48+
result.payload = {
49+
totalBalance: alertObject.balanceAlert?.totalBalance,
50+
side: ChannelSide[alertObject.balanceAlert?.side || 0],
51+
sideBalance: alertObject.balanceAlert?.sideBalance,
52+
bound: alertObject.balanceAlert?.bound,
53+
currency: alertObject.balanceAlert?.currency,
54+
};
55+
}
56+
57+
return result;
58+
};
59+
60+
const streamalerts = (argv: Arguments<any>) => {
61+
const request = new xudrpc.SubscribeAlertsRequest();
62+
const alertsSubscription = client.subscribeAlerts(request);
63+
64+
alertsSubscription.on('data', (alert: xudrpc.Alert) => {
65+
if (argv.json) {
66+
console.log(JSON.stringify(structAlertJson(alert.toObject()), undefined, 2));
67+
} else {
68+
console.log(`(${moment()}) ${AlertType[alert.getType()]}: ${alert.getMessage()}`);
69+
}
70+
});
71+
alertsSubscription.on('end', reconnect.bind(undefined, argv));
72+
alertsSubscription.on('error', onStreamError.bind(undefined, ensureConnection.bind(undefined, argv)));
73+
};
74+
75+
const reconnect = async (argv: Arguments) => {
76+
console.log('Stream has closed, trying to reconnect');
77+
await ensureConnection(argv, false);
78+
};

lib/cli/commands/streamorders.ts

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
import { ServiceError, status } from 'grpc';
21
import { Arguments, Argv } from 'yargs';
32
import { XudClient } from '../../proto/xudrpc_grpc_pb';
43
import * as xudrpc from '../../proto/xudrpc_pb';
5-
import { setTimeoutPromise } from '../../utils/utils';
64
import { loadXudClient } from '../command';
5+
import { onStreamError, waitForClient } from '../utils';
76

87
export const command = 'streamorders [existing]';
98

@@ -26,20 +25,8 @@ const ensureConnection = async (argv: Arguments, printError?: boolean) => {
2625
if (!client) {
2726
client = await loadXudClient(argv);
2827
}
29-
client.waitForReady(Date.now() + 3000, (error: Error | null) => {
30-
if (error) {
31-
if (error.message === 'Failed to connect before the deadline') {
32-
console.error(`could not connect to xud at ${argv.rpchost}:${argv.rpcport}, is xud running?`);
33-
process.exit(1);
34-
}
3528

36-
if (printError) console.error(`${error.name}: ${error.message}`);
37-
setTimeout(ensureConnection.bind(undefined, argv, printError), 3000);
38-
} else {
39-
console.log('Successfully connected, subscribing for orders');
40-
streamOrders(argv);
41-
}
42-
});
29+
waitForClient(client, argv, ensureConnection, streamOrders, printError);
4330
};
4431

4532
const streamOrders = (argv: Arguments<any>) => {
@@ -57,15 +44,7 @@ const streamOrders = (argv: Arguments<any>) => {
5744
// adding end, close, error events only once,
5845
// since they'll be thrown for three of subscriptions in the corresponding cases, catching once is enough.
5946
ordersSubscription.on('end', reconnect.bind(undefined, argv));
60-
ordersSubscription.on('error', async (err: ServiceError) => {
61-
if (err.code === status.UNIMPLEMENTED) {
62-
console.error("xud is locked, run 'xucli unlock', 'xucli create', or 'xucli restore' then try again");
63-
process.exit(1);
64-
}
65-
console.warn(`Unexpected error occured: ${err.message}, reconnecting in 1 second`);
66-
await setTimeoutPromise(1000);
67-
await ensureConnection(argv);
68-
});
47+
ordersSubscription.on('error', onStreamError.bind(undefined, ensureConnection.bind(undefined, argv)));
6948

7049
const swapsRequest = new xudrpc.SubscribeSwapsRequest();
7150
swapsRequest.setIncludeTaker(true);

0 commit comments

Comments
 (0)