Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions lib/Connection.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
var Sender = require('./Sender');
var Receiver = require('./Receiver');
var EventEmitter = require('events').EventEmitter;
var Duplex = require('stream').Duplex;
var util = require('util');

// TODO: have connections refuse packets when closed.
Expand All @@ -9,14 +9,15 @@ module.exports = Connection;
function Connection(packetSender) {
this._sender = new Sender(packetSender);
this._receiver = new Receiver(packetSender);
Duplex.call(this)

var self = this;
this._receiver.on('data', function (data) {
self.emit('data', data)
});
};

util.inherits(Connection, EventEmitter);
util.inherits(Connection, Duplex);

/**
* Sends the given buffer to the end host.
Expand All @@ -38,3 +39,16 @@ Connection.prototype.receive = function (packet) {
this._receiver.receive(packet);
}
};

Connection.prototype._write = function (chunk, encoding, callback) {
this.send(chunk)
callback()
}

Connection.prototype._read = function (n) {

}

Connection.prototype._final = function (callback) {
callback()
}
3 changes: 3 additions & 0 deletions lib/LinkedList.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ LinkedList.prototype._insert = function (parentNode, object) {

if (order <= -1) {
var node = new Node(object);
if (this._currentNode.value > node.value) {
this._currentNode = node
}
node._childNode = parentNode._childNode;
parentNode._childNode = node;
return InsertionResult.INSERTED;
Expand Down
12 changes: 6 additions & 6 deletions lib/Packet.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ function Packet(sequenceNumber, payload, synchronize, reset) {

var offset = 0;

bools = segment.readUInt8(offset); offset++;
var bools = segment.readUInt8(offset); offset++;
this._acknowledgement = !!(bools & 0x80);
this._synchronize = !!(bools & 0x40);
this._finish = !!(bools & 0x20);
this._reset = !!(bools & 0x10);

this._sequenceNumber = segment.readUInt8(offset); offset++;
this._payload = new Buffer(segment.length - offset);
this._payload = Buffer.alloc(segment.length - offset);
segment.copy(this._payload, 0, offset);
} else {
this._acknowledgement = false;
Expand All @@ -33,13 +33,13 @@ function Packet(sequenceNumber, payload, synchronize, reset) {
};

Packet.createAcknowledgementPacket = function (sequenceNumber) {
var packet = new Packet(sequenceNumber, new Buffer(0), false);
var packet = new Packet(sequenceNumber, Buffer.alloc(0), false);
packet._acknowledgement = true;
return packet;
};

Packet.createFinishPacket = function () {
var packet = new Packet(0, new Buffer(0), false, false);
var packet = new Packet(0, Buffer.alloc(0), false, false);
packet._finish = true;
return packet;
};
Expand Down Expand Up @@ -75,7 +75,7 @@ Packet.prototype.getIsReset = function () {
*/
Packet.prototype.toBuffer = function () {
var offset = 0;
var retval = new Buffer(2 + this._payload.length);
var retval = Buffer.alloc(2 + this._payload.length);

var bools = 0 + (
(this._acknowledgement && 0x80) |
Expand Down Expand Up @@ -116,4 +116,4 @@ Packet.prototype.equals = function (packet) {
this.getSequenceNumber() === packet.getSequenceNumber() &&
bufferEqual(this.getPayload(), packet.getPayload())
)
};
};
16 changes: 8 additions & 8 deletions lib/Receiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,15 @@ Receiver.prototype.receive = function (packet) {
}

// Ignores packets that have a sequence number less than the next sequence
// number
if (!packet.getIsSynchronize() && packet.getSequenceNumber() < this._syncSequenceNumber) {
// number. sending the ack again
if (!packet.getIsSynchronize() && packet.getSequenceNumber() < this._nextSequenceNumber) {
this._packetSender.send(Packet.createAcknowledgementPacket(packet.getSequenceNumber()));
return;
}

if (packet.getIsSynchronize() && !this._synced) {
// This is the beginning of the stream.

if (packet.getSequenceNumber() === this._syncSequenceNumber) {
this._packetSender.send(Packet.createAcknowledgementPacket(packet.getSequenceNumber()));
return;
}

// Send the packet upstream, send acknowledgement packet to end host, and
// increment the next expected packet.
this._packets.clear();
Expand All @@ -59,6 +55,11 @@ Receiver.prototype.receive = function (packet) {
// We're done.
return;

} else if (packet.getIsSynchronize() && this._synced && this._syncSequenceNumber === packet.getSequenceNumber()) {
// this will happen if the ack for sync message get lost so we will send it again
this._packetSender.send(Packet.createAcknowledgementPacket(packet.getSequenceNumber()));
// and we are done!
return;
} else if (packet.getIsReset()) {
this.emit('_reset');
this._synced = false;
Expand Down Expand Up @@ -90,7 +91,6 @@ Receiver.prototype.receive = function (packet) {
// after the current packet. If there are, then check to see if the next
// packet is the expected packet number. If it is, then start the
// acknowledgement process anew.

var result = this._packets.insert(packet);
if (result === LinkedList.InsertionResult.INSERTED) {
this._pushIfExpectedSequence(packet);
Expand Down
1 change: 1 addition & 0 deletions lib/Sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ Window.prototype.send = function () {
}
}


pkts.forEach(function (packet) {
packet.on('acknowledge', onAcknowledge);
packet.send();
Expand Down
3 changes: 0 additions & 3 deletions lib/Server.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ function Server(socket) {
this._connections = {};

var self = this;

socket.on('message', function (message, rinfo) {
var addressKey = rinfo.address + rinfo.port;
var connection;
Expand All @@ -29,7 +28,6 @@ function Server(socket) {
self.emit('connection', connection);
} else {
// Just get the existing connection.

connection = self._connections[addressKey];
}

Expand All @@ -38,7 +36,6 @@ function Server(socket) {

if (packet.getIsFinish()) {
// The client requested that the connection be closed.

delete self._connections[addressKey];
} else {
// Capture the packet, and place it into the window of packets.
Expand Down
4 changes: 1 addition & 3 deletions lib/bufferEqual.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@

var buffertools = require('buffertools')
module.exports = function bufferEqual(a, b) {
return buffertools.compare(a, b) === 0
return Buffer.compare(a, b) === 0
}