Skip to content

Conversation

tamasdomokos
Copy link

No description provided.

@tamasdomokos tamasdomokos force-pushed the feat/add_reconnect_and_process_old_files branch from 81b8aad to c810e84 Compare November 16, 2022 12:48
Copy link

@andrei-gavrilescu andrei-gavrilescu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is meant to be a draft, I've just added a few comments to consider when refactoring.

Copy link

@andrei-gavrilescu andrei-gavrilescu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome work Tomi!!

@andrei-gavrilescu
Copy link

There are a lot of awesome changes here, and we probably need to add at least some basic integration tests for the reconnect scenarios please checkout npm run integartion and try to see if they still work (most likely not).

@tamasdomokos tamasdomokos force-pushed the feat/add_reconnect_and_process_old_files branch from c14d391 to b97fbae Compare January 6, 2023 11:15
@tamasdomokos tamasdomokos force-pushed the feat/add_reconnect_and_process_old_files branch 2 times, most recently from b8d3cf6 to 07667e9 Compare January 25, 2023 16:05
assert.deepStrictEqual(parsedBody, resultTemplate);
} else {
// this is a reconnect dumpInfo is not relevant
logger.info('[TEST] Handling DONE event after reconnect with statsSessionId %j, body %j %j',

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not relevant, the reconnect test should have the same result as the dump without the disconnect.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the disconnect it's waiting for the reconnect before is getting processed.

// Subsequent operations will be taken by services in the upper level, like upload to store and persist
// metadata do a db.
case 'close':
this.log.info('[Demux] sink closed');

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case a client reconnect happens, the client won't sent the identity data again, which means the meta object will be missing the identity information used in the app.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the startDate that's set in _sinkCreate will also not correspond with the actual start date of the session, won't this affect the rest of the application?


// we need to wait a little bit before reconnecting.
setTimeout(() => {
connection.connect();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one problem with how reconnect is currently handled, if the client reconnects after the server decided it waited enough then the FeatureExtractor will process the resulting dump file as if it was a new session (even though identity information is missing) resulting in an overwritten s3 dump with partial information and a duplicate entry in redshift. This can also happen if the server restarts and the client lands on another server because of how haproxy is currently setup. Ideally the server or the client would identify these cases and not process them, simply logging an error would do.
Is this case handled by client/server protocol somehow?

/**
*
*/
function runTest() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A test that would somehow simulate a server restart would be cool, but I assume that would be a bit convoluted, maybe an isolated test for the OprhanFileHelper? wdyt?

this._validateSequenceNumber(statsSessionId, requestData.sequenceNumber, this.lastSequenceNumber);

this.lastTimestamp = requestData.timestamp;
this.lastSequenceNumber = requestData.sequenceNumber;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case of the connectionInfo stats entry there is no sequenceNumber, and it might be undefined, please check if the sequenceNumber from the request data is valid

@tamasdomokos tamasdomokos force-pushed the feat/add_reconnect_and_process_old_files branch from eae6ed3 to a718b4f Compare February 8, 2023 13:02
sequenceNumber = this.demuxSink.lastSequenceNumber;
} else {
logger.debug('[ClientMessageHandler] Last sequence number from dump ');
sequenceNumber = await this._getLastSequenceNumberFromDump();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to read the entire file in case of a client reconnect (not server restart case)? Technically we have that information in the previous sink

// logic that handles conflicts at the store level also needs to be added e.g. when uploading to dynamodb
// if the entry already exists because some other instance uploaded first, the same incremental approach needs
// to be taken.
while (!fd) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the incremental approach was removed I assume the while and the comments need to be removed as well.

let identity;

if (isReconnect) {
identity = await this._getIdentityFromFile(sinkData.id);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a client reconnect (not server restart case), the file will be read when we try to get the last sequence number and then again here, we can probably make this more efficient and only read the file once, not insisting for this pr.

this._createSequenceNumberBody(sequenceNumber, isInitial)
));

if (this.client.readyState === 1) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this mean? are there cases where readystate is not 1, if such a case occurs what happens? add a comment for why this is needed.

return 0;
});

const result = await promis;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be done a bit more elegantly like:

let lastLine = 0;
try {
  const lastLineString = await storeFile.getLastLine(dumpPath, 1);
  lastLine = utils.parseLineForSequenceNumber(lastLineString))
} catch(e) {
  logger.error('[ClientMessageHandler] Error.  ', e);
}

return lastLine;

wdyt

.then(
lastLine => utils.parseLineForSequenceNumber(lastLine))
.catch(() => {
logger.debug('[ClientMessageHandler] New connection. File doesn\'t exist. file: ', dumpPath);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming the error can be any error not necessarily that the file doesn't exist, maybe we should log the error as well.

return jsonData[4];
}

return -1;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how will the client react on receiving -1, we should add some comments here.

const readline = require('readline');
const Stream = require('stream');

exports.getLastLine = (fileName, minLength) => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel a bit insecure about this function, if a server restarts, which means we have about 2000 files, when clients start reconnecting we're gonna read each dump (can be up to 1gb per file) line by line not sure how that's gonna affect the server. I wonder if there are any more efficient ways to read the end of a file.

@tamasdomokos tamasdomokos changed the base branch from master to feature-reconnect February 10, 2023 12:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants