-
Notifications
You must be signed in to change notification settings - Fork 41
feat: new grpc call for subscribing alerts and low balance alert (#864) #2023
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 3 commits
b7a77f6
a58c1c6
3a890e0
0f4a1c5
032482f
6458e81
947ab1a
d2cff5c
6f3ee1c
9ccda04
8dcf5cf
f7b7aab
0f95df4
3469df2
25b3954
510ebd4
abc763f
18286f6
9705f1a
1965731
9d380b9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| import { EventEmitter } from 'events'; | ||
| import { BalanceAlert } from './types'; | ||
| import SwapClientManager from '../swaps/SwapClientManager'; | ||
| import { MIN_BALANCE_ALERT_THRESHOLD_IN_MS } from './consts'; | ||
| import Logger from '../Logger'; | ||
| import { AlertType, ChannelSide } from '../constants/enums'; | ||
| import { satsToCoinsStr } from '../cli/utils'; | ||
|
|
||
| interface Alerts { | ||
| on(event: 'alert', listener: (alert: any) => void): this; | ||
| emit(event: 'alert', alert: any): boolean; | ||
| } | ||
|
|
||
| // TODO this class still requires a cleanup if alert is not being thrown anymore after a while | ||
| /** | ||
| * This class works as a middleware for thrown alerts from xud's main flow. Each alert will be caught here | ||
| * and re-thrown if last thrown time was before the minimum threshold that set in consts.ts | ||
| */ | ||
| class Alerts extends EventEmitter { | ||
| private alerts = new Map<string, number>(); | ||
| private logger: Logger; | ||
|
|
||
| constructor({ swapClientManager, logger }: {swapClientManager: SwapClientManager, logger: Logger}) { | ||
| super(); | ||
| this.logger = logger; | ||
| this.listenLowTradingBalanceAlerts(swapClientManager); | ||
| } | ||
|
|
||
| private listenLowTradingBalanceAlerts(swapClientManager: SwapClientManager) { | ||
| const lndClients = swapClientManager.getLndClientsMap().values(); | ||
| for (const lndClient of lndClients) { | ||
| lndClient.on('lowTradingBalance', this.onLowTradingBalance); | ||
| } | ||
| swapClientManager.connextClient?.on('lowTradingBalance', this.onLowTradingBalance); | ||
| } | ||
|
|
||
| private onLowTradingBalance = (balanceAlert: BalanceAlert) => { | ||
| const stringRepresentation = JSON.stringify(balanceAlert); | ||
|
||
| this.logger.trace(`received low trading balance alert ${stringRepresentation}`); | ||
| if (this.alerts.get(stringRepresentation) === undefined || this.checkAlertThreshold(stringRepresentation)) { | ||
| this.logger.trace(`triggering low balance alert ${stringRepresentation}`); | ||
|
|
||
| balanceAlert.message = `${ChannelSide[balanceAlert.side || 0]} trading balance (${satsToCoinsStr(balanceAlert.sideBalance || 0)} ${balanceAlert.currency}) is lower than 10% of trading capacity (${satsToCoinsStr(balanceAlert.totalBalance || 0)} ${balanceAlert.currency})`; | ||
| balanceAlert.type = AlertType.LowTradingBalance; | ||
|
|
||
| this.alerts.set(stringRepresentation, Date.now()); | ||
| this.emit('alert', balanceAlert); | ||
| } | ||
| } | ||
|
|
||
| private checkAlertThreshold(stringRepresentation: string) { | ||
| const lastThrownTime = this.alerts.get(stringRepresentation) || 0; | ||
| const passedTime = Date.now() - lastThrownTime; | ||
| return passedTime > MIN_BALANCE_ALERT_THRESHOLD_IN_MS; | ||
| } | ||
| } | ||
|
|
||
| export default Alerts; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| /** The minimum time in miliseconds to be passed to rethrow a balance alert. */ | ||
| export const MIN_BALANCE_ALERT_THRESHOLD_IN_MS = 10000; | ||
rsercano marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| import { AlertType, ChannelSide } from '../constants/enums'; | ||
|
|
||
| export type BalanceAlert = Alert & { | ||
| /** The total balance of the channel when the alert is triggered. */ | ||
| totalBalance: number; | ||
| /** The side of the balance either local or remote. */ | ||
| side: ChannelSide; | ||
| /** The balance that triggered the alert. */ | ||
| sideBalance: number; | ||
| /** The alert threshold in percentage, e.g. 10 means %10. */ | ||
| bound: number; | ||
| /** The currency of the channel. */ | ||
| currency: string; | ||
| }; | ||
|
|
||
| export type Alert = { | ||
| type: AlertType; | ||
| message: string; | ||
| }; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| import { Arguments, Argv } from 'yargs'; | ||
| import { XudClient } from '../../proto/xudrpc_grpc_pb'; | ||
| import * as xudrpc from '../../proto/xudrpc_pb'; | ||
| import { loadXudClient } from '../command'; | ||
| import { AlertType, ChannelSide } from '../../constants/enums'; | ||
| import { onStreamError, waitForClient } from '../utils'; | ||
| import moment from 'moment'; | ||
|
|
||
| export const command = 'streamalerts'; | ||
|
|
||
| export const describe = 'stream alert notifications from xud'; | ||
|
|
||
| export const builder = (argv: Argv) => argv | ||
| .option('pretty', { | ||
| type: 'boolean', | ||
| }) | ||
| .example('$0 streamalerts -j', 'prints alert payload in a JSON structure') | ||
| .example('$0 streamalerts', 'prints alert message only'); | ||
|
|
||
| export const handler = async (argv: Arguments) => { | ||
| await ensureConnection(argv, true); | ||
| }; | ||
|
|
||
| let client: XudClient; | ||
|
|
||
| const ensureConnection = async (argv: Arguments, printError?: boolean) => { | ||
| if (!client) { | ||
| client = await loadXudClient(argv); | ||
| } | ||
|
|
||
| waitForClient(client, argv, ensureConnection, streamalerts, printError); | ||
| }; | ||
|
|
||
| const structAlertJson = (alertObject: xudrpc.Alert.AsObject) => { | ||
| const result: {type: string, payload: { | ||
| totalBalance?: number, | ||
| side?: string, | ||
| bound?: number, | ||
| sideBalance?: number, | ||
| channelPoint?: string, | ||
| currency?: string, | ||
| } | undefined } = { | ||
| type: AlertType[alertObject.type], | ||
| payload: undefined, | ||
| }; | ||
|
|
||
| if (alertObject.type === xudrpc.Alert.AlertType.LOW_TRADING_BALANCE) { | ||
| result.payload = { | ||
| totalBalance: alertObject.balanceAlert?.totalBalance, | ||
| side: ChannelSide[alertObject.balanceAlert?.side || 0], | ||
| sideBalance: alertObject.balanceAlert?.sideBalance, | ||
| bound: alertObject.balanceAlert?.bound, | ||
| currency: alertObject.balanceAlert?.currency, | ||
| }; | ||
| } | ||
|
|
||
| return result; | ||
| }; | ||
|
|
||
| const streamalerts = (argv: Arguments<any>) => { | ||
| const request = new xudrpc.SubscribeAlertsRequest(); | ||
| const alertsSubscription = client.subscribeAlerts(request); | ||
|
|
||
| alertsSubscription.on('data', (alert: xudrpc.Alert) => { | ||
| if (argv.json) { | ||
| console.log(JSON.stringify(structAlertJson(alert.toObject()), undefined, 2)); | ||
| } else { | ||
| console.log(`(${moment()}) ${AlertType[alert.getType()]}: ${alert.getMessage()}`); | ||
| } | ||
| }); | ||
| alertsSubscription.on('end', reconnect.bind(undefined, argv)); | ||
| alertsSubscription.on('error', onStreamError.bind(undefined, ensureConnection.bind(undefined, argv))); | ||
| }; | ||
|
|
||
| const reconnect = async (argv: Arguments) => { | ||
| console.log('Stream has closed, trying to reconnect'); | ||
| await ensureConnection(argv, false); | ||
| }; |
Uh oh!
There was an error while loading. Please reload this page.