放置基数排序
这是一个长文本。 请多多包涵。 简而言之,问题是: 是否有可用的就地基数排序算法 ?
初步
我有很多小的固定长度字符串,它们只使用我想分类的字母“A”,“C”,“G”和“T”(是的,你猜对了)。
目前,我使用std::sort
,它在STL的所有常见实现中使用introsort。 这工作得很好。 但是,我相信,基数排序适合我的问题完全设置,并应努力在实践中要好得多。
细节
我已经用非常幼稚的实现测试了这个假设,并且对于相对较小的输入(大约为10,000),这是真实的(至少比两倍快)。 但是,当问题规模变大(N> 5,000,000)时,运行时会严重恶化。
原因很明显:基数排序需要复制整个数据(实际上我的幼稚实现中不止一次)。 这意味着我已经将〜4 GiB放入我的主内存中,这显然会导致性能下降。 即使没有,我也承担不起这么多的记忆,因为问题的规模实际上变得更大了。
用例
理想情况下,此算法应适用于2到100之间的任何字符串长度,适用于DNA和DNA5(允许使用额外的通配符“N”),甚至是带有IUPAC模糊代码的DNA(导致16个不同的值)。 但是,我意识到所有这些情况都不能被覆盖,所以我对任何速度改进感到满意。 代码可以动态决定分派给哪个算法。
研究
不幸的是,关于基数排序的维基百科文章是无用的。 关于就地变体的部分是完全垃圾。 关于基数排序的NIST-DADS部分几乎不存在。 有一篇名为Efficient Adaptive In-Place Radix Sorting的有前景的论文描述了算法“MSL”。 不幸的是,这篇论文也令人失望。
具体来说,有以下几点。
首先,该算法包含几个错误,并留下了很多原因。 特别是,它没有详细说明递归调用(我简单地假设它递增或减少一些指针来计算当前的移位和掩码值)。 此外,它使用函数dest_group
和dest_address
而不给定义。 我没有看到如何有效地实现这些(即,在O(1)中;至少dest_address
不是微不足道的)。
最后但并非最不重要的是,该算法通过将数组索引与输入数组内的元素交换来实现就地性。 这显然只适用于数值数组。 我需要在字符串上使用它。 当然,我可以拧紧强类型,然后继续进行,假设内存将容忍我存储不属于它的索引。 但是,只有当我可以将我的字符串压入32位内存(假设为32位整数)时,它才会起作用。 那只有16个字符(让我们暂时忽略16> log(5,000,000))。
其中一位作者提供的另一篇论文根本没有提供准确的描述,但它使得MSL的运行时间呈现出平坦的错误。
回顾一下 :是否有希望找到一个工作参考实现,或者至少有一个很好的伪代码/一个适用于DNA字符串的工作就地基数排序的描述?
那么,这里是一个简单的DNA的MSD基数排序实现。 它是用D语言编写的,因为这是我使用最多的语言,因此最不可能犯下愚蠢的错误,但它很容易被翻译成其他语言。 它在原地,但需要2 * seq.length
穿过阵列。
void radixSort(string[] seqs, size_t base = 0) {
if(seqs.length == 0)
return;
size_t TPos = seqs.length, APos = 0;
size_t i = 0;
while(i < TPos) {
if(seqs[i][base] == 'A') {
swap(seqs[i], seqs[APos++]);
i++;
}
else if(seqs[i][base] == 'T') {
swap(seqs[i], seqs[--TPos]);
} else i++;
}
i = APos;
size_t CPos = APos;
while(i < TPos) {
if(seqs[i][base] == 'C') {
swap(seqs[i], seqs[CPos++]);
}
i++;
}
if(base < seqs[0].length - 1) {
radixSort(seqs[0..APos], base + 1);
radixSort(seqs[APos..CPos], base + 1);
radixSort(seqs[CPos..TPos], base + 1);
radixSort(seqs[TPos..seqs.length], base + 1);
}
}
很显然,这是DNA特有的,而不是一般的,但它应该是快速的。
编辑:
我很好奇这个代码是否真正起作用,所以我在等待自己的生物信息学代码运行时对它进行了测试/调试。 现在上面的版本实际上已经过测试和工作。 对于每个1000万个5碱基的序列,它比优化的插入片段快大约3倍。
我从来没有见过一种就地基数排序,并且从基数排序的本质来看,我怀疑它只要临时数组适合内存就会比不合适的排序快得多。
原因:
排序对输入数组进行线性读取,但所有写入操作几乎都是随机的。 从某个N向上,这可以归结为每次写入时的高速缓存未命中。 这个缓存未命中会减慢你的算法。 如果存在或不存在,则不会改变这种效果。
我知道这不会直接回答你的问题,但如果排序是一个瓶颈,你可能希望看看排序算法作为预处理步骤 (wiki上的软件堆页面可能会让你开始)。
这可以给一个非常好的缓存局部性提升。 然后,一个不符合文本的基数排序将会表现更好。 写操作仍然是随机的,但至少它们将围绕相同的内存块进行集群,并因此增加缓存命中率。
尽管如此,我不知道它是否能在实践中发挥作用。
顺便说一句:如果你只处理DNA字符串:你可以将一个字符压缩成两位,并将你的数据打包很多。 这将减少内存需求的四分之一以上。 寻址变得更加复杂,但是无论如何,CPU的ALU有很多时间用于缓存未命中。
基于dsimcha的代码,我实现了一个更适合我们使用的框架(SeqAn)的更通用的版本。 实际上,移植代码是非常简单的。 之后我才发现,实际上有关于这个话题的出版物。 伟大的事情是:他们基本上和你们说的一样。 Andersson和Nilsson关于实现Radixsort的论文绝对值得一读。 如果你碰巧知道德语,一定要阅读David Weese的毕业论文,他在那里实现了一个通用的子串索引。 本文的大部分内容都致力于详细分析建立索引的成本,考虑二级存储器和非常大的文件。 他的工作成果实际上是在SeqAn中实施的,而不是在我需要的地方。
只是为了好玩,这里是我写的代码(我不认为有人不使用SeqAn会有任何用处)。 请注意,它仍然不考虑基数大于4.我预计这会对性能产生巨大影响,但不幸的是,我现在根本没有时间去实现这一点。
代码的执行速度比短字符串的Introsort快两倍。 盈亏平衡点的长度约为12-13。 字符串的类型(例如它是否具有4,5或16个不同的值)是相对不重要的。 从人类基因组染色体2中分选> 6,000,000个DNA读取在我的PC上花费超过2秒钟。 只是为了记录,这很快! 特别是考虑到我不使用SIMD或任何其他硬件加速。 此外,valgrind告诉我主要瓶颈是operator new
在字符串赋值中是operator new
的。 它被称为约65,000,000次 - 每个字符串十次! 对于这些字符串, swap
可以进行优化,这是一个失败的赠品:不是复制,而是交换所有字符。 我没有尝试过,但我确信这会带来很大的变化。 而且,再说一遍,如果有人不听:基数的大小对运行时间几乎没有影响 - 这意味着我应该尝试实施FryGuy,Stephan和EvilTeach提出的建议。
嗯,是的,顺便说一下: 缓存局部性是一个值得注意的因素 :从1M字符串开始,运行时不再线性增加。 然而,这可以很容易地修复:我使用插入排序为小的子集(<= 20个字符串) - 而不是随机黑客建议的mergesort。 显然,对于这样的小列表来说,这比mergesort更好(参见我链接的第一篇论文)。
namespace seqan {
template <typename It, typename F, typename T>
inline void prescan(It front, It back, F op, T const& id) {
using namespace std;
if (front == back) return;
typename iterator_traits<It>::value_type accu = *front;
*front++ = id;
for (; front != back; ++front) {
swap(*front, accu);
accu = op(accu, *front);
}
}
template <typename TIter, typename TSize, unsigned int RADIX>
inline void radix_permute(TIter front, TIter back, TSize (& bounds)[RADIX], TSize base) {
for (TIter i = front; i != back; ++i)
++bounds[static_cast<unsigned int>((*i)[base])];
TSize fronts[RADIX];
std::copy(bounds, bounds + RADIX, fronts);
prescan(fronts, fronts + RADIX, std::plus<TSize>(), 0);
std::transform(bounds, bounds + RADIX, fronts, bounds, plus<TSize>());
TSize active_base = 0;
for (TIter i = front; i != back; ) {
if (active_base == RADIX - 1)
return;
while (fronts[active_base] >= bounds[active_base])
if (++active_base == RADIX - 1)
return;
TSize current_base = static_cast<unsigned int>((*i)[base]);
if (current_base <= active_base)
++i;
else
std::iter_swap(i, front + fronts[current_base]);
++fronts[current_base];
}
}
template <typename TIter, typename TSize>
inline void insertion_sort(TIter front, TIter back, TSize base) {
typedef typename Value<TIter>::Type T;
struct {
TSize base, len;
bool operator ()(T const& a, T const& b) {
for (TSize i = base; i < len; ++i)
if (a[i] < b[i]) return true;
else if (a[i] > b[i]) return false;
return false;
}
} cmp = { base, length(*front) }; // No closures yet. :-(
for (TIter i = front + 1; i != back; ++i) {
T value = *i;
TIter j = i;
for ( ; j != front && cmp(value, *(j - 1)); --j)
*j = *(j - 1);
if (j != i)
*j = value;
}
}
template <typename TIter, typename TSize, unsigned int RADIX>
inline void radix(TIter top, TIter front, TIter back, TSize base, TSize (& parent_bounds)[RADIX], TSize next) {
if (back - front > 20) {
TSize bounds[RADIX] = { 0 };
radix_permute(front, back, bounds, base);
// Sort current bucket recursively by suffix.
if (base < length(*front) - 1)
radix(front, front, front + bounds[0], base + 1, bounds, static_cast<TSize>(0));
}
else if (back - front > 1)
insertion_sort(front, back, base);
// Sort next buckets on same level recursively.
if (next == RADIX - 1) return;
radix(top, top + parent_bounds[next], top + parent_bounds[next + 1], base, parent_bounds, next + 1);
}
template <typename TIter>
inline void radix_sort(TIter front, TIter back) {
typedef typename Container<TIter>::Type TStringSet;
typedef typename Value<TStringSet>::Type TString;
typedef typename Value<TString>::Type TChar;
typedef typename Size<TStringSet>::Type TSize;
TSize const RADIX = ValueSize<TChar>::VALUE;
TSize bounds[RADIX];
radix(front, front, back, static_cast<TSize>(0), bounds, RADIX - 1);
}
} // namespace seqan
链接地址: http://www.djcxy.com/p/1909.html
上一篇: Place Radix Sort
下一篇: How do I sort a list of dictionaries by values of the dictionary in Python?