Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 26 additions & 31 deletions src/WsHandler.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const JSONStream = require('JSONStream');
const path = require('path');
const { pipeline } = require('stream');
const url = require('url');
const WebSocket = require('ws');
Expand All @@ -8,8 +9,8 @@ const DemuxSink = require('./demux');
const logger = require('./logging');
const PromCollector = require('./metrics/PromCollector');
const { getStatsFormat } = require('./utils/stats-detection');
const { extractTenantDataFromUrl } = require('./utils/utils');
const { RequestType } = require('./utils/utils');
const cwd = process.cwd();

/**
*
Expand All @@ -20,13 +21,14 @@ class WsHandler {
*
*/
constructor({ tempPath, reconnectTimeout, sequenceNumberSendingInterval, workerPool, config }) {
this.sessionTimeoutId = {};
this.sessionTimeoutIds = {};
this.tempPath = tempPath;
this.reconnectTimeout = reconnectTimeout;
this.sequenceNumberSendingInterval = sequenceNumberSendingInterval;
this.processData = this.processData.bind(this);
this.workerPool = workerPool;
this.config = config;
this.dumpFolder = './temp';
}

/**
Expand All @@ -46,29 +48,15 @@ class WsHandler {
* @param {*} meta
* @param {*} connectionInfo
*/
processData(id, meta, connectionInfo, tenantInfo) {
processData(id, dumpPath, connectionInfo) {
logger.info('[WsHandler] Queue for processing id %s', id);

// Metadata associated with a dump can get large so just select the necessary fields.
const dumpData = {
app: meta.applicationName || 'Undefined',
clientId: id,
conferenceId: meta.confName,
conferenceUrl: meta.confID,
dumpPath: meta.dumpPath,
dumpPath,
endDate: Date.now(),
endpointId: meta.endpointId,
startDate: meta.startDate,
sessionId: meta.meetingUniqueId,
userId: meta.displayName,
ampSessionId: meta.sessionId,
ampUserId: meta.userId,
ampDeviceId: meta.deviceId,
statsFormat: connectionInfo.statsFormat,
isBreakoutRoom: meta.isBreakoutRoom,
breakoutRoomId: meta.roomId,
parentStatsSessionId: meta.parentStatsSessionId,
...tenantInfo
startDate: connectionInfo.startDate
};

// Don't process dumps generated by JVB & Jigasi, there should be a more formal process to

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bellow this line we persist connections that are JVB and JIGASI these do not go through the FeatureExtractor instead they are just saved directly, so the dumpData is still needed at this point.
Also line 77 calls this.persistDumpData(dumpData); I don't see that function defined in this class.

Expand Down Expand Up @@ -101,10 +89,11 @@ class WsHandler {
const ua = upgradeReq.headers['user-agent'];
const queryObject = url.parse(referer, true).query;
const statsSessionId = queryObject?.statsSessionId;
const dumpPath = this._getDumpPath(statsSessionId);

this._clearConnectionTimeout(statsSessionId);
const connectionInfo = this._createConnectionInfo(upgradeReq, referer, ua, client);
const demuxSink = this._createDemuxSink(connectionInfo);
const demuxSink = this._createDemuxSink(dumpPath, connectionInfo);

logger.info('[WsHandler] Client connected: %s', statsSessionId);

Expand All @@ -113,23 +102,20 @@ class WsHandler {

clientMessageHandler.sendLastSequenceNumber(true);

demuxSink.on('close-sink', ({ id, meta }) => {
demuxSink.on('close-sink', ({ id }) => {

logger.info(
'[WsHandler] Websocket disconnected waiting for processing the data %s in %d ms',
id,
this.reconnectTimeout
);

const { confID = '' } = meta;
const tenantInfo = extractTenantDataFromUrl(confID);

const timeoutId = setTimeout(this.processData,
this.reconnectTimeout,
id, meta, connectionInfo, tenantInfo
id, dumpPath, connectionInfo
);

this.sessionTimeoutId[id] = timeoutId;
this.sessionTimeoutIds[id] = timeoutId;
});

const connectionPipeline = pipeline(
Expand Down Expand Up @@ -175,17 +161,25 @@ class WsHandler {
/**
*
*/
_createDemuxSink(connectionInfo) {
_createDemuxSink(dumpPath, connectionInfo) {
const demuxSinkOptions = {
tempPath: this.tempPath,
connectionInfo,
dumpFolder: './temp',
dumpPath,
dumpFolder: this.dumpFolder,
log: logger
};

return new DemuxSink(demuxSinkOptions);
}

/**
*
*/
_getDumpPath(statsSessionId) {
return path.resolve(cwd, this.dumpFolder, statsSessionId);
}

/**
*
*/
Expand All @@ -212,7 +206,8 @@ class WsHandler {
origin: upgradeReq.headers.origin,
url: referer,
userAgent: ua,
clientProtocol: client.protocol
clientProtocol: client.protocol,
startDate: Date.now()
};

connectionInfo.statsFormat = getStatsFormat(connectionInfo);
Expand All @@ -226,11 +221,11 @@ class WsHandler {
* @param {*} timeoutId
*/
_clearConnectionTimeout(sessionId) {
const timeoutId = this.sessionTimeoutId[sessionId];
const timeoutId = this.sessionTimeoutIds[sessionId];

if (timeoutId) {
clearTimeout(timeoutId);
delete this.sessionTimeoutId[timeoutId];
delete this.sessionTimeoutIds[timeoutId];
logger.info('[WsHandler] Client reconnected. Clear timeout for connectionId: %s', sessionId);
PromCollector.clientReconnectedCount.inc();
}
Expand Down
3 changes: 0 additions & 3 deletions src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ const FeaturesPublisher = require('./database/FeaturesPublisher');
const FirehoseConnector = require('./database/FirehoseConnector');
const logger = require('./logging');
const PromCollector = require('./metrics/PromCollector');
const S3Manager = require('./store/S3Manager');
const { saveEntryAssureUnique } = require('./store/dynamo');
const { getStatsFormat } = require('./utils/stats-detection');
const { getEnvName,
getIdealWorkerCount,
ResponseType,
Expand Down
86 changes: 11 additions & 75 deletions src/demux.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
const fs = require('fs');
const sizeof = require('object-sizeof');
const path = require('path');
const { Writable } = require('stream');
const util = require('util');

const PromCollector = require('./metrics/PromCollector.js');
const fileStore = require('./store/file');
const utils = require('./utils/utils');
const { uuidV4 } = require('./utils/utils.js');


const cwd = process.cwd();

// we are using this because we want the regular file descriptors returned,
// not the FileHandle objects from fs.promises.open
const fsOpen = util.promisify(fs.open);
Expand Down Expand Up @@ -45,10 +40,11 @@ class DemuxSink extends Writable {
* @param {boolean} persistDump - Flag used for generating a complete dump of the data coming to the stream.
* Required when creating mock tests.
*/
constructor({ tempPath, dumpFolder, connectionInfo, log, persistDump = false }) {
constructor({ tempPath, dumpFolder, dumpPath, connectionInfo, log, persistDump = false }) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think tempPath and dumpFolder are no longer needed if we're going to rely on the newly added dumpPath right?

super({ objectMode: true });

this.dumpFolder = dumpFolder;
this.dumpPath = dumpPath;
this.connectionInfo = connectionInfo;
this.log = log;
this.timeoutId = -1;
Expand Down Expand Up @@ -144,16 +140,15 @@ class DemuxSink extends Writable {
*
* @param {string} id - sink id as saved in the sinkMap
*/
_handleSinkClose(id, meta) {
_handleSinkClose(id) {
const sinkData = this.sinkMap.get(id);

// Sanity check, make sure the data is available if not log an error and just send the id such that any
// listening client has s chance to handle the sink.
if (sinkData) {
// we need to emit this on file stream finish
this.emit('close-sink', {
id: sinkData.id,
meta: this._updateMeta(sinkData.meta, meta)
id: sinkData.id
});
} else {
this.log.error('[Demux] sink on close meta should be available id:', id);
Expand All @@ -173,50 +168,29 @@ class DemuxSink extends Writable {
PromCollector.sessionCount.inc();

const resolvedId = id;
let fd;

const idealPath = path.resolve(cwd, this.dumpFolder, id);
const filePath = idealPath;
const isReconnect = fs.existsSync(filePath);

// If a client reconnects the same client id will be provided thus cases can occur where the previous dump
// with the same id is still present on the disk, in order to avoid conflicts and states where multiple
// handles are taken on the same file, we establish a convention appending an incremental number at the end
// of the file ${id}_${i}. Thus any client that needs to read the dumps can search for ${id} and get an
// incremental list.
// Warning. This will resolve local reconnect conflicts, when uploading the associated metadata to a store
// logic that handles conflicts at the store level also needs to be added e.g. when uploading to dynamodb
// if the entry already exists because some other instance uploaded first, the same incremental approach needs
// to be taken.
while (!fd) {
fd = await fsOpen(filePath, 'a');
}
const isReconnect = fs.existsSync(this.dumpPath);
const fd = await fsOpen(this.dumpPath, 'a');

this.log.info('[Demux] open-sink id: %s; path %s; connection: %o', id, filePath, this.connectionInfo);
this.log.info('[Demux] open-sink id: %s; path %s; connection: %o', id, this.dumpPath, this.connectionInfo);

const sink = fs.createWriteStream(idealPath, { fd });
const sink = fs.createWriteStream(this.dumpPath, { fd });

// Add the associated data to a map in order to properly direct requests to the appropriate sink.
const sinkData = {
id: resolvedId,
sink,
meta: {
startDate: Date.now(),
dumpPath: filePath
startDate: this.startDate,
dumpPath: this.dumpPath
}
};

this.sinkMap.set(id, sinkData);

sink.on('error', error => this.log.error('[Demux] sink on error id: ', id, ' error:', error));
let identity;

if (isReconnect) {
identity = await this._getIdentityFromFile(sinkData.id);
}

// The close event should be emitted both on error and happy flow.
sink.on('close', this._handleSinkClose.bind(this, id, identity));
sink.on('close', this._handleSinkClose.bind(this, id));

if (!isReconnect) {
// Initialize the dump file by adding the connection metadata at the beginning. This data is usually used
Expand All @@ -237,48 +211,10 @@ class DemuxSink extends Writable {
* @param {Object} data - New metadata.
*/
async _sinkUpdateMetadata(sinkData, data) {
let metadata;

// Browser clients will send identity data as an array so we need to extract the element that contains
// the actual metadata
if (Array.isArray(data)) {
metadata = data[2];
} else {
metadata = data;
}

const meta = sinkData.meta;

// A first level update of the properties will suffice.
sinkData.meta = this._updateMeta(meta, metadata);

// We expect metadata to be objects thus we need to stringify them before writing to the sink.
this._sinkWrite(sinkData.sink, JSON.stringify(data));
}

/**
*
* @param {*} meta
* @param {*} metadata
* @returns
*/
_updateMeta(meta, metadata) {
return {
...meta,
...metadata
};
}

/**
* Getting identity from file in case of reconnect.
*/
async _getIdentityFromFile(fname) {
const filePath = utils.getDumpPath(this.tempPath, fname);
const { identity = '' } = await fileStore.getObjectsByKeys(filePath, [ 'identity' ]);

return identity;
}

/**
* Self explanatory.
*
Expand Down
Loading