Skip to content

Commit

Permalink
Reuse threads in ThreadUtils (#2152)
Browse files Browse the repository at this point in the history
* Delete LimitedConcurrencyLevelTaskScheduler

It's internal and no one's using it.  And if someone wants this behavior, ConcurrentExclusiveSchedulerPair in the framework can be used, e.g. replacing `new LimitedConcurrencyLevelTaskScheduler(level)` with `new ConcurrentExclusiveSchedulerPair(level).ConcurrentScheduler`.

* Reuse threads in ThreadUtils

ThreadUtils.CreateBackgroundThread is being truthful to its name and creating a new thread for every call.  However, even for very simple ML.NET usage, such as the MulticlassClassification_Iris demo app, this method is being called ~110,000 times, resulting in ~110,000 threads getting created and destroyed.  That adds measurable overhead in normal runs, but when the debugger is attached it makes the solution effectively unusable, as every thread creation/destruction is tracked by Visual Studio, leading to significant overheads that make an F5 execution last "forever" (== I gave up waiting).

The right solution is for the higher-level algorithms and architecture to be better about its need for threads, creating them only when necessary and otherwise relying on the .NET ThreadPool for execution, via either ThreadPool.QueueUserWorkItem or via Task.Run or the like.

However, as an immediate stop-gap that significantly helps the situation, this commit allows the created threads to be reused for a period of time such that not every call ends up creating a new thread.  In my runs:
- The same demo that app that created ~110K threads now creates only ~32.
- With ctrl-F5 (e.g. no debugger attached), its previously took ~13 seconds, and with this change now takes ~6.
- With F5 (debugger attached), execution previously took "forever", now takes ~190 seconds (still too high, but vastly improved).

The CreateBackgroundThread method is renamed to StartBackgroundThread, and returns a Task instead of a Thread, such that callers may synchronize with that instead with Thread.Join.  In several cases, this avoids additional threads from being consumed, where callers were blocking a thread pool thread doing a synchronous wait for all tasks, and where now that's entirely avoided via Task.WhenAll.  Eventually, more call sites can be fixed as well, as more of the code base is moved to async/await; for now this just handles the obvious ones that don't require any significant restructuring.

* Address PR feedback
  • Loading branch information
stephentoub authored and justinormont committed Jan 21, 2019
1 parent d11630b commit 922d7e5
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 225 deletions.
307 changes: 139 additions & 168 deletions src/Microsoft.ML.Core/Utilities/ThreadUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,157 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.ML.Internal.Utilities
{
internal static partial class Utils
{
public static Thread CreateBackgroundThread(ParameterizedThreadStart start)
{
return new Thread(start)
{
IsBackground = true
};
}
public static Task RunOnBackgroundThread(Action start) =>
ImmediateBackgroundThreadPool.Queue(start);

public static Thread CreateBackgroundThread(ThreadStart start)
{
return new Thread(start)
{
IsBackground = true
};
}
public static Task RunOnBackgroundThread(Action<object> start, object obj) =>
ImmediateBackgroundThreadPool.Queue(start, obj);

public static Thread CreateForegroundThread(ParameterizedThreadStart start)
{
return new Thread(start)
{
IsBackground = false
};
}
public static Thread RunOnForegroundThread(ParameterizedThreadStart start) =>
new Thread(start) { IsBackground = false };

public static Thread CreateForegroundThread(ThreadStart start)
/// <summary>
/// Naive thread pool focused on reducing the latency to execution of chunky work items as much as possible.
/// If a thread is ready to process a work item the moment a work item is queued, it's used, otherwise
/// a new thread is created. This is meant as a stop-gap measure for workloads that would otherwise be
/// creating a new thread for every work item.
/// </summary>
private static class ImmediateBackgroundThreadPool
{
return new Thread(start)
/// <summary>How long should threads wait around for additional work items before retiring themselves.</summary>
private const int IdleMilliseconds = 1_000;
/// <summary>The queue of work items. Also used as a lock to protect all relevant state.</summary>
private static readonly Queue<(Delegate, object, TaskCompletionSource<bool>)> _queue = new Queue<(Delegate, object, TaskCompletionSource<bool>)>();
/// <summary>The number of threads currently waiting for work to arrive.</summary>
private static int _availableThreads = 0;

/// <summary>
/// Queues an <see cref="Action"/> delegate to be executed immediately on another thread,
/// and returns a <see cref="Task"/> that represents its eventual completion. The task will
/// always end in the <see cref="TaskStatus.RanToCompletion"/> state; if the delegate throws
/// an exception, it'll be allowed to propagate on the thread, crashing the process.
/// </summary>
public static Task Queue(Action threadStart) => Queue((Delegate)threadStart, null);

/// <summary>
/// Queues an <see cref="Action{Object}"/> delegate and associated state to be executed immediately on another thread,
/// and returns a <see cref="Task"/> that represents its eventual completion. The task will
/// always end in the <see cref="TaskStatus.RanToCompletion"/> state; if the delegate throws
/// an exception, it'll be allowed to propagate on the thread, crashing the process.
/// </summary>
public static Task Queue(Action<object> threadStart, object state) => Queue((Delegate)threadStart, state);

private static Task Queue(Delegate threadStart, object state)
{
IsBackground = false
};
// Create the TaskCompletionSource used to represent this work.
// Call sites only care about completion, not about the distinction between
// success and failure and do not expect exceptions to be propagated in this manner,
// so only SetResult is used.
var tcs = new TaskCompletionSource<bool>(TaskContinuationOptions.RunContinuationsAsynchronously);

// Queue the work for a thread to pick up. If no thread is immediately available, it will create one.
Enqueue((threadStart, state, tcs));

// Return the task.
return tcs.Task;

void CreateThread()
{
// Create a new background thread to run the work.
var t = new Thread(() =>
{
// Repeatedly get the next item and invoke it, setting its TCS when we're done.
// This will wait for up to the idle time before giving up and exiting.
while (TryDequeue(out (Delegate action, object state, TaskCompletionSource<bool> tcs) item))
{
try
{
if (item.action is Action<object> pts)
{
pts(item.state);
}
else
{
((Action)item.action)();
}
}
finally
{
item.tcs.SetResult(true);
}
}
});
t.IsBackground = true;
t.Start();
}

void Enqueue((Delegate, object, TaskCompletionSource<bool>) item)
{
// Enqueue the work. If there are currently fewer threads waiting
// for work than there are work items in the queue, create another
// thread. This is a heuristic, in that we might end up creating
// more threads than are truly needed, but this whole type is being
// used to replace a previous solution where every work item created
// its own thread, so this is an improvement regardless of any
// such inefficiencies.
lock (_queue)
{
_queue.Enqueue(item);

if (_queue.Count <= _availableThreads)
{
Monitor.Pulse(_queue);
return;
}
}

// No thread was currently available. Create one.
CreateThread();
}

bool TryDequeue(out (Delegate action, object state, TaskCompletionSource<bool> tcs) item)
{
// Dequeues the next item if one is available. Before checking,
// the available thread count is increased, so that enqueuers can
// see how many threads are currently waiting, with the count
// decreased after. Each time it waits, it'll wait for at most
// the idle timeout before giving up.
lock (_queue)
{
_availableThreads++;
try
{
while (_queue.Count == 0)
{
if (!Monitor.Wait(_queue, IdleMilliseconds))
{
if (_queue.Count > 0)
{
break;
}

item = default;
return false;
}
}
}
finally
{
_availableThreads--;
}

item = _queue.Dequeue();
return true;
}
}
}
}
}

Expand Down Expand Up @@ -126,146 +239,4 @@ public void ThrowIfSet(IExceptionContext ectx)
throw ectx.Except(_ex, "Exception thrown in {0}", _component);
}
}

/// <summary>
/// Provides a task scheduler that ensures a maximum concurrency level while
/// running on top of the ThreadPool.
/// </summary>
[BestFriend]
internal sealed class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
// Whether the current thread is processing work items.
[ThreadStatic]
private static bool _currentThreadIsProcessingItems;

//The list of tasks to be executed.
// protected by lock(_tasks).
private readonly LinkedList<Task> _tasks;

//The maximum concurrency level allowed by this scheduler.
private readonly int _concurrencyLevel;

// Currently queued or running delegates.
// protected by lock(_tasks).
private int _queuedOrRunningDelegatesCount;

// Gets the maximum concurrency level supported by this scheduler.
public override int MaximumConcurrencyLevel => _concurrencyLevel;

/// <summary>
/// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the
/// specified concurrency level.
/// </summary>
public LimitedConcurrencyLevelTaskScheduler(int concurrencyLevel)
{
Contracts.Assert(concurrencyLevel >= 1);
_tasks = new LinkedList<Task>();
_concurrencyLevel = concurrencyLevel;
_queuedOrRunningDelegatesCount = 0;
}

// Queues a task to the scheduler.
protected override void QueueTask(Task task)
{
// Add the task to the list of tasks to be processed. If there aren't enough
// delegates currently queued or running to process tasks, schedule another.
lock (_tasks)
{
_tasks.AddLast(task);
if (_queuedOrRunningDelegatesCount < _concurrencyLevel)
{
++_queuedOrRunningDelegatesCount;
NotifyThreadPoolOfPendingWork();
}
}
}

// Attempts to execute the specified task on the current thread.
// Returns whether the task could be executed on the current thread.
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// If this thread isn't already processing a task, we don't support inlining.
if (!_currentThreadIsProcessingItems)
return false;

// If the task was previously queued, remove it from the queue.
if (taskWasPreviouslyQueued)
TryDequeue(task);

// Try to run the task.
return base.TryExecuteTask(task);
}

// Attempts to remove a previously scheduled task from the scheduler.
// Returns whether the task could be found and removed.
protected override bool TryDequeue(Task task)
{
lock (_tasks)
return _tasks.Remove(task);
}

// Gets an enumerable of the tasks currently scheduled on this scheduler.
// Returns an enumerable of the tasks currently scheduled.
protected override IEnumerable<Task> GetScheduledTasks()
{
bool lockTaken = false;
try
{
Monitor.TryEnter(_tasks, ref lockTaken);
if (lockTaken)
return _tasks.ToArray();
else
throw Contracts.ExceptNotSupp();
}
finally
{
if (lockTaken)
Monitor.Exit(_tasks);
}
}

// Informs the ThreadPool that there's work to be executed for this scheduler.
private void NotifyThreadPoolOfPendingWork()
{
WaitCallback action = (state =>
{// Note that the current thread is now processing work items.
// This is necessary to enable inlining of tasks into this thread.
_currentThreadIsProcessingItems = true;
try
{
// Process all available items in the queue.
while (true)
{
Task item;
lock (_tasks)
{
// When there are no more items to be processed,
// note that we're done processing, and get out.
if (_tasks.Count == 0)
{
--_queuedOrRunningDelegatesCount;
break;
}

// Get the next item from the queue.
item = _tasks.First.Value;
_tasks.RemoveFirst();
}

// Execute the task we pulled out of the queue.
base.TryExecuteTask(item);
}
}
// We're done processing items on the current thread.
finally
{
_currentThreadIsProcessingItems = false;
}
});
// Core CLR doesn't have UnsafeQueueUserWorkItem .
// In CLR world unsafe version is faster, but this is not the case for Core CLR.
// more context can be found here: https://github.com/dotnet/coreclr/issues/1607
ThreadPool.UnsafeQueueUserWorkItem(action, null);
}
}
}
Loading

0 comments on commit 922d7e5

Please sign in to comment.