Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New base::task features #132

Open
wants to merge 6 commits into
base: beta
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 32 additions & 16 deletions base/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,54 @@

namespace base {

task::task() : m_running(false), m_completed(false)
task::task() : m_state(state::READY)
{
}

task::~task()
{
// The task must not be running when we are destroying it.
ASSERT(!m_running);

// m_completed can be false in this case if the task was never
// started (i.e. the user never called task::start()).
// ASSERT(m_completed);
ASSERT(m_state != state::RUNNING);
}

task_token& task::start(thread_pool& pool)
{
// Cannot start the task if it's already running
ASSERT(!m_running);
// Cannot start the task if it's already running or enqueued
ASSERT(m_state != state::RUNNING && m_state != state::ENQUEUED);

// Reset flags for a running task
m_running = true;
m_completed = false;
m_state = state::ENQUEUED;
m_token.reset();

pool.execute([this] { in_worker_thread(); });
m_token.m_work = pool.execute([this] { in_worker_thread(); });
return m_token;
}

bool task::try_skip(thread_pool& pool)
{
bool skipped = pool.try_skip(m_token.m_work);
if (skipped) {
m_token.m_canceled = true;
call_finished();
}

return skipped;
}

void task::call_finished()
{
if (m_finished) {
try {
m_finished(m_token);
}
catch (const std::exception& ex) {
LOG(ERROR, "Exception executing 'finished' callback: %s\n", ex.what());
}
}
}

void task::in_worker_thread()
{
m_state = state::RUNNING;
try {
if (!m_token.canceled())
m_execute(m_token);
Expand All @@ -53,11 +71,9 @@ void task::in_worker_thread()
LOG(FATAL, "Exception running task: %s\n", ex.what());
}

m_running = false;
m_state = state::FINISHED;

// This must be the latest statement in the worker thread (see
// task::complete() comment)
m_completed = true;
call_finished();
}

} // namespace base
25 changes: 20 additions & 5 deletions base/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,34 +50,49 @@ class task_token {
std::atomic<bool> m_canceled;
std::atomic<float> m_progress;
float m_progress_min, m_progress_max;
const thread_pool::work* m_work = nullptr;
};

class task {
public:
enum class state {
READY, // task is created an ready to be started
ENQUEUED, // task is enqueued in the thread pool waiting for execution
RUNNING, // task is being executed
FINISHED // task finished execution by either success, error, or cancellation
};

typedef std::function<void(task_token&)> func_t;

task();
~task();

void on_execute(func_t&& f) { m_execute = std::move(f); }
void on_finished(func_t&& f) { m_finished = std::move(f); }

task_token& start(thread_pool& pool);
bool try_skip(thread_pool& pool);

bool running() const { return m_state == state::RUNNING; }

bool running() const { return m_running; }
// Returns true when the task is enqueued in the thread pool's work queue,
// and false when the task is actually being executed.
bool enqueued() const { return m_state == state::ENQUEUED; }

// Returns true when the task is completed (whether it was
// canceled or not). If this is true, it's safe to delete the task
// instance (it will not be used anymore by any othe background
// instance (it will not be used anymore by any other background
// thread).
bool completed() const { return m_completed; }
bool completed() const { return m_state == state::FINISHED; }

private:
void in_worker_thread();
void call_finished();

std::atomic<bool> m_running;
std::atomic<bool> m_completed;
std::atomic<state> m_state;
task_token m_token;
func_t m_execute;
func_t m_finished = nullptr;
};

} // namespace base
Expand Down
23 changes: 19 additions & 4 deletions base/thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,27 @@ thread_pool::~thread_pool()
join_all();
}

void thread_pool::execute(std::function<void()>&& func)
const thread_pool::work* thread_pool::execute(std::function<void()>&& func)
{
thread_pool::work_ptr work = std::make_unique<thread_pool::work>(std::move(func));
const thread_pool::work* result = work.get();
const std::unique_lock lock(m_mutex);
ASSERT(m_running);
m_work.push(std::move(func));
m_work.push_back(std::move(work));
m_cv.notify_one();
return result;
}

bool thread_pool::try_skip(const work* w)
{
std::unique_lock<std::mutex> lock(m_mutex);
for (auto it = m_work.begin(); it != m_work.end(); ++it) {
if (w == it->get()) {
m_work.erase(it);
return true;
}
}
return false;
}

void thread_pool::wait_all()
Expand Down Expand Up @@ -79,9 +94,9 @@ void thread_pool::worker()
m_cv.wait(lock, [this]() -> bool { return !m_running || !m_work.empty(); });
running = m_running;
if (m_running && !m_work.empty()) {
func = std::move(m_work.front());
func = std::move(m_work.front()->m_func);
++m_doingWork;
m_work.pop();
m_work.pop_front();
}
}
try {
Expand Down
23 changes: 20 additions & 3 deletions base/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,37 @@
#pragma once

#include <condition_variable>
#include <deque>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

namespace base {

class thread_pool {
public:
class work {
friend class thread_pool;

public:
work(std::function<void()>&& func) { m_func = std::move(func); }

private:
std::function<void()> m_func = nullptr;
};

typedef std::unique_ptr<work> work_ptr;

thread_pool(const size_t n);
~thread_pool();

void execute(std::function<void()>&& func);
const work* execute(std::function<void()>&& func);

// Tries to skip the work if it was not started yet, in other words, it
// removes the specified work from the queue if possible. Returns true if it
// was able to do so, or false otherwise.
bool try_skip(const work* w);

// Waits until the queue is empty.
void wait_all();
Expand All @@ -39,7 +56,7 @@ class thread_pool {
std::mutex m_mutex;
std::condition_variable m_cv;
std::condition_variable m_cvWait;
std::queue<std::function<void()>> m_work;
std::deque<work_ptr> m_work;
int m_doingWork;
};

Expand Down