0%

为什么要 One-Hot 编码 ( OHE )

在机器学习中 One-Hot 编码 (One-Hot Encoding, OHE) 是一种经常用到的特征编码方式。对一个有 $n$ 个不同可能取值的特征,可以将其编码为一个长度为 $n$ 的向量。对于第 $i$ 个取值,令编码向量第 $i$ 位取 1 ,其余位取 0 。虽然一眼望去这样和直接将特征编码为 0 至 $n$ 的顺序编码没有什么区别,但是顺序编码在处理没有顺序的特征(国家、语言、物品分类等)时会产生特征取值之间存在顺序的副作用:比如编码为 1 和 2 的特征之间的关系会比编码为 1 和 6 的特征更近。

PySpark 和 Scikit-Learn 中 OHE 的区别

在 Scikit-Learn 和 PySpark 中, OHE 的细节存在一些区别,如果已经习惯了 Scikit-Learn 的输出格式,在用 PySpark 时会产生预料之外的输出。在 Scikit-Learn 的 OneHotEncoder() 中,每一个独立的特征会在输出中单独表达为 DataFrame 的一列。但是在 PySpark 的 OneHotEncoderEstimator 中,编码输出的则是稀疏向量 SparseVector , 并且对于有 $n$ 个值的特征,默认输出的向量维度只有 $n-1$ ,并且最后一个特征会被编码为全零的向量。

另外, PySpark 的 OneHotEncoderEstimator 对非数值特征列进行编码前,需要先通过 StringIndexer 进行顺序编码。

举个🌰

用一个随机生成的数据为例,这个训练数据保存了几个员工的姓名、部门、职位、工作年数、工资以及会说的语言。我们希望对部门和岗位进行 OHE 。

1
2
3
4
5
6
7
8
9
10
11
+---+-------+----------+--------+-----+------+---------------------------+
|id |name |department|title |years|salary|language |
+---+-------+----------+--------+-----+------+---------------------------+
|0 |Arche |IT |Engineer|2 |65000 |[English, Spanish] |
|1 |Beth |Purchase |Manager |5 |100000|[English, French, Spanish] |
|2 |Carl |Marketing |Intern |1 |60000 |[English, Chinese] |
|3 |Dorian |Marketing |Manager |8 |125000|[English, French, Japanese]|
|4 |Escher |IT |Manager |6 |10000 |[English] |
|5 |Faridah|IT |Engineer|7 |80000 |[Chinese, Japanese] |
|6 |Gustav |Marketing |Engineer|2 |75000 |[Spanish, Russian] |
+---+-------+----------+--------+-----+------+---------------------------+

同时,还有一个内容稍有不同的测试数据用于测试编码器在面对新数据和空数据时的行为。

1
2
3
4
5
6
7
8
9
10
11
+---+-------+----------+----------+-----+------+---------------------------------+
|id |name |department|title |years|salary|language |
+---+-------+----------+----------+-----+------+---------------------------------+
|0 |Arche |IT |Engineer |4 |70000 |[English, Spanish, Korean] |
|1 |Beth |Purchase |Manager |6 |110000|[English, French, Spanish, Hindi]|
|2 |Carl |Marketing |Intern |2 |65000 |[English, Chinese, German] |
|3 |Dorian |Marketing |Manager |9 |130000|[English, French, Japanese] |
|4 |Escher |IT |Manager |7 |50000 |[English, Thai] |
|5 |Faridah|IT |null |8 |90000 |[Chinese, Japanese] |
|6 |Gustav |null |Researcher|3 |80000 |[Ukrainian, Vietnamese] |
+---+-------+----------+----------+-----+------+---------------------------------+

第一次尝试:PySpark 的默认行为

如果在初始化 StringIndexer 的时候采用默认参数,原始特征列为 departmentStringIndexer 输出为 department_idxOneHotEncoderEstimator 输出为 department_vec

1
2
3
4
5
6
7
8
9
10
11
12
13
14
si_department = StringIndexer(
inputCol="department",
outputCol="department_idx",
# handleInvalid="error"
)
si_title = StringIndexer(
inputCol="title",
outputCol="title_idx",
# handleInvalid="error"
)
ohe = OneHotEncoderEstimator(
inputCols=[si_department.getOutputCol(), si_title.getOutputCol()],
outputCols=["department_vec", "title_vec"]
)

输出的训练数据中,可以看到编码向量的长度都是 2 (比特征元素数少 1)并且最后一个元素被编码成了零向量,即空稀疏向量。同时,由于 handleInvalid 设置的是 error ,编码器在遇到包含未知/空数据的测试数据直接报错

1
2
3
4
5
6
7
8
9
10
11
+---+-------+----------+--------------+--------------+--------+---------+-------------+
|id |name |department|department_idx|department_vec|title |title_idx|title_vec |
+---+-------+----------+--------------+--------------+--------+---------+-------------+
|0 |Arche |IT |0.0 |(2,[0],[1.0]) |Engineer|1.0 |(2,[1],[1.0])|
|1 |Beth |Purchase |2.0 |(2,[],[]) |Manager |0.0 |(2,[0],[1.0])|
|2 |Carl |Marketing |1.0 |(2,[1],[1.0]) |Intern |2.0 |(2,[],[]) |
|3 |Dorian |Marketing |1.0 |(2,[1],[1.0]) |Manager |0.0 |(2,[0],[1.0])|
|4 |Escher |IT |0.0 |(2,[0],[1.0]) |Manager |0.0 |(2,[0],[1.0])|
|5 |Faridah|IT |0.0 |(2,[0],[1.0]) |Engineer|1.0 |(2,[1],[1.0])|
|6 |Gustav |Marketing |1.0 |(2,[1],[1.0]) |Engineer|1.0 |(2,[1],[1.0])|
+---+-------+----------+--------------+--------------+--------+---------+-------------+

第二次尝试:保留未知数据

以部门特征为例,当初始化 StringIndexer 时设置 handleInvalid="keep" 时,一个单独的最高编码会保留给未知数据。即当某一特征有$n$ 个不同数值时, $0$ 至 $(n-1)$ 会保留给已知特征, $n$ 会保留给未知特征。当 OneHotEncoderEstimator 进行编码时,未知特征则被分配给了空向量,即表中的 (3,[],[])

1
2
3
4
5
6
7
8
9
10
11
+---+-------+----------+--------------+--------------+--------+---------+-------------+
|id |name |department|department_idx|department_vec|title |title_idx|title_vec |
+---+-------+----------+--------------+--------------+--------+---------+-------------+
|0 |Arche |IT |0.0 |(3,[0],[1.0]) |Engineer|1.0 |(3,[1],[1.0])|
|1 |Beth |Purchase |2.0 |(3,[2],[1.0]) |Manager |0.0 |(3,[0],[1.0])|
|2 |Carl |Marketing |1.0 |(3,[1],[1.0]) |Intern |2.0 |(3,[2],[1.0])|
|3 |Dorian |Marketing |1.0 |(3,[1],[1.0]) |Manager |0.0 |(3,[0],[1.0])|
|4 |Escher |IT |0.0 |(3,[0],[1.0]) |Manager |0.0 |(3,[0],[1.0])|
|5 |Faridah|IT |0.0 |(3,[0],[1.0]) |Engineer|1.0 |(3,[1],[1.0])|
|6 |Gustav |Marketing |1.0 |(3,[1],[1.0]) |Engineer|1.0 |(3,[1],[1.0])|
+---+-------+----------+--------------+--------------+--------+---------+-------------+
1
2
3
4
5
6
7
8
9
10
11
+---+-------+----------+--------------+--------------+----------+---------+-------------+
|id |name |department|department_idx|department_vec|title |title_idx|title_vec |
+---+-------+----------+--------------+--------------+----------+---------+-------------+
|0 |Arche |IT |0.0 |(3,[0],[1.0]) |Engineer |1.0 |(3,[1],[1.0])|
|1 |Beth |Purchase |2.0 |(3,[2],[1.0]) |Manager |0.0 |(3,[0],[1.0])|
|2 |Carl |Marketing |1.0 |(3,[1],[1.0]) |Intern |2.0 |(3,[2],[1.0])|
|3 |Dorian |Marketing |1.0 |(3,[1],[1.0]) |Manager |0.0 |(3,[0],[1.0])|
|4 |Escher |IT |0.0 |(3,[0],[1.0]) |Manager |0.0 |(3,[0],[1.0])|
|5 |Faridah|IT |0.0 |(3,[0],[1.0]) |null |3.0 |(3,[],[]) |
|6 |Gustav |null |3.0 |(3,[],[]) |Researcher|3.0 |(3,[],[]) |
+---+-------+----------+--------------+--------------+----------+---------+-------------+

第三次尝试:丢弃未知数据

当初始化 StringIndexer 时设置 handleInvalid="skip" 时,包含未知数据的行会被跳过,即从输出数据中丢弃。但由于这样一来,没有多余的位留给未知数据, OneHotEncoderEstimator 的输出还是会少一位。这时就需要令 OneHotEncoderEstimatorhandleInvalid="keep"

可以看到预测数据的编码输出只剩下了 4 行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
si_department = StringIndexer(
inputCol="department",
outputCol="department_idx",
handleInvalid="skip"
)
si_title = StringIndexer(
inputCol="title",
outputCol="title_idx",
handleInvalid="skip"
)
ohe = OneHotEncoderEstimator(
inputCols=[si_department.getOutputCol(), si_title.getOutputCol()],
outputCols=["department_vec", "title_vec"],
handleInvalid="keep"
)
1
2
3
4
5
6
7
8
9
10
11
+---+-------+----------+--------------+--------------+--------+---------+-------------+
|id |name |department|department_idx|department_vec|title |title_idx|title_vec |
+---+-------+----------+--------------+--------------+--------+---------+-------------+
|0 |Arche |IT |0.0 |(3,[0],[1.0]) |Engineer|1.0 |(3,[1],[1.0])|
|1 |Beth |Purchase |2.0 |(3,[2],[1.0]) |Manager |0.0 |(3,[0],[1.0])|
|2 |Carl |Marketing |1.0 |(3,[1],[1.0]) |Intern |2.0 |(3,[2],[1.0])|
|3 |Dorian |Marketing |1.0 |(3,[1],[1.0]) |Manager |0.0 |(3,[0],[1.0])|
|4 |Escher |IT |0.0 |(3,[0],[1.0]) |Manager |0.0 |(3,[0],[1.0])|
|5 |Faridah|IT |0.0 |(3,[0],[1.0]) |Engineer|1.0 |(3,[1],[1.0])|
|6 |Gustav |Marketing |1.0 |(3,[1],[1.0]) |Engineer|1.0 |(3,[1],[1.0])|
+---+-------+----------+--------------+--------------+--------+---------+-------------+
1
2
3
4
5
6
7
8
9
+---+------+----------+--------------+--------------+--------+---------+-------------+
|id |name |department|department_idx|department_vec|title |title_idx|title_vec |
+---+------+----------+--------------+--------------+--------+---------+-------------+
|0 |Arche |IT |0.0 |(3,[0],[1.0]) |Engineer|1.0 |(3,[1],[1.0])|
|1 |Beth |Purchase |2.0 |(3,[2],[1.0]) |Manager |0.0 |(3,[0],[1.0])|
|2 |Carl |Marketing |1.0 |(3,[1],[1.0]) |Intern |2.0 |(3,[2],[1.0])|
|3 |Dorian|Marketing |1.0 |(3,[1],[1.0]) |Manager |0.0 |(3,[0],[1.0])|
|4 |Escher|IT |0.0 |(3,[0],[1.0]) |Manager |0.0 |(3,[0],[1.0])|
+---+------+----------+--------------+--------------+--------+---------+-------------+

后续操作

进行 One-Hot 编码之后的特征可能还需要后续操作。比如 PySpark 的机器学习模型只支持一整个向量的输入,这就需要将编码输出和其他特征组合起来。有时候还需要输出数据具有和 Scikit-Learn 一样的格式,即每个向量的维度单独存为一列。这篇文章就暂时不涉及了。

问题描述

车辆路径问题同样是一个和实际生活非常接近的问题。问题设定中包含 $M$ 个客户,每个客户有需求 $d_i$ 和所在的位置 $(x, y)$ 。位于起点的仓库可以派出 $N$ 辆送货车,每辆车有最大送货容量 $C$ ,车辆基础成本为 $S$,从 a 点开车到 b 点的成本和距离成正比。车辆要覆盖所有客户的需求,不重不漏,并且优化目标为求最少送货成本、需要的送货车辆数量 $N$,以及每辆车的路径。

局部搜索

由于车辆路径问题变量太多,总车辆数、每辆车负责的客户以及客户的顺序均可以调整,因此即时通过构建整数规划的标准形式并通过工具包求解,效率也不会太高。为了完成作业,这里我们只采用局部搜索的方式解决问题。

局部搜索的资料有很多,这一篇文章和我们要解决的问题非常吻合,因此我们尝试复现文章中设计的搜索方法求解问题。

为了避免歧义,我们先定义一些术语:

  • 一辆车从客户 a 直接行驶到客户 b 时,我们称这辆车驶过了一条“边” ( edge )
  • 一辆车连续驶过多个客户,即驶过多条首尾相连的边时,我们称这辆车驶过了一段“路径” ( path )
  • 一辆车从起点出发,途径所有分配给它的客户,最终回到起点时,我们称这辆车驶过了一条“回路” ( route )

有了这些术语,我们就可以定义四种基本的局部搜索模式了。

路径平移 Shift

VRP 路径平移

路径平移,就是将一条回路中的部分路径移动到另一条回路中的某个位置,可以正接也可以反接。如果在不违反限制条件的情况下,可以讲一整条回路平移到另一回路中,那也就相当于合并了两条回路,减少了一辆车的需求。

路径交换 Interchange

VRP 路径交换

路径交换,与平移类似,就是交换两个回路中各自的两端路径,由于两路径均可以在交换之后正接反接,因此一次交换可以产生四种可能的结果。

回路内换边 2-exchange

VRP 回路内换边

回路内换边,和 TSP 的 2-opt 操作相同,即重新连接一条回路内两条边,进行一个“拧麻花”的操作。

回路间首尾重组 Ladder

VRP 回路间首尾重组

回路间首尾重组,就是在两条回路中各自设置一个断点,将两条回路断点两侧的路径重新连接形成两条新回路。根据首尾连接的方式不同,可以产生两种结果。

搜索初始解

局部搜索在操作过程中,只能在有效解间进行搜索并优化目标函数,因此需要找到一个效果不是很好但是可行的解作为搜索的起点。由于车辆路径问题变量多、关联性强、限制条件多,因此很难直接得到一组可行解。在这里我们采用贪心法得到第一组可行解并开始搜索。

搜索的过程是:每当还有顾客没有覆盖到但当前没有车辆有足够容量时,增加一辆货车,从目前没有被覆盖的需求最小的客户开始送货,直至容量不足。如此操作直至所有客户都被覆盖。这样可以保证每个客户覆盖不重不漏,并且车辆容量都足够。

找到初始解之后,就可以开始进行局部搜索,直至达到最大搜索时间、最大搜索次数或者无法通过局部搜索找到更优解为止。求解代码在这里

问题描述

设施选址问题是一个比较具有现实意义的优化问题。问题可以通过 $N$ 个备选的供应设施位置和 $M$ 个客户来描述。选定一个候选位置建立供应设施 $f$ 的时候需要付出 $s_f$ 的建设成本,并且设施的最大供应量不能超过 $cap_f$ ;与之对应每个客户 $c$ 均有一个需求值 $d_c$ 需要被满足;设施 $f$ 和客户 $c$ 之间的距离为两者之间的欧几里得距离 $d(f, c)$ ,送货成本等于送货距离。题目要求每个客户的需求必须满足,且至多只能接收一个供应设施的送货,同时所有设施均不能超负荷运转,即供应量不应超过设施容量。优化目标是在满足所有条件的情况下求出成本最低的设施选址以及送货安排。

整数规划

此题直接通过整数规划就能获得理想的结果。建立规划模型时,需要分别添加两组变量,一组用来表示是否在一个备选位置上建立设施,共 $N$ 个 0-1 变量,另一组用来表示每个客户接受的供应设施,共 $N\times M$ 个 0-1 变量。设置好变量后,将设施位置成本、容量,客户需求以及每个位置和客户之间的距离设置在限制条件中直接求解即可得到最优解,详见完整代码

问题描述

旅行商问题,又称旅行推销员问题,简称 TSP ,在很多科普读物流行文化中都出现过这一问题问题的影子。旅行商问题的基本内容非常简单:一位推销员要去一些城市推销商品,已知城市的位置和两两之间的距离,希望能寻找到一条能便利所有城市,并且每个城市只去一次,最后回到出发城市的路径,以节约时间和旅行成本。

即使不经过任何转化,原始形式的旅行商问题就可以直接应用在路径规划、后勤学、芯片电路设计等场景中,稍加改动,又可以用来解决 DNA 测序等问题的子问题,因此旅行商问题是优化领域中被研究的最多的问题之一。

问题求解

旅行商问题是 NP 困难问题,因此不存在多项式复杂度的最优解法。对于规模较大的问题,可以采用近似解法。

贪心法

用贪心法解 TSP 只需选取一个城市作为起点,然后每一步都选择距离当前位置最近的城市。贪心法通常无法找到非常优秀的解,找到解平均长度比最优解长 25% 左右。但是,存在一些巧妙设计的旅行商问题,使得贪心法反而会找到最差的路径,因此贪心法的使用范围有限。

局部搜索法

局部搜索也可以用来求解 TSP ,最常见的方法是 2-opt ,简单来说就是对路线进行“拧麻花”式的优化。
TSP 2-opt 优化思路
正如图中上半部分那样, $be$ 和 $df$ 两条路线有交叉,如果能将 $cde$ 部分调转重新接入路径,则 TSP 的总距离可以减少。如果能尽可能多地尝试可能的翻转方式,如果翻转之后总路径能缩短则保留翻转并继续搜索,直至无法通过翻转部分路径缩短总距离为止,可以得到一条比较优秀的遍历所有城市的旅行路线。

很多时候, 2-opt “拧麻花” 的效果还不够好,还可以采用同时翻转更多段路径的 k-opt 以及可以动态调整翻转段落数的 v-opt 。这些搜索方法的效果更好,但复杂度也更高。 2-opt 的代码在这里

问题描述

集合覆盖问题是一个经典的组合优化问题,问题通常可以描述为:给定全集 $\mathcal{U}$ 以及 $\mathcal{S}={s_i\in\mathcal{U} \mid s_1, s_2, \ldots, s_n}$ 并且 $\cup_{i=1}^{n} s_i=\mathcal{U}$ 。集合覆盖问题的目标就是找到一个 $\mathcal{S}$ 的最小子集 $\mathcal{C}$,使得 $\mathcal{C}$ 的并集为 $\mathcal{U}$ 。

贪心法

贪心法可以用来求解集合覆盖问题,并且思路很简单:每次选择覆盖了最多尚未被覆盖元素的集合即可。集合覆盖问题是 $\mathcal{NP}$ 完全问题,因此贪心法只能得到 $\mathcal{O}(\log n)$ 近似的近似解。这也是多项式时间内能得到的最好结果了。

整数规划

虽然时间复杂度很高,但整数规划可以求出集合覆盖的最优解,并且形式比较简单,在 gurobi 的帮助下,解决一定规模的问题需要的时间还是可以接受的。

令 $M=|\mathcal{U}|$ ,$N=|\mathcal{S}|$ ,设置变量 $x_0, x_1, \ldots, x_{N-1}$ 和系数 $a_{00}, a_{01}\ldots, a_{0(M-1)},\ldots, a_{(N-1)(M-1)}$ 。其中 $x_i=1$ 表示选择第 $i$ 个集合,反之不选,$a_{ij}=1$ 表示集合 $i$ 中包含 $\mathcal{U}$ 中第 $j$ 个元素,反之没有。为了保证 $\mathcal{U}$ 中所有元素均被覆盖,只需保证每个元素都被包含在至少一个选中的子集中,因此集合覆盖问题的标准形式就可以写出标准形式如下:

$$\begin{align}
\min & \sum i_{i} \
s.t. & \sum_{i=0}^{N-1} a_{ij}x_i\geq 1 & j=1,\ldots, M
\end{align
}$$

完整的解题代码在 Github 上。

我在博客的关于页面放了一个 LeetCode 的进度表,分别标记了每道题是否已经写出代码,以及是否写了解析。手动更新这些内容非常费时费力,同时也容易出错,因此今天试着写了一小段 Python 程序以自动化更新解题进度,主要用到了简单的正则表达式匹配和 python 自带的 filtermap 命令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import re
from os import listdir

src_list = listdir('SRC_DIR')
src_list = filter(lambda filename: re.match('\d\d\d', filename), src_list)
src_nums = set(map(lambda filename: int(filename.split('_')[0]), src_list))

post_list = listdir('POST_DIR')
post_list = filter(lambda x: x[:8] == "Leetcode", post_list)
post_nums = set(map(lambda filename: int(filename.split('-')[1]), post_list))

lines = []
with open("index.md", "r") as index:
for line in index:
if re.match('\|(\d)+\|', line):
items = line.split('|')
idx = int(items[1])
if idx in src_nums:
items[4] = u"\U00002713"
if idx in post_nums:
items[5] = u"\U00002713"
lines.append('|'.join(items))
else:
lines.append(line)

for line in lines:
print(line, end='')

我的 LeetCode 代码文件名格式为:###_name.py , Hexo 上解析文章的文件名为: ‘Leetcode-###-name.md’ 。因此我才用的方式为:遍历两个文件夹,获得已经保存的代码和文章对应的题号,保存成 set 。然后再逐行遍历 index.md ,遇到表格第一列是数字的行,就取得题号并判断该题号在 src_numspost_nums 中是否存在。判断完成后将对应的表格列改成 u"\U00002713" 即 ✓ 。

之所以称之为半自动,是因为 index.md 中的题目列表还只能手动添加,目前只添加了前700题,希望以后能有办法把这一步也自动化。

问题描述

着色问题来自于人们绘制地图的需求。在绘制地图时,人们希望相邻的国家使用不同的颜色,并且使用尽可能少的颜色绘制整张地图。

进行数学抽象后,问题就变成了:给定一个无向图 $\mathcal{G}(\mathcal{V},\mathcal{E})$ ,其中 $\mathcal{V}$ 为顶点集合, $\mathcal{E}$ 为边集合,希望找到 $\mathcal{V}$ 的一个划分将其分成 $K$ 组,使得每个划分中的顶点均互不相邻。最小着色问题得目标即是找到满足条件的最小的 $K$ 。

背包问题一样,图着色问题也是 NP 完全问题,因此无法找出多项式时间的最优解法。不过通过贪心法可以快速找到高质量的次优解,并可以以此为基础通过整数规划找到最优解。

贪心法

用贪心法解决着色问题的基本思路非常简单,主要步骤为:

  1. 通过某种贪心策略不断寻找图中的不相连点集
  2. 将每次找到的点集指定一种颜色并从图中去掉这些顶点
  3. 重复1、2两步直至所有定点均被着色

Python 下常用的图论分析软件包 NetworkX 中已经提供了很多常见的贪心策略和着色函数。因此贪心法解染色问题最简单的实现就是对于给定的图 $\mathcal{G}(\mathcal{V},\mathcal{E})$ ,将所有贪心策略均尝试一遍,保留所需颜色数最小的解。贪心解法函数的 python 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def greedy(node_count, edges):
graph = nx.Graph()
graph.add_nodes_from(range(node_count))
graph.add_edges_from(edges)

strategies = [nx.coloring.strategy_largest_first,
nx.coloring.strategy_random_sequential,
nx.coloring.strategy_smallest_last,
nx.coloring.strategy_independent_set,
nx.coloring.strategy_connected_sequential_bfs,
nx.coloring.strategy_connected_sequential_dfs,
nx.coloring.strategy_connected_sequential,
nx.coloring.strategy_saturation_largest_first]

best_color_count, best_coloring = node_count, {i: i for i in range(node_count)}
for strategy in strategies:
curr_coloring = nx.coloring.greedy_color(G=graph, strategy=strategy)
curr_color_count = max(curr_coloring.values()) + 1
if curr_color_count < best_color_count:
best_color_count = curr_color_count
best_coloring = curr_coloring
return best_color_count, 0, [best_coloring[i] for i in range(node_count)]

整数规划

着色问题同样可以转换成标准 ILP 形式之后利用 gurobi 求解。构造方法可以参考一本老教材 Applied Mathematical Programming 中第九章 Integer Programming (整数规划)给出的例子。

标准形式

令 $N=|\mathcal{V}|$ ,设置两组变量 $y_0, y_1, \ldots, y_{N-1}$ 以及 $x_{00}, x_{01}\ldots, x_{0(N-1)},\ldots, x_{(N-1)(N-1)}$ 。其中 $y_k=1$ 表示使用第 $i$ 种颜色 $x_{ik}=1$ 表示顶点 $i$ 使用颜色 $k$ 进行着色。可以列出优化问题标准形式如下:

$$\begin{align}
\min & \sum y_{i} \
s.t. & \sum_{k=0}^{N-1} x_{ik}=1 & i=1,\ldots, N\
& x_{ik} - y_k\leq 0 & i,k=1,\ldots, N \
& x_{ik}+x_{jk}\leq 1 & (i,j)\in\mathcal{E}\
& x_{ik},y_k\in{0,1}
\end{align
}$$

将题目中给定的数据按标准形式输入到 gurobi 中即可求得最优解,但是随着图规模的增加,解题所需的时间也以极快的速度增长。因此,需要对标准形式进行一些改进,进一步提高性能

性能优化1:去除可能的重复解

因为着色问题有对称性,因此通过标准形式可以得到很多等价的着色问题的解法。比如,有5种可能的颜色(1,2,3,4,5)可供选择时,如果最优解的着色数是3,那么选择(1,2,3)和(3,4,5)以及(1,3,5)等任意三种颜色都是等价的,但是在 ILP 中他们表示的是完全不同的解。因此需要增加一条限制:优先选择编号更小的颜色。

此外,对于一个已知的解,对调两组采用了不同颜色的顶点的着色,得到的新解和原来的解也是等价的,但是在 ILP 中对调颜色的解会被认为是不同的。因此需要增加第二条限制,编号小的颜色应该对尽可能多的顶点进行着色。

性能优化2:减少可选的颜色

ILP 标准形式中,由于不知道最少可以使用多少种颜色,因此在初始化时必须使用 $N$ 种颜色,即假设每个定点都需要一种不同的颜色,这样就需要 $N^2+N$ 个变量。但是根据贪心法得出的次优解可以看出:对于作业题目中给出的数据,着色所需的颜色数通常远远小于图的定点数。因此可以在初始化 ILP 之前,先使用贪心法求的次优解所需的颜色数 $m$,并按有 $m$ 个可选颜色设置 ILP ,这样可以大大减少 ILP 的规模。

性能优化3:用贪心法的解初始化 ILP

在用贪心法获得了次优解之后,可以同时使用次优解的值为 ILP 各个变量赋初始值值,减少 ILP 寻找第一个可行解所需的时间。但是根据观察,这个做法带来的性能提升似乎并不明显。

完整解题代码比较长,就不在这里贴了。

简介

在很多和网络有关的场景中,我们常常需要进行一类操作:记录节点之间是否直接相连,然后计算两个节点之间是否能通过某些邻居而间接连接。比如:在社交网络中,我们希望能够快速知道两个人是否可以通过某些共同好友而建立联系;在地图中,我们希望能够快速知道两地之间是否存在一条通路。如果我们只需要判断这样的连接是否存在而不需要知道具体的连接路径,基于并查集的合并-查找算法 (Union Find) 就是一种非常有用的方法。

数据结构和方法

并查集是一种树形的数据结构,其思路就是维护一颗节点的树,当两个不直接相连的节点具有相同的根结点时,就可以认为两个节点存在连接。顾名思义, Union Find 需要用到两个方法:

  • union 即合并,负责记录节点之间的连接关系
  • find 即查找,负责查询输入的两个点之间是否存在相连关系

同时,在初始化时还要制定节点数 n

Quick Find

Quick Find 可以保证在常数时间复杂度内完成查找,采用的思路是维护一颗扁平的树:
在数据结构中添加一个数组 idx 负责记录每个节点的父节点,根节点的 idx 值就是自己,即 idx[i]=i。在每次执行一次 union 操作时,如 union(p, q)idx[p] 的值设置为 q ,并且遍历所有节点,如果 idx[i]==ppi 的父节点,则重新设置 idx[i]=q 。保证每个节点的父节点就是根结点。查找时,调用 find(a, b) ,只需判断 idx[a]==idx[b] 即可,复杂度为 $O(1)$。

虽然 Quick Find 可以非常高效地进行查找,但是每次添加一条新边时,都要扫描一遍所有节点的父节点,添加每个节点的连接关系时都需要 $O(N)$ 次操作,扩展性较差。

Quick Find 的代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class quickFind(object):
def __init__(self, n):
self.nodes = n
self.idx = list(range(n))
self.parts = n

def union(self, edge):
p, q = edge[0], edge[1]
idxp, idxq = self.idx[p], self.idx[q]
if idxp == idxq:
return

for i in range(n):
if self.idx[i] == idxp:
self.idx[i] == idxq
self.parts -= 1

def find(self, p, q):
return self.idx[p] == self.idx[q]

Quick Union

Quick Find 的 union 操作效率太低,为了提升 union 效率,可以改用 Quick Union 的做法。 Quick Union 不能保证在常数时间复杂度内完成查找,但是可以快速添加新边。

Quick Union 不再保证树是扁平的,而是在每次执行 union(p, q) 时,找到 pq 所在树的根节点 rprq ,并且将 rq 设置为 rp 的父节点,在查找时,同样找到对比两个点所在的根结点即可。 Quick Union 的时间复杂度就完全取决于树的深度,但是对树的平衡性完全没有控制,因此在最坏情况下,输入的数据是一条链形的结构,即树变成了链表,那么 unionfind 的复杂度都会变成 $O(N)$ ,比 Quick Find 还要差。

Quick Union 的代码如下,由于经产需要查找根结点的操作,因此单独定义了 root 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class quickUnion(object):
def __init__(self, n):
self.nodes = n
self.idx = list(range(n))
self.parts = n

def root(self, node):
while node != self.idx[node]:
node = self.idx[node]
return node

def union(self, edge):
p, q = edge[0], edge[1]
rootp, rootq = self.root(p), self.root(q)
if rootp == rootq:
return
else:
self.idx[rootp] = rootq
self.parts -= 1

def find(self, p, q):
return self.root(p) == self.root(q)

Weighted Quick Union

Quick Union 的时间复杂度高度依赖于树的高度,但是对树的平衡性完全没有控制,会无条件地将 rq 设置为 rp 的父节点。因此,如果能让并查集的树形结构尽可能平衡,树的高度数量级就不会超过 $O(log N)$ ,可以有效降低时间复杂度。

为了实现这一点,我们需要在并查集中增加一个 size 数组,记录每个节点及其子节点的数量总和,这样当合并两棵树时,只需要比较两个根结点的 size 值,将尺寸较小树的根结点的父节点指向尺寸较大树的根结点,就可以尽量保持树的平衡性了。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class weightedQuickUnion(object):
def __init__(self, n):
self.nodes = n
self.idx = list(range(n))
self.size = [1] * n
self.parts = n

def root(self, node):
while node != self.idx[node]:
node = self.idx[node]
return node

def union(self, edge):
p, q = edge[0], edge[1]
rootp, rootq = self.root(p), self.root(q)
if rootp == rootq:
return
elif self.size[rootp] > self.size[rootq]:
self.idx[rootq] = rootp
self.size[rootp] += self.size[rootq]
else:
self.idx[rootp] = rootq
self.size[rootq] += self.size[rootp]
self.parts -= 1

def find(self, p, q):
return self.root(p) == self.root(q)

Weighted Quick Union with Path Compression

Weighted Quick Union 的效果已经很好了,但是并查集的潜力还没有完全发掘出来:既然每次运行 root 方法时,都会找到根结点,那何不直接将途中找到的所有节点直接指向该根结点呢?这样做的代价是:由于 root 方法会经常被调用,频繁地生成和销毁中间节点数组带来的成本会很高。因此我们采用一种开销更低,但是效果接近的路径压缩做法:在寻找根结点时,将所有节点的父节点换成其祖父节点,这样每次执行 root 时,路径的长度都会被压缩一次,额外的内存开销为常数,并且额外的代码只有一行。我们只需要把 root 稍作修改即可:

1
2
3
4
5
6
def root(self, node):
while node != self.idx[node]:
# 将祖父节点变成父节点
self.idx[node] = self.idx[self.idx[node]]
node = self.idx[node]
return node

经过简单的修改,带来的性能提升是惊人的,经过路径压缩之后,unionfind 复杂度的一个比较宽松的上界是 $O(log ^* N)$ ,即迭代对数,其定义为对 $N$ 反复进行对数操作,得到一个小于 1 的结果所需进行的对数运算次数,另一个更紧凑的上界是 $O(\alpha(N))$ 即反阿克曼函数 。这两个函数的增长速度都极为缓慢,对于我们通常能见到的数量级的输入,可以认为这两个函数的值不会超过 5 ,如 $log ^* 2^{65536}=5 $ 。也就是说,经过路径压缩的并查集,可以做到在非常接近常数复杂度的速度下实现 unionfind

复杂度列表

Algorithm Constructor Union Find
Quick Find $O(N)$ $O(N)$ $O(1)$
Quick Union $O(N)$ $O(h)$* $O(h)$
Weighted Quick Union $O(N)$ $O(log N)$ $O(log N)$
Weighted Quick Union with Path Compression $O(N)$ $O(\alpha(N))$ $O(\alpha(N))$
* $h$ 是树的高度

参考资料

Case Study in Algorithms by Robert Sedgewick

题目要求

通过程序检测只包含 .*正则表达式 p与输入的字符串 s 是否匹配。

  • . 可以可以匹配除换行符之外的任意字符
  • * 可以匹配 * 之前的字符重复零次或任意多次

如:

1
2
3
4
5
6
7
isMatch("aa","a") = False
isMatch("aa","aa") = True
isMatch("aaa","aa") = False
isMatch("aa", "a*") = True
isMatch("aa", ".*") = True
isMatch("ab", ".*") = True
isMatch("aab", "c*a*b") = True

思路:

总体来说,这是一道相当难的题目,难点是对于 * 的处理。根据网上的整理,此题可以用动态规划和递归法解决。

动态规划

动态规划的细节在 Discrete Optimization 学习笔记 (1) 背包问题 这篇文章里已经做了简单的介绍。在这道题中,由于星号的特性,一个表达式可以匹配多种长度不同的字符串,因此我们同样需要一个二维的动态规划布尔数组。其中, dp[i][j] 为真表示 s[0:i]p[0:j] 可以匹配,为假则不能匹配。

对于只有字符和 . 的情况,验证匹配非常简单,直接逐个字符判断即可,难点在于对于星号的处理。

isMatch("aab", "c*a*b") 为例, dp 是一个4行6列的二维数组,初始化 dp[0][0]=True 表示空字符串可以和空表达式匹配。在循环时,内循环为 j 外循环为 i

每次循环时,如果不涉及到星号,则只需考虑 s[i - 1] == p[j - 1] or p[j - 1] == '.' 是否成立,成立则令 dp[i][j] = dp[i - 1][j - 1]

如果当前字符为 *,则先使 dp[i][j] = dp[i][j - 2] ,即让 s[0:i] 去匹配当前星号出现之前的表达式,也就是说让该 * 之前的字符出现一次;另外如果 p[j - 1] == s[i - 1] 或者 p[j - 1] == . 则说明在 a*.* 之前的字符和 s 当前位置字符相同,可以尝试让 * 之前的字符出现零次再匹配一次,即 dp[i][j] |= dp[i - 1][j]

最后得到的动态规划数组为:

1
2
3
4
[[True, False, True, False, True, False]
[False, False, False, True, True, False]
[False, False, False, False, True, False]
[False, False, False, False, False, True]]

dp[-1][-1] ,为真,匹配成功。

代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Solution(object):
def isMatch(self, s, p):
"""
:type s: str
:type p: str
:rtype: bool
"""
dp = [[False for i in range(len(p) + 1)] for j in range(len(s) + 1)]
dp[0][0] = True
for j in range(1, len(p) + 1):
if p[j - 1] == '*':
dp[0][j] = dp[0][j - 2]
for i in range(1, len(s) + 1):
for j in range(1, len(p) + 1):
if p[j - 1] == '*':
dp[i][j] = dp[i][j - 2]
if s[i - 1] == p[j - 2] or p[j - 2] == '.':
dp[i][j] |= dp[i - 1][j]
elif p[j - 1] == s[i - 1] or p[j - 1] == '.':
dp[i][j] = dp[i - 1][j - 1]

return dp[-1][-1]

递归

递归只需要从前向后不断检查字当前位置的字符以及之后的尾部字符串是否匹配,最终返回真假即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Solution(object):
def isMatch(self, s, p):
"""
:type s: str
:type p: str
:rtype: bool
"""
if len(p) == 0:
return len(s) == 0
if len(p) == 1 or p[1] != '*':
if len(s) == 0 or (s[0] != p[0] and p[0] != '.'):
return False
else:
return self.isMatch(s[1:], p[1:])
else:
i = -1
while i < len(s) and (i < 0 or p[0] == '.' or p[0] == s[i]):
if self.isMatch(s[i+1:], p[2:]):
return True
i += 1
return False

参考:

  1. Regular Expression Matching 解题报告
  2. Leetcode 分类总结 Regular Expression Matching

题目要求

给出 n 个数字组成的排列 nums ,求出按照字典顺序的下一个排列。

思路:

解决此题需要搜索三个位置,先用两个变量 i-1i 从后向前搜索,直至找到满足 nums[i-1] < nums[i] 的情况;之后再用 j 从后向前搜索,直到出现 nums[j] > nums[i-1] 。之后调换 nums[i-1]nums[j] 对调,再将 nums[i] 之后的所有元素倒序排列,即可得到 nextPermutation() 即下一个排列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Solution(object):
def nextPermutation(self, nums):
"""
:type nums: List[int]
:rtype: void Do not return anything, modify nums in-place instead.
"""
l = len(nums)
for i in reversed(range(l)):
if nums[i - 1] < nums[i]:
break
if i > 0:
for j in reversed(range(l)):
if nums[j] > nums[i - 1]:
nums[i - 1], nums[j] = nums[j], nums[i - 1]
nums[i:] = nums[i:][::-1]
break
else:
nums[:] = nums[::-1]