权重洗牌算法的设计与实现

概述

这个算法算法是我在面临为负载均衡分发时思考的,但没想到已经有人和我想到一块了。

场景是我需要做一个分发任务的项目。一开始我希望实现一个打分排序机制来实现对任务的分发,打分机制需要考虑

  • 任务执行端目前能接受的任务数量
  • 任务执行端目前正在排队的任务数量
  • 任务执行端被投诉的数量(分发到任务端失败或被驳回时会被投诉)

因此一开始是希望一个简单的排序机制,对任务进行轮询分发。但分发任务可能是多进程的,很可能会出现大家一起逮着一个高分使劲薅的情况(因为任务执行端的状态返回有延迟),因此打算使用一种洗牌算法,对执行端进行随机排序。但执行端之间还是有不同的,因此需要按照某种方式,让随机情况尽可能的符合分数从高到低的趋势(随机了,但没有完全随机,排序了,但没有完全排序)。

于是我考虑到了洗牌算法,并想着通过重复计数或者区块划分的方式来实现随机获取。

重复计数洗牌

重复计数洗牌可以参考这篇回答的第一个方法。原理就是根据权重数量,举一个简单的例子:

现在有A,B,C,D四个执行端,他们的权重分别是:

  • A: 3
  • B: 2
  • C: 1
  • D: 4

那么,我们可以按照权重数量,生成一个数组:

AAABBCDDDD

接下来,我们可以通过random_shuffle方式,对上面的数组洗牌,变成下面的数组:

DACADBDABD

然后只取该数组中第一次出现的元素,于是就成了:

DACB

于是,我们就可以获得我们想要的结果。

代码实现

代码片段:shuffle.py

这里我使用Python来简单实现,用于原理的展示,因此没有优化大量的拷贝操作,故效率不高。

整体分为两部分,一个是算法的实现,和上面的原理说明一致。一个是测试场景的构建,将1-499的数组随机排序50次,然后取每个数的平均位次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import random
import matplotlib.pyplot as plt
import datetime


def random_shuffle_with_score(in_list: list):
    out_list = []
    tmp_list = in_list.copy()

    item_idx_list = []
    for idx in range(len(tmp_list)):
        item_idx_list += [idx] * tmp_list[idx]

    random.shuffle(item_idx_list)

    for item_idx in item_idx_list:
        if tmp_list[item_idx] == None:
            continue
        out_list.append(tmp_list[item_idx])
        tmp_list[item_idx] = None
    return out_list

start_time = datetime.datetime.now()

x = [x for x in range(1, 500)]
y_s = [0] * len(x)
for i in range(50):
    tmp_y = random_shuffle_with_score(x)
    for idx in range(len(x)):
        y_s[idx] += tmp_y[idx]


y = [y / 50 for y in y_s]

end_time = datetime.datetime.now()

print((end_time - start_time).microseconds)

plt.plot(x, y)
plt.show()

验证结果

我们执行上述代码后,可以得到这样一个折线图,可以看到,权重越高(x轴),排名相对越靠前(y轴)。

验证结果

区块划分洗牌

原理与重复计数类似,但实现起来更复杂一点,效率波动比较大,偶尔效率会远超上述算法。

它的原理是根据权重将权重合并成一个划分为好几块的池子,然后我们将一块一块石头随机地丢到池子里,如果池子没有被石头砸过,那就算“命中”,如果被砸过了,那就向两旁看看最近的池子哪个没有被砸,选旁边的一个没有被砸的池子。这个算法的关键在于如何保证快速命中。

为何考虑快速命中,是因为我们的石头是随机丢下去的,我们不清楚他落的范围应该属于谁(水面太大了,不好一点点定位),因此我们需要一些锚点去快速定位,例如丢在了某个旗子(锚点)旁边,那么我们就基于这个锚点来定位石头的未知。

所以我使用了一个快速查找的map,和一个自定义hash方法,来实现锚点,通过hash后,我们可以猜测这个石头在哪个位置,然后左右查看找出符合条件的池子。

代码实现

于是,就成了这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from operator import truediv
import random
import matplotlib.pyplot as plt
import datetime

ANCHOR_THRESHOLD = 10


class ShuffleStat:
    def __init__(self, pos, weight, idx):
        self.pos = pos
        self.end = pos + weight
        self.idx = idx
        self.is_selected = False

    def is_in(self, pos):
        if pos < self.pos:
            return -1
        if pos >= self.end:
            return 1
        return 0


def random_shuffle_with_score(in_list: list):
    shuffle_item_list = []
    quick_search_map = {}
    out_list = []
    pos = 0
    anchor_count = 0
    split_threshold = max(int(len(in_list) / 20), 1)

    for idx, item in enumerate(in_list):
        shuffle_item_list.append(ShuffleStat(pos, item, idx))
        if anchor_count % ANCHOR_THRESHOLD == 0:
            quick_search_map[int(pos / split_threshold)
                             ] = len(shuffle_item_list) - 1  # 保存下标
        pos += item
        anchor_count += 1

    for i in range(len(in_list)):
        # 拿到一个随机值
        rand_pos = random.randrange(0, pos)
        guess_pos = int(rand_pos / split_threshold)
        get_idx = quick_search_map.get(guess_pos)
        while(get_idx == None):
            guess_pos -= 1
            if guess_pos < 0:
                raise RuntimeError("发生了奇怪的异常,竟然找不到?")

            get_idx = quick_search_map.get(guess_pos)

        got_one_flag = False
        while got_one_flag == False:
            shuffle_item = shuffle_item_list[get_idx]
            ret = shuffle_item.is_in(rand_pos)
            if ret == 0:
                # 找到了,开始往左右向找合适的
                direct = -1  # <0 向右, >0 向左且加1
                while got_one_flag == False:
                    if False == shuffle_item.is_selected:
                        # 就他了
                        out_list.append(in_list[shuffle_item.idx])
                        got_one_flag == True
                        shuffle_item.is_selected = True
                        break
                    else:
                        if direct < 0:
                            # 向右
                            if(get_idx - direct < len(shuffle_item_list)):
                                shuffle_item = shuffle_item_list[get_idx - direct]
                            direct = - direct
                        elif direct > 0:
                            # 向左
                            if(get_idx - direct >= 0):
                                shuffle_item = shuffle_item_list[get_idx - direct]
                            direct = - direct - 1
                        if abs(direct) > len(shuffle_item_list):
                            raise("怎么找不到呢")
                break
            if ret > 0:
                # 应该往右
                get_idx += 1
            elif ret < 0:
                # 应该往左
                get_idx -= 1
            # print(ret)

            if get_idx >= len(shuffle_item_list) or get_idx < 0:
                raise RuntimeError("怎么又没找到?")
    return out_list


start_time = datetime.datetime.now()
x = [x for x in range(1, 500)]
y_s = [0] * len(x)
for i in range(50):
    tmp_y = random_shuffle_with_score(x)
    # print(tmp_y)
    for idx in range(len(x)):
        y_s[idx] += tmp_y[idx]


y = [y / 50 for y in y_s]

end_time = datetime.datetime.now()

print((end_time - start_time).microseconds)

plt.plot(x, y)
plt.show()

验证结果

我们执行上述代码后,可以得到这样一个折线图,可以看到,权重越高(x轴),排名相对越靠前(y轴)。

验证结果2

我们观测到,这个并不如重复计数洗牌在低权重上有比较明显的趋势,但如果数量较多的话,也不为一种不错的随机方式。

比较

经过我的大致测试,性能方面,区块划分洗牌性能要较高与重复计数洗牌,500条记录的数组在50次随机时间大致在282618(毫秒, 区块划分洗牌) 和 314019(毫秒,重复计数洗牌)。