Skip to content

Commit

Permalink
trace
Browse files Browse the repository at this point in the history
  • Loading branch information
ii14 committed Apr 24, 2024
1 parent 0a5abff commit 5614982
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 3 deletions.
16 changes: 15 additions & 1 deletion src/fzx/fzx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,31 @@ void Fzx::setThreads(unsigned threads) noexcept

void Fzx::start()
{
// TODO: assert this object can't be reused
if (mRunning)
return;
mRunning = true;

ASSERT(mCallback);

#ifdef FZX_TRACE
ASSERT(mTracer == nullptr);
mTracer = mTrace.createTracer('m');
#endif

for (size_t i = 0; i < mThreads; ++i) {
auto& worker = mWorkers.emplace_back(std::make_unique<Worker>());
worker->mIndex = i;
worker->mPool = this;
#ifdef FZX_TRACE
worker->mTracer = mTrace.createTracer("0123456789ABCDEF"[i]);
#endif
}
for (const auto& worker : mWorkers)

mTrace.start();
for (const auto& worker : mWorkers) {
worker->mThread = std::thread { &Worker::run, worker.get() };
}
}

void Fzx::stop()
Expand Down Expand Up @@ -118,6 +130,8 @@ CommitResult Fzx::commit()
mJobVersion.fetch_add(1, std::memory_order_release);
}

TRACE(mTracer, "commit");

// Wake up worker threads
for (auto& worker : mWorkers)
worker->mEvents.post(Worker::kJob);
Expand Down
6 changes: 6 additions & 0 deletions src/fzx/fzx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "fzx/matched_item.hpp"
#include "fzx/query.hpp"
#include "fzx/worker.hpp"
#include "fzx/trace.hpp"

namespace fzx {

Expand Down Expand Up @@ -125,6 +126,11 @@ struct Fzx
std::shared_ptr<Query> mQuery;
std::shared_ptr<ItemQueue> mQueue;

public:
Trace mTrace;
private:
TraceTLS* mTracer { nullptr };

/// Worker threads. This vector is shared with workers, so after
/// starting and before joining threads, it cannot be modified.
std::vector<std::unique_ptr<Worker>> mWorkers {};
Expand Down
6 changes: 6 additions & 0 deletions src/fzx/macros.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@
# define FZX_PRAGMA_MSVC(x)
#endif

#define STR_(x) #x
#define STR(x) STR_(x)

#define CAT_(a, b) a ## b
#define CAT(a, b) CAT_(a, b)

namespace fzx {

template <typename T>
Expand Down
57 changes: 57 additions & 0 deletions src/fzx/trace.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed under LGPLv3 - see LICENSE file for details.

#include "fzx/trace.hpp"

namespace fzx {

static struct timespec tsSub(struct timespec a, struct timespec b)
{
struct timespec r; // NOLINT(cppcoreguidelines-pro-type-member-init)
r.tv_sec = a.tv_sec - b.tv_sec;
r.tv_nsec = a.tv_nsec - b.tv_nsec;
if (r.tv_nsec < 0) {
--r.tv_sec;
r.tv_nsec += 1000000000L;
}
return r;
}

TraceTLS* Trace::createTracer(char name)
{
auto p = std::make_unique<TraceTLS>();
p->mName = name;
return mTracers.emplace_back(std::move(p)).get();
}

void Trace::start()
{
clock_gettime(CLOCK_MONOTONIC, &mTimestamp);
}

void Trace::dump(FILE* file) const
{
// auto positions = std::make_unique<size_t[]>(mTracers.size());
// for (size_t i = 0; i < mTracers.size(); ++i)
// positions[i] = 0;


for (const auto& tracer : mTracers) {
uint64_t prev = 0;
for (const auto& event : tracer->mEvents) {
auto ts = tsSub(event.mTimestamp, mTimestamp);
uint64_t time = static_cast<uint64_t>(ts.tv_nsec)
+ static_cast<uint64_t>(ts.tv_sec) * 1000000000ULL;
uint64_t diff = time - prev;
uint64_t tms = time / 1000000ULL;
uint64_t tus = time % 1000000ULL;
uint64_t dus = diff % 1000000ULL;
uint64_t dms = diff / 1000000ULL;
fprintf(file, "%6zu.%06zu %c %6zu.%06zu %s\n",
tms, tus, tracer->mName, dms, dus, event.mMessage);
prev = time;
}
}
fflush(file);
}

} // namespace fzx
60 changes: 60 additions & 0 deletions src/fzx/trace.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed under LGPLv3 - see LICENSE file for details.

#pragma once

#include <atomic>
#include <cstdio>
#include <ctime>
#include <string>
#include <utility>
#include <vector>
#include <memory>

#include "fzx/macros.hpp"

#define FZX_TRACE

namespace fzx {

struct TraceEvent
{
struct timespec mTimestamp;
const char* mMessage;
const char* mSource;
};

struct TraceTLS
{
TraceTLS() noexcept = default;

char mName { 0 };
std::vector<TraceEvent> mEvents;

void trace(const char* msg, const char* file)
{
TraceEvent ev; // NOLINT(cppcoreguidelines-pro-type-member-init)
clock_gettime(CLOCK_MONOTONIC, &ev.mTimestamp);
ev.mMessage = msg;
ev.mSource = file;
mEvents.push_back(ev);
}
};

struct Trace
{
Trace() noexcept = default;
TraceTLS* createTracer(char name);
void start();
void dump(FILE* file) const;

std::vector<std::unique_ptr<TraceTLS>> mTracers;
struct timespec mTimestamp { 0, 0 };
};

#ifdef FZX_TRACE
# define TRACE(ptr, msg) (ptr)->trace(msg, __FILE__ ":" STR(__LINE__))
#else
# define TRACE(ptr, msg)
#endif

} // namespace fzx
3 changes: 3 additions & 0 deletions src/fzx/tui/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ int main(int argc, char** argv)
}
}
}
app.mFzx.stop();
app.mTTY.close();
app.mFzx.mTrace.dump(stdout);

if (app.mStatus == fzx::Status::ExitSuccess) {
if (app.mSelection.empty()) {
std::cout << app.currentItem() << std::endl;
Expand Down
18 changes: 16 additions & 2 deletions src/fzx/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,19 @@ try {
auto loadJob = [&]() -> bool { return mPool->loadJob(job, jobVersion); };

wait:
TRACE(mTracer, "wait");
// Wait until we get a new event.
uint32_t ev = mEvents.wait();
TRACE(mTracer, "wakeup");

if (ev & kStop)
if (ev & kStop) {
TRACE(mTracer, "stop");
return;
}

if ((ev & kJob) && loadJob()) {
match:
TRACE(mTracer, "match");
// A new job invalidates any merged results we got so far.
mergeState.reset();

Expand All @@ -76,6 +81,7 @@ try {
const auto& query = *job.mQuery;
results.mItems.reserve(job.mItems.size());
for (;;) {
TRACE(mTracer, "chunk");
// Reserve a chunk of items.
// TODO: I think there might be a possibility of internal queue counter overflowing here?
const size_t start = std::min(queue.take(kChunkSize), job.mItems.size());
Expand All @@ -93,14 +99,18 @@ try {
// Ignore kMerge events from other workers, we don't care about
// them at this stage, as we don't even have out own results yet.
ev = mEvents.get();
if (ev & kStop)
if (ev & kStop) {
TRACE(mTracer, "stop");
return;
}
if ((ev & kJob) && loadJob())
goto match;
}

TRACE(mTracer, "sort enter");
// Sort the local batch of items.
std::sort(results.mItems.begin(), results.mItems.end());
TRACE(mTracer, "sort leave");

// For workers with no children, jump straight to publishing results.
if (mergeState.done())
Expand All @@ -110,6 +120,7 @@ try {
goto wait; // No new job, no merging left to do.
}

TRACE(mTracer, "merge enter");
// Merge results from other threads.
for (uint8_t i = 0; i < mergeState.size(); ++i) {
// Already got results from this worker, try the next one.
Expand Down Expand Up @@ -147,12 +158,14 @@ try {

mergeState.set(i); // Mark this worker as merged.
}
TRACE(mTracer, "merge leave");

// Not all results have been merged yet, we're still waiting for someone.
if (!mergeState.done())
goto wait;

publish:
TRACE(mTracer, "publish enter");
// Store results and notify whoever is responsible for handling them.
mOutput.store(std::move(results));
results = {};
Expand All @@ -165,6 +178,7 @@ try {
mPool->mWorkers[MergeState::parent(mIndex)]->mEvents.post(kMerge);
}

TRACE(mTracer, "publish leave");
goto wait;
} catch (const std::exception& e) {
std::strncpy(mErrorMsg, e.what(), std::size(mErrorMsg));
Expand Down
2 changes: 2 additions & 0 deletions src/fzx/worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "fzx/matched_item.hpp"
#include "fzx/query.hpp"
#include "fzx/string.hpp"
#include "fzx/trace.hpp"

namespace fzx {

Expand Down Expand Up @@ -45,6 +46,7 @@ struct Worker
Events mEvents;
Fzx* mPool { nullptr };
uint8_t mIndex { 0 };
TraceTLS* mTracer { nullptr };

char mErrorMsg[256] {}; ///< Populated before mError becomes true.
std::atomic<bool> mError { false }; ///< A critical error has occurred.
Expand Down

0 comments on commit 5614982

Please sign in to comment.