Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions cpp/src/io/csv/csv_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ CUDF_KERNEL void __launch_bounds__(csvparse_block_dim)
* @param[out] columns The output column data
* @param[out] valids The bitmaps indicating whether column fields are valid
* @param[out] valid_counts The number of valid fields in each column
* @param[out] is_quoted_flags Per-column boolean arrays tracking which rows were quoted fields
*/
CUDF_KERNEL void __launch_bounds__(csvparse_block_dim)
convert_csv_to_cudf(cudf::io::parse_options_view options,
Expand All @@ -304,7 +305,8 @@ CUDF_KERNEL void __launch_bounds__(csvparse_block_dim)
device_span<cudf::data_type const> dtypes,
device_span<void* const> columns,
device_span<cudf::bitmask_type* const> valids,
device_span<size_type> valid_counts)
device_span<size_type> valid_counts,
device_span<bool* const> is_quoted_flags)
{
auto const raw_csv = data.data();
// thread IDs range per block, so also need the block id.
Expand Down Expand Up @@ -338,15 +340,19 @@ CUDF_KERNEL void __launch_bounds__(csvparse_block_dim)
field_start = trimmed_field.first;
field_end = trimmed_field.second;
}
bool* const is_quoted_output =
is_quoted_flags.empty() ? nullptr : is_quoted_flags[actual_col];
if (is_valid) {
// Type dispatcher does not handle STRING
if (dtypes[actual_col].id() == cudf::type_id::STRING) {
auto end = next_delimiter;
auto end = next_delimiter;
bool was_quoted = false;
if (not options.keepquotes) {
if (not options.detect_whitespace_around_quotes) {
if ((*field_start == options.quotechar) && (*(end - 1) == options.quotechar)) {
++field_start;
--end;
was_quoted = true;
}
} else {
// If the string is quoted, whitespace around the quotes get removed as well
Expand All @@ -355,9 +361,12 @@ CUDF_KERNEL void __launch_bounds__(csvparse_block_dim)
(*(trimmed_field.second - 1) == options.quotechar)) {
field_start = trimmed_field.first + 1;
end = trimmed_field.second - 1;
was_quoted = true;
}
}
}
// Track whether this field was quoted (for doublequote unescaping)
if (is_quoted_output != nullptr) { is_quoted_output[rec_id] = was_quoted; }
auto str_list = static_cast<std::pair<char const*, size_t>*>(columns[actual_col]);
str_list[rec_id].first = field_start;
str_list[rec_id].second = end - field_start;
Expand All @@ -380,6 +389,7 @@ CUDF_KERNEL void __launch_bounds__(csvparse_block_dim)
auto str_list = static_cast<std::pair<char const*, size_t>*>(columns[actual_col]);
str_list[rec_id].first = nullptr;
str_list[rec_id].second = 0;
if (is_quoted_output != nullptr) { is_quoted_output[rec_id] = false; }
}
++actual_col;
}
Expand Down Expand Up @@ -676,11 +686,12 @@ CUDF_KERNEL void __launch_bounds__(rowofs_block_dim)
ctx = make_char_context(ROW_CTX_NONE, ROW_CTX_QUOTE, ROW_CTX_NONE, 1, 0, 1);
}
} else if (c == quotechar) {
if (c_prev == delimiter || c_prev == quotechar) {
// Quoted string after delimiter, quoted string ending in delimiter, or double-quote
ctx = make_char_context(ROW_CTX_QUOTE, ROW_CTX_NONE);
// Only enter quote mode if field starts with quote; Ignore quotes in middle of field.
if (c_prev == delimiter) {
// Quoted string after delimiter - enter quote mode
ctx = make_char_context(ROW_CTX_QUOTE, ROW_CTX_QUOTE);
} else {
// Closing or ignored quote
// Quote in middle of field or closing quote - ignore
ctx = make_char_context(ROW_CTX_NONE, ROW_CTX_NONE);
}
} else {
Expand Down Expand Up @@ -817,15 +828,23 @@ void decode_row_column_data(cudf::io::parse_options_view const& options,
device_span<void* const> columns,
device_span<cudf::bitmask_type* const> valids,
device_span<size_type> valid_counts,
device_span<bool* const> is_quoted_flags,
rmm::cuda_stream_view stream)
{
// Calculate actual block count to use based on records count
auto const block_size = csvparse_block_dim;
auto const num_rows = row_offsets.size() - 1;
auto const grid_size = cudf::util::div_rounding_up_safe<size_t>(num_rows, block_size);

convert_csv_to_cudf<<<grid_size, block_size, 0, stream.value()>>>(
options, data, column_flags, row_offsets, dtypes, columns, valids, valid_counts);
convert_csv_to_cudf<<<grid_size, block_size, 0, stream.value()>>>(options,
data,
column_flags,
row_offsets,
dtypes,
columns,
valids,
valid_counts,
is_quoted_flags);
}

uint32_t __host__ gather_row_offsets(parse_options_view const& options,
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/io/csv/csv_gpu.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2019-2025, NVIDIA CORPORATION.
* SPDX-FileCopyrightText: Copyright (c) 2019-2026, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*/

Expand Down Expand Up @@ -207,6 +207,8 @@ cudf::detail::host_vector<column_type_histogram> detect_column_types(
* @param[out] columns Device memory output of column data
* @param[out] valids Device memory output of column valids bitmap data
* @param[out] valid_counts Device memory output of the number of valid fields in each column
* @param[out] is_quoted Per-column boolean arrays indicating which rows were quoted fields
* (nullptr entries mean the column doesn't need quote tracking)
* @param[in] stream CUDA stream to use
*/
void decode_row_column_data(cudf::io::parse_options_view const& options,
Expand All @@ -217,6 +219,7 @@ void decode_row_column_data(cudf::io::parse_options_view const& options,
device_span<void* const> columns,
device_span<cudf::bitmask_type* const> valids,
device_span<size_type> valid_counts,
device_span<bool* const> is_quoted,
rmm::cuda_stream_view stream);

} // namespace gpu
Expand Down
178 changes: 150 additions & 28 deletions cpp/src/io/csv/reader_impl.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2019-2025, NVIDIA CORPORATION.
* SPDX-FileCopyrightText: Copyright (c) 2019-2026, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*/

Expand All @@ -14,8 +14,14 @@
#include "io/utilities/hostdevice_vector.hpp"
#include "io/utilities/parsing_utils.cuh"

#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_factories.hpp>
#include <cudf/copying.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/cuda_memcpy.hpp>
#include <cudf/detail/utilities/host_worker_pool.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/detail/utilities/visitor_overload.hpp>
#include <cudf/io/csv.hpp>
Expand All @@ -24,14 +30,17 @@
#include <cudf/io/detail/csv.hpp>
#include <cudf/io/types.hpp>
#include <cudf/logger.hpp>
#include <cudf/strings/detail/copy_if_else.cuh>
#include <cudf/strings/detail/replace.hpp>
#include <cudf/table/table.hpp>
#include <cudf/utilities/error.hpp>
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/count.h>
#include <thrust/host_vector.h>
#include <thrust/iterator/counting_iterator.h>

Expand Down Expand Up @@ -614,17 +623,25 @@ void infer_column_types(parse_options const& parse_opts,
}
}

std::vector<column_buffer> decode_data(parse_options const& parse_opts,
host_span<column_parse::flags const> column_flags,
std::vector<std::string> const& column_names,
device_span<char const> data,
device_span<uint64_t const> row_offsets,
host_span<data_type const> column_types,
int32_t num_records,
int32_t num_actual_columns,
int32_t num_active_columns,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
/**
* @brief Result of decode_data containing column buffers and quoted field tracking
*/
struct decode_result {
std::vector<column_buffer> buffers;
std::vector<rmm::device_uvector<bool>> is_quoted_flags;
};

decode_result decode_data(parse_options const& parse_opts,
host_span<column_parse::flags const> column_flags,
std::vector<std::string> const& column_names,
device_span<char const> data,
device_span<uint64_t const> row_offsets,
host_span<data_type const> column_types,
int32_t num_records,
int32_t num_actual_columns,
int32_t num_active_columns,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
// Alloc output; columns' data memory is still expected for empty dataframe
std::vector<column_buffer> out_buffers;
Expand All @@ -648,6 +665,16 @@ std::vector<column_buffer> decode_data(parse_options const& parse_opts,
h_valid[i] = out_buffers[i].null_mask();
}

// Allocate is_quoted_flags arrays for string columns to track which fields were quoted
std::vector<rmm::device_uvector<bool>> is_quoted_flags_storage;
auto h_is_quoted_flags = cudf::detail::make_host_vector<bool*>(num_active_columns, stream);
for (int i = 0; i < num_active_columns; ++i) {
if (column_types[i].id() == type_id::STRING) {
is_quoted_flags_storage.emplace_back(num_records, stream);
h_is_quoted_flags[i] = is_quoted_flags_storage.back().data();
}
}

auto d_valid_counts = cudf::detail::make_zeroed_device_uvector_async<size_type>(
num_active_columns, stream, cudf::get_current_device_resource_ref());

Expand All @@ -660,14 +687,15 @@ std::vector<column_buffer> decode_data(parse_options const& parse_opts,
make_device_uvector_async(h_data, stream, cudf::get_current_device_resource_ref()),
make_device_uvector_async(h_valid, stream, cudf::get_current_device_resource_ref()),
d_valid_counts,
make_device_uvector_async(h_is_quoted_flags, stream, cudf::get_current_device_resource_ref()),
stream);

auto const h_valid_counts = cudf::detail::make_host_vector(d_valid_counts, stream);
for (int i = 0; i < num_active_columns; ++i) {
out_buffers[i].null_count() = num_records - h_valid_counts[i];
}

return out_buffers;
return {std::move(out_buffers), std::move(is_quoted_flags_storage)};
}

cudf::detail::host_vector<data_type> determine_column_types(
Expand Down Expand Up @@ -909,7 +937,7 @@ table_with_metadata read_csv(cudf::io::datasource* source,
auto out_columns = std::vector<std::unique_ptr<cudf::column>>();
out_columns.reserve(column_types.size());
if (num_records != 0) {
auto out_buffers = decode_data( //
auto decode_result = decode_data( //
parse_opts,
column_flags,
column_names,
Expand All @@ -922,23 +950,117 @@ table_with_metadata read_csv(cudf::io::datasource* source,
stream,
mr);

cudf::string_scalar quotechar_scalar(std::string(1, parse_opts.quotechar), true, stream);
cudf::string_scalar dblquotechar_scalar(std::string(2, parse_opts.quotechar), true, stream);
out_columns.resize(column_types.size());

auto& out_buffers = decode_result.buffers;
auto& is_quoted_flags = decode_result.is_quoted_flags;

bool const doublequote_enabled = (parse_opts.quotechar != '\0' && parse_opts.doublequote);

// Identify string columns that need doublequote processing
std::vector<size_t> string_col_indices;
if (doublequote_enabled) {
for (size_t i = 0; i < column_types.size(); ++i) {
if (column_types[i].id() == type_id::STRING) { string_col_indices.push_back(i); }
}
}

// Process string columns with doublequote handling in parallel using thread pool
auto const num_string_cols = string_col_indices.size();
if (num_string_cols > 0) {
auto const quotechar = parse_opts.quotechar;
cudf::string_scalar quotechar_scalar(std::string(1, quotechar), true, stream);
cudf::string_scalar dblquotechar_scalar(std::string(2, quotechar), true, stream);
constexpr size_t max_tasks = 4;
auto const cols_per_task = cudf::util::div_rounding_up_safe(num_string_cols, max_tasks);
auto const num_tasks = cudf::util::div_rounding_up_safe(num_string_cols, cols_per_task);
auto streams = cudf::detail::fork_streams(stream, num_tasks);

auto process_string_column = [&](size_t str_col_idx, rmm::cuda_stream_view col_stream) {
auto const col_idx = string_col_indices[str_col_idx];
auto const& is_quoted = device_span<bool>(is_quoted_flags[str_col_idx]);
auto* buffer = &out_buffers[col_idx];

// Count how many rows were quoted to determine the fast path
auto const num_quoted = thrust::count(
rmm::exec_policy_nosync(col_stream), is_quoted.begin(), is_quoted.end(), true);
if (num_quoted == 0) {
// Fast path: no rows were quoted, skip replacement entirely
out_columns[col_idx] = make_column(*buffer, nullptr, std::nullopt, col_stream);
} else {
auto replaced_all_col = cudf::strings::detail::replace(
cudf::make_strings_column(*buffer->_strings, col_stream)->view(),
dblquotechar_scalar,
quotechar_scalar,
-1,
col_stream,
mr);
if (std::cmp_equal(num_quoted, num_records)) {
// Fast path: all rows were quoted, apply replacement to all
out_columns[col_idx] = std::move(replaced_all_col);
} else {
// Need to replace only the quoted rows
auto const replaced_all_view =
cudf::column_device_view::create(replaced_all_col->view(), col_stream);
auto const replaced_all_iter = cudf::detail::make_optional_iterator<cudf::string_view>(
*replaced_all_view, cudf::nullate::DYNAMIC{replaced_all_col->nullable()});

auto const* original_pairs = buffer->_strings->data();
auto const original_iter = thrust::make_transform_iterator(
thrust::make_counting_iterator<size_type>(0),
cuda::proclaim_return_type<cuda::std::optional<cudf::string_view>>(
[original_pairs] __device__(
size_type idx) -> cuda::std::optional<cudf::string_view> {
auto const& p = original_pairs[idx];
return p.first != nullptr
? cuda::std::optional<cudf::string_view>{cudf::string_view{p.first,
p.second}}
: cuda::std::nullopt;
}));

out_columns[col_idx] = cudf::strings::detail::copy_if_else(
replaced_all_iter,
replaced_all_iter + num_records,
original_iter,
[is_quoted] __device__(size_type idx) { return is_quoted[idx]; },
col_stream,
mr);
}
}
};

std::vector<std::future<void>> tasks;
tasks.reserve(num_tasks);

for (size_t task_id = 0; task_id < num_tasks; ++task_id) {
auto const start_col = task_id * cols_per_task;
auto const end_col = std::min(start_col + cols_per_task, num_string_cols);
auto const col_stream = streams[task_id];
tasks.emplace_back(
cudf::detail::host_worker_pool().submit_task([&, start_col, end_col, col_stream]() {
for (size_t str_col_idx = start_col; str_col_idx < end_col; ++str_col_idx) {
process_string_column(str_col_idx, col_stream);
}
}));
}

for (auto& task : tasks) {
task.get();
}

cudf::detail::join_streams(streams, stream);
}

// Create output columns for the columns that were not processed in the parallel loop
for (size_t i = 0; i < column_types.size(); ++i) {
metadata.schema_info.emplace_back(out_buffers[i].name);
if (column_types[i].id() == type_id::STRING && parse_opts.quotechar != '\0' &&
parse_opts.doublequote) {
// PANDAS' default behavior of enabling doublequote for two consecutive
// quotechars in quoted fields results in reduction to a single quotechar
// TODO: Would be much more efficient to perform this operation in-place
// during the conversion stage
std::unique_ptr<column> col = cudf::make_strings_column(*out_buffers[i]._strings, stream);
out_columns.emplace_back(cudf::strings::detail::replace(
col->view(), dblquotechar_scalar, quotechar_scalar, -1, stream, mr));
} else {
out_columns.emplace_back(make_column(out_buffers[i], nullptr, std::nullopt, stream));
if (!out_columns[i]) {
out_columns[i] = make_column(out_buffers[i], nullptr, std::nullopt, stream);
}
}

for (size_t i = 0; i < column_types.size(); ++i) {
metadata.schema_info.emplace_back(out_buffers[i].name);
}
} else {
// Create empty columns
for (auto column_type : column_types) {
Expand Down
Loading