多线程安全集合类BCL源码分析
volatile ---强制内存读取最新,不从cpu缓存、或线程上下文缓存读取
在被各种优化之后,booknum因为是值类型在每个线程访问时会发生复制且又是在静态方法中被修改。所以每个线程都会复制booknum的值到当前线程上下文中缓存起来。这样就导致了parent线程修改了booknum的值juster线程看不到的情况。这个时候就需要用volatile关键字告诉编译器不需要这样的优化,表示用volatile定义的变量会被改变,每次都必须从内存中读取,而不能把他放在CPU cache或寄存器中重复使用。最后booknum会在运行的过程中修改值且其他线程能‘共享访问’达到最终的效果。
Interlocked---实质包含了volatile效果,但资源更重--在系统总线层面锁死了
内部是RuntimeImports
C++ MFC window系统内核api
Linux下也有 RhpLockCmpXchg32
cmpxchg(void* ptr, int old, int new),如果ptr和old的值一样,则把new写到ptr内存,否则返回ptr的值,整个操作是原子的。在Intel平台下,会用lock cmpxchg来实现,这里的lock个人理解是锁住内存总线,这样如果有另一个线程想访问ptr的内存,就会被block住。
程序会根据当前处理器的类型来决定是否为cmpxchg指令添加lock前缀。如果程序是在多处理器上运行,就为cmpxchg指令加上lock前缀(Lock Cmpxchg)。反之,如果程序是在单处理器上运行,就省略lock前缀(单处理器自身会维护单处理器内的顺序一致
性,不需要lock前缀提供的内存屏障效果)。
intel的手册对lock前缀的说明如下。
1)确保对内存的读-改-写操作原子执行。在Pentium及Pentium之前的处理器中,带有lock前 缀的指令在执行期间会锁住总线,使得其他处理器暂时无法通过总线访问内存。很显然,这会带来昂贵的开销。从Pentium 4、Intel Xeon及P6处理器开始,Intel使用缓存锁定(Cache Locking) 来保证指令执行的原子性。缓存锁定将大大降低lock前缀指令的执行开销。
2)禁止该指令,与之前和之后的读和写指令重排序。
3)把写缓冲区中的所有数据刷新到内存中。
上面的第2点和第3点所具有的内存屏障效果,足以同时实现volatile读和volatile写的内存语义。
SpinWait--不切换cpu
应用程序应该让线程等待而不是切换。
一:Thread.Sleep(1000);
Thread.Sleep()方法:是强制放弃CPU的时间片,然后重新和其他线程一起参与CPU的竞争。
二:Thread.SpinWait(1000);
Thread.SpinWait()方法:只是让CPU去执行一段没有用的代码。当时间结束之后能马上继续执行,而不是重新参与CPU的竞争。
用Sleep()方法是会让线程放弃CPU的使用权。
用SpinWait()方法是不会放弃CPU的使用权。
读写锁ReaderWriterLockSlim
Barrier 类--屏障-多阶段
用于组织多个线程在某个时刻碰面
线程通知AutoResetEvent 和 ManualResetEvent
AutoResetEvent 和 ManualResetEvent 十分相似。两者之间的区别,在于前者是自动(Auto),后者是手动(Manua)
从一个线程向另一个线程发送通知
SemaphoreSlim 信号量
该类限制了同时访问一个资源的线程数量
ReaderWriterLockSlim
创建一个线程安全的机制,在多线程中对一个集合进行读写操作。
CountDowenEvent-----信号类来等待直到一定数量的操作完成
ConcurrentQueue--Volatile强制内存捞最新,比较最新
内含Segment,记录头和尾。
volatile 一个头 一个尾
通过每次插入 弹出,Volatile.Read判断volatitle
然后用Interlocked.CompareExchange原子操作+Volatile.Write修改
public bool TryEnqueue(T item) { Slot[] slots = _slots;
// Loop in case of contention... while (true) { // Get the tail at which to try to return. int currentTail = Volatile.Read(ref _headAndTail.Tail); int slotsIndex = currentTail & _slotsMask;
// Read the sequence number for the tail position. int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber);
// The slot is empty and ready for us to enqueue into it if its sequence // number matches the slot. int diff = sequenceNumber - currentTail; if (diff == 0) { // We may be racing with other enqueuers. Try to reserve the slot by incrementing // the tail. Once we've done that, no one else will be able to write to this slot, // and no dequeuer will be able to read from this slot until we've written the new // sequence number. WARNING: The next few lines are not reliable on a runtime that // supports thread aborts. If a thread abort were to sneak in after the CompareExchange // but before the Volatile.Write, other threads will spin trying to access this slot. // If this implementation is ever used on such a platform, this if block should be // wrapped in a finally / prepared region. if (Interlocked.CompareExchange(ref _headAndTail.Tail, currentTail + 1, currentTail) == currentTail) { // Successfully reserved the slot. Note that after the above CompareExchange, other threads // trying to return will end up spinning until we do the subsequent Write. slots[slotsIndex].Item = item; Volatile.Write(ref slots[slotsIndex].SequenceNumber, currentTail + 1); return true; }
// The tail was already advanced by another thread. A newer tail has already been observed and the next // iteration would make forward progress, so there's no need to spin-wait before trying again. } else if (diff < 0) { // The sequence number was less than what we needed, which means this slot still // contains a value, i.e. the segment is full. Technically it's possible that multiple // dequeuers could have read concurrently, with those getting later slots actually // finishing first, so there could be spaces after this one that are available, but // we need to enqueue in order. return false; } else { // Either the slot contains an item, or it is empty but because the slot was filled and dequeued. In either // case, the tail has already been updated beyond what was observed above, and the sequence number observed // above as a volatile load is more recent than the update to the tail. So, the next iteration of the loop // is guaranteed to see a new tail. Since this is an always-forward-progressing situation, there's no need // to spin-wait before trying again. } } }
/// <summary>Represents a slot in the queue.</summary> [StructLayout(LayoutKind.Auto)] [DebuggerDisplay("Item = {Item}, SequenceNumber = {SequenceNumber}")] internal struct Slot { /// <summary>The item.</summary> public T? Item; // SOS's ThreadPool command depends on this being at the beginning of the struct when T is a reference type /// <summary>The sequence number for this slot, used to synchronize between enqueuers and dequeuers.</summary> public int SequenceNumber; } }
|
ConCurrentStack---spinwait-锁CPU不切换-直接空转--极快
内含node,node指针链表做原子性指向
public void Push(T item) { // Pushes a node onto the front of the stack thread-safely. Internally, this simply // swaps the current head pointer using a (thread safe) CAS operation to accomplish // lock freedom. If the CAS fails, we add some back off to statistically decrease // contention at the head, and then go back around and retry.
Node newNode = new Node(item); newNode._next = _head; if (Interlocked.CompareExchange(ref _head, newNode, newNode._next) == newNode._next) { return; }
// If we failed, go to the slow path and loop around until we succeed. PushCore(newNode, newNode); }
private void PushCore(Node head, Node tail) { SpinWait spin = default;
// Keep trying to CAS the existing head with the new node until we succeed. do { spin.SpinOnce(sleep1Threshold: -1); // Reread the head and link our new node. tail._next = _head; } while (Interlocked.CompareExchange( ref _head, head, tail._next) != tail._next);
if (CDSCollectionETWBCLProvider.Log.IsEnabled()) { CDSCollectionETWBCLProvider.Log.ConcurrentStack_FastPushFailed(spin.Count); } }
|
ConcurrentDictionary--table+monitor混合锁(可重试)
tables.GetBucketAndLock--内部类结构--长度取%
Monitor.Enter--一直重试混合锁,
private bool TryAddInternal(TKey key, int? nullableHashcode, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue) { IEqualityComparer<TKey>? comparer = _comparer;
Debug.Assert( nullableHashcode is null || (comparer is null && key.GetHashCode() == nullableHashcode) || (comparer != null && comparer.GetHashCode(key) == nullableHashcode));
int hashcode = nullableHashcode ?? (comparer is null ? key.GetHashCode() : comparer.GetHashCode(key));
while (true) { Tables tables = _tables; object[] locks = tables._locks; ref Node? bucket = ref tables.GetBucketAndLock(hashcode, out uint lockNo);
bool resizeDesired = false; bool lockTaken = false; try { if (acquireLock) { Monitor.Enter(locks[lockNo], ref lockTaken); }
// If the table just got resized, we may not be holding the right lock, and must retry. // This should be a rare occurrence. if (tables != _tables) { continue; }
// Try to find this key in the bucket Node? prev = null; for (Node? node = bucket; node != null; node = node._next) { Debug.Assert((prev is null && node == bucket) || prev!._next == node); if (hashcode == node._hashcode && (comparer is null ? _defaultComparer.Equals(node._key, key) : comparer.Equals(node._key, key))) { // The key was found in the dictionary. If updates are allowed, update the value for that key. // We need to create a new node for the update, in order to support TValue types that cannot // be written atomically, since lock-free reads may be happening concurrently. if (updateIfExists) { if (s_isValueWriteAtomic) { node._value = value; } else { var newNode = new Node(node._key, value, hashcode, node._next); if (prev is null) { Volatile.Write(ref bucket, newNode); } else { prev._next = newNode; } } resultingValue = value; } else { resultingValue = node._value; } return false; } prev = node; }
// The key was not found in the bucket. Insert the key-value pair. var resultNode = new Node(key, value, hashcode, bucket); Volatile.Write(ref bucket, resultNode); checked { tables._countPerLock[lockNo]++; }
// // If the number of elements guarded by this lock has exceeded the budget, resize the bucket table. // It is also possible that GrowTable will increase the budget but won't resize the bucket table. // That happens if the bucket table is found to be poorly utilized due to a bad hash function. // if (tables._countPerLock[lockNo] > _budget) { resizeDesired = true; } } finally { if (lockTaken) { Monitor.Exit(locks[lockNo]); } }
// // The fact that we got here means that we just performed an insertion. If necessary, we will grow the table. // // Concurrency notes: // - Notice that we are not holding any locks at when calling GrowTable. This is necessary to prevent deadlocks. // - As a result, it is possible that GrowTable will be called unnecessarily. But, GrowTable will obtain lock 0 // and then verify that the table we passed to it as the argument is still the current table. // if (resizeDesired) { GrowTable(tables); }
resultingValue = value; return true; } }
|
private sealed class Tables { /// <summary>A singly-linked list for each bucket.</summary> internal readonly Node?[] _buckets; /// <summary>A set of locks, each guarding a section of the table.</summary> internal readonly object[] _locks; /// <summary>The number of elements guarded by each lock.</summary> internal readonly int[] _countPerLock; /// <summary>Pre-computed multiplier for use on 64-bit performing faster modulo operations.</summary> internal readonly ulong _fastModBucketsMultiplier;
internal Tables(Node?[] buckets, object[] locks, int[] countPerLock) { _buckets = buckets; _locks = locks; _countPerLock = countPerLock; if (IntPtr.Size == 8) { _fastModBucketsMultiplier = HashHelpers.GetFastModMultiplier((uint)buckets.Length); } }
/// <summary>Computes a ref to the bucket for a particular key.</summary> [MethodImpl(MethodImplOptions.AggressiveInlining)] internal ref Node? GetBucket(int hashcode) { Node?[] buckets = _buckets; if (IntPtr.Size == 8) { return ref buckets[HashHelpers.FastMod((uint)hashcode, (uint)buckets.Length, _fastModBucketsMultiplier)]; } else { return ref buckets[(uint)hashcode % (uint)buckets.Length]; } }
/// <summary>Computes the bucket and lock number for a particular key.</summary> [MethodImpl(MethodImplOptions.AggressiveInlining)] internal ref Node? GetBucketAndLock(int hashcode, out uint lockNo) { Node?[] buckets = _buckets; uint bucketNo; if (IntPtr.Size == 8) { bucketNo = HashHelpers.FastMod((uint)hashcode, (uint)buckets.Length, _fastModBucketsMultiplier); } else { bucketNo = (uint)hashcode % (uint)buckets.Length; } lockNo = bucketNo % (uint)_locks.Length; // doesn't use FastMod, as it would require maintaining a different multiplier return ref buckets[bucketNo]; } }
|
BlockingCollection
SemaphoreSlim 信号量
Spinwait 原子性等待
Interlocked 锁
private bool TryAddWithNoTimeValidation(T item, int millisecondsTimeout, CancellationToken cancellationToken) { CheckDisposed();
cancellationToken.ThrowIfCancellationRequested();
if (IsAddingCompleted) { throw new InvalidOperationException(SR.BlockingCollection_Completed); }
bool waitForSemaphoreWasSuccessful = true;
if (_freeNodes != null) { //If the _freeNodes semaphore threw OperationCanceledException then this means that CompleteAdding() //was called concurrently with Adding which is not supported by BlockingCollection. CancellationTokenSource? linkedTokenSource = null; try { waitForSemaphoreWasSuccessful = _freeNodes.Wait(0, default); if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0) { linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource( cancellationToken, _producersCancellationTokenSource.Token); waitForSemaphoreWasSuccessful = _freeNodes.Wait(millisecondsTimeout, linkedTokenSource.Token); } } catch (OperationCanceledException) { //if cancellation was via external token, throw an OCE cancellationToken.ThrowIfCancellationRequested();
//if cancellation was via internal token, this indicates invalid use, hence InvalidOpEx. //Debug.Assert(_ProducersCancellationTokenSource.Token.IsCancellationRequested);
throw new InvalidOperationException (SR.BlockingCollection_Add_ConcurrentCompleteAdd); } finally { if (linkedTokenSource != null) { linkedTokenSource.Dispose(); } } } if (waitForSemaphoreWasSuccessful) { // Update the adders count if the complete adding was not requested, otherwise // spins until all adders finish then throw IOE // The idea behind to spin until all adders finish, is to avoid to return to the caller with IOE while there are still some adders have // not been finished yet SpinWait spinner = default; while (true) { int observedAdders = _currentAdders; if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0) { spinner.Reset(); // CompleteAdding is requested, spin then throw while (_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce(); throw new InvalidOperationException(SR.BlockingCollection_Completed); }
if (Interlocked.CompareExchange(ref _currentAdders, observedAdders + 1, observedAdders) == observedAdders) { Debug.Assert((observedAdders + 1) <= (~COMPLETE_ADDING_ON_MASK), "The number of concurrent adders thread exceeded the maximum limit."); break; } spinner.SpinOnce(sleep1Threshold: -1); }
// This outer try/finally to workaround of repeating the decrement adders code 3 times, because we should decrement the adders if: // 1- _collection.TryAdd threw an exception // 2- _collection.TryAdd succeeded // 3- _collection.TryAdd returned false // so we put the decrement code in the finally block try { //TryAdd is guaranteed to find a place to add the element. Its return value depends //on the semantics of the underlying store. Some underlying stores will not add an already //existing item and thus TryAdd returns false indicating that the size of the underlying //store did not increase.
bool addingSucceeded = false; try { //The token may have been canceled before the collection had space available, so we need a check after the wait has completed. //This fixes bug #702328, case 2 of 2. cancellationToken.ThrowIfCancellationRequested(); addingSucceeded = _collection.TryAdd(item); } catch { //TryAdd did not result in increasing the size of the underlying store and hence we need //to increment back the count of the _freeNodes semaphore. if (_freeNodes != null) { _freeNodes.Release(); } throw; } if (addingSucceeded) { //After adding an element to the underlying storage, signal to the consumers //waiting on _occupiedNodes that there is a new item added ready to be consumed. _occupiedNodes.Release(); } else { throw new InvalidOperationException(SR.BlockingCollection_Add_Failed); } } finally { // decrement the adders count Debug.Assert((_currentAdders & ~COMPLETE_ADDING_ON_MASK) > 0); Interlocked.Decrement(ref _currentAdders); } } return waitForSemaphoreWasSuccessful; }
|
ConcurrentBag
ThreadLocal<WorkStealingQueue> _locals
WorkStealingQueue:volatile 、SpinLock 、Interlocked、 在强制内存读取最新、不使用独立线程上下文缓存的情况下、不切换CPU上下文、然后Interlocked 锁死总线。
internal sealed partial class ThreadPoolWorkQueue { internal static class WorkStealingQueueList { #pragma warning disable CA1825 // avoid the extra generic instantation for Array.Empty<T>(); this is the only place we'll ever create this array private static volatile WorkStealingQueue[] _queues = new WorkStealingQueue[0]; #pragma warning restore CA1825
public static WorkStealingQueue[] Queues => _queues;
public static void Add(WorkStealingQueue queue) { Debug.Assert(queue != null); while (true) { WorkStealingQueue[] oldQueues = _queues; Debug.Assert(Array.IndexOf(oldQueues, queue) < 0);
var newQueues = new WorkStealingQueue[oldQueues.Length + 1]; Array.Copy(oldQueues, newQueues, oldQueues.Length); newQueues[^1] = queue; if (Interlocked.CompareExchange(ref _queues, newQueues, oldQueues) == oldQueues) { break; } } }
|
private SpinLock m_foreignLock = new SpinLock(enableThreadOwnerTracking: false);
public void LocalPush(object obj) { int tail = m_tailIndex;
// We're going to increment the tail; if we'll overflow, then we need to reset our counts if (tail == int.MaxValue) { tail = LocalPush_HandleTailOverflow(); }
// When there are at least 2 elements' worth of space, we can take the fast path. if (tail < m_headIndex + m_mask) { Volatile.Write(ref m_array[tail & m_mask], obj); m_tailIndex = tail + 1; } else { // We need to contend with foreign pops, so we lock. bool lockTaken = false; try { m_foreignLock.Enter(ref lockTaken);
int head = m_headIndex; int count = m_tailIndex - m_headIndex;
// If there is still space (one left), just add the element. if (count >= m_mask) { // We're full; expand the queue by doubling its size. var newArray = new object?[m_array.Length << 1]; for (int i = 0; i < m_array.Length; i++) newArray[i] = m_array[(i + head) & m_mask];
// Reset the field values, incl. the mask. m_array = newArray; m_headIndex = 0; m_tailIndex = tail = count; m_mask = (m_mask << 1) | 1; }
Volatile.Write(ref m_array[tail & m_mask], obj); m_tailIndex = tail + 1; } finally { if (lockTaken) m_foreignLock.Exit(useMemoryBarrier: false); } } }
|
public void Add(T item) => GetCurrentThreadWorkStealingQueue(forceCreate: true)! .LocalPush(item, ref _emptyToNonEmptyListTransitionCount);
|