Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove CommandQueue redirecting usages straight to HWCQ #17219

Merged
merged 10 commits into from
Jan 29, 2025
1 change: 0 additions & 1 deletion tests/tt_metal/tt_metal/api/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ set(UNIT_TESTS_API_SRC
${CMAKE_CURRENT_SOURCE_DIR}/test_banked.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_bit_utils.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_buffer_region.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_CommandQueue.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_direct.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_dram_to_l1_multicast.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_dram.cpp
Expand Down
152 changes: 0 additions & 152 deletions tests/tt_metal/tt_metal/api/test_CommandQueue.cpp

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -462,52 +462,6 @@ bool test_EnqueueWriteBuffer_and_EnqueueReadBuffer_multi_queue(
namespace basic_tests {
namespace dram_tests {

TEST_F(CommandQueueBufferFixture, DISABLED_TestAsyncBufferRW) {
// Test Async Enqueue Read and Write + Get Addr + Buffer Allocation and Deallocation
auto& command_queue = this->device_->command_queue();
auto current_mode = CommandQueue::default_mode();
command_queue.set_mode(CommandQueue::CommandQueueMode::ASYNC);
Program program;
for (int j = 0; j < 10; j++) {
// Asynchronously initialize a buffer on device
uint32_t first_buf_value = j + 1;
uint32_t second_buf_value = j + 2;
uint32_t first_buf_size = 4096;
uint32_t second_buf_size = 2048;
// Asynchronously allocate buffer on device
std::shared_ptr<Buffer> buffer =
Buffer::create(this->device_, first_buf_size, first_buf_size, BufferType::DRAM);
std::shared_ptr<uint32_t> allocated_buffer_address = std::make_shared<uint32_t>();
EnqueueGetBufferAddr(this->device_->command_queue(), allocated_buffer_address.get(), buffer.get(), true);
// Ensure returned addr is correct
EXPECT_EQ((*allocated_buffer_address), buffer->address());

std::shared_ptr<std::vector<uint32_t>> vec =
std::make_shared<std::vector<uint32_t>>(first_buf_size / 4, first_buf_value);
std::vector<uint32_t> readback_vec = {};
// Write first vector to existing on device buffer.
EnqueueWriteBuffer(this->device_->command_queue(), buffer, vec, false);
// Reallocate the vector in the main thread after asynchronously pushing it (ensure that worker still has access
// to this data)
vec = std::make_shared<std::vector<uint32_t>>(second_buf_size / 4, second_buf_value);
// Simulate what tt-eager does: Share buffer ownership with program
AssignGlobalBufferToProgram(buffer, program);
// Reallocate buffer (this is safe, since the program also owns the existing buffer, which will not be
// deallocated)
buffer = Buffer::create(this->device_, second_buf_size, second_buf_size, BufferType::DRAM);
// Write second vector to second buffer
EnqueueWriteBuffer(this->device_->command_queue(), buffer, vec, false);
// Have main thread give up ownership immediately after writing
vec.reset();
// Read both buffer and ensure data is correct
EnqueueReadBuffer(this->device_->command_queue(), buffer, readback_vec, true);
for (int i = 0; i < readback_vec.size(); i++) {
EXPECT_EQ(readback_vec[i], second_buf_value);
}
}
command_queue.set_mode(current_mode);
}

TEST_F(CommandQueueSingleCardBufferFixture, WriteOneTileToDramBank0) {
TestBufferConfig config = {.num_pages = 1, .page_size = 2048, .buftype = BufferType::DRAM};
for (IDevice* device : devices_) {
Expand Down
14 changes: 0 additions & 14 deletions tests/tt_metal/tt_metal/integration/test_flatten.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,17 +299,3 @@ TEST_F(DispatchFixture, TensixFlatten) {
ASSERT_TRUE(test_flatten::flatten(this, this->devices_.at(id), num_tiles_r, num_tiles_c));
}
}

TEST_F(CommandQueueProgramFixture, DISABLED_TensixTestAsyncFlattenStress) {
auto& command_queue = this->device_->command_queue();
auto current_mode = CommandQueue::default_mode();
command_queue.set_mode(CommandQueue::CommandQueueMode::ASYNC);
uint32_t num_tiles_r = 2;
uint32_t num_tiles_c = 2;
if (!this->IsSlowDispatch()) {
num_tiles_r = 1;
num_tiles_c = 1;
}
ASSERT_TRUE(test_flatten::flatten_stress(this->device_, num_tiles_r, num_tiles_c));
command_queue.set_mode(current_mode);
}
126 changes: 2 additions & 124 deletions tt_metal/api/tt-metalium/command_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,18 @@
#include "worker_config_buffer.hpp"
#include "program_impl.hpp"
#include "trace_buffer.hpp"
#include "hardware_command_queue.hpp"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to fix file names and includes in a separate PR


namespace tt::tt_metal {
inline namespace v0 {

class CommandQueue;
class BufferRegion;
class Event;
class Trace;
using RuntimeArgs = std::vector<std::variant<Buffer*, uint32_t>>;

} // namespace v0

class HWCommandQueue;

// Only contains the types of commands which are enqueued onto the device
enum class EnqueueCommandType {
ENQUEUE_READ_BUFFER,
Expand All @@ -55,10 +53,6 @@ enum class EnqueueCommandType {

string EnqueueCommandTypeToString(EnqueueCommandType ctype);

class CommandInterface;

using WorkerQueue = LockFreeQueue<CommandInterface>;

class Command {
public:
Command() {}
Expand Down Expand Up @@ -205,121 +199,6 @@ class EnqueueTerminateCommand : public Command {
constexpr bool has_side_effects() { return false; }
};

struct RuntimeArgsMetadata {
CoreCoord core_coord;
std::shared_ptr<RuntimeArgs> runtime_args_ptr;
std::shared_ptr<Kernel> kernel;
std::vector<uint32_t> update_idx;
};

// Common interface for all command queue types
struct CommandInterface {
EnqueueCommandType type;
std::optional<bool> blocking;
std::optional<std::variant<std::reference_wrapper<Buffer>, std::shared_ptr<Buffer>>> buffer;
Program* program;
std::optional<RuntimeArgsMetadata> runtime_args_md;
std::optional<const Buffer*> shadow_buffer;
std::optional<HostDataType> src;
std::optional<void*> dst;
std::optional<std::shared_ptr<Event>> event;
std::optional<uint32_t> trace_id;
std::optional<BufferRegion> region;
tt::stl::Span<const SubDeviceId> sub_device_ids;
};

inline namespace v0 {

class CommandQueue {
friend class Trace;

public:
enum class CommandQueueMode {
PASSTHROUGH = 0,
ASYNC = 1,
TRACE = 2,
};
enum class CommandQueueState {
IDLE = 0,
RUNNING = 1,
TERMINATE = 2,
};

CommandQueue() = delete;

CommandQueue(IDevice* device, uint32_t id, CommandQueueMode mode = CommandQueue::default_mode());
~CommandQueue();

// Trace queue constructor
CommandQueue(Trace& trace);

// Getters for private members
IDevice* device() const { return this->device_ptr; }
uint32_t id() const { return this->cq_id; }

// Blocking method to wait for all commands to drain from the queue
// Optional if used in passthrough mode (async_mode = false)
void wait_until_empty();

// Schedule a command to be run on the device
// Blocking if in passthrough mode. Non-blocking if in async mode
void run_command(CommandInterface&& command);

// API for setting/getting the mode of the command queue
// TODO: disallow changing the mode of the queue. This is error prone, because changing mode requires
// accordingly updating the higher-level abstractions.
void set_mode(const CommandQueueMode& mode);
CommandQueueMode get_mode() const { return this->mode; }

// Reference to the underlying hardware command queue, non-const because side-effects are allowed
HWCommandQueue& hw_command_queue();

// The empty state of the worker queue
bool empty() const { return this->worker_queue.empty(); }

// Dump methods for name and pending commands in the queue
void dump();
std::string name();

static CommandQueueMode default_mode() {
// Envvar is used for bringup and debug only. Will be removed in the future and should not be relied on in
// production.
static int value =
parse_env<int>("TT_METAL_CQ_ASYNC_MODE", /*default_value=*/static_cast<int>(CommandQueueMode::PASSTHROUGH));
return static_cast<CommandQueue::CommandQueueMode>(value);
}
// Determine if any CQ is using Async mode
static bool async_mode_set() { return num_async_cqs > 0; }

private:
// Initialize Command Queue Mode based on the env-var. This will be default, unless the user excplictly sets the
// mode using set_mode.
CommandQueueMode mode;
CommandQueueState worker_state;
std::unique_ptr<std::thread> worker_thread;
WorkerQueue worker_queue;
uint32_t cq_id = 0;
IDevice* device_ptr = nullptr;
Trace* trace_ptr = nullptr;

void start_worker();
void stop_worker();
void run_worker();
void run_command_impl(const CommandInterface& command);

bool async_mode() { return this->mode == CommandQueueMode::ASYNC; }
bool trace_mode() { return this->mode == CommandQueueMode::TRACE; }
bool passthrough_mode() { return this->mode == CommandQueueMode::PASSTHROUGH; }

std::atomic<std::size_t> worker_thread_id = -1;
std::atomic<std::size_t> parent_thread_id = -1;
// Track the number of CQs using async vs pt mode
inline static uint32_t num_async_cqs = 0;
inline static uint32_t num_passthrough_cqs = 0;
};

} // namespace v0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for removing this class and cleaning things up. We have a bunch of command structs defined in this file, ex: EnqueueTraceCommand, EnqueueProgramCommand and EnqueueTerminateCommand.
These are used inside the individual enqueue impls to do command specific processing. These should not be in this file. Can we please move them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Want to make this in the PR after this is merged together with file names.


// Primitives used to place host only operations on the SW Command Queue.
// These are used in functions exposed through tt_metal.hpp or host_api.hpp
void EnqueueGetBufferAddr(CommandQueue& cq, uint32_t* dst_buf_addr, const Buffer* buffer, bool blocking);
Expand All @@ -337,5 +216,4 @@ void EnqueueAddBufferToProgram(

} // namespace tt::tt_metal

std::ostream& operator<<(std::ostream& os, tt::tt_metal::EnqueueCommandType const& type);
std::ostream& operator<<(std::ostream& os, tt::tt_metal::CommandQueue::CommandQueueMode const& type);
std::ostream& operator<<(std::ostream& os, const tt::tt_metal::EnqueueCommandType& type);
Loading
Loading