Skip to content

Commit 812994f

Browse files
authored
Merge pull request #581 from share/server-fixup
✨ Allow middleware to mutate submitted ops
2 parents 20f1fa7 + 3db7070 commit 812994f

File tree

9 files changed

+487
-8
lines changed

9 files changed

+487
-8
lines changed

docs/middleware/op-submission.md

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,20 @@ Since `'submitRequestEnd'` is an event -- not a middleware hook -- it provides n
140140

141141
## Mutating ops
142142

143-
{: .warn :}
144-
Mutating ops in middleware is generally a **bad idea**, and should be avoided.
143+
Ops may be amended in the `apply` middleware using the special `request.$fixup()` method:
145144

146-
The main reason for avoiding op mutation is that the client who submitted the op will not be informed of the mutation, so the client's doc will never receive the mutation.
145+
```js
146+
backend.use('apply', (request, next) => {
147+
let error;
148+
try {
149+
request.$fixup([{p: ['meta'], oi: {timestamp: Date.now()}}]);
150+
} catch (e) {
151+
error = e;
152+
}
147153

148-
The general workaround is to trigger a second op submission, rather than mutate the provided op. This obviously has the downside of op submissions being unatomic, but is the safest way to get the original client to receive the update.
154+
next(error);
155+
});
156+
```
149157

150158
{: .warn :}
151-
When submitting ops from the middleware, set careful conditions under which you submit ops in order to avoid infinite op submission loops, where submitting an op recursively triggers infinite op submissions.
159+
The `request.$fixup()` method may throw an error, which should be handled appropriately, usually by passing directly to the `next()` callback.

lib/agent.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -718,9 +718,10 @@ Agent.prototype._unsubscribeBulk = function(collection, ids, callback) {
718718

719719
Agent.prototype._submit = function(collection, id, op, callback) {
720720
var agent = this;
721-
this.backend.submit(this, collection, id, op, null, function(err, ops) {
721+
this.backend.submit(this, collection, id, op, null, function(err, ops, request) {
722722
// Message to acknowledge the op was successfully submitted
723723
var ack = {src: op.src, seq: op.seq, v: op.v};
724+
if (request._fixupOps.length) ack[ACTIONS.fixup] = request._fixupOps;
724725
if (err) {
725726
// Occasional 'Op already submitted' errors are expected to happen as
726727
// part of normal operation, since inflight ops need to be resent after

lib/backend.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ Backend.prototype.submit = function(agent, index, id, op, options, originalCallb
229229

230230
var callback = function(error, ops) {
231231
backend.emit('submitRequestEnd', error, request);
232-
originalCallback(error, ops);
232+
originalCallback(error, ops, request);
233233
};
234234

235235
var err = ot.checkOp(op);

lib/client/doc.js

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ var types = require('../types');
55
var util = require('../util');
66
var clone = util.clone;
77
var deepEqual = require('fast-deep-equal');
8+
var ACTIONS = require('../message-actions').ACTIONS;
89

910
var ERROR_CODE = ShareDBError.CODES;
1011

@@ -960,6 +961,23 @@ Doc.prototype._opAcknowledged = function(message) {
960961
return this.fetch();
961962
}
962963

964+
if (message[ACTIONS.fixup]) {
965+
for (var i = 0; i < message[ACTIONS.fixup].length; i++) {
966+
var fixupOp = message[ACTIONS.fixup][i];
967+
968+
for (var j = 0; j < this.pendingOps.length; j++) {
969+
var transformErr = transformX(this.pendingOps[i], fixupOp);
970+
if (transformErr) return this._hardRollback(transformErr);
971+
}
972+
973+
try {
974+
this._otApply(fixupOp, false);
975+
} catch (error) {
976+
return this._hardRollback(error);
977+
}
978+
}
979+
}
980+
963981
// The op was committed successfully. Increment the version number
964982
this.version++;
965983

lib/error.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ ShareDBError.prototype.name = 'ShareDBError';
1515
ShareDBError.CODES = {
1616
ERR_APPLY_OP_VERSION_DOES_NOT_MATCH_SNAPSHOT: 'ERR_APPLY_OP_VERSION_DOES_NOT_MATCH_SNAPSHOT',
1717
ERR_APPLY_SNAPSHOT_NOT_PROVIDED: 'ERR_APPLY_SNAPSHOT_NOT_PROVIDED',
18+
ERR_FIXUP_IS_ONLY_VALID_ON_APPLY: 'ERR_FIXUP_IS_ONLY_VALID_ON_APPLY',
19+
ERR_CANNOT_FIXUP_DELETION: 'ERR_CANNOT_FIXUP_DELETION',
1820
ERR_CLIENT_ID_BADLY_FORMED: 'ERR_CLIENT_ID_BADLY_FORMED',
1921
ERR_CANNOT_PING_OFFLINE: 'ERR_CANNOT_PING_OFFLINE',
2022
ERR_CONNECTION_SEQ_INTEGER_OVERFLOW: 'ERR_CONNECTION_SEQ_INTEGER_OVERFLOW',
@@ -68,6 +70,7 @@ ShareDBError.CODES = {
6870
ERR_SNAPSHOT_READS_REJECTED: 'ERR_SNAPSHOT_READS_REJECTED',
6971
ERR_SUBMIT_TRANSFORM_OPS_NOT_FOUND: 'ERR_SUBMIT_TRANSFORM_OPS_NOT_FOUND',
7072
ERR_TYPE_CANNOT_BE_PROJECTED: 'ERR_TYPE_CANNOT_BE_PROJECTED',
73+
ERR_TYPE_DOES_NOT_SUPPORT_COMPOSE: 'ERR_TYPE_DOES_NOT_SUPPORT_COMPOSE',
7174
ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE: 'ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE',
7275
ERR_UNKNOWN_ERROR: 'ERR_UNKNOWN_ERROR'
7376
};

lib/message-actions.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ exports.ACTIONS = {
99
bulkSubscribe: 'bs',
1010
bulkUnsubscribe: 'bu',
1111
fetch: 'f',
12+
fixup: 'fixup',
1213
subscribe: 's',
1314
unsubscribe: 'u',
1415
op: 'op',

lib/submit-request.js

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
var ot = require('./ot');
22
var projections = require('./projections');
33
var ShareDBError = require('./error');
4+
var types = require('./types');
45

56
var ERROR_CODE = ShareDBError.CODES;
67

@@ -40,9 +41,47 @@ function SubmitRequest(backend, agent, index, id, op, options) {
4041
this.snapshot = null;
4142
this.ops = [];
4243
this.channels = null;
44+
this._fixupOps = [];
4345
}
4446
module.exports = SubmitRequest;
4547

48+
SubmitRequest.prototype.$fixup = function(op) {
49+
if (this.action !== this.backend.MIDDLEWARE_ACTIONS.apply) {
50+
throw new ShareDBError(
51+
ERROR_CODE.ERR_FIXUP_IS_ONLY_VALID_ON_APPLY,
52+
'fixup can only be called during the apply middleware'
53+
);
54+
}
55+
56+
if (this.op.del) {
57+
throw new ShareDBError(
58+
ERROR_CODE.ERR_CANNOT_FIXUP_DELETION,
59+
'fixup cannot be applied on deletion ops'
60+
);
61+
}
62+
63+
var typeId = this.op.create ? this.op.create.type : this.snapshot.type;
64+
var type = types.map[typeId];
65+
if (typeof type.compose !== 'function') {
66+
throw new ShareDBError(
67+
ERROR_CODE.ERR_TYPE_DOES_NOT_SUPPORT_COMPOSE,
68+
typeId + ' does not support compose'
69+
);
70+
}
71+
72+
if (this.op.create) this.op.create.data = type.apply(this.op.create.data, op);
73+
else this.op.op = type.compose(this.op.op, op);
74+
75+
var fixupOp = {
76+
src: this.op.src,
77+
seq: this.op.seq,
78+
v: this.op.v,
79+
op: op
80+
};
81+
82+
this._fixupOps.push(fixupOp);
83+
};
84+
4685
SubmitRequest.prototype.submit = function(callback) {
4786
var request = this;
4887
var backend = this.backend;
@@ -103,7 +142,7 @@ SubmitRequest.prototype.submit = function(callback) {
103142

104143
// Transform the op up to the current snapshot version, then apply
105144
var from = op.v;
106-
backend.db.getOpsToSnapshot(collection, id, from, snapshot, null, function(err, ops) {
145+
backend.db.getOpsToSnapshot(collection, id, from, snapshot, {metadata: true}, function(err, ops) {
107146
if (err) return callback(err);
108147

109148
if (ops.length !== snapshot.v - from) {
@@ -134,6 +173,8 @@ SubmitRequest.prototype.apply = function(callback) {
134173
// Always set the channels before each attempt to apply. If the channels are
135174
// modified in a middleware and we retry, we want to reset to a new array
136175
this.channels = this.backend.getChannels(this.collection, this.id);
176+
this._fixupOps = [];
177+
delete this.op.m.fixup;
137178

138179
var request = this;
139180
this.backend.trigger(this.backend.MIDDLEWARE_ACTIONS.apply, this.agent, this, function(err) {
@@ -152,6 +193,7 @@ SubmitRequest.prototype.commit = function(callback) {
152193
var backend = this.backend;
153194
backend.trigger(backend.MIDDLEWARE_ACTIONS.commit, this.agent, this, function(err) {
154195
if (err) return callback(err);
196+
if (request._fixupOps.length) request.op.m.fixup = request._fixupOps;
155197

156198
// Try committing the operation and snapshot to the database atomically
157199
backend.db.commit(
@@ -204,6 +246,7 @@ SubmitRequest.prototype._transformOp = function(ops) {
204246
// can happen in normal operation, such as a client resending an
205247
// unacknowledged operation at reconnect. It's important we don't apply
206248
// the same op twice
249+
if (op.m.fixup) this._fixupOps = op.m.fixup;
207250
return this.alreadySubmittedError();
208251
}
209252

@@ -213,6 +256,7 @@ SubmitRequest.prototype._transformOp = function(ops) {
213256

214257
var err = ot.transform(type, this.op, op);
215258
if (err) return err;
259+
delete op.m;
216260
this.ops.push(op);
217261
}
218262
};

test/client/submit.js

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ var sinon = require('sinon');
44
var types = require('../../lib/types');
55
var deserializedType = require('./deserialized-type');
66
var numberType = require('./number-type');
7+
var errorHandler = require('../util').errorHandler;
78
types.register(deserializedType.type);
89
types.register(deserializedType.type2);
910
types.register(numberType.type);
@@ -1210,5 +1211,52 @@ module.exports = function() {
12101211
});
12111212
});
12121213
});
1214+
1215+
describe('submitting when behind the server', function() {
1216+
var doc;
1217+
var remoteDoc;
1218+
1219+
beforeEach(function(done) {
1220+
var connection = this.backend.connect();
1221+
doc = connection.get('dogs', 'fido');
1222+
var remoteConnection = this.backend.connect();
1223+
remoteDoc = remoteConnection.get('dogs', 'fido');
1224+
1225+
async.series([
1226+
doc.create.bind(doc, {name: 'fido'}),
1227+
remoteDoc.fetch.bind(remoteDoc),
1228+
remoteDoc.submitOp.bind(remoteDoc, [{p: ['tricks'], oi: ['fetch']}]),
1229+
function(next) {
1230+
expect(doc.data).to.eql({name: 'fido'});
1231+
expect(remoteDoc.data).to.eql({name: 'fido', tricks: ['fetch']});
1232+
next();
1233+
}
1234+
], done);
1235+
});
1236+
1237+
it('is sent ops it has missed when submitting, without calling fetch', function(done) {
1238+
sinon.spy(doc, 'fetch');
1239+
1240+
doc.submitOp([{p: ['age'], oi: 2}], errorHandler(done));
1241+
1242+
doc.once('op', function() {
1243+
expect(doc.data).to.eql({name: 'fido', tricks: ['fetch'], age: 2});
1244+
expect(doc.fetch.called).to.be.false;
1245+
done();
1246+
});
1247+
});
1248+
1249+
it('does not expose op metadata in the middleware when sending missing ops', function(done) {
1250+
this.backend.use('apply', function(request) {
1251+
expect(request.ops).to.have.length(1);
1252+
var op = request.ops[0];
1253+
expect(op.op).to.eql([{p: ['tricks'], oi: ['fetch']}]);
1254+
expect(op.m).to.be.undefined;
1255+
done();
1256+
});
1257+
1258+
doc.submitOp([{p: ['age'], oi: 2}], errorHandler(done));
1259+
});
1260+
});
12131261
});
12141262
};

0 commit comments

Comments
 (0)