您的当前位置:首页函数式内功心法-08: 流式复合技术之Conduit海纳百川

函数式内功心法-08: 流式复合技术之Conduit海纳百川

2024-12-14 来源:哗拓教育

流式计算是我非常喜欢的技术。无流式,不快活!ALL in streaming!
所以,这次会废话多一点,嘿嘿。

早期学习spark跟storm时候,决然选择clojure发展下去,就是看到了流式引擎未来的辉煌。虽然目前spark在机器学习推动下如日中天,storm随着创始人自立门户成为弃子,但是流式浪潮必将掀起腥风血雨,浴火重生。

我已经深深地感觉到,流式引擎始于数据世界,亦将辉煌于云计算时代。流式技术是一剂能从根本上解决读写分离的良药,能一统计算引擎与查询引擎, 在各个发展方向中不断壮大,最终九九归一,笑傲江湖!

流式引擎应用在哪里?

流式引擎无处不在。从性质上主要分为事件流与数据流。
事件流较为简单,一般相互独立,数据流则复杂一些,亦可以将事件消息作为数据处理,即将事件流作为数据流来玩。在涉及状态管理的情况下,事件与数据流引擎基本无差异。
简单介绍一下各个方面的应用:

  1. 文件系统(ceph)
    a. 从简单的文件系统来说, inotify接口就是一个事件流
    b. 有了事件流filebeat可以调用系统接口得到数据流。
    c. 云存储ceph亦是如此
  2. 数据库(postgresql, elasticsearch)
    a. postgresql数据库的事件流为触发器,可以很方便地使用扩张与外界流通。
    b. postgresql数据库的数据流则为replicate接口,本质上从底层日志文件读取
    c. elasticsearch使用NRT策略实现数据流准实时
  3. 消息队列(rabbitmq, zeromq, kafka)
    最标准的实时流组件自然就是消息队列了
    a. RabbitMQ主要用于事务事件流,保障消息稳定性
    b. ZeroMQ主要用于网络事件流,提高网络的扩展性
    c. kafka主要用于数据流机制,高并发低延迟
  4. 配置管理(zookeeper)
    zookeeper对配置变化进行监听,产生事件流.
  5. 前端展现(websocket, react)
    a. 从后台服务到前端的事件流主要通过websocket触发
    b. 界面ui的事件流触发主要通过facebook的react框架完成

现在可以看到,所有的系统已经完成了实时的闭环.

  1. ui通过react框架完成前端事件流机制.
  2. 前端事件流通过http以及websocket建立后台事件流通讯.
  3. 后台服务事件流通过CSP机制完成业务接口实时响应功能.
  4. 后台服务配置状态通过zookeeper事件流管理
  5. 后台服务数据流通过kafka消息队列机制完成数据整合.
  6. kafka通过postgresql以及es存储引擎完成数据分析功能.

有了事件流及数据流,接着看强大的流式引擎。

  1. 事件流与数据流处理引擎(csp,ksql,conduit)
    常用的事件流引擎就是CSP跟Actor了, 简单快捷!
    常用的数据流引擎kafka streams & KSQL/storm/onyx/flink
    conduit与transducer为可复合的流式引擎,也是本文的核心。
  2. 流式引擎状态存储(rocksdb)
    目前主要采用的状态存储一般为KV存储,主要分为rocksdb以及redis+hbase
    rocksdb更简单好用一些, 在区块链及kafka streams引擎中起着至关作用。

流式引擎相比较于批量引擎强大在何处?

随着响应式编程跟实时计算的兴盛, 批量引擎逐渐在衰落。

  1. 批量引擎没有事件流响应机制。
    所以需要引入调度器事件流机制来连接流程,引起了极大的复杂度跟维护性。并且对于调度机制来说,目前的技术大部分采用手工配置,日积月累形成较大的维护负担。
  2. 批量引擎不能管线运行,造成极大的内存消耗, 存储浪费和时间延迟。
    例如如果有五个表进行join,就得进行四步中间操作,如果采用流式就可以同时一起遍历所有流对kv存储进行操作。加上调度系统的作业依赖,延迟性进一步加大。
  3. 批量引擎无法动态资源管理,对于系统稳定性有极大影响.
    当前分布式批量引擎对于资源的分配都是静态的,对于hdfs来说按照块来化分更是硬伤。由于数据的存储,内存的分配,计算的复杂度没有较好的关联性,需要大量的人工介入与参数调整。而流式系统对于内存消耗较少,存储压力极小,计算复杂度可以自动化动态增加节点进行微调,所以有着更大的扩展性与稳定性。
  4. 批量引擎调试成本太高,人工介入性过多。
    批量引擎失败后,没有自动恢复触发机制,需要人工去干预。对于作业并行以及数据间的相互影响,一旦不慎重考虑,将是一个巨大的恶梦等在前面。而流式处理在设计之处并行上极为独立,相关的数据修复及影响将会自动触发,所以简化了系统设计,将系统的健壮性根值在程序逻辑中,减少了人工性,加强了事件性自主修复功能。由于流式机制的响应式特性,对于排除故障亦是更加方便。
  5. 批量引擎短时间产生极大io及CPU压力,无法与查询引擎共存。
    由于批量引擎处理过程中,带来大量的io及CPU压力,产生jvm gc及磁盘读取过慢,会极大影响外部的查询接口。目前唯一能做的办法是将批量过程隔离开来,仅同步最后结果至服务平台。但是限制还是非常大的,同步数据量占整个io消耗的比重是很难预估的。如果产生的结果大部分要同步,最终批量分隔出来产生的效果并不会明显改善,并且对于分布式系统需要两套集群环境开销,非常不经济。如果采用实时系统,所有的载荷通过实时平摊,加入分片机制扩展缓和,流式计算引擎与服务查询引擎是可以很简单共存的。

前面都是废话,我们要开始正题了

  1. 流式引擎的分类
    a. Tuple事件流架构-storm
    b. Sequence型架构-kafka streams, storm trident, flink
    c. Transducer复合型架构-clojure
    d. Conduit复合型架构-haskell
  2. 深入理解Conduit
    a. Conduit的基本原理
    b. Conduit的高级技巧
  3. Conduit源码剖析
    a. Conduit类型定义
    b. Conduit的传递与融合
    c. Conduit的基本函数-yield, wait, lift, leftover
    d. Conduit的组合函数-yieldMany, awaitForever, fold, map
    e. Conduit的应用函数
    e. Conduit的Chunk机制

一. 流式引擎的分类

前面谈到,数据流是更高级的事件流,因为事件流仅仅关心当前事件并做出响应,而数据流在此之上要考虑更多的状态计算,跨行汇总之类的东西。

a. Tuple事件流架构-storm

当年最早在流式引擎领域称雄的就是storm了,那时候的流式处理比较原始,基于事件流来完成。
比如发送一个tuple事件之后,产生ack应答事件,保障数据处理的完整性。当然底层考虑的问题会比较复杂,一旦涉及网络,消息的保障性就会异常麻烦。

  • 最简单的搞法是最多一次,发送了我就不管了,至少下游是否接受到并处理我不关心,算是脏乱差的搞法,在网络不复杂的情况下已经能处理大部分数据精度要求不高的情况。
  • 接着就是至少一次,发送之后要求接受方应答,如果没有收到应答,定期重新发送。在这种情况下,如果应答发送后没有接受到回应,是会再次发送的。所以存在多次发送的情况。多次发送的情况下游通过消息id信息进行幂等处理一下就好了。
  • 当然大家都想要严格一次的效果。如果前端幂等方便实现当然是最好的。如果不能实现,我们可以对消息进行批次分类,第一批给一个事务id,对没有应答的消息再次发送。对于接受方,可以通过事务id来进行幂等处理。

当然事件流引擎还要处理数据流入速度过快的情况,这就是回压技术。当压力过大时,减缓上游的取数速度。
在事件流引擎里面没有状态管理的概念,所以需要人工管理。

  • 一种是内存性的,基于内存运算,可以通过rpc机制访问。
  • 另一种是借助redis, hbase, rocksdb之类的KV存储引擎完成.
    在这种事件流机制的情况下,开发效率较低,代码复用性差。好在有了事件流机制的保障下,我们可以开始下一阶段的流式处理抽象。

b. Sequence型架构-kafka streams, storm trident, flink

storm的tuple事件流机制之后,函数式技术在数据领域方面的先天优势开始发挥出来。

当把事件流抽象到数据流的概念的时候,我们可以无需额外编码就可以运用映射函数式里的map,filter, concat, reduce, sliding, group by各种内存状态操作。

映射函数式的核心是sequence,就是一个惰性的序列,与java的iterator接口类似。

映射函数式是函数式的基本功能,不够强大,但是应用广泛,包括java streams, spark rdd, scala, flink, storm trident, kafka streams都是映射函数式,属于函数式初级功能。

映射函数式有很大的缺陷,只能向下复合,不能平行复合。拿最基本的平行复合例子来说,比如用映射函数式同时独立计算sum跟count,是很难实现的。
为什么呢? 因为映射式函数是不可侵入的。
当我们计算sum的时候,整个sequence是作为一个整体处理的,没法同时侵入count的过程。之所以映射式函数不可侵入,是因为映射函数式考虑的是整体操作。

我们知道基本的函数式操作是transform/reduce,就是单行转换与跨行聚合操作。而聚合操作这个过程无法平行复合的,因为它需要跨行遍历sequence。当然,如果需要同时进行reduce操作,是可以在reduce里面同时进行两个操作,自己分别管理状态。但是这不是复合操作,没去完全合并独立的两个reduce操作。

基本上市面上的流式引擎大部分落在这个阶段,因为函数式编程仍处于初步阶段。kafka streams在此基础上通过rocksdb立置了状态管理,加上KSQL的解析引擎,在传统流式引擎方向是属于技术比较领先的了。

c. Transducer复合型架构-clojure

作为函数式鼻祖实用性分支的clojure认识到了映射性函数式的局限性,并开创了transducer技术。

transducer技术是什么呢? transducer不再关心sequence。而是关心函数的复合。transducer分为两个过程,transform跟reduce。前面讲过transform过程是可以自由组合的。

所以transducer的核心思想就是,数据不再是sequence,而是transform函数复合调用链,最后进行reduce对结果进行合并。
这个时候,数据流是可侵入的,因为transform的每个操作是单行处理的调用链,只有reduce是负责整体。

那么我看看一下前面这个问题,如何同时计算sum跟count呢?
我们可以使用juxt函数,将调用链同时传递给sum跟count。juxt进行reduce操作的时候,对sum跟count也进行reduce即可。

=> (into {} (x/by-key odd? (x/transjuxt {:sum (x/reduce +) :mean x/avg :count x/count})) (range 256))
{false {:sum 16256, :mean 127, :count 128}, true {:sum 16384, :mean 128, :count 128}}

对于transducer技术来说,算是一个很大的进步了。transducer在垂直复合的情况下,完全了reduce的水平复合,基本上可以说解决了99%的问题。那么另外的1%呢? transform的水平复合!
由于transform必须调用reduce来完成复合,所以两个transform仍然是无法复合的。比如我要对数据进行奇偶分类,接着对奇数进行平方,对偶数进行3次方。不进行reduce处理,它的中间transform结果是无法流式合并的。

d. Conduit复合型架构-haskell

最后haskell老大忍不住了,他毫不谦虚地喊道,小弟们,老夫就是专业搞复合技术几十年啊!
众人不解,前面该想到的办法都想到了,难道还有方法不成?
haskell老大无所畏慎地说了一句,去中心化。
这里我们就要开始提到conduit的思想了。
conduit基于pipe的概念。数据流可以是pipe, 处理逻辑也可以是pipe。tranform跟reduce是同样的pipe, reduce是有返回值的pipe。有了pipe之后,我们可以构建连接器进行融合。
对于前面的问题: 对数据进行奇偶分类,接着对奇数进行平方,对偶数进行3次方。
我们对于这两个pipe逻辑构建水平连接器, 接着调用垂直连接器从上游获取数据。当从上游获取数据后,水平连接器分别单行调用垂直连接器,输出合并后的结果。一口气分成了分流及合流操作。
对比于前面的transducer思想来说,数据的采集不再区分transform跟reduce,transform是多行结果数据, reduce是单行结果数据,统一由连接器收集组合。

基本思想如此,那我们今天的主角就是conduit,让我们看一看它是具体如此实现的。

二. 深入理解Conduit

1. Conduit的基本原理

Conduit的Pipe分为生产者Pipe, 转换器Pipe, 消费者Pipe。

  • 生产者Pipe是一个递归的惰性数据结构. 包括自己的值与生产下一个生产者Pipe的逻辑。
  • 转换器Pipe则是一个递归的惰性函数结构. 对于每个函数,接受输入值之后,产生新的Pipe. 新的Pipe可以是新的生产者Pipe,或者是部分消费的leftover Pipe,或者递归消费的转换器Pipe.
  • 消费者Pipe则是一种特殊的转换器Pipe,即为最后Folds(同reduce)操作完成的单行值。

所以,整个连接的过程大体上有三种

  • 单个pipe内部组合过程, 也叫自连接传递>>
  • 同种pipe相互组合的过程,叫做Zip
  • 不同种pipe的连接组合过程,叫做融合fuse

a. 单个Pipe内部组合的过程有两种基本操作:

  • 对于生产者Pipe为yield操作,多个yield操作对数据进行递归连接产生数据流。
  • 对于转换器Pipe为await操作,await操作之后可以yield新的数据,leftover将读取的原始数据重新连接回去, 连接新的await递归生成新的数据流

b. 同种Pipe Zip显然有三种:ZipSource,ZipConduit,ZipSink。

  • 由于这里是单线程, 对于生产者ZipSource,仅仅只是将两个数据流结对组合起来,比较简单。
  • ZipSink则是将两个Fold 消费者Pipe结对起来, 比如同时计算的sum跟count。
  • ZipConduit则是分别水平组合两个转换Pipe,将输出的结果依次连接起来,由于不同pipe对于同样的数据输入输出逻辑不一样,所以没有固定的顺序,谁输出就连接谁。

c. 不同Pipe之间融合逻辑比较通用

融合逻辑是Conduit的核心,也是流式架构大杀器。
当然在极少数情况下,融合操作比较困难的情况下,可以选择直接将函数逻辑与Pipe相连,而不一定需要融合。这里不过多展开介绍.

为了进一步理解融合的过程,我们对前面的一些概念进行具体定义。

  • 生产者pipe yield连接出的pipe定义为:
    HaveOutput [next HaveOutput] o
  • 消费者pipe await读取输入的pipe定义为:
    NeedInput (i -> [new Pipe]) (u -> [new Pipe])
    这里await的pipe分为两个分去,一个有数据输入,一个是数据读取结束
  • 消费者pipe await递归连接之后,还可以推回消费值Leftover, 生成新的HaveOutput
    Leftover [new Pipe] l

介绍了基本的Pipe类型,我们可以简单了级一下Pipe的融合过程.
组合时,拨开右边发现是NeedInput,我们从左边读取数据交给右边产生新的Pipe,如果新产生的Pipe依然是NeedInput接着递归下去,直至结束,两边递归完成。当然这里只是最简单的一种情况,完全的情况我们后面在源码里面详细讲解。

2. Conduit的高级技巧

前面讲了基本的传递自连接>>,融合fuse及zip操作。
这里介绍得是一些便于使用的高级功能。

  • Chunk功能
  • 消费策略

a. Chunk功能

由于数据处理中涉及批量读取的过程,即一次读取n条数据。对于常用的sequence方案来说,一般会进行concat手工拆解,带来了不必要的麻烦跟性能开销。
Conduit提供了Chunk原生实现,及基于Chunk函数,可以原生操作在底层的数据中。比如读取文件时,一下子读取1024个字节,分多批次读完。有了chunk函数之后,我们可以直接基于chunk实现丢弃掉2000个字节,而不需要进行对字节拼接连之后再做处理。

b. 消费策略

消费策略分为两种,一种是输出消费策略,另一种是分层消费策略

  • 输出消费的意思是,仅消耗输出需要的数据。如果输出不需要输出,那么融合的过程是不需要去读取父Pipe的数据的。
  • 分层消费策略,就是可以对数据进行分层处理。传统的基于sequence的操作take读取部分操作之后,数据流是直接丢弃的,不能进行分层读取。而在conduit中可以对于前面未消耗的数据递交给下面的逻辑消费,达到分层消费。但在未些情况下,不太明确程序内部逻辑的情况下,可以强制消费读取数据。比如先读取10条数据给子转换器处理,子转换器不知道是否进行了消费,我们可以对未消费的数据进行强制消费操作,最终紧接着的转换逻辑消费的必定是10条之后的数据。

三. Conduit源码剖析,好好玩!

a. Conduit类型定义
b. Conduit的传递与融合
c. Conduit的基本函数-yield, wait, lift, leftover
d. Conduit的组合函数-yieldMany, awaitForever, fold, map
e. Conduit的应用函数
e. Conduit的Chunk机制

显示全文