Redis中的跳跃表


跳跃表是一种有序链表,它在链表的每个节点维护了一个指针数组,每个指针指向后续的某个节点。这使得访问节点的效率比一般的单链表高。

跳跃表

为什么在redis中使用跳跃表,而不是平衡树或者红黑树等数据结构?redis的作者是这么回答的(原文地址):

1) They are not very memory intensive. It's up to you basically. Changing parameters about the probability of a node to have a given number of levels will make then less memory intensive than btrees.

2) A sorted set is often target of many ZRANGE or ZREVRANGE operations, that is, traversing the skip list as a linked list. With this operation the cache locality of skip lists is at least as good as with other kind of balanced trees.

3) They are simpler to implement, debug, and so forth. For instance thanks to the skip list simplicity I received a patch (already in Redis master) with augmented skip lists implementing ZRANK in O(log(N)). It required little changes to the code.

Salvatore Sanfilippo的回答大意是跳跃表和平衡树对应操作的时间复杂度相当;空间复杂度取决跳跃表的层数;由于在redis中对跳跃表的操作大多数情况是ZRANGE和ZREVRANGE,cache命中率和平衡树相当。

黄健宏写的《Redis设计与实现(第二版)》简单介绍了跳跃表,并未对redis中跳跃表的查找,插入和删除算法做介绍,本博文主要介绍redis中的插入操作。

跳跃表的相关数据结构

redis源码中的redis.h中有跳跃表的数据结构定义:

typedef struct zskiplistNode {
    robj *obj;
    double score;
    struct zskiplistNode *backward;
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned int span;
    } level[];
} zskiplistNode;

typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;

其中zskiplistNode结构体是一个跳跃表的节点,zskiplist结构体表示一个跳跃表,各个成员的意思如下:

  • obj: 该跳跃表节点存贮的对象
  • score: 分值,用来对节点进行排序,如果两个节点score相同,则按照字符串对象比较obj大小
  • backward:指向前一个跳跃表节点
  • level: 一个zskiplistLevel结构数组,每个zskiplistLevel有两个成员,一个是前向指针forward,指向跳跃表中后面的某一个节点,span存贮所指向的节点和当前节点相距多远(两个相邻的跳跃表距离为1)
  • header:指向跳跃表的头节点,跳跃表的头节点中obj,score,backward都为空,仅有level数组
  • tail:指向跳跃表的表尾节点
  • length:跳跃表的长度
  • level:跳跃表中所有节点中level数组的最大长度,也即跳跃表的层数

跳跃表的创建

跳跃表的实现代码在t_zset.c中,创建代码如下:

zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
    zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    zn->score = score;
    zn->obj = obj;
    return zn;
}

zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;

    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}

zslCreateNode创建一个跳跃表节点,其中分配内存大小的计算方法可以参阅文章C语言结构体里的成员数组和指针

zslCreate创建一个跳跃表,其中设置header为一个score为0,obj为NULL的跳跃表节点。

跳跃表新建节点的level数组大小的确定

代码如下:

int zslRandomLevel(void) {
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

其中ZSKIPLIST_P是一个小于1的小数,redis中定义为0.25,该算法的特点是分配到的level值越大,概率越低。

跳跃表的节点插入操作

代码如下:

zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    redisAssert(!isnan(score));
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* we assume the key is not already inside, since we allow duplicated
     * scores, and the re-insertion of score and redis object should never
     * happen since the caller of zslInsert() should test in the hash table
     * if the element is already inside or not. */
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,obj);
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}

代码的关键点在中间的查找循环:

 for (i = zsl->level-1; i >= 0; i--) {
    /* store rank that is crossed to reach the insert position */
    rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
    while (x->level[i].forward &&
        (x->level[i].forward->score < score ||
            (x->level[i].forward->score == score &&
            compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
        rank[i] += x->level[i].span;
        x = x->level[i].forward;
    }
    update[i] = x;
 }

从header节点的level-1层开始,比较该层的forward指针指向的节点的score值是否小于要插入的score,当score相等时,比较字符串对象obj,如果条件成立,跳forward指向节点的同一层,否则,记录当前节点的当前层到update数组(下标是层数,值为节点指针),并向下移动一层(i --),当循环结束时,update记录的节点的level数组中的forward指针需修改为待插入的节点,而新插入节点各层forward要指向的位置就是update节点相应层指向的节点。插入的过程如下:

x = zslCreateNode(level,score,obj);
for (i = 0; i < level; i++) {
    x->level[i].forward = update[i]->level[i].forward;
    update[i]->level[i].forward = x;

    /* update span covered by update[i] as x is inserted here */
    x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
    update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}

可以结合下面的示例图来理解:

实例

span值的计算和forward指针的指向是相关的,仔细推敲一下应该很容易理解。

总结

跳跃表的插入代码的查找循环和查找节点的逻辑是一致的,删除节点的算法也相似,本文不特别介绍。

跳跃表的多层链表还可以支持“记忆化查找”(Search with fingers):记录上一次查找节点成功的位置,在进行新的查找时,如果待查找的值和已记录节点的值的差的绝对值不大,可以从记录的节点开始进行新的查找,如果节点的值分布较平均,这种查找方法可以大大提高查找效率。但如果节点离散化严重,这种查找方法很可能适得其反。