首页 > 解决方案 > 如何使用 cuda 计算矩阵每一行中元素的顺序?

问题描述

我正在寻找如何argsort沿着矩阵的行或列处理 cuda/thrust。

这意味着给定一个矩阵,例如:

A = [[ 3.4257, -1.2345,  0.6232, -0.1354], 
     [-1.6639,  0.1557, -0.1763,  1.0257], 
     [0.6863,  0.0992,  1.4487,  0.0157]].

我需要计算每一行中元素的顺序,所以输出是:

index = [[1, 3, 2, 0],
         [0, 2, 1, 3],
         [3, 1, 0, 2]]

请问我该怎么做?

标签: sortingcudathrust

解决方案


这可以使用thrust::sort. 我们需要一组行索引和一组列索引。行索引是为了确保排序顺序在行之间分段。列索引是排序后给我们的结果。

将值、行索引、列索引压缩在一起。创建一个排序函子,首先对行排序,然后对值排序。输出是重新排列的列索引。

$ cat t114.cu
#include <thrust/sort.h>
#include <thrust/iterator/zip_iterator.h>
#include <thrust/device_vector.h>
#include <thrust/host_vector.h>
#include <iostream>

using namespace thrust::placeholders;

struct my_sort_functor
{
  template <typename T1, typename T2>
  __host__ __device__
  bool operator()(const T1 &t1, const T2 &t2){
    if  (thrust::get<1>(t1) < thrust::get<1>(t2)) return true;
    if  (thrust::get<1>(t1) > thrust::get<1>(t2)) return false;
    if  (thrust::get<0>(t1) < thrust::get<0>(t2)) return true;
    return false;
  }
};

typedef float mt;
typedef int it;

int main(){

  mt A[] = { 3.4257, -1.2345,  0.6232, -0.1354,
            -1.6639,  0.1557, -0.1763,  1.0257,
             0.6863,  0.0992,  1.4487,  0.0157};
  const int rows = 3;
  const int cols = 4;
  thrust::device_vector<mt> d_A(A, A+rows*cols);
  thrust::device_vector<it> row_idx(d_A.size());
  thrust::device_vector<it> col_idx(d_A.size());
  thrust::sequence(row_idx.begin(), row_idx.end());
  thrust::sequence(col_idx.begin(), col_idx.end());
  thrust::transform(row_idx.begin(), row_idx.end(), row_idx.begin(), _1/cols);
  thrust::transform(col_idx.begin(), col_idx.end(), col_idx.begin(), _1%cols);
  auto my_zip_iterator = thrust::make_zip_iterator(thrust::make_tuple(d_A.begin(), row_idx.begin(), col_idx.begin()));
  thrust::sort(my_zip_iterator, my_zip_iterator+rows*cols, my_sort_functor());
  thrust::host_vector<it> h_col_idx = col_idx;
  thrust::copy_n(h_col_idx.begin(), rows*cols, std::ostream_iterator<it>(std::cout, ","));
  std::cout << std::endl;
}
$ nvcc -o t114 t114.cu
$ ./t114
1,3,2,0,0,2,1,3,3,1,0,2,
$

这是另一种方法。这种方法不会重新排序数据,而只是产生排序结果。我们只需要一个索引并即时计算行索引,而列索引仅在确定有序结果后计算。

$ cat t114.cu
#include <thrust/sort.h>
#include <thrust/device_vector.h>
#include <thrust/host_vector.h>
#include <thrust/copy.h>
#include <iostream>

using namespace thrust::placeholders;
typedef float mt;
typedef int it;

struct my_sort_functor
{
  mt *d;
  it cols;
  my_sort_functor(mt *_d, it _cols) : d(_d), cols(_cols) {};
  __host__ __device__
  bool operator()(const it &t1, const it &t2){
    it row1 = t1/cols;
    it row2 = t2/cols;
    if  (row1 < row2) return true;
    if  (row1 > row2) return false;
    if  (d[t1] < d[t2]) return true;
    return false;
  }
};

int main(){

  mt A[] = { 3.4257, -1.2345,  0.6232, -0.1354,
            -1.6639,  0.1557, -0.1763,  1.0257,
             0.6863,  0.0992,  1.4487,  0.0157};
  const int rows = 3;
  const int cols = 4;
  thrust::device_vector<mt> d_A(A, A+rows*cols);
  thrust::device_vector<it> idx(d_A.size());
  thrust::sequence(idx.begin(), idx.end());
  thrust::sort(idx.begin(), idx.end(), my_sort_functor(thrust::raw_pointer_cast(d_A.data()), cols));
  thrust::transform(idx.begin(), idx.end(), idx.begin(), _1%cols);
  thrust::host_vector<it> h_idx = idx;
  thrust::copy_n(h_idx.begin(), rows*cols, std::ostream_iterator<it>(std::cout, ","));
  std::cout << std::endl;
}
$ nvcc -o t114 t114.cu
$ ./t114
1,3,2,0,0,2,1,3,3,1,0,2,
$

我通常倾向于第二种方法,因为它只移动 1/3 的数据,因为它的性能更高。然而它也在做两个整数除法,所以它可能是一个洗牌。我们可以在第二种方法中预先计算行索引,代价是在排序期间移动两倍的数据,但避免了即时除法操作。

如果数组维度足够小(比如行和列维度都小于 65536),我们甚至可以预先计算行索引和列索引,并在索引的高位编码行,在低位编码列,以便只正在移动单个索引数量。更好的是:

$ cat t114.cu
#include <thrust/sort.h>
#include <thrust/device_vector.h>
#include <thrust/host_vector.h>
#include <thrust/sequence.h>
#include <thrust/transform.h>
#include <iostream>

using namespace thrust::placeholders;
typedef float mt;
typedef unsigned it;

struct my_sort_functor
{
  mt *d;
  it cols;
  my_sort_functor(mt *_d, it _cols) : d(_d), cols(_cols) {};
  __host__ __device__
  bool operator()(const it &t1, const it &t2){
    it row1 = t1>>16;
    it row2 = t2>>16;
    if  (row1 < row2) return true;
    if  (row1 > row2) return false;
    it col1 = t1&65535;
    it col2 = t2&65535;
    it i1 = row1*cols+col1;
    it i2 = row2*cols+col2;
    if  (d[i1] < d[i2]) return true;
    return false;
  }
};

struct my_transform_functor
{
  it cols;
  my_transform_functor(it _cols) : cols(_cols) {};
  __host__ __device__
    it operator()(const it &t1){
      it row = t1/cols;
      it col = t1 - row*cols;
      return (row << 16) + col;
    }
};

int main(){

  mt A[] = { 3.4257, -1.2345,  0.6232, -0.1354,
            -1.6639,  0.1557, -0.1763,  1.0257,
             0.6863,  0.0992,  1.4487,  0.0157};
  // assume rows and cols are each less than 65536
  const int rows = 3;
  const int cols = 4;
  thrust::device_vector<mt> d_A(A, A+rows*cols);
  thrust::device_vector<it> idx(d_A.size());
  thrust::sequence(idx.begin(), idx.end());
  thrust::transform(idx.begin(), idx.end(), idx.begin(), my_transform_functor(cols));
  thrust::sort(idx.begin(), idx.end(), my_sort_functor(thrust::raw_pointer_cast(d_A.data()), cols));
  thrust::host_vector<it> h_idx = idx;
  for (int i = 0; i < rows*cols; i++) std::cout << (h_idx[i]&65535) << ",";
  std::cout << std::endl;
}
$ nvcc -o t114 t114.cu
$ ./t114
1,3,2,0,0,2,1,3,3,1,0,2,
$

只移动一个数量,没有动态划分。


推荐阅读