CUDA Cooperative Groups 求和规约示例

优点

  • Warp 原生指令,可避免共享内存访问,减少延迟
  • 自动同步,无需手动调用 __syncthreads()
  • 灵活线程组,支持任意大小的 tile

向量求和(Warp级)

示例演算

假设:

  • 输入:input = [1, 2, 3, ..., 64](长度 64)。
  • 配置:blockDim.x = 64gridDim.x = 2(共 2 个线程块)。

    • Warp 0(线程 0-31):

      • 归约 [1, 2, ..., 32]sum = 528
      • output[0] = 528
    • Warp 1(线程 32-63):

      • 归约 [33, 34, ..., 64]sum = 1552
      • `output[1] = 1552
  • 输出:output = [528, 1552]
__global__ void sum_kernel(int* input, int* output) {
    auto block = cg::this_thread_block();
    auto tile = cg::tiled_partition<32>(block); // 32线程的Warp分片

    int val = input[block.thread_rank()]; // 每个线程加载一个输入元素
    int sum = cg::reduce(tile, val, cg::plus<int>()); // Warp内求和归约(使用shfl指令)

    if (tile.thread_rank() == 0) { // Warp的第一个线程写入结果
        output[blockIdx.x] = sum;
    }
}

矩阵行求和(块级)

示例演算

假设:

  • 输入矩阵(2×4): matrix=[15263748]

    $$ \text{matrix} =\begin{bmatrix} 1 & 2 &3 & 4 \\ 5 & 6 & 7 & 8 \end{bmatrix} $$

  • 配置:blockDim.x = 2(每行 2 线程),gridDim.x = 2(共 2 线程块)。

执行过程:

  1. 行 0(blockIdx.x = 0):

    • 线程 0:i = 0, 2→ sum = 1 + 3 = 4
    • 线程 1:i = 1, 3→ sum = 2 + 4 = 6
    • 归约:total_sum = 4 + 6 = 10
    • 输出:row_sums[0] = 10
  2. 行 1(blockIdx.x = 1):

    • 线程 0:i = 0, 2→ sum = 5 + 7 = 12
    • 线程 1:i = 1, 3→ sum = 6 + 8 = 14
    • 归约:total_sum = 12 + 14 = 26
    • 输出:row_sums[1] = 26
  3. 最终结果:

    $$ \text{result}=\begin{bmatrix} 10 \\ 26 \end{bmatrix} $$

__global__ void row_sum(float* matrix, float* row_sums, int cols) {
    auto block = cg::this_thread_block();
    float sum = 0.0f;

    // 每个线程处理一行中的部分元素
    for (int i = block.thread_rank(); i < cols; i += block.size()) {
        sum += matrix[blockIdx.x * cols + i];
    }

    // 块内归约
    float total_sum = cg::reduce(block, sum, cg::plus<float>());

    if (block.thread_rank() == 0) {
        row_sums[blockIdx.x] = total_sum; // 写入结果
    }
}

矩阵求和(Warp级 + 块级)

第一阶段(partialBlockReduceKernel):将输入矩阵 Awidth × height)按列分块,计算每列的部分和。结果存入 partial_sumswidth × numReduceBlocks)。

第二阶段(blockReduce): 对 partial_sums 的每列再次归约,得到最终结果 resultwidth × 1)。

示例演算

假设:

  • 输入矩阵 A2×8):

    $$ A= \begin{bmatrix} 1 & 5 \\ 2 & 6 \\ 3 & 7 \\ 4 & 8 \\ 1 & 5 \\ 2 & 6 \\ 3 & 7 \\ 4 & 8 \\ \end{bmatrix} $$

  • 参数:width=2, height=8, blockDim.x=4, gridDim.x=2

第一阶段:

  1. 线程块0(blockIdx.x=0):

    • 列0:线程0-3处理 A[0][0..3]→ 部分和 1+2+3+4=10
    • 列1:线程0-3处理 A[1][0..3]→ 部分和 5+6+7+8=26
  2. 线程块1(blockIdx.x=1):

    • 列0:线程0-3处理 A[0][4..7]→ 部分和 1+2+3+4=10
    • 列1:线程0-3处理 A[1][4..7]→ 部分和 5+6+7+8=26
  3. partial_sums

    $$ \text{partial_sums}=\begin{bmatrix} 10 & 26 \\ 10 & 26 \end{bmatrix} $$

第二阶段:

  • partial_sums的每列求和:

    • 列0:10 + 10 = 20
    • 列1:26 + 26 = 52
  • 最终结果:

    $$ \text{result}=\begin{bmatrix} 20 \\ 52 \end{bmatrix} $$

template<typename Ty>
struct AddOp {
    __device__ Ty initial() const { return Ty::zero(); }  // 初始值(如0)
    __device__ Ty operator()(Ty a, Ty b) const { return a + b; }  // 加法
    __device__ void evalAssign(Ty& a, Ty b) const { a += b; }     // 累加
    __device__ Ty reduce(const auto& group, Ty val) {
        return cg::reduce(group, val, cg::plus<Ty>());  // Warp级归约
    }
    __device__ void final_block_reduction_async(const auto& group, Ty* dst, Ty val) {
        AddOpFinalReduce<Ty>::final_block_reduction_async(group, dst, val);
    }
};


template<typename F>
RustCudaError
vectorSum(F* in, F* result, size_t width, size_t height, cudaStream_t stream) {
    dim3 blockDim(512, 1, 1);
    size_t numReduceBlocks = (((height - 1) / blockDim.x + 1) - 1) / 8 + 1;
    dim3 gridDim(numReduceBlocks, width, 1);

    // Allocate the partial sums and set them to zero.
    F* partial_sums;
    CUDA_OK(
        cudaMallocAsync(&partial_sums, sizeof(F) * gridDim.x * width, stream)
    );

    size_t numTiles = blockDim.x / 32;

    AddOp<F> op;

    partialBlockReduceKernel<<<
        gridDim,
        blockDim,
        numTiles * blockDim.y * sizeof(F),
        stream>>>(in, partial_sums, width, height, op);

    size_t new_height = gridDim.x;
    gridDim.x = (((new_height - 1) / blockDim.x + 1) - 1) / 32 + 1;

    // Initialize the result value.
    //
    // *Warning*: this assumes the zero of `F` is just given by the zero byte pattern.
    CUDA_OK(cudaMemsetAsync(result, 0, sizeof(F) * width, stream));
    // Compute the final results from the partial evaluations.
    blockReduce<<<gridDim, blockDim, 0, stream>>>(
        partial_sums,
        result,
        width,
        new_height,
        op
    );
    // Free the memory used for partial sums.
    CUDA_OK(cudaFreeAsync(partial_sums, stream));

    return CUDA_SUCCESS_MOON;
}


template<typename F, typename TyOp>
__global__ void partialBlockReduceKernel(
    F* A,
    F* partial,
    size_t width,
    size_t height,
    TyOp&& op
) {
    auto block = cg::this_thread_block();
    auto tile = cg::tiled_partition<32>(block);

    F thread_val = op.initial();

    size_t batchIdx = blockDim.y * blockIdx.y + threadIdx.y;
    if (batchIdx >= width) {
        return;
    }

    // Stride loop to accumulate partial sum
    // 处理当前列的部分行
    for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; i < height;
         i += blockDim.x * gridDim.x) {
        op.evalAssign(thread_val, A[batchIdx * height + i]);
    }

    // Allocate shared memory
    extern __shared__ unsigned char memory[];
    F* shared = reinterpret_cast<F*>(memory);

    // Warp-level reduction within tiles
    thread_val = partialBlockReduce(block, tile, thread_val, shared, op);

    // Write the result to the partial_sums array
    if (block.thread_rank() == 0) {
        partial[batchIdx * gridDim.x + blockIdx.x] = shared[0];
    }
}

template<typename F, typename TyOp, typename TyBlock, typename TyTile>
__device__ F partialBlockReduce(
    const TyBlock& block,
    const TyTile& tile,
    F val,
    F* shared,
    TyOp&& op
) {
    // Warp-level reduction within tiles
    val = op.reduce(tile, val);

    // Only the first thread of each warp writes to shared memory
    if (tile.thread_rank() == 0) {
        shared[tile.meta_group_rank()] = val;
    }
    block.sync();  // Synchronize after warp-level reduction

    // Perform tree-based reduction on shared memory
    for (int stride = (block.size() / tile.size()) / 2; stride > 0;
         stride /= 2) {
        if (block.thread_rank() < stride) {
            op.evalAssign(
                shared[block.thread_rank()],
                shared[block.thread_rank() + stride]
            );
        }
        block.sync();  // Synchronize after each step
    }

    return shared[0];
}

template<typename F, typename TyOp>
__global__ void
blockReduce(F* A, F* result, size_t width, size_t height, TyOp&& op) {
    auto block = cg::this_thread_block();
    auto tile = cg::tiled_partition<32>(block);

    size_t batchIdx = blockDim.y * blockIdx.y + threadIdx.y;
    if (batchIdx >= width) {
        return;
    }

    F thread_val = op.initial();

    for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; i < height;
         i += blockDim.x * gridDim.x) {
        op.evalAssign(thread_val, A[batchIdx * height + i]);
    }

    op.final_block_reduction_async(tile, &result[batchIdx], thread_val);
    block.sync();
}

template<typename Ty>
struct AddOpFinalReduce {
    template<typename TyGroup>
    __device__ __forceinline__ static void
    final_block_reduction_async(const TyGroup& group, Ty* dst, Ty val);
};

template<>
struct AddOpFinalReduce<kb31_t> {
    template<typename TyGroup>
    __device__ __forceinline__ static void
    final_block_reduction_async(const TyGroup& group, kb31_t* dst, kb31_t val) {
        cuda::atomic_ref<kb31_t, cuda::thread_scope_block> atomic(dst[0]);
        // reduce thread sums across the tile, add the result to the atomic
        return cg::reduce_update_async(group, atomic, val, cg::plus<kb31_t>());
    }
};