这章主要包括THTensor是如何进行并行计算,进行性能优化的

THTensorMath

以ThTensor_(cadd)为例,说明一个tensor并行计算的具体做法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void THTensor_(cadd)(THTensor *r_, THTensor *t, scalar_t value, THTensor *src)
{
  THTensor_(resizeAs)(r_, t);
  int64_t r_Size = THTensor_(nElement)(r_);
  int64_t srcSize = THTensor_(nElement)(src);
  int r_Contig = THTensor_(isContiguous)(r_);
  int tContig = THTensor_(isContiguous)(t);
  int srcContig = THTensor_(isContiguous)(src);
  if (srcSize == r_Size) {
    if (r_Contig && tContig && srcContig) {
      if(r_ == t) {
        //当场景是两个相同向量相加时,通过BLAS库进行向量计算
        THBlas_(axpy)(THTensor_(nElement)(t), value, src->data<scalar_t>(), 1, r_->data<scalar_t>(), 1);
      } else {
        TH_TENSOR_APPLY3_CONTIG(scalar_t, r_, scalar_t, t, scalar_t, src, THVector_(cadd)(r__data, t_data, src_data, value, r__len););
      }
    } else {
      TH_TENSOR_APPLY3_PARALLEL(r_Size, r_Contig, tContig, srcContig, scalar_t, r_, scalar_t, t, scalar_t, src, *r__data = *t_data + value * *src_data;, UNCERTAIN_TH_OMP_OVERHEAD_THRESHOLD);
    }
  } else {
    TH_TENSOR_APPLY3(scalar_t, r_, scalar_t, t, scalar_t, src, *r__data = *t_data + value * *src_data;);
  }
}

THBlas_(axpy): 实现

TH_TENSOR_APPLY3_CONTIG 适用于全连续的tensor计算,最后一个参数作为lambda的一部分,这里有openmp与tbb两种实现,以openmp为例

is_parallel_region 首先判断是否在当前并行环境内,omp_in_parallel()如果在的话直接运行表达式,否则

at::parallel_for中pytorch自己实现的并行方法

TH_TENSOR_APPLY3_PARALLEL适用于经过变换的非连续tensor, 为了方便阅读,下面代码只保留了三个中的一个tensor

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#define TH_TENSOR_APPLY3_PARALLEL(SIZE, CONTIG1, CONTIG2, CONTIG3, TYPE1, TENSOR1, TYPE2, TENSOR2, TYPE3, TENSOR3, CODE, THRESHOLD) \
{ \
  /* for adveanced searching index*/ \
  if (CONTIG1 && CONTIG2 && CONTIG3) { \  //全连续的情况
    TYPE1 *rp = THTensor_getStoragePtr(TENSOR1)->data<TYPE1>()+TENSOR1->storage_offset(); \
    TYPE2 *tp = THTensor_getStoragePtr(TENSOR2)->data<TYPE2>()+TENSOR2->storage_offset(); \
    TYPE3 *srcp = THTensor_getStoragePtr(TENSOR3)->data<TYPE3>()+TENSOR3->storage_offset(); \
    if (tp != (TYPE2*)rp) { \  //两个输入tensor是否一致
      at::parallel_for(0, SIZE, (THRESHOLD * 10), [&](int64_t begin, int64_t end) { \
        PRAGMA_IVDEP \
        for (auto iter = begin; iter < end; iter++) { \
          TYPE1 *TENSOR1##_data = rp+iter; \  //tensor2,tensor3
          CODE \
        } \
      }); \
    } else { \
      at::parallel_for(0, SIZE, (THRESHOLD * 10), [&](int64_t begin, int64_t end) { \
        PRAGMA_SIMD \
        for (auto iter = begin; iter < end; iter++) { \
          TYPE1 *TENSOR1##_data = rp+iter; \ //tensor2,3
          CODE \
        } \
      }); \
    } \ 
} else { \
    int TH_TENSOR_APPLY_hasFinished = 0; \
    int64_t TH_TENSOR_dim_index = 0; \
    __TH_TENSOR_APPLYX_PREAMBLE(TYPE1, TENSOR1, -1, 1) \  //重点!
    if (0 == TH_TENSOR_APPLY_hasFinished) { \
      auto TENSOR1##_i_local = TENSOR1##_i; \
      auto TENSOR1##_data_local = TENSOR1##_data; \
      at::parallel_for(0, SIZE, THRESHOLD, [&](int64_t begin, int64_t end) { \
        auto TENSOR1##_i = TENSOR1##_i_local; \
        auto TENSOR1##_data = TENSOR1##_data_local; \
        ptrdiff_t line_index_start = begin; \
        ptrdiff_t line_seg_length = (end - begin); \
        __TH_TENSOR_APPLYX_CAL_MEMORY_OFFSET(TENSOR1); \  //tensor下标转化为内存偏移量
        TENSOR1##_data += TENSOR1##_memory_offset; \
        ptrdiff_t count = 0; \
        ptrdiff_t TENSOR1##_start = TENSOR1##_counter_tmp[TENSOR1##_dim - 1]; \
        while (count < line_seg_length) { \        
          for (TENSOR1##_i=TENSOR1##_start, TENSOR2##_i=TENSOR2##_start,TENSOR3##_i=TENSOR3##_start; (count<line_seg_length)&&(TENSOR1##_i<TENSOR1##_size)&&(TENSOR2##_i<TENSOR2##_size)&&(TENSOR3##_i<TENSOR3##_size); ++TENSOR1##_i,++TENSOR2##_i,++TENSOR3##_i,++count) { \
            CODE \
            TENSOR1##_data += TENSOR1##_stride; \  //单个不连续区间内的计算
          } \
          if (count < line_seg_length) { \
            __TH_TENSOR_APPLYX_UPDATE_COUNTERS_PARALLEL(TENSOR1); \
          } \
        } \
        if (TENSOR1##_counter_tmp != NULL) { \
          THFree(TENSOR1##_counter_tmp); \
        } \
      }); \
    } \
    if (TENSOR1##_counter != NULL) { \
      THFree(TENSOR1##_counter); \
    } \
  } \
}

__TH_TENSOR_APPLYX_PREAMBLE

1.从最外层的索引开始循环,直到到达数据不再连续的维度,即该维度的跨距不等于由外层维度定义的张量的大小。原理很简单,对比stride与tensor size的大小是否一致即可

2.storageOffset + stride_B * index_B

向量化指令与动态派发

SIMD简介

pytorch中调用THVector_(cadd)的时候实际调用的是THVector_(cadd_DISPATCHPTR)(z, x, y, c, n)

这个函数本质是一个函数指针,指向的是默认实现THVector_(cadd_DEFAULT)

THVector_(cadd_DISPATCHTABLE)[]不同的向量化指令实现定义在该数组中

代码位置: TH/vector/simd.h

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
#define INIT_DISPATCH_PTR(OP)    \
  do {                           \
    size_t i;                       \
    for (i = 0; i < sizeof(THVector_(OP ## _DISPATCHTABLE)) / sizeof(FunctionDescription); ++i) { \
      THVector_(OP ## _DISPATCHPTR) = reinterpret_cast<decltype(THVector_(OP ## _DISPATCHPTR))>(THVector_(OP ## _DISPATCHTABLE)[i].function);                     \
      if (THVector_(OP ## _DISPATCHTABLE)[i].supportedSimdExt & hostSimdExts) {                       \
        break;                                                                                     \
      }                                                                                            \
    }                                                                                              \
  } while(0)
struct THVector_(startup) { 
  THVector_(startup)() {
    uint32_t hostSimdExts = detectHostSIMDExtensions();  //根据环境判断支持的SIMD指令集
    INIT_DISPATCH_PTR(fill);  //动态派发,如上,把每一个op的运算函数指针进行初始化
    INIT_DISPATCH_PTR(cadd);
    INIT_DISPATCH_PTR(adds);
		//...
  }
};