[Adapdev-commits] Adapdev/src/Adapdev/Threading CallerThreadContext.cs,NONE,1.1 Exceptions.cs,NONE,1
Status: Beta
Brought to you by:
intesar66
From: Sean M. <int...@us...> - 2005-11-14 04:14:21
|
Update of /cvsroot/adapdev/Adapdev/src/Adapdev/Threading In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv15710/src/Adapdev/Threading Added Files: CallerThreadContext.cs Exceptions.cs SmartThreadPool.cs STPStartInfo.cs WorkItem.cs WorkItemsQueue.cs Log Message: Added new objects for multi-threading Modified ForeignKeyAssociation to display the full foreign key name Fixed some bugs w/ the unit test multi-threading --- NEW FILE: WorkItem.cs --- // Ami Bar // am...@gm... using System; using System.Threading; using System.Diagnostics; namespace Adapdev.Threading { #region WorkItem Delegate /// <summary> /// A delegate that represents the method to run as the work item /// </summary> /// <param name="state">A state object for the method to run</param> public delegate object WorkItemCallback(object state); /// <summary> /// A delegate to call after the WorkItemCallback completed /// </summary> /// <param name="wir">The work item result object</param> public delegate void PostExecuteWorkItemCallback(IWorkItemResult wir); #endregion #region IWorkItemResult interface /// <summary> /// IWorkItemResult interface /// </summary> public interface IWorkItemResult { /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits. /// </summary> /// <returns>The result of the work item</returns> object GetResult(); /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits until timeout. /// </summary> /// <returns>The result of the work item</returns> /// On timeout throws WorkItemTimeoutException object GetResult( int millisecondsTimeout, bool exitContext); /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits until timeout. /// </summary> /// <returns>The result of the work item</returns> /// On timeout throws WorkItemTimeoutException object GetResult( TimeSpan timeout, bool exitContext); /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits until timeout or until the cancelWaitHandle is signaled. /// </summary> /// <param name="millisecondsTimeout">Timeout in milliseconds, or -1 for infinite</param> /// <param name="exitContext"> /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. /// </param> /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the blocking if needed</param> /// <returns>The result of the work item</returns> /// On timeout throws WorkItemTimeoutException /// On cancel throws WorkItemCancelException object GetResult( int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle); /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits until timeout or until the cancelWaitHandle is signaled. /// </summary> /// <returns>The result of the work item</returns> /// On timeout throws WorkItemTimeoutException /// On cancel throws WorkItemCancelException object GetResult( TimeSpan timeout, bool exitContext, WaitHandle cancelWaitHandle); /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits. /// </summary> /// <param name="e">Filled with the exception if one was thrown</param> /// <returns>The result of the work item</returns> object GetResult(out Exception e); /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits until timeout. /// </summary> /// <param name="e">Filled with the exception if one was thrown</param> /// <returns>The result of the work item</returns> /// On timeout throws WorkItemTimeoutException object GetResult( int millisecondsTimeout, bool exitContext, out Exception e); /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits until timeout. /// </summary> /// <param name="e">Filled with the exception if one was thrown</param> /// <returns>The result of the work item</returns> /// On timeout throws WorkItemTimeoutException object GetResult( TimeSpan timeout, bool exitContext, out Exception e); /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits until timeout or until the cancelWaitHandle is signaled. /// </summary> /// <param name="millisecondsTimeout">Timeout in milliseconds, or -1 for infinite</param> /// <param name="exitContext"> /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. /// </param> /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the blocking if needed</param> /// <param name="e">Filled with the exception if one was thrown</param> /// <returns>The result of the work item</returns> /// On timeout throws WorkItemTimeoutException /// On cancel throws WorkItemCancelException object GetResult( int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle, out Exception e); /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits until timeout or until the cancelWaitHandle is signaled. /// </summary> /// <returns>The result of the work item</returns> /// <param name="e">Filled with the exception if one was thrown</param> /// On timeout throws WorkItemTimeoutException /// On cancel throws WorkItemCancelException object GetResult( TimeSpan timeout, bool exitContext, WaitHandle cancelWaitHandle, out Exception e); /// <summary> /// Gets an indication whether the asynchronous operation has completed. /// </summary> bool IsCompleted { get; } /// <summary> /// Gets an indication whether the asynchronous operation has been canceled. /// </summary> bool IsCanceled { get; } /// <summary> /// Gets a user-defined object that qualifies or contains information about an asynchronous operation. /// </summary> object State { get; } /// <summary> /// Cancel the work item if it didn't start running yet. /// </summary> /// <returns>Returns true on success or false if the work item is in progress or already completed</returns> bool Cancel(); } #endregion #region WorkItem class /// <summary> /// Holds a callback delegate and the state for that delegate. /// </summary> internal class WorkItem : IDisposable { #region WorkItemState enum /// <summary> /// Indicates the state of the work item in the thread pool /// </summary> private enum WorkItemState { InQueue, InProgress, Completed, Canceled, } #endregion #region Member Variables /// <summary> /// Callback delegate for the callback. /// </summary> private WorkItemCallback _callback; /// <summary> /// Callback delegate for the the post execute. /// </summary> private PostExecuteWorkItemCallback _postExecuteWorkItemCallback; /// <summary> /// State with which to call the callback delegate. /// </summary> private object _state; /// <summary> /// Stores the caller's context /// </summary> private CallerThreadContext _callerContext; /// <summary> /// Holds the result of the mehtod /// </summary> private object _result; /// <summary> /// Hold the exception if the method threw it /// </summary> private Exception _exception; /// <summary> /// Hold the state of the work item /// </summary> private WorkItemState _workItemState; /// <summary> /// A ManualResetEvent to indicate that the result is ready /// </summary> private ManualResetEvent _workItemCompleted; /// <summary> /// A reference count to the _workItemCompleted. /// When it reaches to zero _workItemCompleted is Closed /// </summary> private int _workItemCompletedRefCount; /// <summary> /// Represents the result state of the work item /// </summary> private WorkItemResult _workItemResult; /// <summary> /// Indicates when to call to the post execute /// </summary> private CallToPostExecute _callToPostExecute; #endregion #region Construction /// <summary> /// Initialize the callback holding object. /// </summary> /// <param name="callback">Callback delegate for the callback.</param> /// <param name="state">State with which to call the callback delegate.</param> /// /// We assume that the WorkItem object is created within the thread /// that meant to run the callback public WorkItem( WorkItemCallback callback, object state, bool useCallerContext, PostExecuteWorkItemCallback postExecuteWorkItemCallback, CallToPostExecute callToPostExecute) { if (useCallerContext) { _callerContext = CallerThreadContext.Capture(); } _postExecuteWorkItemCallback = postExecuteWorkItemCallback; _callToPostExecute = callToPostExecute; _callback = callback; _state = state; _workItemState = WorkItemState.InQueue; _workItemCompleted = null; _workItemCompletedRefCount = 0; _workItemResult = new WorkItemResult(this); } #endregion #region Methods /// <summary> /// Change the state of the work item to in progress if it wasn't canceled. /// </summary> /// <returns> /// Return true on success or false in case the work item was canceled. /// If the work item needs to run a post execute then the method will return true. /// </returns> public bool StartingWorkItem() { lock(this) { if (WorkItemState.Canceled == _workItemState) { bool result = false; if ((_postExecuteWorkItemCallback != null) && ((_callToPostExecute & CallToPostExecute.WhenWorkItemCanceled) == CallToPostExecute.WhenWorkItemCanceled)) { result = true; } return result; } Debug.Assert(WorkItemState.InQueue == _workItemState); SetWorkItemState(WorkItemState.InProgress); } return true; } /// <summary> /// Execute the work item and the post execute /// </summary> public void Execute() { CallToPostExecute currentCallToPostExecute = 0; // Execute the work item if we are in the correct state switch(_workItemState) { case WorkItemState.InProgress: currentCallToPostExecute |= CallToPostExecute.WhenWorkItemNotCanceled; ExecuteWorkItem(); break; case WorkItemState.Canceled: currentCallToPostExecute |= CallToPostExecute.WhenWorkItemCanceled; break; default: Debug.Assert(false); throw new NotSupportedException(); } // Run the post execute as needed if ((currentCallToPostExecute & _callToPostExecute) != 0) { PostExecute(); } } /// <summary> /// Execute the work item /// </summary> private void ExecuteWorkItem() { CallerThreadContext ctc = null; if (null != _callerContext) { ctc = CallerThreadContext.Capture(); CallerThreadContext.Apply(_callerContext); } Exception exception = null; object result = null; try { result = _callback(_state); } catch (Exception e) { // Save the exception so we can rethrow it later exception = e; } if (null != _callerContext) { CallerThreadContext.Apply(ctc); } SetResult(result, exception); } /// <summary> /// Runs the post execute callback /// </summary> private void PostExecute() { if (null != _postExecuteWorkItemCallback) { try { _postExecuteWorkItemCallback(this._workItemResult); } catch (Exception e) { Debug.Assert(null != e); } } } /// <summary> /// Set the result of the work item to return /// </summary> /// <param name="result">The result of the work item</param> internal void SetResult(object result, Exception exception) { _result = result; _exception = exception; SignalComplete(false); } /// <summary> /// Returns the work item result /// </summary> /// <returns>The work item result</returns> internal IWorkItemResult GetWorkItemResult() { return _workItemResult; } /// <summary> /// Wait for all work items to complete /// </summary> /// <param name="workItemResults">Array of work item result objects</param> /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param> /// <param name="exitContext"> /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. /// </param> /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param> /// <returns> /// true when every work item in workItemResults has completed; otherwise false. /// </returns> internal static bool WaitAll( IWorkItemResult [] workItemResults, int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle) { bool success; WaitHandle [] waitHandles = new WaitHandle[workItemResults.Length];; GetWaitHandles(workItemResults, waitHandles); if ((null == cancelWaitHandle) && (waitHandles.Length <= 64)) { success = WaitHandle.WaitAll(waitHandles, millisecondsTimeout, exitContext); } else { success = true; int millisecondsLeft = millisecondsTimeout; DateTime start = DateTime.Now; WaitHandle [] whs; if (null != cancelWaitHandle) { whs = new WaitHandle [] { null, cancelWaitHandle }; } else { whs = new WaitHandle [] { null }; } bool waitInfinitely = (Timeout.Infinite == millisecondsTimeout); // Iterate over the wait handles and wait for each one to complete. // We cannot use WaitHandle.WaitAll directly, because the cancelWaitHandle // won't affect it. // Each iteration we update the time left for the timeout. for(int i = 0; i < workItemResults.Length; ++i) { // WaitAny don't work with negative numbers if (!waitInfinitely && (millisecondsLeft < 0)) { success = false; break; } whs[0] = waitHandles[i]; int result = WaitHandle.WaitAny(whs, millisecondsLeft, exitContext); if((result > 0) || (WaitHandle.WaitTimeout == result)) { success = false; break; } if(!waitInfinitely) { // Update the time left to wait TimeSpan ts = DateTime.Now - start; millisecondsLeft = millisecondsTimeout - (int)ts.TotalMilliseconds; } } } // Release the wait handles ReleaseWaitHandles(workItemResults); return success; } /// <summary> /// Waits for any of the work items in the specified array to complete, cancel, or timeout /// </summary> /// <param name="workItemResults">Array of work item result objects</param> /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param> /// <param name="exitContext"> /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. /// </param> /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param> /// <returns> /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled. /// </returns> internal static int WaitAny( IWorkItemResult [] workItemResults, int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle) { WaitHandle [] waitHandles = null; if (null != cancelWaitHandle) { waitHandles = new WaitHandle[workItemResults.Length+1]; GetWaitHandles(workItemResults, waitHandles); waitHandles[workItemResults.Length] = cancelWaitHandle; } else { waitHandles = new WaitHandle[workItemResults.Length]; GetWaitHandles(workItemResults, waitHandles); } int result = WaitHandle.WaitAny(waitHandles, millisecondsTimeout, exitContext); // Treat cancel as timeout if (null != cancelWaitHandle) { if (result == workItemResults.Length) { result = WaitHandle.WaitTimeout; } } ReleaseWaitHandles(workItemResults); return result; } /// <summary> /// Fill an array of wait handles with the work items wait handles. /// </summary> /// <param name="workItemResults">An array of work item results</param> /// <param name="waitHandles">An array of wait handles to fill</param> private static void GetWaitHandles( IWorkItemResult [] workItemResults, WaitHandle [] waitHandles) { for(int i = 0; i < workItemResults.Length; ++i) { WorkItemResult wir = workItemResults[i] as WorkItemResult; Debug.Assert(null != wir, "All workItemResults must be WorkItemResult objects"); waitHandles[i] = wir.GetWorkItem().GetWaitHandle(); } } /// <summary> /// Release the work items' wait handles /// </summary> /// <param name="workItemResults">An array of work item results</param> private static void ReleaseWaitHandles(IWorkItemResult [] workItemResults) { for(int i = 0; i < workItemResults.Length; ++i) { WorkItemResult wir = workItemResults[i] as WorkItemResult; wir.GetWorkItem().ReleaseWaitHandle(); } } #endregion #region Private Members /// <summary> /// Sets the work item's state /// </summary> /// <param name="workItemState">The state to set the work item to</param> private void SetWorkItemState(WorkItemState workItemState) { lock(this) { _workItemState = workItemState; } } /// <summary> /// Signals that work item has been completed or canceled /// </summary> /// <param name="canceled">Indicates that the work item has been canceled</param> private void SignalComplete(bool canceled) { SetWorkItemState(canceled ? WorkItemState.Canceled : WorkItemState.Completed); lock(this) { // If someone is waiting then signal. if (null != _workItemCompleted) { _workItemCompleted.Set(); } } } #endregion #region Members exposed by WorkItemResult /// <summary> /// Cancel the work item if it didn't start running yet. /// </summary> /// <returns>Returns true on success or false if the work item is in progress or already completed</returns> private bool Cancel() { lock(this) { switch(_workItemState) { case WorkItemState.Canceled: //Debug.WriteLine("Work item already canceled"); return true; case WorkItemState.Completed: case WorkItemState.InProgress: //Debug.WriteLine("Work item cannot be canceled"); return false; case WorkItemState.InQueue: // Signal to the wait for completion that the work // item has been completed (canceled). There is no // reason to wait for it to get out of the queue SignalComplete(true); //Debug.WriteLine("Work item canceled"); return true; } } return false; } /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits for the result, timeout, or cancel. /// In case of error the method throws and exception /// </summary> /// <returns>The result of the work item</returns> private object GetResult( int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle) { Exception e = null; object result = GetResult(millisecondsTimeout, exitContext, cancelWaitHandle, out e); if (null != e) { throw e; } return result; } /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits for the result, timeout, or cancel. /// In case of error the e argument is filled with the exception /// </summary> /// <returns>The result of the work item</returns> private object GetResult( int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle, out Exception e) { e = null; // Check for cancel if (WorkItemState.Canceled == _workItemState) { throw new WorkItemCancelException("Work item canceled"); } // Check for completion if (IsCompleted) { e = _exception; return _result; } // If no cancelWaitHandle is provided if (null == cancelWaitHandle) { WaitHandle wh = GetWaitHandle(); bool timeout = !wh.WaitOne(millisecondsTimeout, exitContext); ReleaseWaitHandle(); if (timeout) { throw new WorkItemTimeoutException("Work item timeout"); } } else { WaitHandle wh = GetWaitHandle(); int result = WaitHandle.WaitAny(new WaitHandle[] { wh, cancelWaitHandle }); ReleaseWaitHandle(); switch(result) { case 0: // The work item signaled // Note that the signal could be also as a result of canceling the // work item (not the get result) break; case 1: case WaitHandle.WaitTimeout: throw new WorkItemTimeoutException("Work item timeout"); default: Debug.Assert(false); break; } } // Check for cancel if (WorkItemState.Canceled == _workItemState) { throw new WorkItemCancelException("Work item canceled"); } Debug.Assert(IsCompleted); e = _exception; // Return the result return _result; } /// <summary> /// A wait handle to wait for completion, cancel, or timeout /// </summary> private WaitHandle GetWaitHandle() { lock(this) { if (null == _workItemCompleted) { _workItemCompleted = new ManualResetEvent(IsCompleted); } ++_workItemCompletedRefCount; } return _workItemCompleted; } private void ReleaseWaitHandle() { lock(this) { if (null != _workItemCompleted) { --_workItemCompletedRefCount; if (0 == _workItemCompletedRefCount) { _workItemCompleted.Close(); _workItemCompleted = null; } } } } /// <summary> /// Returns true when the work item has completed or canceled /// </summary> private bool IsCompleted { get { lock(this) { return ((_workItemState == WorkItemState.Completed) || (_workItemState == WorkItemState.Canceled)); } } } /// <summary> /// Returns true when the work item has canceled /// </summary> public bool IsCanceled { get { lock(this) { return (_workItemState == WorkItemState.Canceled); } } } #endregion #region WorkItemResult class private class WorkItemResult : IWorkItemResult { /// <summary> /// A back reference to the work item /// </summary> private WorkItem _workItem; public WorkItemResult(WorkItem workItem) { _workItem = workItem; } internal WorkItem GetWorkItem() { return _workItem; } #region IWorkItemResult Members public bool IsCompleted { get { return _workItem.IsCompleted; } } public bool IsCanceled { get { return _workItem.IsCanceled; } } public object GetResult() { return _workItem.GetResult(Timeout.Infinite, true, null); } public object GetResult(int millisecondsTimeout, bool exitContext) { return _workItem.GetResult(millisecondsTimeout, exitContext, null); } public object GetResult(TimeSpan timeout, bool exitContext) { return _workItem.GetResult((int)timeout.TotalMilliseconds, exitContext, null); } public object GetResult(int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle) { return _workItem.GetResult(millisecondsTimeout, exitContext, cancelWaitHandle); } public object GetResult(TimeSpan timeout, bool exitContext, WaitHandle cancelWaitHandle) { return _workItem.GetResult((int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle); } public object GetResult(out Exception e) { return _workItem.GetResult(Timeout.Infinite, true, null, out e); } public object GetResult(int millisecondsTimeout, bool exitContext, out Exception e) { return _workItem.GetResult(millisecondsTimeout, exitContext, null, out e); } public object GetResult(TimeSpan timeout, bool exitContext, out Exception e) { return _workItem.GetResult((int)timeout.TotalMilliseconds, exitContext, null, out e); } public object GetResult(int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle, out Exception e) { return _workItem.GetResult(millisecondsTimeout, exitContext, cancelWaitHandle, out e); } public object GetResult(TimeSpan timeout, bool exitContext, WaitHandle cancelWaitHandle, out Exception e) { return _workItem.GetResult((int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle, out e); } public bool Cancel() { return _workItem.Cancel(); } public object State { get { return _workItem._state; } } #endregion } #endregion #region IDisposable Members public void Dispose() { IDisposable disp = _state as IDisposable; if (null != disp) { disp.Dispose(); _state = null; } } #endregion } #endregion } --- NEW FILE: WorkItemsQueue.cs --- // Ami Bar // am...@gm... using System; using System.Collections; using System.Threading; namespace Adapdev.Threading { #region WorkItemsQueue class /// <summary> /// WorkItemsQueue class. /// </summary> internal class WorkItemsQueue : IDisposable { #region Member variables /// /// Waiters queue (implemented as stack). /// The head is a dummy and is not used. /// private WaiterEntry _headWaiterEntry = new WaiterEntry(); /// <summary> /// Waiters count /// </summary> private int _waitersCount = 0; /// <summary> /// Work items queue /// </summary> private Queue _workItems = new Queue(); /// <summary> /// Indicate that work items are allowed to be queued /// </summary> private bool _isWorkItemsQueueActive = true; /// <summary> /// A slot in the thread local storage to save some data /// </summary> private static readonly LocalDataStoreSlot _slot = Thread.AllocateDataSlot(); private bool _isDisposed = false; #endregion #region Public properties /// <summary> /// Returns the current number of work items in the queue /// </summary> public int Count { get { lock(this) { ValidateNotDisposed(); return _workItems.Count; } } } #endregion #region Public methods /// <summary> /// Enqueue a work item to the queue. /// </summary> public bool EnqueueWorkItem(WorkItem workItem) { // A work item cannot be null, since null is used in the // WaitForWorkItem() method to indicate timeout or cancel if (null == workItem) { throw new ArgumentNullException("workItem" , "workItem cannot be null"); } bool enqueue = true; // First check if there is a waiter waiting for work item. During // the check, timed out waiters are ignored. If there is no // waiter then the work item is queued. lock(this) { ValidateNotDisposed(); if (!_isWorkItemsQueueActive) { return false; } while(_waitersCount > 0) { // Dequeue a waiter. WaiterEntry waiterEntry = PopWaiter(); // Signal the waiter. On success break the loop if (waiterEntry.Signal(workItem)) { enqueue = false; break; } } if (enqueue) { // Enqueue the work item _workItems.Enqueue(workItem); } } return true; } /// <summary> /// Waits for a work item or exits on timeout or cancel /// </summary> /// <param name="millisecondsTimeout">Timeout in milliseconds</param> /// <param name="cancelEvent">Cancel wait handle</param> /// <returns>Returns true if the resource was granted</returns> public WorkItem DequeueWorkItem( int millisecondsTimeout, WaitHandle cancelEvent) { /// This method cause the caller to wait for a work item. /// If there is at least one waiting work item then the /// method returns immidiately with true. /// /// If there are no waiting work items then the caller /// is queued between other waiters for a work item to arrive. /// /// If a work item didn't come within millisecondsTimeout or /// the user canceled the wait by signaling the cancelEvent /// then the method returns false to indicate that the caller /// didn't get a work item. WaiterEntry waiterEntry = null; WorkItem workItem = null; lock(this) { ValidateNotDisposed(); // If there are waiting work items then take one and return. if (_workItems.Count > 0) { workItem = _workItems.Dequeue() as WorkItem; return workItem; } // No waiting work items ... else { // Get the wait entry for the waiters queue waiterEntry = GetThreadWaiterEntry(); // Put the waiter with the other waiters PushWaiter(waiterEntry); } } // Prepare array of wait handle for the WaitHandle.WaitAny() WaitHandle [] waitHandles = new WaitHandle [] { waiterEntry.WaitHandle, cancelEvent }; // Wait for an available resource, cancel event, or timeout. // During the wait we are supposes to exit the synchronization // domain. (Placing true as the third argument of the WaitAny()) // It just doesn't work, I don't know why, so I have lock(this) // statments insted of one. int index = WaitHandle.WaitAny( waitHandles, millisecondsTimeout, true); lock(this) { // success is true if it got a work item. bool success = (0 == index); // The timeout variable is used only for readability. // (We treat cancel as timeout) bool timeout = !success; // On timeout update the waiterEntry that it is timed out if (timeout) { // The Timeout() fails if the waiter has already been signaled timeout = waiterEntry.Timeout(); // On timeout remove the waiter from the queue. // Note that the complexity is O(1). if(timeout) { RemoveWaiter(waiterEntry, false); } // Again readability success = !timeout; } // On success return the work item if (success) { workItem = waiterEntry.WorkItem; if (null == workItem) { workItem = _workItems.Dequeue() as WorkItem; } } } // On failure return null. return workItem; } /// <summary> /// Cleanup the work items queue, hence no more work /// items are allowed to be queue /// </summary> protected virtual void Cleanup() { lock(this) { // Deactivate only once if (!_isWorkItemsQueueActive) { return; } // Don't queue more work items _isWorkItemsQueueActive = false; foreach(WorkItem workItem in _workItems) { workItem.Dispose(); } // Clear the work items that are already queued _workItems.Clear(); // Note: // I don't iterate over the queue and dispose of work items's states, // since if a work item has a state object that is still in use in the // application then I must not dispose it. // Tell the waiters that they were timed out. // It won't signal them to exit, but to ignore their // next work item. while(_waitersCount > 0) { WaiterEntry waiterEntry = PopWaiter(); waiterEntry.Timeout(); } } } #endregion #region Private methods /// <summary> /// Returns the WaiterEntry of the current thread /// </summary> /// <returns></returns> /// In order to avoid creation and destuction of WaiterEntry /// objects each thread stores its own WaiterEntry. private WaiterEntry GetThreadWaiterEntry() { WaiterEntry waiterEntry = Thread.GetData(_slot) as WaiterEntry; if (null == waiterEntry) { waiterEntry = new WaiterEntry(); Thread.SetData(_slot, waiterEntry); } waiterEntry.Reset(); return waiterEntry; } #region Waiters stack methods /// /// Push a new waiter into the waiter's stack /// /// A waiter to put in the stack private void PushWaiter(WaiterEntry newWaiterEntry) { // Remove the waiter if it is already in the stack and // update waiter's count as needed RemoveWaiter(newWaiterEntry, false); // If the stack is empty then newWaiterEntry is the new head of the stack if (null == _headWaiterEntry._nextWaiterEntry) { _headWaiterEntry._nextWaiterEntry = newWaiterEntry; newWaiterEntry._prevWaiterEntry = _headWaiterEntry; } // If the stack is not empty then put newWaiterEntry as the new head // of the stack. else { // Save the old first waiter entry WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry; // Update the links _headWaiterEntry._nextWaiterEntry = newWaiterEntry; newWaiterEntry._nextWaiterEntry = oldFirstWaiterEntry; newWaiterEntry._prevWaiterEntry = _headWaiterEntry; oldFirstWaiterEntry._prevWaiterEntry = newWaiterEntry; } // Increment the number of waiters ++_waitersCount; } /// /// Pop a waiter from the waiter's stack /// /// Returns the first waiter in the stack private WaiterEntry PopWaiter() { // Store the current stack head WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry; // Store the new stack head WaiterEntry newHeadWaiterEntry = oldFirstWaiterEntry._nextWaiterEntry; // Update the old stack head list links and decrement the number // waiters. RemoveWaiter(oldFirstWaiterEntry, true); // Update the new stack head _headWaiterEntry._nextWaiterEntry = newHeadWaiterEntry; if (null != newHeadWaiterEntry) { newHeadWaiterEntry._prevWaiterEntry = _headWaiterEntry; } // Return the old stack head return oldFirstWaiterEntry; } /// <summary> /// Remove a waiter from the stack /// </summary> /// <param name="waiterEntry">A waiter entry to remove</param> /// <param name="popDecrement">If true the waiter count is always decremented</param> private void RemoveWaiter(WaiterEntry waiterEntry, bool popDecrement) { // Store the prev entry in the list WaiterEntry prevWaiterEntry = waiterEntry._prevWaiterEntry; // Store the next entry in the list WaiterEntry nextWaiterEntry = waiterEntry._nextWaiterEntry; // A flag to indicate if we need to decrement the waiters count. // If we got here from PopWaiter then we must decrement. // If we got here from PushWaiter then we decrement only if // the waiter was already in the stack. bool decrementCounter = popDecrement; // Null the waiter's entry links waiterEntry._prevWaiterEntry = null; waiterEntry._nextWaiterEntry = null; // If the waiter entry had a prev link then update it. // It also means that the waiter is already in the list and we // need to decrement the waiters count. if (null != prevWaiterEntry) { prevWaiterEntry._nextWaiterEntry = nextWaiterEntry; decrementCounter = true; } // If the waiter entry had a next link then update it. // It also means that the waiter is already in the list and we // need to decrement the waiters count. if (null != nextWaiterEntry) { nextWaiterEntry._prevWaiterEntry = prevWaiterEntry; decrementCounter = true; } // Decrement the waiters count if needed if (decrementCounter) { --_waitersCount; } } #endregion #endregion #region WaiterEntry class // A waiter entry in the _waiters queue. private class WaiterEntry : IDisposable { #region Member variables /// <summary> /// Event to signal the waiter that it got the work item. /// </summary> private AutoResetEvent _waitHandle = new AutoResetEvent(false); /// <summary> /// Flag to know if this waiter already quited from the queue /// because of a timeout. /// </summary> private bool _isTimedout = false; /// <summary> /// Flag to know if the waiter was signaled and got a work item. /// </summary> private bool _isSignaled = false; /// <summary> /// A work item that passed directly to the waiter withou going /// through the queue /// </summary> private WorkItem _workItem = null; private bool _isDisposed = false; // Linked list members internal WaiterEntry _nextWaiterEntry = null; internal WaiterEntry _prevWaiterEntry = null; #endregion #region Construction public WaiterEntry() { Reset(); } #endregion #region Public methods public WaitHandle WaitHandle { get { return _waitHandle; } } public WorkItem WorkItem { get { lock(this) { return _workItem; } } } /// <summary> /// Signal the waiter that it got a work item. /// </summary> /// <returns>Return true on success</returns> /// The method fails if Timeout() preceded its call public bool Signal(WorkItem workItem) { lock(this) { if (!_isTimedout) { _workItem = workItem; _isSignaled = true; _waitHandle.Set(); return true; } } return false; } /// <summary> /// Mark the wait entry that it has been timed out /// </summary> /// <returns>Return true on success</returns> /// The method fails if Signal() preceded its call public bool Timeout() { lock(this) { // Time out can happen only if the waiter wasn't marked as // signaled if (!_isSignaled) { // We don't remove the waiter from the queue, the DequeueWorkItem // method skips _waiters that were timed out. _isTimedout = true; return true; } } return false; } /// <summary> /// Reset the wait entry so it can be used again /// </summary> public void Reset() { _workItem = null; _isTimedout = false; _isSignaled = false; _waitHandle.Reset(); } /// <summary> /// Free resources /// </summary> public void Close() { if (null != _waitHandle) { _waitHandle.Close(); _waitHandle = null; } } #endregion #region IDisposable Members public void Dispose() { if (!_isDisposed) { Close(); _isDisposed = true; } } ~WaiterEntry() { Dispose(); } #endregion } #endregion #region IDisposable Members public void Dispose() { if (!_isDisposed) { Cleanup(); _isDisposed = true; GC.SuppressFinalize(this); } } ~WorkItemsQueue() { Cleanup(); } private void ValidateNotDisposed() { if(_isDisposed) { throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown"); } } #endregion } #endregion } --- NEW FILE: CallerThreadContext.cs --- // Ami Bar // am...@gm... using System; using System.Threading; using System.Globalization; using System.Security.Principal; using System.Reflection; using System.Runtime.Remoting.Contexts; namespace Adapdev.Threading { #region CallerThreadContext class /// <summary> /// This class stores the caller thread context in order to restore /// it when the work item is executed in the context of the thread /// from the pool. /// Note that we can't store the thread's CompressedStack, because /// it throws a security exception /// </summary> internal class CallerThreadContext { private CultureInfo _culture = null; private CultureInfo _cultureUI = null; private IPrincipal _principal; private System.Runtime.Remoting.Contexts.Context _context; private static FieldInfo _fieldInfo = GetFieldInfo(); private static FieldInfo GetFieldInfo() { Type threadType = typeof(Thread); return threadType.GetField( "m_Context", BindingFlags.Instance | BindingFlags.NonPublic); } /// <summary> /// Constructor /// </summary> private CallerThreadContext() { } /// <summary> /// Captures the current thread context /// </summary> /// <returns></returns> public static CallerThreadContext Capture() { CallerThreadContext callerThreadContext = new CallerThreadContext(); Thread thread = Thread.CurrentThread; callerThreadContext._culture = thread.CurrentCulture; callerThreadContext._cultureUI = thread.CurrentUICulture; callerThreadContext._principal = Thread.CurrentPrincipal; callerThreadContext._context = Thread.CurrentContext; return callerThreadContext; } /// <summary> /// Applies the thread context stored earlier /// </summary> /// <param name="callerThreadContext"></param> public static void Apply(CallerThreadContext callerThreadContext) { Thread thread = Thread.CurrentThread; thread.CurrentCulture = callerThreadContext._culture; thread.CurrentUICulture = callerThreadContext._cultureUI; Thread.CurrentPrincipal = callerThreadContext._principal; // Uncomment the following block to enable the Thread.CurrentThread /* if (null != _fieldInfo) { _fieldInfo.SetValue( Thread.CurrentThread, callerThreadContext._context); } */ } } #endregion } --- NEW FILE: STPStartInfo.cs --- // Ami Bar // am...@gm... using System; namespace Adapdev.Threading { /// <summary> /// Summary description for STPStartInfo. /// </summary> public class STPStartInfo { /// <summary> /// Idle timeout in milliseconds. /// If a thread is idle for _idleTimeout milliseconds then /// it may quit. /// </summary> private int _idleTimeout; /// <summary> /// The lower limit of threads in the pool. /// </summary> private int _minWorkerThreads; /// <summary> /// The upper limit of threads in the pool. /// </summary> private int _maxWorkerThreads; /// <summary> /// Use the caller's security context /// </summary> private bool _useCallerContext; /// <summary> /// Dispose of the state object of a work item /// </summary> private bool _disposeOfStateObjects; /// <summary> /// The option to run the post execute /// </summary> private CallToPostExecute _callToPostExecute; /// <summary> /// A post execute callback to call when none is provided in /// the QueueWorkItem method. /// </summary> private PostExecuteWorkItemCallback _postExecuteWorkItemCallback; public STPStartInfo() { _idleTimeout = SmartThreadPool.DefaultIdleTimeout; _minWorkerThreads = SmartThreadPool.DefaultMinWorkerThreads; _maxWorkerThreads = SmartThreadPool.DefaultMaxWorkerThreads; _useCallerContext = SmartThreadPool.DefaultUseCallerContext; _disposeOfStateObjects = SmartThreadPool.DefaultDisposeOfStateObjects; _callToPostExecute = SmartThreadPool.DefaultCallToPostExecute; _postExecuteWorkItemCallback = SmartThreadPool.DefaultPostExecuteWorkItemCallback; } public STPStartInfo(STPStartInfo stpStartInfo) { _idleTimeout = stpStartInfo._idleTimeout; _minWorkerThreads = stpStartInfo._minWorkerThreads; _maxWorkerThreads = stpStartInfo._maxWorkerThreads; _useCallerContext = stpStartInfo._useCallerContext; _disposeOfStateObjects = stpStartInfo._disposeOfStateObjects; _callToPostExecute = stpStartInfo._callToPostExecute; _postExecuteWorkItemCallback = stpStartInfo._postExecuteWorkItemCallback; } public int IdleTimeout { get { return _idleTimeout; } set { _idleTimeout = value; } } public int MinWorkerThreads { get { return _minWorkerThreads; } set { _minWorkerThreads = value; } } public int MaxWorkerThreads { get { return _maxWorkerThreads; } set { _maxWorkerThreads = value; } } public bool UseCallerContext { get { return _useCallerContext; } set { _useCallerContext = value; } } public bool DisposeOfStateObjects { get { return _disposeOfStateObjects; } set { _disposeOfStateObjects = value; } } public CallToPostExecute CallToPostExecute { get { return _callToPostExecute; } set { _callToPostExecute = value; } } public PostExecuteWorkItemCallback PostExecuteWorkItemCallback { get { return _postExecuteWorkItemCallback; } set { _postExecuteWorkItemCallback = value; } } } } --- NEW FILE: SmartThreadPool.cs --- // Ami Bar // am...@gm... // // Smart Thread Pool in C#. // 7 Aug 2004 - Initial release // 14 Sep 2004 - Bug fixes // 15 Oct 2004 - Added new features // - Work items return result. // - Support waiting synchronization for multiple work items. // - Work items can be cancelled. // - Passage of the caller threads context to the thread in the pool. // - Minimal usage of WIN32 handles. // - Minor bug fixes. // 26 Dec 2004 - Changes: // - Removed static constructors. // - Added finalizers. // - Changed Exceptions so they are serializable. // - Fixed the bug in one of the SmartThreadPool constructors. // - Changed the SmartThreadPool.WaitAll() so it will support any number of waiters. // The SmartThreadPool.WaitAny() is still limited by the .NET Framework. // - Added PostExecute with options on which cases to call it. // - Added option to dispose of the state objects. // - Added a WaitForIdle() method that waits until the work items queue is empty. // - Added an STPStartInfo class for the initialization of the thread pool. // - Changed exception handling so if a work item throws an exception it // is rethrown at GetResult(), rather then firing an UnhandledException event. // Note that PostExecute exception are always ignored. // 25 Mar 2005 - Fixed bug: // - Fixed bug where work items got lost. It could happen sometimes, but especially // when the idle timeout is small. // 3 Jul 2005 - Fixed bug: // - Fixed bug where Enqueue() throws an exception because PopWaiter() returned null, // hardly reconstructed // 16 Aug 2005 - Fixed bug: // - Fixed bug where the InUseThreads becomes negative when cancelling work items using System; using System.Security; using System.Threading; using System.Collections; using System.Diagnostics; namespace Adapdev.Threading { #region CallToPostExecute enumerator [Flags] public enum CallToPostExecute { Never = 0x00, WhenWorkItemCanceled = 0x01, WhenWorkItemNotCanceled = 0x02, Always = WhenWorkItemCanceled | WhenWorkItemNotCanceled, } #endregion #region SmartThreadPool class /// <summary> /// Smart thread pool class. /// </summary> public class SmartThreadPool : IDisposable { #region Default Constants /// <summary> /// Default minimum number of threads the thread pool contains. (0) /// </summary> public const int DefaultMinWorkerThreads = 0; /// <summary> /// Default maximum number of threads the thread pool contains. (25) /// </summary> public const int DefaultMaxWorkerThreads = 25; /// <summary> /// Default idle timeout in milliseconds. (One minute) /// </summary> public const int DefaultIdleTimeout = 60*1000; // One minute /// <summary> /// Indicate to copy the security context of the caller and then use it in the call. (false) /// </summary> public const bool DefaultUseCallerContext = false; /// <summary> /// Indicate to dispose of the state objects if they support the IDispose interface. (false) /// </summary> public const bool DefaultDisposeOfStateObjects = false; /// <summary> /// The default option to run the post execute /// </summary> public const CallToPostExecute DefaultCallToPostExecute = CallToPostExecute.Always; /// <summary> /// The default post execute method to run. /// When null it means not to call it. /// </summary> public static readonly PostExecuteWorkItemCallback DefaultPostExecuteWorkItemCallback = null; #endregion #region Member Variables /// <summary> /// Hashtable of all the threads in the thread pool. /// </summary> private Hashtable _workerThreads = Hashtable.Synchronized(new Hashtable()); /// <summary> /// Queue of work items. /// </summary> private WorkItemsQueue _workItemsQueue = new WorkItemsQueue(); /// <summary> /// Number of threads that currently work (not idle). /// </summary> private int _inUseWorkerThreads = 0; /// <summary> /// Start information to use. /// It is simpler than providing many constructors. /// </summary> private STPStartInfo _stpStartInfo = new STPStartInfo(); /// <summary> /// Total number of work items that are stored in the work items queue /// plus the work items that the threads in the pool are working on. /// </summary> private int _currentWorkItemsCount = 0; /// <summary> /// Signaled when the thread pool is idle, i.e. no thread is busy /// and the work items queue is empty /// </summary> private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); /// <summary> /// An event to signal all the threads to quit immediately. /// </summary> private ManualResetEvent _shuttingDownEvent = new ManualResetEvent(false); /// <summary> /// A flag to indicate the threads to quit. /// </summary> private bool _shutdown = false; /// <summary> /// Counts the threads created in the pool. /// It is used to name the threads. /// </summary> private int _threadCounter = 0; /// <summary> /// Indicate that the SmartThreadPool has been disposed /// </summary> private bool _isDisposed = false; #endregion #region Construction and Finalization /// <summary> /// Constructor /// </summary> public SmartThreadPool() { Start(); } /// <summary> /// Constructor /// </summary> /// <param name="idleTimeout">Idle timeout in milliseconds</param> public SmartThreadPool(int idleTimeout) { _stpStartInfo.IdleTimeout = idleTimeout; Start(); } /// <summary> /// Constructor /// </summary> /// <param name="idleTimeout">Idle timeout in milliseconds</param> /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param> public SmartThreadPool( int idleTimeout, int maxWorkerThreads) { _stpStartInfo.IdleTimeout = idleTimeout; _stpStartInfo.MaxWorkerThreads = maxWorkerThreads; Start(); } /// <summary> /// Constructor /// </summary> /// <param name="idleTimeout">Idle timeout in milliseconds</param> /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param> /// <param name="minWorkerThreads">Lower limit of threads in the pool</param> public SmartThreadPool( int idleTimeout, int maxWorkerThreads, int minWorkerThreads) { _stpStartInfo.IdleTimeout = idleTimeout; _stpStartInfo.MaxWorkerThreads = maxWorkerThreads; _stpStartInfo.MinWorkerThreads = minWorkerThreads; Start(); } /// <summary> /// Constructor /// </summary> /// <param name="idleTimeout">Idle timeout in milliseconds</param> /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param> /// <param name="minWorkerThreads">Lower limit of threads in the pool</param> /// <param name="useCallerContext">Indicate to copy the security context of the caller and then use it in the call</param> /// <param name="disposeOfStateObjects">Indicate to dispose of the state objects if they support the IDispose interface</param> public SmartThreadPool(STPStartInfo stpStartInfo) { _stpStartInfo = new STPStartInfo(stpStartInfo); Start(); } private void Start() { ValidateSTPStartInfo(); StartThreads(_stpStartInfo.MinWorkerThreads); } private void ValidateSTPStartInfo() { if (_stpStartInfo.MinWorkerThreads < 0) { throw new ArgumentOutOfRangeException( "MinWorkerThreads", "MinWorkerThreads cannot be negative"); } if (_stpStartInfo.MaxWorkerThreads <= 0) { throw new ArgumentOutOfRangeException( "MaxWorkerThreads", "MaxWorkerThreads must be greater than zero"); } if (_stpStartInfo.MinWorkerThreads > _stpStartInfo.MaxWorkerThreads) { throw new ArgumentOutOfRangeException( "MinWorkerThreads, maxWorkerThreads", "MaxWorkerThreads must be greater or equal to MinWorkerThreads"); } } #endregion #region Thread Processing /// <summary> /// Waits on the queue for a work item, shutdown, or timeout. /// </summary> /// <returns> /// Returns the WaitingCallback or null in case of timeout or shutdown. /// </returns> private WorkItem Dequeue() { WorkItem workItem = _workItemsQueue.DequeueWorkItem(_stpStartInfo.IdleTimeout, _shuttingDownEvent); return workItem; } /// <summary> /// Put a new work item in the queue /// </summary> /// <param name="workItem">A work item to queue</param> private void Enqueue(WorkItem workItem) { // Make sure the workItem is not null Debug.Assert(null != workItem); IncrementWorkItemsCount(); _workItemsQueue.EnqueueWorkItem(workItem); // If all the threads are busy then try to create a new one if ((InUseThreads + WaitingCallbacks) > _workerThreads.Count) { StartThreads(1); } } private void IncrementWorkItemsCount() { int count = Interlocked.Increment(ref _currentWorkItemsCount); if (count == 1) { _isIdleWaitHandle.Reset(); } } private void DecrementWorkItemsCount() { int count = Interlocked.Decrement(ref _currentWorkItemsCount); if (count == 0) { _isIdleWaitHandle.Set(); } } /// <summary> /// Inform that the current thread is about to quit or quiting. /// The same thread may call this method more than once. /// </summary> private void InformCompleted() { // There is no need to lock the two methods together // since only the current thread removes itself // and the _workerThreads is a synchronized hashtable if (_workerThreads.Contains(Thread.CurrentThread)) { _workerThreads.Remove(Thread.CurrentThread); } } /// <summary> /// Starts new threads /// </summary> /// <param name="threadsCount">The number of threads to start</param> private void StartThreads(int threadsCount) { lock(_workerThreads.SyncRoot) { // Don't start threads on shut down if (_shutdown) { return; } for(int i = 0; i < threadsCount; ++i) { // Don't create more threads then the upper limit if (_workerThreads.Count >= _stpStartInfo.MaxWorkerThreads) { return; } // Create a new thread Thread workerThread = new Thread(new ThreadStart(ProcessQueuedItems)); // Configure the new thread and start it workerThread.Name = "STP Thread #" + _threadCounter; workerThread.IsBackground = true; workerThread.Start(); ++_threadCounter; // Add the new thread to the hashtable and update its creation // time. _workerThreads[workerThread] = DateTime.Now; } } } /// <summary> /// A worker thread method that processes work items from the work items queue. /// </summary> private void ProcessQueuedItems() { try { bool bInUseWorkerThreadsWasIncremented = false; // Process until shutdown. while(!_shutdown) { // Update the last time this thread was seen alive. // It's good for debugging. _workerThreads[Thread.CurrentThread] = DateTime.Now; // Wait for a work item, shutdown, or timeout WorkItem workItem = Dequeue(); // Update the last time this thread was seen alive. // It's good for debugging. _workerThreads[Thread.CurrentThread] = DateTime.Now; // On timeout or shut down. if (null == workItem) { // Double lock for quit. if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads) { lock(_workerThreads.SyncRoot) { if (_workerThreads.Count > _stpStartInfo.MinWo... [truncated message content] |