From 4e697bb7873c3edd07bd57fdf711688d0194919d Mon Sep 17 00:00:00 2001 From: Alejandro Saucedo Date: Thu, 15 Oct 2020 21:40:31 +0100 Subject: [PATCH] Updated to current multiple queue implementation --- single_include/kompute/Kompute.hpp | 41 +++++++++--- src/Manager.cpp | 47 +++++++++----- src/include/kompute/Manager.hpp | 41 +++++++++--- test/TestAsyncOperations.cpp | 101 +++++++++++++++++------------ 4 files changed, 151 insertions(+), 79 deletions(-) diff --git a/single_include/kompute/Kompute.hpp b/single_include/kompute/Kompute.hpp index ee8705d..a986042 100755 --- a/single_include/kompute/Kompute.hpp +++ b/single_include/kompute/Kompute.hpp @@ -1253,19 +1253,22 @@ class Manager Manager(); /** - Similar to base constructor but allows the user to provide the device - they would like to create the resources on. + * Similar to base constructor but allows the user to provide the device + * they would like to create the resources on. + * + * @param physicalDeviceIndex The index of the physical device to use + * @param totalQueues The total number of compute queues to create. */ - Manager(uint32_t physicalDeviceIndex); + Manager(uint32_t physicalDeviceIndex, uint32_t totalComputeQueues = 1); /** * Manager constructor which allows your own vulkan application to integrate * with the vulkan kompute use. * * @param instance Vulkan compute instance to base this application - * @physicalDevice Vulkan physical device to use for application - * @device Vulkan logical device to use for all base resources - * @physicalDeviceIndex Index for vulkan physical device used + * @param physicalDevice Vulkan physical device to use for application + * @param device Vulkan logical device to use for all base resources + * @param physicalDeviceIndex Index for vulkan physical device used */ Manager(std::shared_ptr instance, std::shared_ptr physicalDevice, @@ -1290,6 +1293,16 @@ class Manager std::weak_ptr getOrCreateManagedSequence( std::string sequenceName); + /** + * Create a new managed Kompute sequence so it's available within the manager. + * + * @param sequenceName The name for the named sequence to be created + * @param queueIndex The queue to use from the available queues + * @return Weak pointer to the manager owned sequence resource + */ + std::weak_ptr createManagedSequence( + std::string sequenceName, uint32_t queueIndex = 0); + /** * Operation that adds extra operations to existing or new created * sequences. @@ -1342,7 +1355,12 @@ class Manager std::weak_ptr sqWeakPtr = this->getOrCreateManagedSequence(sequenceName); - if (std::shared_ptr sq = sqWeakPtr.lock()) { + std::unordered_map>::iterator found = + this->mManagedSequences.find(sequenceName); + + if (found == this->mManagedSequences.end()) { + std::shared_ptr sq = found->second; + SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence BEGIN"); sq->begin(); @@ -1355,6 +1373,9 @@ class Manager SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence EVAL"); sq->evalAsync(); } + else { + SPDLOG_ERROR("Kompute Manager evalOpAsync sequence [{}] not found", sequenceName); + } SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence SUCCESS"); } @@ -1382,7 +1403,7 @@ class Manager SPDLOG_DEBUG("Kompute Manager evalOpAwait running sequence SUCCESS"); } else { - SPDLOG_ERROR("Sequence not found"); + SPDLOG_ERROR("Kompute Manager evalOpAwait Sequence not found"); } } @@ -1437,7 +1458,7 @@ class Manager std::shared_ptr mDevice = nullptr; bool mFreeDevice = false; uint32_t mComputeQueueFamilyIndex = -1; - std::shared_ptr mComputeQueue = nullptr; + std::vector> mComputeQueues; // -------------- ALWAYS OWNED RESOURCES std::unordered_map> @@ -1452,7 +1473,7 @@ class Manager // Create functions void createInstance(); - void createDevice(); + void createDevice(uint32_t totalComputeQueues); }; } // End namespace kp diff --git a/src/Manager.cpp b/src/Manager.cpp index 1debb4c..58f0678 100644 --- a/src/Manager.cpp +++ b/src/Manager.cpp @@ -25,15 +25,15 @@ debugMessageCallback(VkDebugReportFlagsEXT flags, #endif Manager::Manager() - : Manager(0) + : Manager(0, 1) {} -Manager::Manager(uint32_t physicalDeviceIndex) +Manager::Manager(uint32_t physicalDeviceIndex, uint32_t totalComputeQueues) { this->mPhysicalDeviceIndex = physicalDeviceIndex; this->createInstance(); - this->createDevice(); + this->createDevice(totalComputeQueues); } Manager::Manager(std::shared_ptr instance, @@ -98,19 +98,27 @@ Manager::getOrCreateManagedSequence(std::string sequenceName) this->mManagedSequences.find(sequenceName); if (found == this->mManagedSequences.end()) { - std::shared_ptr sq = - std::make_shared(this->mPhysicalDevice, - this->mDevice, - this->mComputeQueue, - this->mComputeQueueFamilyIndex); - sq->init(); - this->mManagedSequences.insert({ sequenceName, sq }); - return sq; + return this->createManagedSequence(sequenceName); } else { return found->second; } } +std::weak_ptr +Manager::createManagedSequence(std::string sequenceName, uint32_t queueIndex) { + + SPDLOG_DEBUG("Kompute Manager createManagedSequence with sequenceName: {} and queueIndex: {}", sequenceName, queueIndex); + + std::shared_ptr sq = + std::make_shared(this->mPhysicalDevice, + this->mDevice, + this->mComputeQueues[queueIndex], + this->mComputeQueueFamilyIndex); + sq->init(); + this->mManagedSequences.insert({ sequenceName, sq }); + return sq; +} + void Manager::createInstance() { @@ -197,7 +205,7 @@ Manager::createInstance() } void -Manager::createDevice() +Manager::createDevice(uint32_t totalComputeQueues) { SPDLOG_DEBUG("Kompute Manager creating Device"); @@ -248,7 +256,7 @@ Manager::createDevice() } const float defaultQueuePriority(0.0f); - const uint32_t defaultQueueCount(1); + const uint32_t defaultQueueCount(totalComputeQueues); vk::DeviceQueueCreateInfo deviceQueueCreateInfo( vk::DeviceQueueCreateFlags(), this->mComputeQueueFamilyIndex, @@ -264,9 +272,16 @@ Manager::createDevice() &deviceCreateInfo, nullptr, this->mDevice.get()); SPDLOG_DEBUG("Kompute Manager device created"); - this->mComputeQueue = std::make_shared(); - this->mDevice->getQueue( - this->mComputeQueueFamilyIndex, 0, this->mComputeQueue.get()); + for (uint32_t i = 0; i < totalComputeQueues; i++) + { + std::shared_ptr currQueue = std::make_shared(); + + this->mDevice->getQueue( + this->mComputeQueueFamilyIndex, i, currQueue.get()); + + this->mComputeQueues.push_back(currQueue); + } + SPDLOG_DEBUG("Kompute Manager compute queue obtained"); } diff --git a/src/include/kompute/Manager.hpp b/src/include/kompute/Manager.hpp index e795bbf..e270983 100644 --- a/src/include/kompute/Manager.hpp +++ b/src/include/kompute/Manager.hpp @@ -25,19 +25,22 @@ class Manager Manager(); /** - Similar to base constructor but allows the user to provide the device - they would like to create the resources on. + * Similar to base constructor but allows the user to provide the device + * they would like to create the resources on. + * + * @param physicalDeviceIndex The index of the physical device to use + * @param totalQueues The total number of compute queues to create. */ - Manager(uint32_t physicalDeviceIndex); + Manager(uint32_t physicalDeviceIndex, uint32_t totalComputeQueues = 1); /** * Manager constructor which allows your own vulkan application to integrate * with the vulkan kompute use. * * @param instance Vulkan compute instance to base this application - * @physicalDevice Vulkan physical device to use for application - * @device Vulkan logical device to use for all base resources - * @physicalDeviceIndex Index for vulkan physical device used + * @param physicalDevice Vulkan physical device to use for application + * @param device Vulkan logical device to use for all base resources + * @param physicalDeviceIndex Index for vulkan physical device used */ Manager(std::shared_ptr instance, std::shared_ptr physicalDevice, @@ -62,6 +65,16 @@ class Manager std::weak_ptr getOrCreateManagedSequence( std::string sequenceName); + /** + * Create a new managed Kompute sequence so it's available within the manager. + * + * @param sequenceName The name for the named sequence to be created + * @param queueIndex The queue to use from the available queues + * @return Weak pointer to the manager owned sequence resource + */ + std::weak_ptr createManagedSequence( + std::string sequenceName, uint32_t queueIndex = 0); + /** * Operation that adds extra operations to existing or new created * sequences. @@ -114,7 +127,12 @@ class Manager std::weak_ptr sqWeakPtr = this->getOrCreateManagedSequence(sequenceName); - if (std::shared_ptr sq = sqWeakPtr.lock()) { + std::unordered_map>::iterator found = + this->mManagedSequences.find(sequenceName); + + if (found == this->mManagedSequences.end()) { + std::shared_ptr sq = found->second; + SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence BEGIN"); sq->begin(); @@ -127,6 +145,9 @@ class Manager SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence EVAL"); sq->evalAsync(); } + else { + SPDLOG_ERROR("Kompute Manager evalOpAsync sequence [{}] not found", sequenceName); + } SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence SUCCESS"); } @@ -154,7 +175,7 @@ class Manager SPDLOG_DEBUG("Kompute Manager evalOpAwait running sequence SUCCESS"); } else { - SPDLOG_ERROR("Sequence not found"); + SPDLOG_ERROR("Kompute Manager evalOpAwait Sequence not found"); } } @@ -209,7 +230,7 @@ class Manager std::shared_ptr mDevice = nullptr; bool mFreeDevice = false; uint32_t mComputeQueueFamilyIndex = -1; - std::shared_ptr mComputeQueue = nullptr; + std::vector> mComputeQueues; // -------------- ALWAYS OWNED RESOURCES std::unordered_map> @@ -224,7 +245,7 @@ class Manager // Create functions void createInstance(); - void createDevice(); + void createDevice(uint32_t totalComputeQueues); }; } // End namespace kp diff --git a/test/TestAsyncOperations.cpp b/test/TestAsyncOperations.cpp index ba8b203..15fd9ca 100644 --- a/test/TestAsyncOperations.cpp +++ b/test/TestAsyncOperations.cpp @@ -9,21 +9,6 @@ TEST(TestAsyncOperations, TestManagerAsync) { uint32_t size = 100000; - std::vector data(size, 0.0); - std::vector resultSync(size, 100000); - std::vector resultAsync(size, 200000); - - std::shared_ptr tensorA{ new kp::Tensor(data) }; - std::shared_ptr tensorB{ new kp::Tensor(data) }; - std::shared_ptr tensorC{ new kp::Tensor(data) }; - std::shared_ptr tensorD{ new kp::Tensor(data) }; - std::shared_ptr tensorE{ new kp::Tensor(data) }; - std::shared_ptr tensorF{ new kp::Tensor(data) }; - - kp::Manager mgr; - - mgr.evalOpDefault({ tensorA, tensorB, tensorC, tensorD, tensorE, tensorF }); - std::string shader(R"( #version 450 @@ -44,52 +29,82 @@ TEST(TestAsyncOperations, TestManagerAsync) } )"); + std::vector data(size, 0.0); + std::vector resultSync(size, 100000); + std::vector resultAsync(size, 100000); + + std::shared_ptr tensorSyncA{ new kp::Tensor(data) }; + std::shared_ptr tensorSyncB{ new kp::Tensor(data) }; + std::shared_ptr tensorSyncC{ new kp::Tensor(data) }; + std::shared_ptr tensorSyncD{ new kp::Tensor(data) }; + std::shared_ptr tensorSyncE{ new kp::Tensor(data) }; + std::shared_ptr tensorSyncF{ new kp::Tensor(data) }; + + kp::Manager mgr; + + mgr.evalOpDefault({ tensorSyncA, tensorSyncB, tensorSyncC, tensorSyncD, tensorSyncE, tensorSyncF }); + auto startSync = std::chrono::high_resolution_clock::now(); mgr.evalOpDefault>( - { tensorA, tensorB }, std::vector(shader.begin(), shader.end())); + { tensorSyncA, tensorSyncB }, std::vector(shader.begin(), shader.end())); mgr.evalOpDefault>( - { tensorC, tensorD }, std::vector(shader.begin(), shader.end())); + { tensorSyncC, tensorSyncD }, std::vector(shader.begin(), shader.end())); mgr.evalOpDefault>( - { tensorE, tensorF }, std::vector(shader.begin(), shader.end())); + { tensorSyncE, tensorSyncF }, std::vector(shader.begin(), shader.end())); + + mgr.evalOpDefault({ tensorSyncB, tensorSyncD, tensorSyncF }); auto endSync = std::chrono::high_resolution_clock::now(); - - mgr.evalOpDefault({ tensorB, tensorD, tensorF }); - - EXPECT_EQ(tensorB->data(), resultSync); - EXPECT_EQ(tensorD->data(), resultSync); - EXPECT_EQ(tensorF->data(), resultSync); - auto durationSync = std::chrono::duration_cast(endSync - startSync).count(); - auto startAsync = std::chrono::high_resolution_clock::now(); + EXPECT_EQ(tensorSyncB->data(), resultSync); + EXPECT_EQ(tensorSyncD->data(), resultSync); + EXPECT_EQ(tensorSyncF->data(), resultSync); - mgr.evalOpAsync>( - { tensorA, tensorB }, "asyncOne", std::vector(shader.begin(), shader.end())); + //std::shared_ptr tensorAsyncA{ new kp::Tensor(data) }; + //std::shared_ptr tensorAsyncB{ new kp::Tensor(data) }; + //std::shared_ptr tensorAsyncC{ new kp::Tensor(data) }; + //std::shared_ptr tensorAsyncD{ new kp::Tensor(data) }; + //std::shared_ptr tensorAsyncE{ new kp::Tensor(data) }; + //std::shared_ptr tensorAsyncF{ new kp::Tensor(data) }; - mgr.evalOpAsync>( - { tensorC, tensorD }, "asyncTwo", std::vector(shader.begin(), shader.end())); + //kp::Manager mgrAsync(0, 1); - mgr.evalOpAsync>( - { tensorE, tensorF }, "asyncThree", std::vector(shader.begin(), shader.end())); + //mgrAsync.evalOpDefault({ tensorAsyncA, tensorAsyncB, tensorAsyncC, tensorAsyncD, tensorAsyncE, tensorAsyncF }); - mgr.evalOpAwait("asyncOne"); - mgr.evalOpAwait("asyncTwo"); - mgr.evalOpAwait("asyncThree"); + //mgrAsync.createManagedSequence("async0", 0); + ////mgrAsync.createManagedSequence("async1", 1); + ////mgrAsync.createManagedSequence("async2", 2); - auto endAsync = std::chrono::high_resolution_clock::now(); + //auto startAsync = std::chrono::high_resolution_clock::now(); - auto durationAsync = std::chrono::duration_cast(endAsync - startAsync).count(); + //mgrAsync.evalOpAsync>( + // { tensorAsyncA, tensorAsyncB }, "async0", std::vector(shader.begin(), shader.end())); - mgr.evalOpDefault({ tensorB, tensorD, tensorF }); + ////mgrAsync.evalOpAsync>( + //// { tensorAsyncC, tensorAsyncD }, "async1", std::vector(shader.begin(), shader.end())); - EXPECT_EQ(tensorB->data(), resultAsync); - EXPECT_EQ(tensorD->data(), resultAsync); - EXPECT_EQ(tensorF->data(), resultAsync); + ////mgrAsync.evalOpAsync>( + //// { tensorAsyncE, tensorAsyncF }, "async2", std::vector(shader.begin(), shader.end())); - SPDLOG_DEBUG("Total Sync: {}", durationSync); - SPDLOG_DEBUG("Total Async: {}", durationAsync); + //mgrAsync.evalOpAwait("async0"); + ////mgrAsync.evalOpAwait("async1"); + ////mgrAsync.evalOpAwait("async2"); + + //mgrAsync.evalOpDefault({ tensorAsyncB }); + ////mgrAsync.evalOpDefault({ tensorAsyncD }); + ////mgrAsync.evalOpDefault({ tensorAsyncF }); + + //auto endAsync = std::chrono::high_resolution_clock::now(); + //auto durationAsync = std::chrono::duration_cast(endAsync - startAsync).count(); + + //EXPECT_EQ(tensorAsyncB->data(), resultAsync); + ////EXPECT_EQ(tensorAsyncD->data(), resultAsync); + ////EXPECT_EQ(tensorAsyncF->data(), resultAsync); + + ////SPDLOG_DEBUG("Total Sync: {}", durationSync); + //SPDLOG_DEBUG("Total Async: {}", durationAsync); }