Moko — Love Between Python & Big Data System
date
May 13, 2025
slug
moko
status
Published
tags
Tech
summary
a note on moko
type
Newsletter
BackGround
Python作为一门data science中频繁使用的语言,易用性强而且生态完善。在大数据场景中,我们会使用Python操作关系型数据(表结构数据)、图结构数据、Tensor等等。他们分别对应了大数据中不同的具体应用场景,从此也衍生出不同的大数据系统——如Spark、Dask、Torch、GRAPE等等。
但是现在越来越多的应用需要多个大数据系统协同工作。以论文中的例子作为Motivation Example。

上述代码中包含三个部分:
- 使用pandas读取关系型数据 (Pandas)
- 将1. 中读取的数据转化为tensor格式并交给模型推理得到可能的欺诈用户 (Tensor)
- 根据2. 中得到的数据进行图计算得到风险路径 (NetworkX)
对于这样的应用来说,作者观察到存在以下优化机会:
- 对于相同的任务来说可能有多个Big Data System可以处理,而且每个系统又有不同的特点,怎么选择出当前应用场景中最合适的系统呢?
- 对于需要多个大数据系统协同合作的任务,跨系统之间的数据交互性能开销极大,如何优化呢?
本文提出了MOKO这个编程框架,用户只需要写出一些上层代码,MOKO框架底层自动将对应的大数据处理任务转化为对对应的Big Data System的调用。同时优化了跨系统之间的数据传输开销。
Challenge
MOKO的设计中主要存在下面几个挑战:
- 优化机会识别:给定一段用户写的Python代码,怎么识别出可以用Big Data System代替处理的那部分代码呢?怎么识别存在优化机会的代码呢?
- 最佳系统选择:对于特定的数据处理任务,如何在多个备选系统中,选出最合适的系统呢?
- 高效数据共享:如何在多个系统之间高效共享数据呢?
MOKO
Architecture
MOKO的架构主要包含四个部分:IR层,physical plan generator,Optimizer,Runtime
下面IR Layer中提到的dialect和task的抽象的作用和区别分别是什么?感觉都是把对一系列Operation的抽象,都是将这一系列Operation映射到某一类系统中去。
IR Layer
该组件接受用户手写的Python代码,将其编译成IR,然后将IRs划分成多个不同的dialects,这些dialects之间互相独立(每个dialect可以理解为可以交给某类Big Data System处理的任务,但是这个步骤应该还无法确定处理该Task的具体系统)
整体来说,IR Layer需要
- 识别出Idiom(几个IR组成的任务,可以映射为用户写的某段代码)
- 通过特定的Idiom Rewriting规则对识别到的Idiom进行优化
Idiom识别
Idiom识别分为两部分:控制面识别和数据面识别。
首先,我们有一系列预先定义好的控制流结构,如for循环或者if条件分支判断。对于用户写的Python代码,我们先是识别控制流结构,也就是识别出某些代码段的控制流结构。然后我们根据代码中具体的数据处理逻辑进行数据面的匹配(如矩阵计算等等)
Idiom改写
识别出Idiom之后,我们会有一系列规则(格式形如<idiom, rewrite>),我们根据我们找到的Idiom,选择对应的改写规则进行优化
这部分解决了前面Challenge中提到的优化机会识别
Generator
该组件将IR Layer生成的IR转化为更底层的IR,并生成一个physical plan。并将一些操作group起来(也就是Task),映射到某类大数据系统,相应地产生对应的数据转换代码(在不同系统之间进行数据转换的代码)
Optimizer
使用cost model选出性能最好的physical plan,然后交给对应的Runtime运行,当Runtime处理完一个Task之后,Optimizer会对剩下的Tasks使用cost model重复刚刚的优化过程。
Runtime
为集成到MOKO的系统提供分布式运行环境(比如把不同的系统分别运行在K8s集群上,根据负载动态扩缩容),同时提供一个cluster level的execution frame存储每个task的中间结果。(底层可能是一个分布式文件系统或者内存上的分布式对象存储)
Data Alignment
之前的跨系统优化工作需要用户自己指定多个系统之间的数据转换逻辑(为了自己的框架能够可扩展)。但是MOKO采取了一个不同的思路。MOKO的思想是用不同的视角来看待数据,不同类型的数据会提供不同的接口。因此MOKO使用了trait作为数据的抽象。
什么是trait?
可以理解为编程语言中的接口类型。对于某个特定系统的特定数据类型可能是多个trait,表示该数据类型同时具备多种不同的能力(接口)。trait描述的是对应数据类型的能力。
有了这种抽象之后,不同系统之间数据类型的转换可以归约到不同trait之间的转换。我们只需要写好不同trait之间转换逻辑即可。
我们以dask.dataframe和Gemini::Graph为例。

dask.dataframe可以用EdgeListTrait、VertexListTrait和ColumnWiseTrait来描述,而Gemini::Graph可以用IncomingAdjListTrait、OutGoingAdjListTrait和ColumnWiseTrait来描述,那么我们只要实现了这几个Trait之间的转换逻辑即可。这时我们就可以用dask.dataframe作为底层physical data对上层应用模拟Gemini::Graph的接口,对上层应用屏蔽底层实现。
既然我们可以用上游系统的对象模拟出下游系统所需要的Traits了,那么下游系统可以使用上游系统的对象吗?
如果不行的话,这篇工作就没那么fancy了。。。
Corner Case:对于一些比较复杂的数据,可能难以用多个Trait描述,则MOKO会将其转换成比较简单的数据类型,然后再用Trait描述。
如果上游系统输出的Trait和下游系统需要的Trait不同,则我们需要解决这个不一致的现象,称之为Trait Lifting。
Trait Lifting有两种情况:
- Method sharing:使用上游任务输出的Traits模拟下游任务所需要的Traits所定义的接口。使用这种方式可以实现zero-copy。
- Physical conversion:需要physical update底层存储以实现下游任务定义的Traits。(如本来上有任务只有IncomingAdjListTrait,我们可以通过改变底层数据结构实现IncomingAdjListTrait和OutGoingAdjListTrait)
由于上游系统的数据类型和下游系统的数据类型可能分别对应多个Traits,因此数据转换方式可能有许多种,如何选出最优数据转换方式,也是一个问题。而这部分就是Optimizer的职责之一
这部分解决了前面Challenge中提到的高效数据共享
Optimizer
前面的欺诈检测应用的IR如图所示,但是这个IR仅仅是一个logical plan,因为从这个IR中我们无法得到底层的具体运行方式、也不知道这个操作会交给哪个系统去执行。

我们需要Optimizer寻找到性能比较好的physical plan。类似于数据库中的解决方案,MOKO也设计了一个基于cost model的physical plan生成方法。
Cost Model
首先需要对问题进行建模
考虑执行路径P=(T, D)
其中:
我们假设这n个task被分发到m个不同的执行引擎执行,则
我们定义:
我们定义两个cost function:
则MOKO采用下面的公式评估P的性能
Optimizer就是根据不同的Lifting Rule和不同的系统选择枚举出不同的P,然后选择cost最低的P作为最后的执行方案。
不同task之间的dependency一定是链状的吗?如果不是的话这样的公式是不是就有问题了
还是用之前的例子说明问题

现在根据左边的logical plan,我们可以根据不同的backend system枚举出不同的execution path。这里由于每条path都只包含一个系统,因此没有展示data alignment的节点。(例子太烂。。。
对于每条路径,MOKO的Optimizer通过前面提到的cost评估公式给出时延开销预测。最后MOKO选出预测性能最好的execution path,生成physical plan(对应图中的Low-level IR),并交给对应的Runtime运行任务。完成该task之后,MOKO将剩余task的logical plan重新交给Optimizer生成新的physical plan,如果新生成的physical plan和之前一致则正常运行。否则采用最新生成的physical plan执行。
这部分解决了前面Challenge中提到的最佳系统选择
总结:
该工作发现Python作为一门易用性极强的语言被用到各种各样的场景中,其中对于大数据背景下的许多场景可能需要多个Big Data System协同工作。但是不同系统之间通过glue code的方式协同可能会导致整体性能下降。MOKO通过MLIR的方式,对用户写的Python脚本进行分析,在多种执行方案中选择出性能最好的执行方案,从单个task来看可能不是最优,但是从整体来看性能确实最好的。我觉得MOKO最重要的思想还是利用了Composed System相比于单个系统所带来的系统视角。可能选择A系统可以很快地处理好TaskC,但是将B系统(A系统的上游系统)的数据转化为A系统的数据可能需要巨大的开销,还不如在B系统中处理TaskC,尽管B系统处理TaskC本身可能性能不是很好。面临这样的Trade off,MOKO通过cost model的方式在确保cost model准确的情况下,帮助系统开发者在前面的Trade Off中做出抉择。