放置基数排序

这是一个长文本。 请多多包涵。 简而言之,问题是: 是否有可用的就地基数排序算法


初步

我有很多小的固定长度字符串,它们只使用我想分类的字母“A”,“C”,“G”和“T”(是的,你猜对了)。

目前,我使用std::sort ,它在STL的所有常见实现中使用introsort。 这工作得很好。 但是,我相信,基数排序适合我的问题完全设置,并应努力在实践中好得多。

细节

我已经用非常幼稚的实现测试了这个假设,并且对于相对较小的输入(大约为10,000),这是真实的(至少比两倍快)。 但是,当问题规模变大(N> 5,000,000)时,运行时会严重恶化。

原因很明显:基数排序需要复制整个数据(实际上我的幼稚实现中不止一次)。 这意味着我已经将〜4 GiB放入我的主内存中,这显然会导致性能下降。 即使没有,我也承担不起这么多的记忆,因为问题的规模实际上变得更大了。

用例

理想情况下,此算法应适用于2到100之间的任何字符串长度,适用于DNA和DNA5(允许使用额外的通配符“N”),甚至是带有IUPAC模糊代码的DNA(导致16个不同的值)。 但是,我意识到所有这些情况都不能被覆盖,所以我对任何速度改进感到满意。 代码可以动态决定分派给哪个算法。

研究

不幸的是,关于基数排序的维基百科文章是无用的。 关于就地变体的部分是完全垃圾。 关于基数排序的NIST-DADS部分几乎不存在。 有一篇名为Efficient Adaptive In-Place Radix Sorting的有前景的论文描述了算法“MSL”。 不幸的是,这篇论文也令人失望。

具体来说,有以下几点。

首先,该算法包含几个错误,并留下了很多原因。 特别是,它没有详细说明递归调用(我简单地假设它递增或减少一些指针来计算当前的移位和掩码值)。 此外,它使用函数dest_groupdest_address而不给定义。 我没有看到如何有效地实现这些(即,在O(1)中;至少dest_address不是微不足道的)。

最后但并非最不重要的是,该算法通过将数组索引与输入数组内的元素交换来实现就地性。 这显然只适用于数值数组。 我需要在字符串上使用它。 当然,我可以拧紧强类型,然后继续进行,假设内存将容忍我存储不属于它的索引。 但是,只有当我可以将我的字符串压入32位内存(假设为32位整数)时,它才会起作用。 那只有16个字符(让我们暂时忽略16> log(5,000,000))。

其中一位作者提供的另一篇论文根本没有提供准确的描述,但它使得MSL的运行时间呈现出平坦的错误。

回顾一下 :是否有希望找到一个工作参考实现,或者至少有一个很好的伪代码/一个适用于DNA字符串的工作就地基数排序的描述?


那么,这里是一个简单的DNA的MSD基数排序实现。 它是用D语言编写的,因为这是我使用最多的语言,因此最不可能犯下愚蠢的错误,但它很容易被翻译成其他语言。 它在原地,但需要2 * seq.length穿过阵列。

void radixSort(string[] seqs, size_t base = 0) {
    if(seqs.length == 0)
        return;

    size_t TPos = seqs.length, APos = 0;
    size_t i = 0;
    while(i < TPos) {
        if(seqs[i][base] == 'A') {
             swap(seqs[i], seqs[APos++]);
             i++;
        }
        else if(seqs[i][base] == 'T') {
            swap(seqs[i], seqs[--TPos]);
        } else i++;
    }

    i = APos;
    size_t CPos = APos;
    while(i < TPos) {
        if(seqs[i][base] == 'C') {
            swap(seqs[i], seqs[CPos++]);
        }
        i++;
    }
    if(base < seqs[0].length - 1) {
        radixSort(seqs[0..APos], base + 1);
        radixSort(seqs[APos..CPos], base + 1);
        radixSort(seqs[CPos..TPos], base + 1);
        radixSort(seqs[TPos..seqs.length], base + 1);
   }
}

很显然,这是DNA特有的,而不是一般的,但它应该是快速的。

编辑:

我很好奇这个代码是否真正起作用,所以我在等待自己的生物信息学代码运行时对它进行了测试/调试。 现在上面的版本实际上已经过测试和工作。 对于每个1000万个5碱基的序列,它比优化的插入片段快大约3倍。


我从来没有见过一种就地基数排序,并且从基数排序的本质来看,我怀疑它只要临时数组适合内存就会比不合适的排序快得多。

原因:

排序对输入数组进行线性读取,但所有写入操作几乎都是随机的。 从某个N向上,这可以归结为每次写入时的高速缓存未命中。 这个缓存未命中会减慢你的算法。 如果存在或不存在,则不会改变这种效果。

我知道这不会直接回答你的问题,但如果排序是一个瓶颈,你可能希望看看排序算法作为预处理步骤 (wiki上的软件堆页面可能会让你开始)。

这可以给一个非常好的缓存局部性提升。 然后,一个不符合文本的基数排序将会表现更好。 写操作仍然是随机的,但至少它们将围绕相同的内存块进行集群,并因此增加缓存命中率。

尽管如此,我不知道它是否能在实践中发挥作用。

顺便说一句:如果你只处理DNA字符串:你可以将一个字符压缩成两位,并将你的数据打包很多。 这将减少内存需求的四分之一以上。 寻址变得更加复杂,但是无论如何,CPU的ALU有很多时间用于缓存未命中。


基于dsimcha的代码,我实现了一个更适合我们使用的框架(SeqAn)的更通用的版本。 实际上,移植代码是非常简单的。 之后我才发现,实际上有关于这个话题的出版物。 伟大的事情是:他们基本上和你们说的一样。 Andersson和Nilsson关于实现Radixsort的论文绝对值得一读。 如果你碰巧知道德语,一定要阅读David Weese的毕业论文,他在那里实现了一个通用的子串索引。 本文的大部分内容都致力于详细分析建立索引的成本,考虑二级存储器和非常大的文件。 他的工作成果实际上是在SeqAn中实施的,而不是在我需要的地方。

只是为了好玩,这里是我写的代码(我不认为有人不使用SeqAn会有任何用处)。 请注意,它仍然不考虑基数大于4.我预计这会对性能产生巨大影响,但不幸的是,我现在根本没有时间去实现这一点。

代码的执行速度比短字符串的Introsort快两倍。 盈亏平衡点的长度约为12-13。 字符串的类型(例如它是否具有4,5或16个不同的值)是相对不重要的。 从人类基因组染色体2中分选> 6,000,000个DNA读取在我的PC上花费超过2秒钟。 只是为了记录,这很快! 特别是考虑到我不使用SIMD或任何其他硬件加速。 此外,valgrind告诉我主要瓶颈是operator new在字符串赋值中是operator new的。 它被称为约65,000,000次 - 每个字符串十次! 对于这些字符串, swap可以进行优化,这是一个失败的赠品:不是复制,而是交换所有字符。 我没有尝试过,但我确信这会带来很大的变化。 而且,再说一遍,如果有人不听:基数的大小对运行时间几乎没有影响 - 这意味着我应该尝试实施FryGuy,Stephan和EvilTeach提出的建议。

嗯,是的,顺便说一下: 缓存局部性是一个值得注意的因素 :从1M字符串开始,运行时不再线性增加。 然而,这可以很容易地修复:我使用插入排序为小的子集(<= 20个字符串) - 而不是随机黑客建议的mergesort。 显然,对于这样的小列表来说,这比mergesort更好(参见我链接的第一篇论文)。

namespace seqan {

template <typename It, typename F, typename T>
inline void prescan(It front, It back, F op, T const& id) {
    using namespace std;
    if (front == back) return;
    typename iterator_traits<It>::value_type accu = *front;
    *front++ = id;
    for (; front != back; ++front) {
        swap(*front, accu);
        accu = op(accu, *front);
    }
}

template <typename TIter, typename TSize, unsigned int RADIX>
inline void radix_permute(TIter front, TIter back, TSize (& bounds)[RADIX], TSize base) {
    for (TIter i = front; i != back; ++i)
        ++bounds[static_cast<unsigned int>((*i)[base])];

    TSize fronts[RADIX];

    std::copy(bounds, bounds + RADIX, fronts);
    prescan(fronts, fronts + RADIX, std::plus<TSize>(), 0);
    std::transform(bounds, bounds + RADIX, fronts, bounds, plus<TSize>());

    TSize active_base = 0;

    for (TIter i = front; i != back; ) {
        if (active_base == RADIX - 1)
            return;
        while (fronts[active_base] >= bounds[active_base])
            if (++active_base == RADIX - 1)
                return;
        TSize current_base = static_cast<unsigned int>((*i)[base]);
        if (current_base <= active_base)
            ++i;
        else
            std::iter_swap(i, front + fronts[current_base]);
        ++fronts[current_base];
    }
}

template <typename TIter, typename TSize>
inline void insertion_sort(TIter front, TIter back, TSize base) {
    typedef typename Value<TIter>::Type T;
    struct {
        TSize base, len;
        bool operator ()(T const& a, T const& b) {
            for (TSize i = base; i < len; ++i)
                if (a[i] < b[i]) return true;
                else if (a[i] > b[i]) return false;
            return false;
        }
    } cmp = { base, length(*front) }; // No closures yet. :-(

    for (TIter i = front + 1; i != back; ++i) {
        T value = *i;
        TIter j = i;
        for ( ; j != front && cmp(value, *(j - 1)); --j)
            *j = *(j - 1);
        if (j != i)
            *j = value;
    }
}

template <typename TIter, typename TSize, unsigned int RADIX>
inline void radix(TIter top, TIter front, TIter back, TSize base, TSize (& parent_bounds)[RADIX], TSize next) {
    if (back - front > 20) {
        TSize bounds[RADIX] = { 0 };
        radix_permute(front, back, bounds, base);

        // Sort current bucket recursively by suffix.
        if (base < length(*front) - 1)
            radix(front, front, front + bounds[0], base + 1, bounds, static_cast<TSize>(0));
    }
    else if (back - front > 1)
        insertion_sort(front, back, base);

    // Sort next buckets on same level recursively.
    if (next == RADIX - 1) return;
    radix(top, top + parent_bounds[next], top + parent_bounds[next + 1], base, parent_bounds, next + 1);
}

template <typename TIter>
inline void radix_sort(TIter front, TIter back) {
    typedef typename Container<TIter>::Type TStringSet;
    typedef typename Value<TStringSet>::Type TString;
    typedef typename Value<TString>::Type TChar;
    typedef typename Size<TStringSet>::Type TSize;

    TSize const RADIX = ValueSize<TChar>::VALUE;
    TSize bounds[RADIX];

    radix(front, front, back, static_cast<TSize>(0), bounds, RADIX - 1);
}

} // namespace seqan
链接地址: http://www.djcxy.com/p/1909.html

上一篇: Place Radix Sort

下一篇: How do I sort a list of dictionaries by values of the dictionary in Python?