-
Notifications
You must be signed in to change notification settings - Fork 24
feat: add reconnect and process old files #99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature-reconnect
Are you sure you want to change the base?
feat: add reconnect and process old files #99
Conversation
81b8aad
to
c810e84
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is meant to be a draft, I've just added a few comments to consider when refactoring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome work Tomi!!
There are a lot of awesome changes here, and we probably need to add at least some basic integration tests for the reconnect scenarios please checkout |
c14d391
to
b97fbae
Compare
b8d3cf6
to
07667e9
Compare
src/test/client.js
Outdated
assert.deepStrictEqual(parsedBody, resultTemplate); | ||
} else { | ||
// this is a reconnect dumpInfo is not relevant | ||
logger.info('[TEST] Handling DONE event after reconnect with statsSessionId %j, body %j %j', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this not relevant, the reconnect test should have the same result as the dump without the disconnect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After the disconnect it's waiting for the reconnect before is getting processed.
// Subsequent operations will be taken by services in the upper level, like upload to store and persist | ||
// metadata do a db. | ||
case 'close': | ||
this.log.info('[Demux] sink closed'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case a client reconnect happens, the client won't sent the identity data again, which means the meta object will be missing the identity information used in the app.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the startDate that's set in _sinkCreate
will also not correspond with the actual start date of the session, won't this affect the rest of the application?
|
||
// we need to wait a little bit before reconnecting. | ||
setTimeout(() => { | ||
connection.connect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is one problem with how reconnect is currently handled, if the client reconnects after the server decided it waited enough then the FeatureExtractor will process the resulting dump file as if it was a new session (even though identity information is missing) resulting in an overwritten s3 dump with partial information and a duplicate entry in redshift. This can also happen if the server restarts and the client lands on another server because of how haproxy is currently setup. Ideally the server or the client would identify these cases and not process them, simply logging an error would do.
Is this case handled by client/server protocol somehow?
/** | ||
* | ||
*/ | ||
function runTest() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A test that would somehow simulate a server restart would be cool, but I assume that would be a bit convoluted, maybe an isolated test for the OprhanFileHelper? wdyt?
this._validateSequenceNumber(statsSessionId, requestData.sequenceNumber, this.lastSequenceNumber); | ||
|
||
this.lastTimestamp = requestData.timestamp; | ||
this.lastSequenceNumber = requestData.sequenceNumber; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in case of the connectionInfo stats entry there is no sequenceNumber, and it might be undefined, please check if the sequenceNumber from the request data is valid
eae6ed3
to
a718b4f
Compare
sequenceNumber = this.demuxSink.lastSequenceNumber; | ||
} else { | ||
logger.debug('[ClientMessageHandler] Last sequence number from dump '); | ||
sequenceNumber = await this._getLastSequenceNumberFromDump(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to read the entire file in case of a client reconnect (not server restart case)? Technically we have that information in the previous sink
// logic that handles conflicts at the store level also needs to be added e.g. when uploading to dynamodb | ||
// if the entry already exists because some other instance uploaded first, the same incremental approach needs | ||
// to be taken. | ||
while (!fd) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the incremental approach was removed I assume the while and the comments need to be removed as well.
let identity; | ||
|
||
if (isReconnect) { | ||
identity = await this._getIdentityFromFile(sinkData.id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is a client reconnect (not server restart case), the file will be read when we try to get the last sequence number and then again here, we can probably make this more efficient and only read the file once, not insisting for this pr.
this._createSequenceNumberBody(sequenceNumber, isInitial) | ||
)); | ||
|
||
if (this.client.readyState === 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this mean? are there cases where readystate is not 1, if such a case occurs what happens? add a comment for why this is needed.
return 0; | ||
}); | ||
|
||
const result = await promis; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be done a bit more elegantly like:
let lastLine = 0;
try {
const lastLineString = await storeFile.getLastLine(dumpPath, 1);
lastLine = utils.parseLineForSequenceNumber(lastLineString))
} catch(e) {
logger.error('[ClientMessageHandler] Error. ', e);
}
return lastLine;
wdyt
.then( | ||
lastLine => utils.parseLineForSequenceNumber(lastLine)) | ||
.catch(() => { | ||
logger.debug('[ClientMessageHandler] New connection. File doesn\'t exist. file: ', dumpPath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm assuming the error can be any error not necessarily that the file doesn't exist, maybe we should log the error as well.
return jsonData[4]; | ||
} | ||
|
||
return -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how will the client react on receiving -1, we should add some comments here.
const readline = require('readline'); | ||
const Stream = require('stream'); | ||
|
||
exports.getLastLine = (fileName, minLength) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel a bit insecure about this function, if a server restarts, which means we have about 2000 files, when clients start reconnecting we're gonna read each dump (can be up to 1gb per file) line by line not sure how that's gonna affect the server. I wonder if there are any more efficient ways to read the end of a file.
No description provided.