cuda
Параллельное сокращение (например, как суммировать массив)
Поиск…
замечания
Алгоритм параллельной регрессии обычно относится к алгоритму, который объединяет массив элементов, создавая единственный результат. Типичными проблемами, которые относятся к этой категории, являются:
- суммирование всех элементов в массиве
- нахождение максимума в массиве
В общем случае параллельное редукция может быть применено для любого двоичного ассоциативного оператора , т. Е. (A*B)*C = A*(B*C)
. С таким оператором * алгоритм параллельной редукции повторно группирует аргументы массива в парах. Каждая пара вычисляется параллельно с другими, уменьшая общий размер массива за один шаг. Процесс повторяется до тех пор, пока не будет существовать только один элемент.

Если оператор коммутативен (т. Е. A*B = B*A
) в дополнение к ассоциативному, алгоритм может парировать в другой схеме. С теоретической точки зрения это не имеет никакого значения, но на практике это дает лучшую схему доступа к памяти:

Не все ассоциативные операторы коммутативны - например, возьмем матричное умножение.
Одноблочная параллельная редукция для коммутативного оператора
Простейшим подходом к параллельному сокращению CUDA является назначение отдельного блока для выполнения задачи:
static const int arraySize = 10000;
static const int blockSize = 1024;
__global__ void sumCommSingleBlock(const int *a, int *out) {
int idx = threadIdx.x;
int sum = 0;
for (int i = idx; i < arraySize; i += blockSize)
sum += a[i];
__shared__ int r[blockSize];
r[idx] = sum;
__syncthreads();
for (int size = blockSize/2; size>0; size/=2) { //uniform
if (idx<size)
r[idx] += r[idx+size];
__syncthreads();
}
if (idx == 0)
*out = r[0];
}
...
sumCommSingleBlock<<<1, blockSize>>>(dev_a, dev_out);
Это наиболее возможно, когда размер данных не очень большой (около нескольких тысяч элементов). Обычно это происходит, когда сокращение является частью некоторой более крупной программы CUDA. Если вход соответствует blockSize
с самого начала, первый цикл for
может быть полностью удален.
Обратите внимание, что на первом этапе, когда есть больше элементов, чем потоки, мы добавляем вещи полностью независимо. Только когда проблема сводится к blockSize
, фактические триггеры параллельной редукции. Тот же код может быть применен к любому другому коммутативному ассоциативному оператору, такому как умножение, минимум, максимум и т. Д.
Обратите внимание, что алгоритм можно сделать быстрее, например, используя параллельное сокращение уровня деформации.
Одноблочная параллельная редукция для некоммутативного оператора
Выполнение параллельной редукции для некоммутативного оператора немного более активно, по сравнению с коммутативной версией. В этом примере мы по-прежнему используем дополнение для целых чисел для простоты. Его можно было бы заменить, например, матричным умножением, которое действительно является некоммутативным. Заметим, что при этом 0 следует заменить нейтральным элементом умножения - то есть единичной матрицей.
static const int arraySize = 1000000;
static const int blockSize = 1024;
__global__ void sumNoncommSingleBlock(const int *gArr, int *out) {
int thIdx = threadIdx.x;
__shared__ int shArr[blockSize*2];
__shared__ int offset;
shArr[thIdx] = thIdx<arraySize ? gArr[thIdx] : 0;
if (thIdx == 0)
offset = blockSize;
__syncthreads();
while (offset < arraySize) { //uniform
shArr[thIdx + blockSize] = thIdx+offset<arraySize ? gArr[thIdx+offset] : 0;
__syncthreads();
if (thIdx == 0)
offset += blockSize;
int sum = shArr[2*thIdx] + shArr[2*thIdx+1];
__syncthreads();
shArr[thIdx] = sum;
}
__syncthreads();
for (int stride = 1; stride<blockSize; stride*=2) { //uniform
int arrIdx = thIdx*stride*2;
if (arrIdx+stride<blockSize)
shArr[arrIdx] += shArr[arrIdx+stride];
__syncthreads();
}
if (thIdx == 0)
*out = shArr[0];
}
...
sumNoncommSingleBlock<<<1, blockSize>>>(dev_a, dev_out);
В первом цикле while выполняется столько, сколько есть элементов ввода, кроме потоков. На каждой итерации выполняется одно сокращение, и результат сжимается в первую половину массива shArr
. Вторая половина заполняется новыми данными.
Когда все данные загружаются из gArr
, выполняется второй цикл. Теперь мы больше не __syncthreads()
результат (который __syncthreads()
дополнительных __syncthreads()
). На каждом шаге поток n получает 2*n
активный элемент и добавляет его с помощью 2*n+1
элемента:

Существует множество способов дальнейшей оптимизации этого простого примера, например, путем сокращения уровня деформации и устранения конфликтов в банках с разделяемой памятью.
Многоблочное параллельное сокращение для коммутативного оператора
Многоблочный подход к параллельному сокращению CUDA создает дополнительную проблему по сравнению с одноблочным подходом, поскольку блоки ограничены в общении. Идея состоит в том, чтобы позволить каждому блоку вычислить часть входного массива, а затем иметь один заключительный блок для объединения всех частичных результатов. Для этого можно запустить два ядра, неявно создав точку синхронизации по всей сетке.
static const int wholeArraySize = 100000000;
static const int blockSize = 1024;
static const int gridSize = 24; //this number is hardware-dependent; usually #SM*2 is a good number.
__global__ void sumCommMultiBlock(const int *gArr, int arraySize, int *gOut) {
int thIdx = threadIdx.x;
int gthIdx = thIdx + blockIdx.x*blockSize;
const int gridSize = blockSize*gridDim.x;
int sum = 0;
for (int i = gthIdx; i < arraySize; i += gridSize)
sum += gArr[i];
__shared__ int shArr[blockSize];
shArr[thIdx] = sum;
__syncthreads();
for (int size = blockSize/2; size>0; size/=2) { //uniform
if (thIdx<size)
shArr[thIdx] += shArr[thIdx+size];
__syncthreads();
}
if (thIdx == 0)
gOut[blockIdx.x] = shArr[0];
}
__host__ int sumArray(int* arr) {
int* dev_arr;
cudaMalloc((void**)&dev_arr, wholeArraySize * sizeof(int));
cudaMemcpy(dev_arr, arr, wholeArraySize * sizeof(int), cudaMemcpyHostToDevice);
int out;
int* dev_out;
cudaMalloc((void**)&dev_out, sizeof(int)*gridSize);
sumCommMultiBlock<<<gridSize, blockSize>>>(dev_arr, wholeArraySize, dev_out);
//dev_out now holds the partial result
sumCommMultiBlock<<<1, blockSize>>>(dev_out, gridSize, dev_out);
//dev_out[0] now holds the final result
cudaDeviceSynchronize();
cudaMemcpy(&out, dev_out, sizeof(int), cudaMemcpyDeviceToHost);
cudaFree(dev_arr);
cudaFree(dev_out);
return out;
}
В идеале желательно запустить достаточно блоков, чтобы насытить все многопроцессоры на GPU при полной занятости. Превышение этого числа - в частности, запуск как можно большего количества потоков, чем есть элементы в массиве, - является контрпродуктивным. Это не увеличивает необработанную вычислительную мощность, но предотвращает использование очень эффективного первого цикла.
Также можно получить тот же результат, используя одно ядро, с помощью защиты последнего блока :
static const int wholeArraySize = 100000000;
static const int blockSize = 1024;
static const int gridSize = 24;
__device__ bool lastBlock(int* counter) {
__threadfence(); //ensure that partial result is visible by all blocks
int last = 0;
if (threadIdx.x == 0)
last = atomicAdd(counter, 1);
return __syncthreads_or(last == gridDim.x-1);
}
__global__ void sumCommMultiBlock(const int *gArr, int arraySize, int *gOut, int* lastBlockCounter) {
int thIdx = threadIdx.x;
int gthIdx = thIdx + blockIdx.x*blockSize;
const int gridSize = blockSize*gridDim.x;
int sum = 0;
for (int i = gthIdx; i < arraySize; i += gridSize)
sum += gArr[i];
__shared__ int shArr[blockSize];
shArr[thIdx] = sum;
__syncthreads();
for (int size = blockSize/2; size>0; size/=2) { //uniform
if (thIdx<size)
shArr[thIdx] += shArr[thIdx+size];
__syncthreads();
}
if (thIdx == 0)
gOut[blockIdx.x] = shArr[0];
if (lastBlock(lastBlockCounter)) {
shArr[thIdx] = thIdx<gridSize ? gOut[thIdx] : 0;
__syncthreads();
for (int size = blockSize/2; size>0; size/=2) { //uniform
if (thIdx<size)
shArr[thIdx] += shArr[thIdx+size];
__syncthreads();
}
if (thIdx == 0)
gOut[0] = shArr[0];
}
}
__host__ int sumArray(int* arr) {
int* dev_arr;
cudaMalloc((void**)&dev_arr, wholeArraySize * sizeof(int));
cudaMemcpy(dev_arr, arr, wholeArraySize * sizeof(int), cudaMemcpyHostToDevice);
int out;
int* dev_out;
cudaMalloc((void**)&dev_out, sizeof(int)*gridSize);
int* dev_lastBlockCounter;
cudaMalloc((void**)&dev_lastBlockCounter, sizeof(int));
cudaMemset(dev_lastBlockCounter, 0, sizeof(int));
sumCommMultiBlock<<<gridSize, blockSize>>>(dev_arr, wholeArraySize, dev_out, dev_lastBlockCounter);
cudaDeviceSynchronize();
cudaMemcpy(&out, dev_out, sizeof(int), cudaMemcpyDeviceToHost);
cudaFree(dev_arr);
cudaFree(dev_out);
return out;
}
Обратите внимание, что ядро можно сделать быстрее, например, используя параллельное сокращение уровня деформации.
Многоблочное параллельное сокращение для некоммутативного оператора
Многоблочный подход к параллельному сокращению очень похож на одноблочный подход. Глобальный массив ввода должен быть разделен на разделы, каждый из которых уменьшается на один блок. Когда получается частичный результат из каждого блока, один конечный блок уменьшает их, чтобы получить конечный результат.
-
sumNoncommSingleBlock
объясняется более подробно в примере восстановления с одним блоком. -
lastBlock
принимает только последний его блок. Если вы хотите этого избежать, вы можете разделить ядро на два отдельных вызова.
static const int wholeArraySize = 100000000;
static const int blockSize = 1024;
static const int gridSize = 24; //this number is hardware-dependent; usually #SM*2 is a good number.
__device__ bool lastBlock(int* counter) {
__threadfence(); //ensure that partial result is visible by all blocks
int last = 0;
if (threadIdx.x == 0)
last = atomicAdd(counter, 1);
return __syncthreads_or(last == gridDim.x-1);
}
__device__ void sumNoncommSingleBlock(const int* gArr, int arraySize, int* out) {
int thIdx = threadIdx.x;
__shared__ int shArr[blockSize*2];
__shared__ int offset;
shArr[thIdx] = thIdx<arraySize ? gArr[thIdx] : 0;
if (thIdx == 0)
offset = blockSize;
__syncthreads();
while (offset < arraySize) { //uniform
shArr[thIdx + blockSize] = thIdx+offset<arraySize ? gArr[thIdx+offset] : 0;
__syncthreads();
if (thIdx == 0)
offset += blockSize;
int sum = shArr[2*thIdx] + shArr[2*thIdx+1];
__syncthreads();
shArr[thIdx] = sum;
}
__syncthreads();
for (int stride = 1; stride<blockSize; stride*=2) { //uniform
int arrIdx = thIdx*stride*2;
if (arrIdx+stride<blockSize)
shArr[arrIdx] += shArr[arrIdx+stride];
__syncthreads();
}
if (thIdx == 0)
*out = shArr[0];
}
__global__ void sumNoncommMultiBlock(const int* gArr, int* out, int* lastBlockCounter) {
int arraySizePerBlock = wholeArraySize/gridSize;
const int* gArrForBlock = gArr+blockIdx.x*arraySizePerBlock;
int arraySize = arraySizePerBlock;
if (blockIdx.x == gridSize-1)
arraySize = wholeArraySize - blockIdx.x*arraySizePerBlock;
sumNoncommSingleBlock(gArrForBlock, arraySize, &out[blockIdx.x]);
if (lastBlock(lastBlockCounter))
sumNoncommSingleBlock(out, gridSize, out);
}
В идеале желательно запустить достаточно блоков, чтобы насытить все многопроцессоры на GPU при полной занятости. Превышение этого числа - в частности, запуск как можно большего количества потоков, чем есть элементы в массиве, - является контрпродуктивным. Это не увеличивает необработанную вычислительную мощность, но предотвращает использование очень эффективного первого цикла.
Однопараметрическое параллельное сокращение для коммутативного оператора
Иногда сокращение должно выполняться в очень малом масштабе, как часть большего ядра CUDA. Предположим, например, что входные данные имеют ровно 32 элемента - количество потоков в основе. В таком сценарии для сокращения можно назначить одну деформацию. Учитывая, что warp выполняется в идеальной синхронизации, многие команды __syncthreads()
могут быть удалены - по сравнению с уменьшением уровня блока.
static const int warpSize = 32;
__device__ int sumCommSingleWarp(volatile int* shArr) {
int idx = threadIdx.x % warpSize; //the lane index in the warp
if (idx<16) shArr[idx] += shArr[idx+16];
if (idx<8) shArr[idx] += shArr[idx+8];
if (idx<4) shArr[idx] += shArr[idx+4];
if (idx<2) shArr[idx] += shArr[idx+2];
if (idx==0) shArr[idx] += shArr[idx+1];
return shArr[0];
}
shArr
предпочтительно представляет собой массив в общей памяти. Значение должно быть одинаковым для всех потоков в warp. Если sumCommSingleWarp
вызывается несколькими перекосами, shArr
должен быть разным между искажениями (одинаковыми в каждом warp).
Аргумент shArr
помечен как volatile
чтобы гарантировать, что операции над массивом фактически выполняются там, где это указано. В противном случае повторное назначение shArr[idx]
может быть оптимизировано как присвоение регистру, при этом только конечный атрибут является фактическим хранилищем для shArr
. Когда это происходит, немедленные назначения не видны другим потокам, что приводит к неправильным результатам. Обратите внимание, что вы можете передать нормальный энергонезависимый массив в качестве аргумента volatile, так же, как когда вы передаете не const в качестве параметра const.
Если после сокращения ничего не волнует содержимое shArr[1..31]
, можно еще более упростить код:
static const int warpSize = 32;
__device__ int sumCommSingleWarp(volatile int* shArr) {
int idx = threadIdx.x % warpSize; //the lane index in the warp
if (idx<16) {
shArr[idx] += shArr[idx+16];
shArr[idx] += shArr[idx+8];
shArr[idx] += shArr[idx+4];
shArr[idx] += shArr[idx+2];
shArr[idx] += shArr[idx+1];
}
return shArr[0];
}
В этой установке мы удалили многие условия if
. Дополнительные потоки выполняют некоторые ненужные дополнения, но мы больше не заботимся о содержимом, которое они создают. Поскольку перекосы выполняются в режиме SIMD, мы фактически не экономим время, когда эти потоки ничего не делают. С другой стороны, оценка условий занимает относительно большое количество времени, так как тело этих операторов if
настолько мало. Исходный оператор if
может быть удален, если shArr[32..47]
дополняется 0.
Снижение уровня деформации может быть использовано для ускорения снижения уровня блока:
__global__ void sumCommSingleBlockWithWarps(const int *a, int *out) {
int idx = threadIdx.x;
int sum = 0;
for (int i = idx; i < arraySize; i += blockSize)
sum += a[i];
__shared__ int r[blockSize];
r[idx] = sum;
sumCommSingleWarp(&r[idx & ~(warpSize-1)]);
__syncthreads();
if (idx<warpSize) { //first warp only
r[idx] = idx*warpSize<blockSize ? r[idx*warpSize] : 0;
sumCommSingleWarp(r);
if (idx == 0)
*out = r[0];
}
}
Аргумент &r[idx & ~(warpSize-1)]
в основном равен r + warpIdx*32
. Это эффективно разделяет массив r
на куски из 32 элементов, и каждый кусок присваивается отдельной структуре.
Однопаральное параллельное сокращение для некоммутативного оператора
Иногда сокращение должно выполняться в очень малом масштабе, как часть большего ядра CUDA. Предположим, например, что входные данные имеют ровно 32 элемента - количество потоков в основе. В таком сценарии для сокращения можно назначить одну деформацию. Учитывая, что warp выполняется в идеальной синхронизации, многие команды __syncthreads()
могут быть удалены - по сравнению с уменьшением уровня блока.
static const int warpSize = 32;
__device__ int sumNoncommSingleWarp(volatile int* shArr) {
int idx = threadIdx.x % warpSize; //the lane index in the warp
if (idx%2 == 0) shArr[idx] += shArr[idx+1];
if (idx%4 == 0) shArr[idx] += shArr[idx+2];
if (idx%8 == 0) shArr[idx] += shArr[idx+4];
if (idx%16 == 0) shArr[idx] += shArr[idx+8];
if (idx == 0) shArr[idx] += shArr[idx+16];
return shArr[0];
}
shArr
предпочтительно представляет собой массив в общей памяти. Значение должно быть одинаковым для всех потоков в warp. Если sumCommSingleWarp
вызывается несколькими перекосами, shArr
должен быть разным между искажениями (одинаковыми в каждом warp).
Аргумент shArr
помечен как volatile
чтобы гарантировать, что операции над массивом фактически выполняются там, где это указано. В противном случае повторное назначение shArr[idx]
может быть оптимизировано как присвоение регистру, при этом только конечный атрибут является фактическим хранилищем для shArr
. Когда это происходит, немедленные назначения не видны другим потокам, что приводит к неправильным результатам. Обратите внимание, что вы можете передать нормальный энергонезависимый массив в качестве аргумента volatile, так же, как когда вы передаете не const в качестве параметра const.
Если не нужно окончательное содержание shArr[1..31]
и может shArr[32..47]
с нулями, можно упростить приведенный выше код:
static const int warpSize = 32;
__device__ int sumNoncommSingleWarpPadded(volatile int* shArr) {
//shArr[32..47] == 0
int idx = threadIdx.x % warpSize; //the lane index in the warp
shArr[idx] += shArr[idx+1];
shArr[idx] += shArr[idx+2];
shArr[idx] += shArr[idx+4];
shArr[idx] += shArr[idx+8];
shArr[idx] += shArr[idx+16];
return shArr[0];
}
В этой установке мы удалили все if
таковые условия, которые составляют около половины инструкции. Дополнительные потоки выполняют некоторые ненужные дополнения, сохраняя результат в ячейках shArr
которые в конечном итоге не влияют на конечный результат. Поскольку перекосы выполняются в режиме SIMD, мы фактически не экономим время, когда эти потоки ничего не делают.
Параллельное сокращение одиночной варпы с использованием только регистров
Как правило, сокращение выполняется для глобального или общего массива. Однако, когда редукция выполняется в очень небольшом масштабе, как часть большего ядра CUDA, ее можно выполнить с помощью единственного деформирования. Когда это происходит, на Keppler или более высоких архитектурах (CC> = 3.0), можно использовать функции warp-shuffle, чтобы вообще не использовать общую память.
Предположим, например, что каждый поток в warp содержит одно значение входных данных. Все потоки вместе имеют 32 элемента, которые нам нужно подвести (или выполнить другую ассоциативную операцию)
__device__ int sumSingleWarpReg(int value) {
value += __shfl_down(value, 1);
value += __shfl_down(value, 2);
value += __shfl_down(value, 4);
value += __shfl_down(value, 8);
value += __shfl_down(value, 16);
return __shfl(value,0);
}
Эта версия работает как для коммутативных, так и для некоммутативных операторов.