Skip to content

Commit fd1d0bc

Browse files
authored
fix: fix replication v1.14 (#2537)
1 parent 32bdcd1 commit fd1d0bc

File tree

5 files changed

+56
-3
lines changed

5 files changed

+56
-3
lines changed

src/server/main_service.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1849,6 +1849,7 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
18491849
cntx->transaction = stub_tx.get();
18501850

18511851
result = interpreter->RunFunction(eval_args.sha, &error);
1852+
cntx->transaction->FIX_ConcludeJournalExec(); // flush journal
18521853

18531854
cntx->transaction = tx;
18541855
return OpStatus::OK;

src/server/replica.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1203,7 +1203,7 @@ auto DflyShardReplica::TransactionReader::NextTxData(JournalReader* reader, Cont
12031203

12041204
// Otherwise, continue building multi command.
12051205
DCHECK(res->opcode == journal::Op::MULTI_COMMAND || res->opcode == journal::Op::EXEC);
1206-
DCHECK(res->txid > 0);
1206+
DCHECK(res->txid > 0 || res->shard_cnt == 1);
12071207

12081208
auto txid = res->txid;
12091209
auto& txdata = current_[txid];

src/server/transaction.cc

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,9 @@ Transaction::Transaction(const Transaction* parent, ShardId shard_id, std::optio
108108
// Use squashing mechanism for inline execution of single-shard EVAL
109109
multi_->mode = LOCK_AHEAD;
110110
}
111+
111112
multi_->role = SQUASHED_STUB;
113+
multi_->shard_journal_write.resize(1);
112114

113115
time_now_ms_ = parent->time_now_ms_;
114116

@@ -972,6 +974,16 @@ void Transaction::IterateMultiLocks(ShardId sid, std::function<void(const std::s
972974
}
973975
}
974976

977+
void Transaction::FIX_ConcludeJournalExec() {
978+
if (!multi_->shard_journal_write.front())
979+
return;
980+
981+
if (auto journal = EngineShard::tlocal()->journal(); journal != nullptr) {
982+
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, 1,
983+
unique_slot_checker_.GetUniqueSlotId(), {}, false);
984+
}
985+
}
986+
975987
void Transaction::EnableShard(ShardId sid) {
976988
unique_shard_cnt_ = 1;
977989
unique_shard_id_ = sid;
@@ -1506,8 +1518,13 @@ void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&
15061518
bool allow_await) const {
15071519
auto journal = shard->journal();
15081520
CHECK(journal);
1509-
if (multi_ && multi_->role != SQUASHED_STUB)
1510-
multi_->shard_journal_write[shard->shard_id()] = true;
1521+
1522+
if (multi_) {
1523+
if (multi_->role != SQUASHED_STUB)
1524+
multi_->shard_journal_write[shard->shard_id()] = true;
1525+
else
1526+
multi_->shard_journal_write[0] = true;
1527+
}
15111528

15121529
bool is_multi = multi_commands || IsAtomicMulti();
15131530

src/server/transaction.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,9 @@ class Transaction {
353353

354354
void IterateMultiLocks(ShardId sid, std::function<void(const std::string&)> cb) const;
355355

356+
// Send journal EXEC opcode after a series of MULTI commands on the currently active shard
357+
void FIX_ConcludeJournalExec();
358+
356359
private:
357360
// Holds number of locks for each IntentLock::Mode: shared and exlusive.
358361
struct LockCnt {

tests/dragonfly/replication_test.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -769,6 +769,38 @@ async def test_expiry(df_local_factory: DflyInstanceFactory, n_keys=1000):
769769
assert all(v is None for v in res)
770770

771771

772+
@dfly_args({"proactor_threads": 4})
773+
async def test_simple_scripts(df_local_factory: DflyInstanceFactory):
774+
master = df_local_factory.create()
775+
replicas = [df_local_factory.create() for _ in range(2)]
776+
df_local_factory.start_all([master] + replicas)
777+
778+
c_replicas = [replica.client() for replica in replicas]
779+
c_master = master.client()
780+
781+
# Connect replicas and wait for sync to finish
782+
for c_replica in c_replicas:
783+
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
784+
await check_all_replicas_finished([c_replica], c_master)
785+
786+
# Generate some scripts and run them
787+
keys = ["a", "b", "c", "d", "e"]
788+
for i in range(len(keys) + 1):
789+
script = ""
790+
subkeys = keys[:i]
791+
for key in subkeys:
792+
script += f"redis.call('INCR', '{key}')"
793+
script += f"redis.call('INCR', '{key}')"
794+
795+
await c_master.eval(script, len(subkeys), *subkeys)
796+
797+
# Wait for replicas
798+
await check_all_replicas_finished([c_replica], c_master)
799+
800+
for c_replica in c_replicas:
801+
assert (await c_replica.mget(keys)) == ["10", "8", "6", "4", "2"]
802+
803+
772804
"""
773805
Test script replication.
774806

0 commit comments

Comments
 (0)