CUDA Cooperative Groups 求和规约示例
优点
- Warp 原生指令,可避免共享内存访问,减少延迟
- 自动同步,无需手动调用 __syncthreads()
- 灵活线程组,支持任意大小的 tile
向量求和(Warp级)
示例演算
假设:
- 输入:
input = [1, 2, 3, ..., 64]
(长度 64)。 配置:
blockDim.x = 64
,gridDim.x = 2
(共 2 个线程块)。Warp 0(线程 0-31):
- 归约
[1, 2, ..., 32]
→sum = 528
output[0] = 528
- 归约
Warp 1(线程 32-63):
- 归约
[33, 34, ..., 64]
→sum = 1552
`output[1] = 1552
- 归约
- 输出:
output = [528, 1552]
__global__ void sum_kernel(int* input, int* output) {
auto block = cg::this_thread_block();
auto tile = cg::tiled_partition<32>(block); // 32线程的Warp分片
int val = input[block.thread_rank()]; // 每个线程加载一个输入元素
int sum = cg::reduce(tile, val, cg::plus<int>()); // Warp内求和归约(使用shfl指令)
if (tile.thread_rank() == 0) { // Warp的第一个线程写入结果
output[blockIdx.x] = sum;
}
}
矩阵行求和(块级)
示例演算
假设:
输入矩阵(
2×4
):matrix=[15263748]
$$ \text{matrix} =\begin{bmatrix} 1 & 2 &3 & 4 \\ 5 & 6 & 7 & 8 \end{bmatrix} $$
- 配置:blockDim.x = 2(每行 2 线程),gridDim.x = 2(共 2 线程块)。
执行过程:
行 0(blockIdx.x = 0):
- 线程 0:i = 0, 2→ sum = 1 + 3 = 4
- 线程 1:i = 1, 3→ sum = 2 + 4 = 6
- 归约:total_sum = 4 + 6 = 10
- 输出:row_sums[0] = 10
行 1(blockIdx.x = 1):
- 线程 0:i = 0, 2→ sum = 5 + 7 = 12
- 线程 1:i = 1, 3→ sum = 6 + 8 = 14
- 归约:total_sum = 12 + 14 = 26
- 输出:row_sums[1] = 26
最终结果:
$$ \text{result}=\begin{bmatrix} 10 \\ 26 \end{bmatrix} $$
__global__ void row_sum(float* matrix, float* row_sums, int cols) {
auto block = cg::this_thread_block();
float sum = 0.0f;
// 每个线程处理一行中的部分元素
for (int i = block.thread_rank(); i < cols; i += block.size()) {
sum += matrix[blockIdx.x * cols + i];
}
// 块内归约
float total_sum = cg::reduce(block, sum, cg::plus<float>());
if (block.thread_rank() == 0) {
row_sums[blockIdx.x] = total_sum; // 写入结果
}
}
矩阵求和(Warp级 + 块级)
第一阶段(partialBlockReduceKernel
):将输入矩阵 A
(width × height
)按列分块,计算每列的部分和。结果存入 partial_sums
(width × numReduceBlocks
)。
第二阶段(blockReduce
): 对 partial_sums
的每列再次归约,得到最终结果 result
(width × 1
)。
示例演算
假设:
输入矩阵
A
(2×8
):$$ A= \begin{bmatrix} 1 & 5 \\ 2 & 6 \\ 3 & 7 \\ 4 & 8 \\ 1 & 5 \\ 2 & 6 \\ 3 & 7 \\ 4 & 8 \\ \end{bmatrix} $$
- 参数:
width=2
,height=8
,blockDim.x=4
,gridDim.x=2
。
第一阶段:
线程块0(
blockIdx.x=0
):- 列0:线程0-3处理
A[0][0..3]
→ 部分和1+2+3+4=10
。 - 列1:线程0-3处理
A[1][0..3]
→ 部分和5+6+7+8=26
。
- 列0:线程0-3处理
线程块1(
blockIdx.x=1
):- 列0:线程0-3处理
A[0][4..7]
→ 部分和1+2+3+4=10
。 - 列1:线程0-3处理
A[1][4..7]
→ 部分和5+6+7+8=26
。
- 列0:线程0-3处理
partial_sums
:$$ \text{partial_sums}=\begin{bmatrix} 10 & 26 \\ 10 & 26 \end{bmatrix} $$
第二阶段:
对
partial_sums
的每列求和:- 列0:
10 + 10 = 20
。 - 列1:
26 + 26 = 52
。
- 列0:
最终结果:
$$ \text{result}=\begin{bmatrix} 20 \\ 52 \end{bmatrix} $$
template<typename Ty>
struct AddOp {
__device__ Ty initial() const { return Ty::zero(); } // 初始值(如0)
__device__ Ty operator()(Ty a, Ty b) const { return a + b; } // 加法
__device__ void evalAssign(Ty& a, Ty b) const { a += b; } // 累加
__device__ Ty reduce(const auto& group, Ty val) {
return cg::reduce(group, val, cg::plus<Ty>()); // Warp级归约
}
__device__ void final_block_reduction_async(const auto& group, Ty* dst, Ty val) {
AddOpFinalReduce<Ty>::final_block_reduction_async(group, dst, val);
}
};
template<typename F>
RustCudaError
vectorSum(F* in, F* result, size_t width, size_t height, cudaStream_t stream) {
dim3 blockDim(512, 1, 1);
size_t numReduceBlocks = (((height - 1) / blockDim.x + 1) - 1) / 8 + 1;
dim3 gridDim(numReduceBlocks, width, 1);
// Allocate the partial sums and set them to zero.
F* partial_sums;
CUDA_OK(
cudaMallocAsync(&partial_sums, sizeof(F) * gridDim.x * width, stream)
);
size_t numTiles = blockDim.x / 32;
AddOp<F> op;
partialBlockReduceKernel<<<
gridDim,
blockDim,
numTiles * blockDim.y * sizeof(F),
stream>>>(in, partial_sums, width, height, op);
size_t new_height = gridDim.x;
gridDim.x = (((new_height - 1) / blockDim.x + 1) - 1) / 32 + 1;
// Initialize the result value.
//
// *Warning*: this assumes the zero of `F` is just given by the zero byte pattern.
CUDA_OK(cudaMemsetAsync(result, 0, sizeof(F) * width, stream));
// Compute the final results from the partial evaluations.
blockReduce<<<gridDim, blockDim, 0, stream>>>(
partial_sums,
result,
width,
new_height,
op
);
// Free the memory used for partial sums.
CUDA_OK(cudaFreeAsync(partial_sums, stream));
return CUDA_SUCCESS_MOON;
}
template<typename F, typename TyOp>
__global__ void partialBlockReduceKernel(
F* A,
F* partial,
size_t width,
size_t height,
TyOp&& op
) {
auto block = cg::this_thread_block();
auto tile = cg::tiled_partition<32>(block);
F thread_val = op.initial();
size_t batchIdx = blockDim.y * blockIdx.y + threadIdx.y;
if (batchIdx >= width) {
return;
}
// Stride loop to accumulate partial sum
// 处理当前列的部分行
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; i < height;
i += blockDim.x * gridDim.x) {
op.evalAssign(thread_val, A[batchIdx * height + i]);
}
// Allocate shared memory
extern __shared__ unsigned char memory[];
F* shared = reinterpret_cast<F*>(memory);
// Warp-level reduction within tiles
thread_val = partialBlockReduce(block, tile, thread_val, shared, op);
// Write the result to the partial_sums array
if (block.thread_rank() == 0) {
partial[batchIdx * gridDim.x + blockIdx.x] = shared[0];
}
}
template<typename F, typename TyOp, typename TyBlock, typename TyTile>
__device__ F partialBlockReduce(
const TyBlock& block,
const TyTile& tile,
F val,
F* shared,
TyOp&& op
) {
// Warp-level reduction within tiles
val = op.reduce(tile, val);
// Only the first thread of each warp writes to shared memory
if (tile.thread_rank() == 0) {
shared[tile.meta_group_rank()] = val;
}
block.sync(); // Synchronize after warp-level reduction
// Perform tree-based reduction on shared memory
for (int stride = (block.size() / tile.size()) / 2; stride > 0;
stride /= 2) {
if (block.thread_rank() < stride) {
op.evalAssign(
shared[block.thread_rank()],
shared[block.thread_rank() + stride]
);
}
block.sync(); // Synchronize after each step
}
return shared[0];
}
template<typename F, typename TyOp>
__global__ void
blockReduce(F* A, F* result, size_t width, size_t height, TyOp&& op) {
auto block = cg::this_thread_block();
auto tile = cg::tiled_partition<32>(block);
size_t batchIdx = blockDim.y * blockIdx.y + threadIdx.y;
if (batchIdx >= width) {
return;
}
F thread_val = op.initial();
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; i < height;
i += blockDim.x * gridDim.x) {
op.evalAssign(thread_val, A[batchIdx * height + i]);
}
op.final_block_reduction_async(tile, &result[batchIdx], thread_val);
block.sync();
}
template<typename Ty>
struct AddOpFinalReduce {
template<typename TyGroup>
__device__ __forceinline__ static void
final_block_reduction_async(const TyGroup& group, Ty* dst, Ty val);
};
template<>
struct AddOpFinalReduce<kb31_t> {
template<typename TyGroup>
__device__ __forceinline__ static void
final_block_reduction_async(const TyGroup& group, kb31_t* dst, kb31_t val) {
cuda::atomic_ref<kb31_t, cuda::thread_scope_block> atomic(dst[0]);
// reduce thread sums across the tile, add the result to the atomic
return cg::reduce_update_async(group, atomic, val, cg::plus<kb31_t>());
}
};
没有评论