@@ -1358,12 +1358,20 @@ bool DcpProducer::setStreamDeadStatus(Vbid vbid,
1358
1358
end_stream_status_t status) {
1359
1359
auto rv = streams.find (vbid.get ());
1360
1360
if (rv != streams.end ()) {
1361
- for (auto handle = rv->second ->rlock (); !handle.end (); handle.next ()) {
1362
- if (handle.get ()->compareStreamId (sid)) {
1363
- handle.get ()->setDead (status);
1364
- return true ;
1361
+ std::shared_ptr<Stream> streamPtr;
1362
+ // MB-35073: holding StreamContainer rlock while calling setDead
1363
+ // has been seen to cause lock inversion elsewhere.
1364
+ // Collect sharedptr then setDead once lock is released (itr out of
1365
+ // scope).
1366
+ for (auto itr = rv->second ->rlock (); !itr.end (); itr.next ()) {
1367
+ if (itr.get ()->compareStreamId (sid)) {
1368
+ streamPtr = itr.get ();
1369
+ break ;
1365
1370
}
1366
1371
}
1372
+ if (streamPtr) {
1373
+ streamPtr->setDead (status);
1374
+ }
1367
1375
return true ;
1368
1376
}
1369
1377
@@ -1378,12 +1386,22 @@ void DcpProducer::closeAllStreams() {
1378
1386
streams.end (),
1379
1387
[&vbvector](StreamsMap::value_type& vt) {
1380
1388
vbvector.push_back ((Vbid)vt.first );
1381
- auto handle = vt.second ->wlock ();
1382
- while (!handle.end ()) {
1383
- handle.get ()->setDead (END_STREAM_DISCONNECTED);
1384
- handle.next ();
1389
+ std::vector<std::shared_ptr<Stream>> streamPtrs;
1390
+ // MB-35073: holding StreamContainer lock while
1391
+ // calling setDead leads to lock inversion - so
1392
+ // collect sharedptrs in one pass then setDead once
1393
+ // lock is released (itr out of scope).
1394
+ {
1395
+ auto handle = vt.second ->wlock ();
1396
+ for (; !handle.end (); handle.next ()) {
1397
+ streamPtrs.push_back (handle.get ());
1398
+ }
1399
+ handle.clear ();
1400
+ }
1401
+
1402
+ for (auto streamPtr : streamPtrs) {
1403
+ streamPtr->setDead (END_STREAM_DISCONNECTED);
1385
1404
}
1386
- handle.clear ();
1387
1405
});
1388
1406
}
1389
1407
for (const auto vbid: vbvector) {
0 commit comments