[Adapdev-commits] Adapdev/src/Adapdev/Threading CallerThreadContext.cs,1.2,1.3 DelegateAdapter.cs,1.
Status: Beta
Brought to you by:
intesar66
From: Sean M. <int...@us...> - 2005-11-16 05:33:38
|
Update of /cvsroot/adapdev/Adapdev/src/Adapdev/Threading In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv19977/src/Adapdev/Threading Added Files: CallerThreadContext.cs DelegateAdapter.cs Exceptions.cs STPStartInfo.cs SmartThreadPool.cs ThreadPoolWait.cs WorkItem.cs WorkItemsQueue.cs Log Message: Reposting to the repository after it got hosed --- NEW FILE: WorkItem.cs --- // Ami Bar // am...@gm... using System; using System.Threading; using System.Diagnostics; namespace Adapdev.Threading { #region WorkItem Delegate /// <summary> /// A delegate that represents the method to run as the work item /// </summary> /// <param name="state">A state object for the method to run</param> public delegate object WorkItemCallback(object state); /// <summary> /// A delegate to call after the WorkItemCallback completed /// </summary> /// <param name="wir">The work item result object</param> public delegate void PostExecuteWorkItemCallback(IWorkItemResult wir); #endregion #region IWorkItemResult interface /// <summary> /// IWorkItemResult interface /// </summary> public interface IWorkItemResult { /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits. /// </summary> /// <returns>The result of the work item</returns> object GetResult(); /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits until timeout. /// </summary> /// <returns>The result of the work item</returns> /// On timeout throws WorkItemTimeoutException object GetResult( int millisecondsTimeout, bool exitContext); /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits until timeout. /// </summary> /// <returns>The result of the work item</returns> /// On timeout throws WorkItemTimeoutException object GetResult( TimeSpan timeout, bool exitContext); /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits until timeout or until the cancelWaitHandle is signaled. /// </summary> /// <param name="millisecondsTimeout">Timeout in milliseconds, or -1 for infinite</param> /// <param name="exitContext"> /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. /// </param> /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the blocking if needed</param> /// <returns>The result of the work item</returns> /// On timeout throws WorkItemTimeoutException /// On cancel throws WorkItemCancelException object GetResult( int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle); /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits until timeout or until the cancelWaitHandle is signaled. /// </summary> /// <returns>The result of the work item</returns> /// On timeout throws WorkItemTimeoutException /// On cancel throws WorkItemCancelException object GetResult( TimeSpan timeout, bool exitContext, WaitHandle cancelWaitHandle); /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits. /// </summary> /// <param name="e">Filled with the exception if one was thrown</param> /// <returns>The result of the work item</returns> object GetResult(out Exception e); /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits until timeout. /// </summary> /// <param name="e">Filled with the exception if one was thrown</param> /// <returns>The result of the work item</returns> /// On timeout throws WorkItemTimeoutException object GetResult( int millisecondsTimeout, bool exitContext, out Exception e); /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits until timeout. /// </summary> /// <param name="e">Filled with the exception if one was thrown</param> /// <returns>The result of the work item</returns> /// On timeout throws WorkItemTimeoutException object GetResult( TimeSpan timeout, bool exitContext, out Exception e); /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits until timeout or until the cancelWaitHandle is signaled. /// </summary> /// <param name="millisecondsTimeout">Timeout in milliseconds, or -1 for infinite</param> /// <param name="exitContext"> /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. /// </param> /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the blocking if needed</param> /// <param name="e">Filled with the exception if one was thrown</param> /// <returns>The result of the work item</returns> /// On timeout throws WorkItemTimeoutException /// On cancel throws WorkItemCancelException object GetResult( int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle, out Exception e); /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits until timeout or until the cancelWaitHandle is signaled. /// </summary> /// <returns>The result of the work item</returns> /// <param name="e">Filled with the exception if one was thrown</param> /// On timeout throws WorkItemTimeoutException /// On cancel throws WorkItemCancelException object GetResult( TimeSpan timeout, bool exitContext, WaitHandle cancelWaitHandle, out Exception e); /// <summary> /// Gets an indication whether the asynchronous operation has completed. /// </summary> bool IsCompleted { get; } /// <summary> /// Gets an indication whether the asynchronous operation has been canceled. /// </summary> bool IsCanceled { get; } /// <summary> /// Gets a user-defined object that qualifies or contains information about an asynchronous operation. /// </summary> object State { get; } /// <summary> /// Cancel the work item if it didn't start running yet. /// </summary> /// <returns>Returns true on success or false if the work item is in progress or already completed</returns> bool Cancel(); } #endregion #region WorkItem class /// <summary> /// Holds a callback delegate and the state for that delegate. /// </summary> internal class WorkItem : IDisposable { #region WorkItemState enum /// <summary> /// Indicates the state of the work item in the thread pool /// </summary> private enum WorkItemState { InQueue, InProgress, Completed, Canceled, } #endregion #region Member Variables /// <summary> /// Callback delegate for the callback. /// </summary> private WorkItemCallback _callback; /// <summary> /// Callback delegate for the the post execute. /// </summary> private PostExecuteWorkItemCallback _postExecuteWorkItemCallback; /// <summary> /// State with which to call the callback delegate. /// </summary> private object _state; /// <summary> /// Stores the caller's context /// </summary> private CallerThreadContext _callerContext; /// <summary> /// Holds the result of the mehtod /// </summary> private object _result; /// <summary> /// Hold the exception if the method threw it /// </summary> private Exception _exception; /// <summary> /// Hold the state of the work item /// </summary> private WorkItemState _workItemState; /// <summary> /// A ManualResetEvent to indicate that the result is ready /// </summary> private ManualResetEvent _workItemCompleted; /// <summary> /// A reference count to the _workItemCompleted. /// When it reaches to zero _workItemCompleted is Closed /// </summary> private int _workItemCompletedRefCount; /// <summary> /// Represents the result state of the work item /// </summary> private WorkItemResult _workItemResult; /// <summary> /// Indicates when to call to the post execute /// </summary> private CallToPostExecute _callToPostExecute; #endregion #region Construction /// <summary> /// Initialize the callback holding object. /// </summary> /// <param name="callback">Callback delegate for the callback.</param> /// <param name="state">State with which to call the callback delegate.</param> /// /// We assume that the WorkItem object is created within the thread /// that meant to run the callback public WorkItem( WorkItemCallback callback, object state, bool useCallerContext, PostExecuteWorkItemCallback postExecuteWorkItemCallback, CallToPostExecute callToPostExecute) { if (useCallerContext) { _callerContext = CallerThreadContext.Capture(); } _postExecuteWorkItemCallback = postExecuteWorkItemCallback; _callToPostExecute = callToPostExecute; _callback = callback; _state = state; _workItemState = WorkItemState.InQueue; _workItemCompleted = null; _workItemCompletedRefCount = 0; _workItemResult = new WorkItemResult(this); } #endregion #region Methods /// <summary> /// Change the state of the work item to in progress if it wasn't canceled. /// </summary> /// <returns> /// Return true on success or false in case the work item was canceled. /// If the work item needs to run a post execute then the method will return true. /// </returns> public bool StartingWorkItem() { lock(this) { if (WorkItemState.Canceled == _workItemState) { bool result = false; if ((_postExecuteWorkItemCallback != null) && ((_callToPostExecute & CallToPostExecute.WhenWorkItemCanceled) == CallToPostExecute.WhenWorkItemCanceled)) { result = true; } return result; } Debug.Assert(WorkItemState.InQueue == _workItemState); SetWorkItemState(WorkItemState.InProgress); } return true; } /// <summary> /// Execute the work item and the post execute /// </summary> public void Execute() { CallToPostExecute currentCallToPostExecute = 0; // Execute the work item if we are in the correct state switch(_workItemState) { case WorkItemState.InProgress: currentCallToPostExecute |= CallToPostExecute.WhenWorkItemNotCanceled; ExecuteWorkItem(); break; case WorkItemState.Canceled: currentCallToPostExecute |= CallToPostExecute.WhenWorkItemCanceled; break; default: Debug.Assert(false); throw new NotSupportedException(); } // Run the post execute as needed if ((currentCallToPostExecute & _callToPostExecute) != 0) { PostExecute(); } } /// <summary> /// Execute the work item /// </summary> private void ExecuteWorkItem() { CallerThreadContext ctc = null; if (null != _callerContext) { ctc = CallerThreadContext.Capture(); CallerThreadContext.Apply(_callerContext); } Exception exception = null; object result = null; try { result = _callback(_state); } catch (Exception e) { // Save the exception so we can rethrow it later exception = e; } if (null != _callerContext) { CallerThreadContext.Apply(ctc); } SetResult(result, exception); } /// <summary> /// Runs the post execute callback /// </summary> private void PostExecute() { if (null != _postExecuteWorkItemCallback) { try { _postExecuteWorkItemCallback(this._workItemResult); } catch (Exception e) { Debug.Assert(null != e); } } } /// <summary> /// Set the result of the work item to return /// </summary> /// <param name="result">The result of the work item</param> internal void SetResult(object result, Exception exception) { _result = result; _exception = exception; SignalComplete(false); } /// <summary> /// Returns the work item result /// </summary> /// <returns>The work item result</returns> internal IWorkItemResult GetWorkItemResult() { return _workItemResult; } /// <summary> /// Wait for all work items to complete /// </summary> /// <param name="workItemResults">Array of work item result objects</param> /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param> /// <param name="exitContext"> /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. /// </param> /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param> /// <returns> /// true when every work item in workItemResults has completed; otherwise false. /// </returns> internal static bool WaitAll( IWorkItemResult [] workItemResults, int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle) { bool success; WaitHandle [] waitHandles = new WaitHandle[workItemResults.Length];; GetWaitHandles(workItemResults, waitHandles); if ((null == cancelWaitHandle) && (waitHandles.Length <= 64)) { success = WaitHandle.WaitAll(waitHandles, millisecondsTimeout, exitContext); } else { success = true; int millisecondsLeft = millisecondsTimeout; DateTime start = DateTime.Now; WaitHandle [] whs; if (null != cancelWaitHandle) { whs = new WaitHandle [] { null, cancelWaitHandle }; } else { whs = new WaitHandle [] { null }; } bool waitInfinitely = (Timeout.Infinite == millisecondsTimeout); // Iterate over the wait handles and wait for each one to complete. // We cannot use WaitHandle.WaitAll directly, because the cancelWaitHandle // won't affect it. // Each iteration we update the time left for the timeout. for(int i = 0; i < workItemResults.Length; ++i) { // WaitAny don't work with negative numbers if (!waitInfinitely && (millisecondsLeft < 0)) { success = false; break; } whs[0] = waitHandles[i]; int result = WaitHandle.WaitAny(whs, millisecondsLeft, exitContext); if((result > 0) || (WaitHandle.WaitTimeout == result)) { success = false; break; } if(!waitInfinitely) { // Update the time left to wait TimeSpan ts = DateTime.Now - start; millisecondsLeft = millisecondsTimeout - (int)ts.TotalMilliseconds; } } } // Release the wait handles ReleaseWaitHandles(workItemResults); return success; } /// <summary> /// Waits for any of the work items in the specified array to complete, cancel, or timeout /// </summary> /// <param name="workItemResults">Array of work item result objects</param> /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param> /// <param name="exitContext"> /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. /// </param> /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param> /// <returns> /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled. /// </returns> internal static int WaitAny( IWorkItemResult [] workItemResults, int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle) { WaitHandle [] waitHandles = null; if (null != cancelWaitHandle) { waitHandles = new WaitHandle[workItemResults.Length+1]; GetWaitHandles(workItemResults, waitHandles); waitHandles[workItemResults.Length] = cancelWaitHandle; } else { waitHandles = new WaitHandle[workItemResults.Length]; GetWaitHandles(workItemResults, waitHandles); } int result = WaitHandle.WaitAny(waitHandles, millisecondsTimeout, exitContext); // Treat cancel as timeout if (null != cancelWaitHandle) { if (result == workItemResults.Length) { result = WaitHandle.WaitTimeout; } } ReleaseWaitHandles(workItemResults); return result; } /// <summary> /// Fill an array of wait handles with the work items wait handles. /// </summary> /// <param name="workItemResults">An array of work item results</param> /// <param name="waitHandles">An array of wait handles to fill</param> private static void GetWaitHandles( IWorkItemResult [] workItemResults, WaitHandle [] waitHandles) { for(int i = 0; i < workItemResults.Length; ++i) { WorkItemResult wir = workItemResults[i] as WorkItemResult; Debug.Assert(null != wir, "All workItemResults must be WorkItemResult objects"); waitHandles[i] = wir.GetWorkItem().GetWaitHandle(); } } /// <summary> /// Release the work items' wait handles /// </summary> /// <param name="workItemResults">An array of work item results</param> private static void ReleaseWaitHandles(IWorkItemResult [] workItemResults) { for(int i = 0; i < workItemResults.Length; ++i) { WorkItemResult wir = workItemResults[i] as WorkItemResult; wir.GetWorkItem().ReleaseWaitHandle(); } } #endregion #region Private Members /// <summary> /// Sets the work item's state /// </summary> /// <param name="workItemState">The state to set the work item to</param> private void SetWorkItemState(WorkItemState workItemState) { lock(this) { _workItemState = workItemState; } } /// <summary> /// Signals that work item has been completed or canceled /// </summary> /// <param name="canceled">Indicates that the work item has been canceled</param> private void SignalComplete(bool canceled) { SetWorkItemState(canceled ? WorkItemState.Canceled : WorkItemState.Completed); lock(this) { // If someone is waiting then signal. if (null != _workItemCompleted) { _workItemCompleted.Set(); } } } #endregion #region Members exposed by WorkItemResult /// <summary> /// Cancel the work item if it didn't start running yet. /// </summary> /// <returns>Returns true on success or false if the work item is in progress or already completed</returns> private bool Cancel() { lock(this) { switch(_workItemState) { case WorkItemState.Canceled: //Debug.WriteLine("Work item already canceled"); return true; case WorkItemState.Completed: case WorkItemState.InProgress: //Debug.WriteLine("Work item cannot be canceled"); return false; case WorkItemState.InQueue: // Signal to the wait for completion that the work // item has been completed (canceled). There is no // reason to wait for it to get out of the queue SignalComplete(true); //Debug.WriteLine("Work item canceled"); return true; } } return false; } /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits for the result, timeout, or cancel. /// In case of error the method throws and exception /// </summary> /// <returns>The result of the work item</returns> private object GetResult( int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle) { Exception e = null; object result = GetResult(millisecondsTimeout, exitContext, cancelWaitHandle, out e); if (null != e) { throw e; } return result; } /// <summary> /// Get the result of the work item. /// If the work item didn't run yet then the caller waits for the result, timeout, or cancel. /// In case of error the e argument is filled with the exception /// </summary> /// <returns>The result of the work item</returns> private object GetResult( int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle, out Exception e) { e = null; // Check for cancel if (WorkItemState.Canceled == _workItemState) { throw new WorkItemCancelException("Work item canceled"); } // Check for completion if (IsCompleted) { e = _exception; return _result; } // If no cancelWaitHandle is provided if (null == cancelWaitHandle) { WaitHandle wh = GetWaitHandle(); bool timeout = !wh.WaitOne(millisecondsTimeout, exitContext); ReleaseWaitHandle(); if (timeout) { throw new WorkItemTimeoutException("Work item timeout"); } } else { WaitHandle wh = GetWaitHandle(); int result = WaitHandle.WaitAny(new WaitHandle[] { wh, cancelWaitHandle }); ReleaseWaitHandle(); switch(result) { case 0: // The work item signaled // Note that the signal could be also as a result of canceling the // work item (not the get result) break; case 1: case WaitHandle.WaitTimeout: throw new WorkItemTimeoutException("Work item timeout"); default: Debug.Assert(false); break; } } // Check for cancel if (WorkItemState.Canceled == _workItemState) { throw new WorkItemCancelException("Work item canceled"); } Debug.Assert(IsCompleted); e = _exception; // Return the result return _result; } /// <summary> /// A wait handle to wait for completion, cancel, or timeout /// </summary> private WaitHandle GetWaitHandle() { lock(this) { if (null == _workItemCompleted) { _workItemCompleted = new ManualResetEvent(IsCompleted); } ++_workItemCompletedRefCount; } return _workItemCompleted; } private void ReleaseWaitHandle() { lock(this) { if (null != _workItemCompleted) { --_workItemCompletedRefCount; if (0 == _workItemCompletedRefCount) { _workItemCompleted.Close(); _workItemCompleted = null; } } } } /// <summary> /// Returns true when the work item has completed or canceled /// </summary> private bool IsCompleted { get { lock(this) { return ((_workItemState == WorkItemState.Completed) || (_workItemState == WorkItemState.Canceled)); } } } /// <summary> /// Returns true when the work item has canceled /// </summary> public bool IsCanceled { get { lock(this) { return (_workItemState == WorkItemState.Canceled); } } } #endregion #region WorkItemResult class private class WorkItemResult : IWorkItemResult { /// <summary> /// A back reference to the work item /// </summary> private WorkItem _workItem; public WorkItemResult(WorkItem workItem) { _workItem = workItem; } internal WorkItem GetWorkItem() { return _workItem; } #region IWorkItemResult Members public bool IsCompleted { get { return _workItem.IsCompleted; } } public bool IsCanceled { get { return _workItem.IsCanceled; } } public object GetResult() { return _workItem.GetResult(Timeout.Infinite, true, null); } public object GetResult(int millisecondsTimeout, bool exitContext) { return _workItem.GetResult(millisecondsTimeout, exitContext, null); } public object GetResult(TimeSpan timeout, bool exitContext) { return _workItem.GetResult((int)timeout.TotalMilliseconds, exitContext, null); } public object GetResult(int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle) { return _workItem.GetResult(millisecondsTimeout, exitContext, cancelWaitHandle); } public object GetResult(TimeSpan timeout, bool exitContext, WaitHandle cancelWaitHandle) { return _workItem.GetResult((int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle); } public object GetResult(out Exception e) { return _workItem.GetResult(Timeout.Infinite, true, null, out e); } public object GetResult(int millisecondsTimeout, bool exitContext, out Exception e) { return _workItem.GetResult(millisecondsTimeout, exitContext, null, out e); } public object GetResult(TimeSpan timeout, bool exitContext, out Exception e) { return _workItem.GetResult((int)timeout.TotalMilliseconds, exitContext, null, out e); } public object GetResult(int millisecondsTimeout, bool exitContext, WaitHandle cancelWaitHandle, out Exception e) { return _workItem.GetResult(millisecondsTimeout, exitContext, cancelWaitHandle, out e); } public object GetResult(TimeSpan timeout, bool exitContext, WaitHandle cancelWaitHandle, out Exception e) { return _workItem.GetResult((int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle, out e); } public bool Cancel() { return _workItem.Cancel(); } public object State { get { return _workItem._state; } } #endregion } #endregion #region IDisposable Members public void Dispose() { IDisposable disp = _state as IDisposable; if (null != disp) { disp.Dispose(); _state = null; } } #endregion } #endregion } --- NEW FILE: WorkItemsQueue.cs --- // Ami Bar // am...@gm... using System; using System.Collections; using System.Threading; namespace Adapdev.Threading { #region WorkItemsQueue class /// <summary> /// WorkItemsQueue class. /// </summary> internal class WorkItemsQueue : IDisposable { #region Member variables /// /// Waiters queue (implemented as stack). /// The head is a dummy and is not used. /// private WaiterEntry _headWaiterEntry = new WaiterEntry(); /// <summary> /// Waiters count /// </summary> private int _waitersCount = 0; /// <summary> /// Work items queue /// </summary> private Queue _workItems = new Queue(); /// <summary> /// Indicate that work items are allowed to be queued /// </summary> private bool _isWorkItemsQueueActive = true; /// <summary> /// A slot in the thread local storage to save some data /// </summary> private static readonly LocalDataStoreSlot _slot = Thread.AllocateDataSlot(); private bool _isDisposed = false; #endregion #region Public properties /// <summary> /// Returns the current number of work items in the queue /// </summary> public int Count { get { lock(this) { ValidateNotDisposed(); return _workItems.Count; } } } #endregion #region Public methods /// <summary> /// Enqueue a work item to the queue. /// </summary> public bool EnqueueWorkItem(WorkItem workItem) { // A work item cannot be null, since null is used in the // WaitForWorkItem() method to indicate timeout or cancel if (null == workItem) { throw new ArgumentNullException("workItem" , "workItem cannot be null"); } bool enqueue = true; // First check if there is a waiter waiting for work item. During // the check, timed out waiters are ignored. If there is no // waiter then the work item is queued. lock(this) { ValidateNotDisposed(); if (!_isWorkItemsQueueActive) { return false; } while(_waitersCount > 0) { // Dequeue a waiter. WaiterEntry waiterEntry = PopWaiter(); // Signal the waiter. On success break the loop if (waiterEntry.Signal(workItem)) { enqueue = false; break; } } if (enqueue) { // Enqueue the work item _workItems.Enqueue(workItem); } } return true; } /// <summary> /// Waits for a work item or exits on timeout or cancel /// </summary> /// <param name="millisecondsTimeout">Timeout in milliseconds</param> /// <param name="cancelEvent">Cancel wait handle</param> /// <returns>Returns true if the resource was granted</returns> public WorkItem DequeueWorkItem( int millisecondsTimeout, WaitHandle cancelEvent) { /// This method cause the caller to wait for a work item. /// If there is at least one waiting work item then the /// method returns immidiately with true. /// /// If there are no waiting work items then the caller /// is queued between other waiters for a work item to arrive. /// /// If a work item didn't come within millisecondsTimeout or /// the user canceled the wait by signaling the cancelEvent /// then the method returns false to indicate that the caller /// didn't get a work item. WaiterEntry waiterEntry = null; WorkItem workItem = null; lock(this) { ValidateNotDisposed(); // If there are waiting work items then take one and return. if (_workItems.Count > 0) { workItem = _workItems.Dequeue() as WorkItem; return workItem; } // No waiting work items ... else { // Get the wait entry for the waiters queue waiterEntry = GetThreadWaiterEntry(); // Put the waiter with the other waiters PushWaiter(waiterEntry); } } // Prepare array of wait handle for the WaitHandle.WaitAny() WaitHandle [] waitHandles = new WaitHandle [] { waiterEntry.WaitHandle, cancelEvent }; // Wait for an available resource, cancel event, or timeout. // During the wait we are supposes to exit the synchronization // domain. (Placing true as the third argument of the WaitAny()) // It just doesn't work, I don't know why, so I have lock(this) // statments insted of one. int index = WaitHandle.WaitAny( waitHandles, millisecondsTimeout, true); lock(this) { // success is true if it got a work item. bool success = (0 == index); // The timeout variable is used only for readability. // (We treat cancel as timeout) bool timeout = !success; // On timeout update the waiterEntry that it is timed out if (timeout) { // The Timeout() fails if the waiter has already been signaled timeout = waiterEntry.Timeout(); // On timeout remove the waiter from the queue. // Note that the complexity is O(1). if(timeout) { RemoveWaiter(waiterEntry, false); } // Again readability success = !timeout; } // On success return the work item if (success) { workItem = waiterEntry.WorkItem; if (null == workItem) { workItem = _workItems.Dequeue() as WorkItem; } } } // On failure return null. return workItem; } /// <summary> /// Cleanup the work items queue, hence no more work /// items are allowed to be queue /// </summary> protected virtual void Cleanup() { lock(this) { // Deactivate only once if (!_isWorkItemsQueueActive) { return; } // Don't queue more work items _isWorkItemsQueueActive = false; foreach(WorkItem workItem in _workItems) { workItem.Dispose(); } // Clear the work items that are already queued _workItems.Clear(); // Note: // I don't iterate over the queue and dispose of work items's states, // since if a work item has a state object that is still in use in the // application then I must not dispose it. // Tell the waiters that they were timed out. // It won't signal them to exit, but to ignore their // next work item. while(_waitersCount > 0) { WaiterEntry waiterEntry = PopWaiter(); waiterEntry.Timeout(); } } } #endregion #region Private methods /// <summary> /// Returns the WaiterEntry of the current thread /// </summary> /// <returns></returns> /// In order to avoid creation and destuction of WaiterEntry /// objects each thread stores its own WaiterEntry. private WaiterEntry GetThreadWaiterEntry() { WaiterEntry waiterEntry = Thread.GetData(_slot) as WaiterEntry; if (null == waiterEntry) { waiterEntry = new WaiterEntry(); Thread.SetData(_slot, waiterEntry); } waiterEntry.Reset(); return waiterEntry; } #region Waiters stack methods /// /// Push a new waiter into the waiter's stack /// /// A waiter to put in the stack private void PushWaiter(WaiterEntry newWaiterEntry) { // Remove the waiter if it is already in the stack and // update waiter's count as needed RemoveWaiter(newWaiterEntry, false); // If the stack is empty then newWaiterEntry is the new head of the stack if (null == _headWaiterEntry._nextWaiterEntry) { _headWaiterEntry._nextWaiterEntry = newWaiterEntry; newWaiterEntry._prevWaiterEntry = _headWaiterEntry; } // If the stack is not empty then put newWaiterEntry as the new head // of the stack. else { // Save the old first waiter entry WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry; // Update the links _headWaiterEntry._nextWaiterEntry = newWaiterEntry; newWaiterEntry._nextWaiterEntry = oldFirstWaiterEntry; newWaiterEntry._prevWaiterEntry = _headWaiterEntry; oldFirstWaiterEntry._prevWaiterEntry = newWaiterEntry; } // Increment the number of waiters ++_waitersCount; } /// /// Pop a waiter from the waiter's stack /// /// Returns the first waiter in the stack private WaiterEntry PopWaiter() { // Store the current stack head WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry; // Store the new stack head WaiterEntry newHeadWaiterEntry = oldFirstWaiterEntry._nextWaiterEntry; // Update the old stack head list links and decrement the number // waiters. RemoveWaiter(oldFirstWaiterEntry, true); // Update the new stack head _headWaiterEntry._nextWaiterEntry = newHeadWaiterEntry; if (null != newHeadWaiterEntry) { newHeadWaiterEntry._prevWaiterEntry = _headWaiterEntry; } // Return the old stack head return oldFirstWaiterEntry; } /// <summary> /// Remove a waiter from the stack /// </summary> /// <param name="waiterEntry">A waiter entry to remove</param> /// <param name="popDecrement">If true the waiter count is always decremented</param> private void RemoveWaiter(WaiterEntry waiterEntry, bool popDecrement) { // Store the prev entry in the list WaiterEntry prevWaiterEntry = waiterEntry._prevWaiterEntry; // Store the next entry in the list WaiterEntry nextWaiterEntry = waiterEntry._nextWaiterEntry; // A flag to indicate if we need to decrement the waiters count. // If we got here from PopWaiter then we must decrement. // If we got here from PushWaiter then we decrement only if // the waiter was already in the stack. bool decrementCounter = popDecrement; // Null the waiter's entry links waiterEntry._prevWaiterEntry = null; waiterEntry._nextWaiterEntry = null; // If the waiter entry had a prev link then update it. // It also means that the waiter is already in the list and we // need to decrement the waiters count. if (null != prevWaiterEntry) { prevWaiterEntry._nextWaiterEntry = nextWaiterEntry; decrementCounter = true; } // If the waiter entry had a next link then update it. // It also means that the waiter is already in the list and we // need to decrement the waiters count. if (null != nextWaiterEntry) { nextWaiterEntry._prevWaiterEntry = prevWaiterEntry; decrementCounter = true; } // Decrement the waiters count if needed if (decrementCounter) { --_waitersCount; } } #endregion #endregion #region WaiterEntry class // A waiter entry in the _waiters queue. private class WaiterEntry : IDisposable { #region Member variables /// <summary> /// Event to signal the waiter that it got the work item. /// </summary> private AutoResetEvent _waitHandle = new AutoResetEvent(false); /// <summary> /// Flag to know if this waiter already quited from the queue /// because of a timeout. /// </summary> private bool _isTimedout = false; /// <summary> /// Flag to know if the waiter was signaled and got a work item. /// </summary> private bool _isSignaled = false; /// <summary> /// A work item that passed directly to the waiter withou going /// through the queue /// </summary> private WorkItem _workItem = null; private bool _isDisposed = false; // Linked list members internal WaiterEntry _nextWaiterEntry = null; internal WaiterEntry _prevWaiterEntry = null; #endregion #region Construction public WaiterEntry() { Reset(); } #endregion #region Public methods public WaitHandle WaitHandle { get { return _waitHandle; } } public WorkItem WorkItem { get { lock(this) { return _workItem; } } } /// <summary> /// Signal the waiter that it got a work item. /// </summary> /// <returns>Return true on success</returns> /// The method fails if Timeout() preceded its call public bool Signal(WorkItem workItem) { lock(this) { if (!_isTimedout) { _workItem = workItem; _isSignaled = true; _waitHandle.Set(); return true; } } return false; } /// <summary> /// Mark the wait entry that it has been timed out /// </summary> /// <returns>Return true on success</returns> /// The method fails if Signal() preceded its call public bool Timeout() { lock(this) { // Time out can happen only if the waiter wasn't marked as // signaled if (!_isSignaled) { // We don't remove the waiter from the queue, the DequeueWorkItem // method skips _waiters that were timed out. _isTimedout = true; return true; } } return false; } /// <summary> /// Reset the wait entry so it can be used again /// </summary> public void Reset() { _workItem = null; _isTimedout = false; _isSignaled = false; _waitHandle.Reset(); } /// <summary> /// Free resources /// </summary> public void Close() { if (null != _waitHandle) { _waitHandle.Close(); _waitHandle = null; } } #endregion #region IDisposable Members public void Dispose() { if (!_isDisposed) { Close(); _isDisposed = true; } } ~WaiterEntry() { Dispose(); } #endregion } #endregion #region IDisposable Members public void Dispose() { if (!_isDisposed) { Cleanup(); _isDisposed = true; GC.SuppressFinalize(this); } } ~WorkItemsQueue() { Cleanup(); } private void ValidateNotDisposed() { if(_isDisposed) { throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown"); } } #endregion } #endregion } --- NEW FILE: CallerThreadContext.cs --- // Ami Bar // am...@gm... using System; using System.Threading; using System.Globalization; using System.Security.Principal; using System.Reflection; using System.Runtime.Remoting.Contexts; namespace Adapdev.Threading { #region CallerThreadContext class /// <summary> /// This class stores the caller thread context in order to restore /// it when the work item is executed in the context of the thread /// from the pool. /// Note that we can't store the thread's CompressedStack, because /// it throws a security exception /// </summary> internal class CallerThreadContext { private CultureInfo _culture = null; private CultureInfo _cultureUI = null; private IPrincipal _principal; private System.Runtime.Remoting.Contexts.Context _context; private static FieldInfo _fieldInfo = GetFieldInfo(); private static FieldInfo GetFieldInfo() { Type threadType = typeof(Thread); return threadType.GetField( "m_Context", BindingFlags.Instance | BindingFlags.NonPublic); } /// <summary> /// Constructor /// </summary> private CallerThreadContext() { } /// <summary> /// Captures the current thread context /// </summary> /// <returns></returns> public static CallerThreadContext Capture() { CallerThreadContext callerThreadContext = new CallerThreadContext(); Thread thread = Thread.CurrentThread; callerThreadContext._culture = thread.CurrentCulture; callerThreadContext._cultureUI = thread.CurrentUICulture; callerThreadContext._principal = Thread.CurrentPrincipal; callerThreadContext._context = Thread.CurrentContext; return callerThreadContext; } /// <summary> /// Applies the thread context stored earlier /// </summary> /// <param name="callerThreadContext"></param> public static void Apply(CallerThreadContext callerThreadContext) { Thread thread = Thread.CurrentThread; thread.CurrentCulture = callerThreadContext._culture; thread.CurrentUICulture = callerThreadContext._cultureUI; Thread.CurrentPrincipal = callerThreadContext._principal; // Uncomment the following block to enable the Thread.CurrentThread /* if (null != _fieldInfo) { _fieldInfo.SetValue( Thread.CurrentThread, callerThreadContext._context); } */ } } #endregion } --- NEW FILE: DelegateAdapter.cs --- #region Original Copyright 2003 Richard Lowe /** Taken from the following blog: http://blogs.geekdojo.net/richard/archive/2003/12/19/492.aspx Usage: using System; using System.Collections; using System.Threading; using Timing; using DelegateAdapter; public class Program { // Create any method and a corresponding delegate: public delegate double WorkMethodHandler(double factor, string name); public static double WorkMethod(double factor, string name) { Console.WriteLine(name); return 3.14159 * factor; } public static void Main() { // Create the DelegateAdapter with the appropriate method and arguments: DelegateAdapter adapter = new DelegateAdapter(new WorkMethodHandler(WorkMethod), 3.123456789, "Richard"); // Automatically creates new ThreadStart and passes to the Thread constructor. // The adapter is implicitly convertible to a ThreadStart, which is why this works. Thread worker = new Thread(adapter); // change the arguments: adapter.Args = new object[] {9.14159d, "Roberto"}; // run it: worker.Start(); // wait to exit: worker.Join(); // get result: Console.WriteLine(adapter.ReturnValue); } } **/ #endregion #region Modified Copyright / License Information /* Copyright 2004 - 2005 Adapdev Technologies, LLC Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ============================ Author Log ============================ III Full Name SMM Sean McCormack (Adapdev) ============================ Change Log ============================ III MMDDYY Change */ #endregion namespace DelegateAdapter { using System; using System.Threading; /// <summary> /// Uses DynamicInvoke to allow any method to be easily mapped to the ThreadStart, WaitCallback or TimerCallback. /// </summary> public class DelegateAdapter { private object[] _args; public object[] Args { get { return _args; } set { _args = value; } } private Delegate _target; private object _returnValue; /// <summary> /// The return value, if any of the last execution of this DelegateAdapter's target method. /// </summary> public object ReturnValue { get { return _returnValue; } } /// <summary> /// Creates an instance of DelegateAdapter given any delegate. /// </summary> /// <param name="target">The delegate that will be invoked when one of the output delegates is invoked.</param> public DelegateAdapter(Delegate target) : this(target, null) { } /// <summary> /// Creates an instance of DelegateAdapter given any delegate and it's parameters to pass. /// </summary> /// <param name="target">The delegate that will be invoked when one of the output delegates is invoked.</param> /// <param name="args">The arguments that will be passed to the target delegate.</param> public DelegateAdapter(Delegate target, params object[] args) { _target = target; _args = args; } /// <summary> /// Dynamically invokes the target delegate with the provided arguments. /// </summary> public void Execute() { _returnValue = _target.DynamicInvoke(_args); } /// <summary> /// Dynamically invokes the target delegate with the state object provided by the caller. *Note* ignores any Args passed to the DelegateAdapter. /// </summary> /// <param name="state"></param> public void Execute(object state) { if (state is object[]) _returnValue = _target.DynamicInvoke(state as object[]); else _returnValue = _target.DynamicInvoke(new object[] {state}); } /// <summary> /// Creates a new, unique ThreadStart delegate for use with the Thread class. /// </summary> /// <returns>The new ThreadStart delegate</returns> public ThreadStart CreateThreadStart() { return new ThreadStart(Execute); } /// <summary> /// Creates a new, unique WaitCallback delegate for use with the ThreadPool class. /// </summary> /// <returns>The new WaitCallback delegate</returns> public WaitCallback CreateWaitCallback() { return new WaitCallback(Execute); } /// <summary> /// Creates a new, unique TimerCallback delegate for use with the Timer class. /// </summary> /// <returns>The new TimerCallback delegate</returns> public TimerCallback CreateTimerCallback() { return new TimerCallback(Execute); } public static implicit operator ThreadStart(DelegateAdapter adapter) { return adapter.CreateThreadStart(); } public static implicit operator WaitCallback(DelegateAdapter adapter) { return adapter.CreateWaitCallback(); } public static implicit operator TimerCallback(DelegateAdapter adapter) { return adapter.CreateTimerCallback(); } } } --- NEW FILE: STPStartInfo.cs --- // Ami Bar // am...@gm... using System; namespace Adapdev.Threading { /// <summary> /// Summary description for STPStartInfo. /// </summary> public class STPStartInfo { /// <summary> /// Idle timeout in milliseconds. /// If a thread is idle for _idleTimeout milliseconds then /// it may quit. /// </summary> private int _idleTimeout; /// <summary> /// The lower limit of threads in the pool. /// </summary> private int _minWorkerThreads; /// <summary> /// The upper limit of threads in the pool. /// </summary> private int _maxWorkerThreads; /// <summary> /// Use the caller's security context /// </summary> private bool _useCallerContext; /// <summary> /// Dispose of the state object of a work item /// </summary> private bool _disposeOfStateObjects; /// <summary> /// The option to run the post execute /// </summary> private CallToPostExecute _callToPostExecute; /// <summary> /// A post execute callback to call when none is provided in /// the QueueWorkItem method. /// </summary> private PostExecuteWorkItemCallback _postExecuteWorkItemCallback; public STPStartInfo() { _idleTimeout = SmartThreadPool.DefaultIdleTimeout; _minWorkerThreads = SmartThreadPool.DefaultMinWorkerThreads; _maxWorkerThreads = SmartThreadPool.DefaultMaxWorkerThreads; _useCallerContext = SmartThreadPool.DefaultUseCallerContext; _disposeOfStateObjects = SmartThreadPool.DefaultDisposeOfStateObjects; _callToPostExecute = SmartThreadPool.DefaultCallToPostExecute; _postExecuteWorkItemCallback = SmartThreadPool.DefaultPostExecuteWorkItemCallback; } public STPStartInfo(STPStartInfo stpStartInfo) { _idleTimeout = stpStartInfo._idleTimeout; _minWorkerThreads = stpStartInfo._minWorkerThreads; _maxWorkerThreads = stpStartInfo._maxWorkerThreads; _useCallerContext = stpStartInfo._useCallerContext; _disposeOfStateObjects = stpStartInfo._disposeOfStateObjects; _callToPostExecute = stpStartInfo._callToPostExecute; _postExecuteWorkItemCallback = stpStartInfo._postExecuteWorkItemCallback; } public int IdleTimeout { get { return _idleTimeout; } set { _idleTimeout = value; } } public int MinWorkerThreads { get { return _minWorkerThreads; } set { _minWorkerThreads = value; } } public int MaxWorkerThreads { get { return _maxWorkerThreads; } set { _maxWorkerThreads = value; } } public bool UseCallerContext { get { return _useCallerContext; } set { _useCallerContext = value; } } public bool DisposeOfStateObjects { get { return _disposeOfStateObjects; } set { _disposeOfStateObjects = value; } } public CallToPostExecute CallToPostExecute { get { return _callToPostExecute; } set { _callToPostExecute = value; } } public PostExecuteWorkItemCallback PostExecuteWorkItemCallback { get { return _postExecuteWorkItemCallback; } set { _postExecuteWorkItemCallback = value; } } } } --- NEW FILE: SmartThreadPool.cs --- // Ami Bar // am...@gm... // // Smart Thread Pool in C#. // 7 Aug 2004 - Initial release // 14 Sep 2004 - Bug fixes // 15 Oct 2004 - Added new features // - Work items return result. // - Support waiting synchronization for multiple work items. // - Work items can be cancelled. // - Passage of the caller threads context to the thread in the pool. // - Minimal usage of WIN32 handles. // - Minor bug fixes. // 26 Dec 2004 - Changes: // - Removed static constructors. // - Added finalizers. // - Changed Exceptions so they are serializable. // - Fixed the bug in one of the SmartThreadPool constructors. // - Changed the SmartThreadPool.WaitAll() so it will support any number of waiters. // The SmartThreadPool.WaitAny() is still limited by the .NET Framework. // - Added PostExecute with options on which cases to call it. // - Added option to dispose of the state objects. // - Added a WaitForIdle() method that waits until the work items queue is empty. // - Added an STPStartInfo class for the initialization of the thread pool. // - Changed exception handling so if a work item throws an exception it // is rethrown at GetResult(), rather then firing an UnhandledException event. // Note that PostExecute exception are always ignored. // 25 Mar 2005 - Fixed bug: // - Fixed bug where work items got lost. It could happen sometimes, but especially // when the idle timeout is small. // 3 Jul 2005 - Fixed bug: // - Fixed bug where Enqueue() throws an exception because PopWaiter() returned null, // hardly reconstructed // 16 Aug 2005 - Fixed bug: // - Fixed bug where the InUseThreads becomes negative when cancelling work items using System; using System.Security; using System.Threading; using System.Collections; using System.Diagnostics; namespace Adapdev.Threading { #region CallToPostExecute enumerator [Flags] public enum CallToPostExecute { Never = 0x00, WhenWorkItemCanceled = 0x01, WhenWorkItemNotCanceled = 0x02, Always = WhenWorkItemCanceled | WhenWorkItemNotCanceled, } #endregion #region SmartThreadPool class /// <summary> /// Smart thread pool class. /// </summary> public class SmartThreadPool : IDisposable { #region Default Constants /// <summary> /// Default minimum number of threads the thread pool contains. (0) /// </summary> public const int DefaultMinWorkerThreads = 0; /// <summary> /// Default maximum number of threads the thread pool contains. (25) /// </summary> public const int DefaultMaxWorkerThreads = 25; /// <summary> /// Default idle timeout in milliseconds. (One minute) /// </summary> public const int DefaultIdleTimeout = 60*1000; // One minute /// <summary> /// Indicate to copy the security context of the caller and then use it in the call. (false) /// </summary> public const bool DefaultUseCallerContext = false; /// <summary> /// Indicate to dispose of the state objects if they support the IDispose interface. (false) /// </summary> public const bool DefaultDisposeOfStateObjects = false; /// <summary> /// The default option to run the post execute /// </summary> public const CallToPostExecute DefaultCallToPostExecute = CallToPostExecute.Always; /// <summary> /// The default post execute method to run. /// When null it means not to call it. /// </summary> public static readonly PostExecuteWorkItemCallback DefaultPostExecuteWorkItemCallback = null; #endregion #region Member Variables /// <summary> /// Hashtable of all the threads in the thread pool. /// </summary> private Hashtable _workerThreads = Hashtable.Synchronized(new Hashtable()); /// <summary> /// Queue of work items. /// </summary> private WorkItemsQueue _workItemsQueue = new WorkItemsQueue(); /// <summary> /// Number of threads that currently work (not idle). /// </summary> private int _inUseWorkerThreads = 0; /// <summary> /// Start information to use. /// It is simpler than providing many constructors. /// </summary> private STPStartInfo _stpStartInfo = new STPStartInfo(); /// <summary> /// Total number of work items that are stored in the work items queue /// plus the work items that the threads in the pool are working on. /// </summary> private int _currentWorkItemsCount = 0; /// <summary> /// Signaled when the thread pool is idle, i.e. no thread is busy /// and the work items queue is empty /// </summary> private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); /// <summary> /// An event to signal all the threads to quit immediately. /// </summary> private ManualResetEvent _shuttingDownEvent = new ManualResetEvent(false); /// <summary> /// A flag to indicate the threads to quit. /// </summary> private bool _shutdown = false; /// <summary> /// Counts the threads created in the pool. /// It is used to name the threads. /// </summary> private int _threadCounter = 0; /// <summary> /// Indicate that the SmartThreadPool has been disposed /// </summary> private bool _isDisposed = false; #endregion #region Construction and Finalization /// <summary> /// Constructor /// </summary> public SmartThreadPool() { Start(); } /// <summary> /// Constructor /// </summary> /// <param name="idleTimeout">Idle timeout in milliseconds</param> public SmartThreadPool(int idleTimeout) { _stpStartInfo.IdleTimeout = idleTimeout; Start(); } /// <summary> /// Constructor /// </summary> /// <param name="idleTimeout">Idle timeout in milliseconds</param> /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param> public SmartThreadPool( int idleTimeout, int maxWorkerThreads) { _stpStartInfo.IdleTimeout = idleTimeout; _stpStartInfo.MaxWorkerThreads = maxWorkerThreads; Start(); } /// <summary> /// Constructor /// </summary> /// <param name="idleTimeout">Idle timeout in milliseconds</param> /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param> /// <param name="minWorkerThreads">Lower limit of threads in the pool</param> public SmartThreadPool( int idleTimeout, int maxWorkerThreads, int minWorkerThreads) { _stpStartInfo.IdleTimeout = idleTimeout; _stpStartInfo.MaxWorkerThreads = maxWorkerThreads; _stpStartInfo.MinWorkerThreads = minWorkerThreads; Start(); } /// <summary> /// Constructor /// </summary> /// <param name="idleTimeout">Idle timeout in milliseconds</param> /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param> /// <param name="minWorkerThreads">Lower limit of threads in the pool</param> /// <param name="useCallerContext">Indicate to copy the security context of the caller and then use it in the call</param> /// <param name="disposeOfStateObjects">Indicate to dispose of the state objects if they support the IDispose interface</param> public SmartThreadPool(STPStartInfo stpStartInfo) { _stpStartInfo = new STPStartInfo(stpStartInfo); Start(); } private void Start() { ValidateSTPStartInfo(); StartThreads(_stpStartInfo.MinWorkerThreads); } private void ValidateSTPStartInfo() { if (_stpStartInfo.MinWorkerThreads < 0) { throw new ArgumentOutOfRangeException( "MinWorkerThreads", "MinWorkerThreads cannot be negative"); } if (_stpStartInfo.MaxWorkerThreads <= 0) { throw new ArgumentOutOfRangeException( "MaxWorkerThread... [truncated message content] |