Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 38 additions & 26 deletions dali/operators/imgcodec/image_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "dali/pipeline/operator/checkpointing/stateless_operator.h"
#include "dali/pipeline/operator/common.h"
#include "dali/pipeline/operator/operator.h"
#include "dali/pipeline/util/new_thread_pool.h"

#if not(WITH_DYNAMIC_NVIMGCODEC_ENABLED)
nvimgcodecStatus_t get_libjpeg_turbo_extension_desc(nvimgcodecExtensionDesc_t *ext_desc);
Expand Down Expand Up @@ -228,9 +229,9 @@ class ImageDecoder : public StatelessOperator<Backend> {
num_threads_ = spec.GetArgument<int>("num_threads");
GetDecoderSpecificArguments(spec);

thread_pool_ = std::make_unique<NewThreadPool>(num_threads_, device_id_,
spec.GetArgument<bool>("affine"), "MixedDecoder");
if (std::is_same<MixedBackend, Backend>::value) {
thread_pool_ = std::make_unique<ThreadPool>(num_threads_, device_id_,
spec.GetArgument<bool>("affine"), "MixedDecoder");
if (spec_.HasArgument("cache_size"))
cache_ = std::make_unique<CachedDecoderImpl>(spec_);
}
Expand Down Expand Up @@ -410,23 +411,29 @@ class ImageDecoder : public StatelessOperator<Backend> {
nvimgcodecStatus_t schedule(int device_id, int sample_idx, void *task_context,
void (*task)(int thread_id, int sample_idx, void *task_context)) {
assert(tp_);
nvimgcodec_scheduled_tasks_.emplace_back([=](int tid) { task(tid, sample_idx, task_context); });
nvimgcodec_scheduled_tasks_.emplace_back([=]() {
task(NewThreadPool::this_thread_idx(), sample_idx, task_context);
});
return NVIMGCODEC_STATUS_SUCCESS;
}

nvimgcodecStatus_t run(int device_id) {
assert(tp_);
if (!job_)
job_.emplace();
for (int i = 0; i < static_cast<int>(nvimgcodec_scheduled_tasks_.size()); i++) {
tp_->AddWork(std::move(nvimgcodec_scheduled_tasks_[i]), -i);
job_->AddTask(std::move(nvimgcodec_scheduled_tasks_[i]));
}
nvimgcodec_scheduled_tasks_.clear();
tp_->RunAll(false);
job_->Run(*tp_, false);
return NVIMGCODEC_STATUS_SUCCESS;
}

nvimgcodecStatus_t wait(int device_id) {
assert(tp_);
tp_->WaitForWork();
if (job_) {
job_->Wait();
job_.reset();
}
return NVIMGCODEC_STATUS_SUCCESS;
}

Expand Down Expand Up @@ -525,8 +532,8 @@ class ImageDecoder : public StatelessOperator<Backend> {
throw std::runtime_error(make_string("Invalid sample_type: ", sample_type));
}

ThreadPool *GetThreadPool(const Workspace &ws) {
return std::is_same<MixedBackend, Backend>::value ? thread_pool_.get() : &ws.GetThreadPool();
NewThreadPool *GetThreadPool(const Workspace &ws) {
return thread_pool_.get();
}

bool SetupImpl(std::vector<OutputDesc> &output_desc, const Workspace &ws) override {
Expand Down Expand Up @@ -674,7 +681,7 @@ class ImageDecoder : public StatelessOperator<Backend> {
TensorListShape<> out_shape(nsamples, 3);

const bool use_cache = cache_ && cache_->IsCacheEnabled() && dtype_ == DALI_UINT8;
auto setup_block = [&](int block_idx, int nblocks, int tid) {
auto setup_block = [&](int block_idx, int nblocks) {
int i_start = nsamples * block_idx / nblocks;
int i_end = nsamples * (block_idx + 1) / nblocks;
DomainTimeRange tr("Setup #" + std::to_string(block_idx) + "/" + std::to_string(nblocks),
Expand Down Expand Up @@ -753,25 +760,26 @@ class ImageDecoder : public StatelessOperator<Backend> {

if (ntasks < 2) {
DomainTimeRange tr("Setup", DomainTimeRange::kOrange);
setup_block(0, 1, -1); // run all in current thread
setup_block(0, 1); // run all in current thread
} else {
Job job;
int block_idx = 0;
atomic_idx_.store(0);
auto setup_task = [&, nblocks](int tid) {
auto setup_task = [&, nblocks]() {
DomainTimeRange tr("Setup", DomainTimeRange::kOrange);
int block_idx;
while ((block_idx = atomic_idx_.fetch_add(1)) < nblocks) {
setup_block(block_idx, nblocks, tid);
setup_block(block_idx, nblocks);
}
};

for (int task_idx = 0; task_idx < ntasks - 1; task_idx++) {
tp_->AddWork(setup_task, -task_idx);
job.AddTask(setup_task, -task_idx);
}
assert(ntasks >= 2);
tp_->RunAll(false); // start work but not wait
setup_task(-1); // last task in current thread
tp_->WaitForWork(); // wait for the other threads
job.Run(*tp_, false); // start work but not wait
setup_task(); // last task in current thread
job.Wait(); // wait for the other threads
}

// Allocate the memory for the outputs...
Expand Down Expand Up @@ -846,7 +854,10 @@ class ImageDecoder : public StatelessOperator<Backend> {
// before it issues stream synchronization with the user stream. Even if we didn't have that
// race, we probably want to wait for all threads to finish anyway because we can't
// guarantee that the thread pool from the workspace outlives RunImplImpl call.
tp_->WaitForWork();
if (job_) {
job_->Wait();
job_.reset();
}
}
if (decode_status_size != nsamples_decode)
throw std::runtime_error("Failed to run decoder");
Expand All @@ -859,12 +870,13 @@ class ImageDecoder : public StatelessOperator<Backend> {
}
}
if (any_need_processing) {
Job job;
for (size_t idx = 0; idx < nsamples_decode; idx++) {
size_t orig_idx = decode_sample_idxs_[idx];
auto st_ptr = state_[orig_idx].get();
if (st_ptr->need_processing) {
tp_->AddWork(
[&, out = output[orig_idx], st_ptr, orig_idx](int tid) {
job.AddTask(
[&, out = output[orig_idx], st_ptr, orig_idx]() {
DomainTimeRange tr(make_string("Convert #", orig_idx), DomainTimeRange::kOrange);
auto &st = *st_ptr;
if constexpr (std::is_same<MixedBackend, Backend>::value) {
Expand All @@ -878,11 +890,10 @@ class ImageDecoder : public StatelessOperator<Backend> {
st.req_layout, st.orig_img_type, ROI{}, nvimgcodecOrientation_t{});
st.host_buf.reset();
}
},
-idx);
}, -idx);
}
}
tp_->RunAll(true);
job.Run(*tp_, true);
}
}

Expand All @@ -906,7 +917,7 @@ class ImageDecoder : public StatelessOperator<Backend> {
}
}

std::unique_ptr<ThreadPool> thread_pool_;
std::unique_ptr<NewThreadPool> thread_pool_;
std::unique_ptr<CachedDecoderImpl> cache_;

NvImageCodecInstance instance_ = {};
Expand Down Expand Up @@ -936,7 +947,8 @@ class ImageDecoder : public StatelessOperator<Backend> {
bool use_orientation_ = true;
int max_batch_size_ = 1;
int num_threads_ = -1;
ThreadPool *tp_ = nullptr;
NewThreadPool *tp_ = nullptr;
std::optional<IncrementalJob> job_;
std::vector<std::unique_ptr<SampleState>> state_;
std::vector<nvimgcodecCodeStream_t> batch_encoded_streams_;
std::vector<nvimgcodecImage_t> batch_images_;
Expand All @@ -952,7 +964,7 @@ class ImageDecoder : public StatelessOperator<Backend> {
std::vector<nvimgcodecExtensionDesc_t> extensions_descs_;
std::vector<nvimgcodecExtension_t> extensions_;

std::vector<std::function<void(int)>> nvimgcodec_scheduled_tasks_;
std::vector<std::function<void()>> nvimgcodec_scheduled_tasks_;
};

} // namespace imgcodec
Expand Down
76 changes: 76 additions & 0 deletions dali/pipeline/util/new_thread_pool.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <typeinfo>
#include "dali/pipeline/util/new_thread_pool.h"
#include "dali/core/device_guard.h"
#include "dali/util/nvml.h"
#include "dali/core/nvtx.h"

namespace dali {

NewThreadPool::NewThreadPool(
int num_threads,
std::optional<int> device_id,
bool set_affinity,
std::string name)
: name_(name) {
if (device_id.has_value() && *device_id == CPU_ONLY_DEVICE_ID)
device_id = std::nullopt;
#if NVML_ENABLED
// We use NVML only for setting thread affinity
if (device_id.has_value() && set_affinity) {
nvml_handle_ = nvml::NvmlInstance::CreateNvmlInstance();
}
#endif
Init(num_threads, [=, this](int thread_idx) {
return OnThreadStart(thread_idx, set_affinity);
});
}

std::any NewThreadPool::OnThreadStart(int thread_idx, bool set_affinity) {
std::string name = make_string("[DALI][NT", thread_idx, "]", name_);
SetThreadName(name.c_str());
std::any dg;
if (device_id_.has_value())
dg.emplace<DeviceGuard>(*device_id_);
#if NVML_ENABLED
try {
if (set_affinity) {
const char *env_affinity = std::getenv("DALI_AFFINITY_MASK");
int core = -1;
if (env_affinity) {
const auto &vec = string_split(env_affinity, ',');
if ((size_t)thread_idx < vec.size()) {
core = std::stoi(vec[thread_idx]);
} else {
DALI_WARN("DALI_AFFINITY_MASK environment variable is set, "
"but does not have enough entries: thread_id (", thread_idx,
") vs #entries (", vec.size(), "). Ignoring...");
}
}
nvml::SetCPUAffinity(core);
}
} catch (const std::exception &e) {
DALI_WARN("Couldn't set thread affinity in thread ", thread_idx, " of thread pool \"",
name_, "\". Exception ", typeid(e).name(), ": ", e.what());
} catch (...) {
DALI_WARN("Couldn't set thread affinity in thread ", thread_idx, " of thread pool \"",
name_, "\". Unknown error.");
}
#endif
return dg;
}

} // namespace dali
42 changes: 42 additions & 0 deletions dali/pipeline/util/new_thread_pool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <optional>
#include <string>
#include "dali/core/exec/thread_pool_base.h"
#if NVML_ENABLED
#include "dali/util/nvml.h"
#endif

#ifndef DALI_PIPELINE_UTIL_NEW_THREAD_POOL_H_
#define DALI_PIPELINE_UTIL_NEW_THREAD_POOL_H_

namespace dali {

class DLL_PUBLIC NewThreadPool : public ThreadPoolBase {
public:
NewThreadPool(int num_threads, std::optional<int> device_id, bool set_affinity, std::string name);

private:
std::any OnThreadStart(int thread_idx, bool set_affinity);
std::optional<int> device_id_;
std::string name_;
#if NVML_ENABLED
nvml::NvmlInstance nvml_handle_;
#endif
};

} // namespace dali

#endif // DALI_PIPELINE_UTIL_NEW_THREAD_POOL_H_