diff --git a/lib/Connection.js b/lib/Connection.js index 34d6d36..fe8bd4b 100644 --- a/lib/Connection.js +++ b/lib/Connection.js @@ -1,6 +1,6 @@ var Sender = require('./Sender'); var Receiver = require('./Receiver'); -var EventEmitter = require('events').EventEmitter; +var Duplex = require('stream').Duplex; var util = require('util'); // TODO: have connections refuse packets when closed. @@ -9,6 +9,7 @@ module.exports = Connection; function Connection(packetSender) { this._sender = new Sender(packetSender); this._receiver = new Receiver(packetSender); + Duplex.call(this) var self = this; this._receiver.on('data', function (data) { @@ -16,7 +17,7 @@ function Connection(packetSender) { }); }; -util.inherits(Connection, EventEmitter); +util.inherits(Connection, Duplex); /** * Sends the given buffer to the end host. @@ -38,3 +39,16 @@ Connection.prototype.receive = function (packet) { this._receiver.receive(packet); } }; + +Connection.prototype._write = function (chunk, encoding, callback) { + this.send(chunk) + callback() +} + +Connection.prototype._read = function (n) { + +} + +Connection.prototype._final = function (callback) { + callback() +} \ No newline at end of file diff --git a/lib/LinkedList.js b/lib/LinkedList.js index 3a46363..05fa30d 100644 --- a/lib/LinkedList.js +++ b/lib/LinkedList.js @@ -102,6 +102,9 @@ LinkedList.prototype._insert = function (parentNode, object) { if (order <= -1) { var node = new Node(object); + if (this._currentNode.value > node.value) { + this._currentNode = node + } node._childNode = parentNode._childNode; parentNode._childNode = node; return InsertionResult.INSERTED; diff --git a/lib/Packet.js b/lib/Packet.js index 20aaad0..17c0944 100644 --- a/lib/Packet.js +++ b/lib/Packet.js @@ -13,14 +13,14 @@ function Packet(sequenceNumber, payload, synchronize, reset) { var offset = 0; - bools = segment.readUInt8(offset); offset++; + var bools = segment.readUInt8(offset); offset++; this._acknowledgement = !!(bools & 0x80); this._synchronize = !!(bools & 0x40); this._finish = !!(bools & 0x20); this._reset = !!(bools & 0x10); this._sequenceNumber = segment.readUInt8(offset); offset++; - this._payload = new Buffer(segment.length - offset); + this._payload = Buffer.alloc(segment.length - offset); segment.copy(this._payload, 0, offset); } else { this._acknowledgement = false; @@ -33,13 +33,13 @@ function Packet(sequenceNumber, payload, synchronize, reset) { }; Packet.createAcknowledgementPacket = function (sequenceNumber) { - var packet = new Packet(sequenceNumber, new Buffer(0), false); + var packet = new Packet(sequenceNumber, Buffer.alloc(0), false); packet._acknowledgement = true; return packet; }; Packet.createFinishPacket = function () { - var packet = new Packet(0, new Buffer(0), false, false); + var packet = new Packet(0, Buffer.alloc(0), false, false); packet._finish = true; return packet; }; @@ -75,7 +75,7 @@ Packet.prototype.getIsReset = function () { */ Packet.prototype.toBuffer = function () { var offset = 0; - var retval = new Buffer(2 + this._payload.length); + var retval = Buffer.alloc(2 + this._payload.length); var bools = 0 + ( (this._acknowledgement && 0x80) | @@ -116,4 +116,4 @@ Packet.prototype.equals = function (packet) { this.getSequenceNumber() === packet.getSequenceNumber() && bufferEqual(this.getPayload(), packet.getPayload()) ) -}; \ No newline at end of file +}; diff --git a/lib/Receiver.js b/lib/Receiver.js index 4eac337..bed64f1 100644 --- a/lib/Receiver.js +++ b/lib/Receiver.js @@ -27,19 +27,15 @@ Receiver.prototype.receive = function (packet) { } // Ignores packets that have a sequence number less than the next sequence - // number - if (!packet.getIsSynchronize() && packet.getSequenceNumber() < this._syncSequenceNumber) { + // number. sending the ack again + if (!packet.getIsSynchronize() && packet.getSequenceNumber() < this._nextSequenceNumber) { + this._packetSender.send(Packet.createAcknowledgementPacket(packet.getSequenceNumber())); return; } if (packet.getIsSynchronize() && !this._synced) { // This is the beginning of the stream. - if (packet.getSequenceNumber() === this._syncSequenceNumber) { - this._packetSender.send(Packet.createAcknowledgementPacket(packet.getSequenceNumber())); - return; - } - // Send the packet upstream, send acknowledgement packet to end host, and // increment the next expected packet. this._packets.clear(); @@ -59,6 +55,11 @@ Receiver.prototype.receive = function (packet) { // We're done. return; + } else if (packet.getIsSynchronize() && this._synced && this._syncSequenceNumber === packet.getSequenceNumber()) { + // this will happen if the ack for sync message get lost so we will send it again + this._packetSender.send(Packet.createAcknowledgementPacket(packet.getSequenceNumber())); + // and we are done! + return; } else if (packet.getIsReset()) { this.emit('_reset'); this._synced = false; @@ -90,7 +91,6 @@ Receiver.prototype.receive = function (packet) { // after the current packet. If there are, then check to see if the next // packet is the expected packet number. If it is, then start the // acknowledgement process anew. - var result = this._packets.insert(packet); if (result === LinkedList.InsertionResult.INSERTED) { this._pushIfExpectedSequence(packet); diff --git a/lib/Sender.js b/lib/Sender.js index d859846..2b117d3 100644 --- a/lib/Sender.js +++ b/lib/Sender.js @@ -69,6 +69,7 @@ Window.prototype.send = function () { } } + pkts.forEach(function (packet) { packet.on('acknowledge', onAcknowledge); packet.send(); diff --git a/lib/Server.js b/lib/Server.js index 5e29db7..b6e22c9 100644 --- a/lib/Server.js +++ b/lib/Server.js @@ -15,7 +15,6 @@ function Server(socket) { this._connections = {}; var self = this; - socket.on('message', function (message, rinfo) { var addressKey = rinfo.address + rinfo.port; var connection; @@ -29,7 +28,6 @@ function Server(socket) { self.emit('connection', connection); } else { // Just get the existing connection. - connection = self._connections[addressKey]; } @@ -38,7 +36,6 @@ function Server(socket) { if (packet.getIsFinish()) { // The client requested that the connection be closed. - delete self._connections[addressKey]; } else { // Capture the packet, and place it into the window of packets. diff --git a/lib/bufferEqual.js b/lib/bufferEqual.js index 59fb9fc..120ad1d 100644 --- a/lib/bufferEqual.js +++ b/lib/bufferEqual.js @@ -1,5 +1,3 @@ - -var buffertools = require('buffertools') module.exports = function bufferEqual(a, b) { - return buffertools.compare(a, b) === 0 + return Buffer.compare(a, b) === 0 } \ No newline at end of file