一致性是分布式系统中常见的问题,大多数业务系统在CAP中都会选择AP而牺牲强一致性,workflow微服务也不例外,这篇文章想结合所开发的workflow微服务的两个典型的业务场景来聊一聊如何保障最终一致性。

UserTask操作的一致性

UserTask是审批流的核心组成部分,也是BPMN中最常见的task类型之一,用以表示需要人工介入的任务,一般需要支持claim,delegate,resolve,complete等操作以满足不同业务场景的需要,目前在我们的实际业务使用中仅支持complete操作,用户的通过或驳回操作底层都是调用complete只是传递不同的参数。 其核心组成有两大块:

  1. user assignment,其中UserTask域只存储user id或group id,而用户域负责根据type和id进行解析;
  2. form,用户审批时一般在UI上需要展示相关信息,因此除了字段类型和数据之外还需要存储UI展示所需要的数据,与UI组件有一定的耦合。 目前我们线上的业务方使用场景如下图:

2022101301 copy

这里可以看到有两个主要的业务场景:1)创建user task实例,需要通知下游Todo模块以保证相关用户可以收到提醒;2)用户完成user task,需要保证Todo被标记为已完成同时通知服务侧执行下一步。

第一个场景的业务为工作流走到userTask的时候执行对应的activityBehavior,根据userId查询candidate并创建task实例,并根据candidates到下游为每个candidate创建Todo,入口在服务侧,如果消息丢失则会导致相关用户收不到未做事项提醒从而导致流程卡住。 这里有一个注意点是下游在做幂等性消费的时候已完成的Todo也需要进行查询,否则可能出现这样一种情况:1. workflow service发送userTaskCreatedEvent(userTask:1, candidate:a),2. 下游收到之后创建一条Todo(userTask:1,user:a,status:new),发送确认消息但丢失或者延迟较大导致workflow service未收到,3. user: a审批完task之后Todo(userTask:1,user:a,status:done),即软删除,4. workflow service重发userTaskCreatedEvent(userTask:1, candidate:a),5. 下游收到之后根据userTask=1 and user=’a’ and isDeleted=0查询Todo返回空,再次创建一条Todo。

第二个场景的业务入口在单体系统中,对应的业务方还没有微服务化,逻辑上来说调用顺序应如:下游业务发送complete event->工作流微服务发送task completed event->Todo标记对应的数据为已完成。但从图上可以看到,Todo模块和下游业务方都在单体系统中,因此为了简化流程,我们将clear Todo和send complete event包装在一个transaction中,具体来说通过将event注册到synchronization上,等tx commit之后再发送event。 至于这里为什么要采用transactional event,是因为业务方在调用完complete task之后往往还有一些额外的操作,比如业务方自实现的状态保存,这些操作有可能会失败导致transaction回滚,如果不把event延迟到tx commit之后再提交,可能出现Todo因为tx rollback而没有清除但event已经发送出去导致服务侧的workflow已经将对应的userTask标记为completed。

可以注意到这里两个场景可以构成一个业务闭环,userTask从创建到完成每一次状态跃迁都是由event驱动,如图所示:

1

已知TaskCreated->TaskNotified的时候可能会出现消息丢失,因此我们会定时轮询状态为TaskCreated并且updated_at超出1min的进行消息重发,而TaskNotified->TaskCompleted也可能出现消息丢失,因为前文提到上游单体系统只是将event的发出时间推迟并没有保证消息不丢,因此在这种场景下,我们同样通过轮询状态为TaskNotified并且updated_at超出1d的task,然后到Todo侧查询是否存在未完成的Todo,如果存在意味着用户确实没有完成相应的task则将updated_at更新为current,如果不存在则说明用户已经完成操作,在workflow服务端将task complete从而保证流程能正常走下去。 至于为什么TaskCreated可以直接跃迁至TaskCompleted,是因为可能出现前文提到的Todo的确认消息丢失或者延迟,用户操作了complete,workflow服务在收到Todo确认之前先收到了complete task的消息,这种情况下也是可以直接将状态标记为completed。

ServiceTask执行的一致性

ServiceTask是BPMN中常见的一种task类型,在内嵌式的业务编排工作流当中常被用做执行本地Java代码以满足业务扩展需求,但在微服务架构下业务代码分布在各个业务节点,flowable默认的实现很明显并不能满足我们的需求,为此我们提供了基于service query(读)和service command(写)的执行策略,当流程实例走到ServiceTask的时候会发送消息对应的topic,下游消费者订阅相关topic之后执行对应的业务代码并通过消息发送执行结果,服务侧会订阅该topic并根据返回结果是否出错决定走什么分支,整体流程如下:

2022101301

其中关于task的输入输出是另外一个大的话题,这里暂且不表,我们仅关注于跨节点可能引入的状态不一致问题。 与UserTask类似,一个完整的业务流程最终会走回服务侧,即ServiceTask执行的开始和结束都在workflow服务端,因此这里也是通过业务自查来保证task被下游执行。 我们在上游(即服务侧)引入本地数据库表,在TaskRequestProduer发送消息之前先将task的相关信息存入表中,状态标记成unfinished,下游收到消息之后根据实际业务场景做幂等处理,当然对于只读场景(上文提到的service query)则没有幂等性要求,下游完成task execution之后则会发送消息到服务侧,考虑到收到消息意味着要调用ServiceTaskActivityBehavior.trigger,此时服务侧会继续执行processInstance直到下一个async continuation才会停止,意味着此刻才会提交topic partition offset,这个时间是不固定的,因此我们在consumer中做了异步trigger,具体来说会将其存成job同时把task表中的数据标记成finished,job后续会被asyncExecutor获取并执行,这里多说一句,job的获取绝大多数情况都是通过TransactionSynchronization注册到当前事务,在afterCommit会直接提交到asyncExecutor执行,避免轮询。除此之外,服务侧每个节点都会有后台线程定时检查超时未完成的task record,若发现有这样的record则会重发消息。

小结

与RocketMQ的事务消息相比,虽然本地消息表的性能会差一大截,但考虑到我们的业务场景消息量并不算大,根据旧系统的数据估算,单个DC每天的消息量大约略高于100w,假设因为引入本地消息表导致的额外SQL性能消耗10ms,那么3台server只需要1h就能完成100w+的消息投递,因此对实际的业务并不会造成什么影响。 除此之外,仅仅在工作流服务侧通过RocketMQ保证消息不丢失还不够,下游在发送complete userTask或者返回serviceTask执行结果的时候也需要保证消息不能丢失,但由于我们的下游是单体系统,其消息中间件也是基于kafka,如果要在下游引入本地消息表,其开发成本也不低,因此相比较于RocketMQ,通过引入中间状态和本地消息表来实现事务一致性更适合我们的业务场景。