最近在研读《C++ Concurrency in Action - 2rd》,在讲到死锁的问题描述和解决方案时,给出了std::lock能够实现同时锁住多个mutex而不出现死锁,但是没有讲为啥用std::lock就能避免死锁。google搜了一下全网也没有人回答这个问题,于是打算翻下STL代码自己看看,参考的VS2019 MSVC 14.29.30037 <mutex>文件的代码。
先来看一下std::lock函数,std::lock函数能接收多个(至少需要两个,_Lock0和_Lock1是必须的参数)互斥量,而且允许不同类型的互斥量(每个都是独立的互斥量模板,只要实现了lock、unlock、try_lock函数的类都能当做这里的互斥量,用户可以自定义互斥量类型)。std::lock函数调的内部的_Lock_nonmember1函数。
template <class _Lock0, class _Lock1, class... _LockN>
void lock(_Lock0& _Lk0, _Lock1& _Lk1, _LockN&... _LkN) { // lock multiple locks, without deadlock
_Lock_nonmember1(_Lk0, _Lk1, _LkN...);
}
接下来看下_Lock_nonmember1函数,这个模板函数有一个针对两个以上互斥量的泛化实现,并实现了一个针对两个互斥量的偏特化。两个以上互斥量的泛化实现和针对两个互斥量的偏特化实现,从原理上看是完全一致的。因此我们在这里用针对两个互斥量的偏特化实现来研究“为什么std::lock能避免死锁”这个问题,聚焦问题的本身。
_Lock_nonmember1函数内循环执行两个_Lock_attempt_small函数,_Lk0和_Lk1两个互斥量在这两个函数中交换了顺序,可以知道只要有一个_Lock_attempt_small函数返回false时就会退出循环,即加锁成功了。
template <class _Lock0, class _Lock1>
void _Lock_nonmember1(_Lock0& _Lk0, _Lock1& _Lk1) {
// lock 2 locks, without deadlock, special case for better codegen and reduced metaprogramming for common case
while (_Lock_attempt_small(_Lk0, _Lk1) && _Lock_attempt_small(_Lk1, _Lk0)) { // keep trying
}
}
再进去看_Lock_attempt_small的实现。_Lock_attempt_small先对入参中的_Lk0加锁[1],然后对_Lk1尝试加锁[2]:如果try_lock返回true说明_Lk1也加锁成功,_Lock_attempt_small返回false让_Lock_nonmember1的while循环退出,_Lk0和_Lk1都成功锁住,std::lock执行结束;如果出现异常会对_Lk0解锁并将异常往上抛出[3],这一步异常保护是保证了异常安全的,因为异常处理中恢复了之前_Lk0没加锁的状态;如果try_lock返回false说明尝试对_Lk1加锁失败,其他线程占有着_Lk1,此时会对之前已经上锁的_Lk0解锁[4],然后让渡当前的CPU时间片给其他线程,最后返回true,_Lock_nonmember1的while循环继续。
template <class _Lock0, class _Lock1>
bool _Lock_attempt_small(_Lock0& _Lk0, _Lock1& _Lk1) {
// attempt to lock 2 locks, by first locking _Lk0, and then trying to lock _Lk1 returns whether to try again
_Lk0.lock(); // [1]
_TRY_BEGIN
if (_Lk1.try_lock()) { // [2]
return false;
}
_CATCH_ALL
_Lk0.unlock(); // [3]
_RERAISE;
_CATCH_END
_Lk0.unlock(); // [4]
_STD this_thread::yield();
return true;
}
我们再回到_Lock_nonmember1(_Lock0& _Lk0, _Lock1& _Lk1)函数中去看(此时下文中描述的_Lk0和_Lk1是基于_Lock_nonmember1的_Lk0和_Lk1,不是_Lock_attempt_small内的_Lk0和_Lk1),_Lock_nonmember1在调用_Lock_attempt_small时是交替地给进去_Lk0和_Lk1的,即这次先lock住_Lk0再对_Lk1做try_lock,下一次则会交换,先lock住_Lk1再对_Lk0做try_lock,同时当lock住一个锁而try_lock另一个锁失败的时候,会解掉已经lock住的锁,让渡时间片后再重新进入下一次lock和try_lock。这样,使用std::lock的开发者就不必自己去确保两个互斥量始终以相同的顺序去上锁(事实上有些情况下开发者也没法保证,比如开发者在实现一个线程安全的容器的swap函数的时候),而lock住一个锁并try_lock另一个锁失败时解锁并让渡的操作保证了std::lock不会有死锁风险。
搞懂两个互斥量时std::lock的实现了,扩展一下,再来看看两个以上互斥量时_Lock_nonmember1的实现:
// FUNCTION TEMPLATE lock
template <class _Lock0, class _Lock1, class _Lock2, class... _LockN>
void _Lock_nonmember1(_Lock0& _Lk0, _Lock1& _Lk1, _Lock2& _Lk2, _LockN&... _LkN) {
// lock 3 or more locks, without deadlock
int _Hard_lock = 0;
while (_Hard_lock != -1) {
_Hard_lock = _Lock_attempt(_Hard_lock, _Lk0, _Lk1, _Lk2, _LkN...);
}
}
template <class… _LockN>
int _Lock_attempt(const int _Hard_lock, _LockN&… _LkN) {
// attempt to lock 3 or more locks, starting by locking _LkN[_Hard_lock] and trying to lock the rest
using _Indices = index_sequence_for<_LockN…>;
_Lock_from_locks(_Hard_lock, _Indices{}, _LkN…);
int _Failed = -1;
int _Backout_start = _Hard_lock; // that is, unlock _Hard_lock
_TRY_BEGIN
_Failed = _Try_lock_range(0, _Hard_lock, _LkN...);
if (_Failed == -1) {
_Backout_start = 0; // that is, unlock [0, _Hard_lock] if the next throws
_Failed = _Try_lock_range(_Hard_lock + 1, sizeof...(_LockN), _LkN...);
if (_Failed == -1) { // we got all the locks
return -1;
}
}
_CATCH_ALL
_Unlock_locks(_Backout_start, _Hard_lock + 1, _Indices{}, _LkN...);
_RERAISE;
_CATCH_END
// we didn't get all the locks, backout
_Unlock_locks(_Backout_start, _Hard_lock + 1, _Indices{}, _LkN...);
_STD this_thread::yield();
return _Failed;
}
实现上其实还是类似的,_Lock_attempt_small变成了_Lock_attempt,而_Lock_attempt函数实际上也是先将_Hard_lock锁住(_Lock_from_locks),然后依次对其他的互斥量做try_lock操作(_Try_lock_range),如果所有的锁都锁定了,_Lock_attempt返回-1结束掉_Lock_nonmember1里的while循环,如果失败了,所有已加的锁全部解掉,然后让渡时间片,_Failed是当前的失败位置,返回回去给_Hard_lock,下一次循环时会在这个位置开始lock,再依次try_lock其他的锁。如果出现异常,处理的方式也是类似的,先解掉所有已经加了锁的互斥量上的锁,再往上throw出去。