MongoDB 基础系列十六:增删查改 CRUD Concepts 之两阶段提交

前言

此篇博文是 Mongdb 基础系列之一;

本文为作者的原创作品,转载需注明出处;

简介

Operations on a single document are always atomic with MongoDB databases; however, operations that involve multiple documents, which are often referred to as “multi-document transactions”, are not atomic. Since documents can be fairly complex and contain multiple “nested” documents, single-document atomicity provides the necessary support for many practical use cases.

注意,作用在多个文档中上的事务,不具备原子性;

备注,前文多次提到当事务作用到一个包含内嵌文档的文档上的时候,不能保证内嵌文档的事务原子性,这里也隐含的提到了这种情况;不过,还没来得及验证这种情况;如果事务原子性不能作用到内嵌文档上,那么 MongoDB 数据库对事务的支持上显得太弱了;

虽然 MongoDB 对事务原子性的限制特别严格,但是我们仍然对多个文档的操作,多个操作之间的事务性有如下的诉求:

  • Atomicity( 事务原子性 ):如果事务中的某一步操作失败,那么前面所执行的步骤需要“回滚”( roll back ),使数据恢复到之前的状态;
  • Consistency( 数据一致性 ):如果是因为网络或者硬件上的错误导致事务失败,那么数据库本身必须能够提供能够将数据进行恢复的功能;

要能够使得 MongoDB 支持 “multi-document transactions” 既多文档的事务操作,必须通过你的应用程序来实现,通过你的应用程序来实现两阶段提交来保证在对多文档操作的事务完整性;下面,我们通过一个例子来看看如何通过一个简单的应用来实现对 MongoDB 的两阶段提交事务操作;

在应用层上实现两阶段提交的例子

概述

考虑这样的一种场景,账户 $A$ 向账户 $B$ 进行转账;在关系型数据库中,事务的实现很简单,只需要在多个 SQL 执行上设置事务即可;但是 MongoDB,你就得自己在应用层模拟实现两阶段提交的方式来保证事务的完整性;下面我们就来看看这个例子,

初始化数据

该例子中需要创建两个 collections,

  • accounts collection 用来存储账户 $A$ 和账户 $B$ 的信息
  • transactions collection 用来存储转账过程的事务信息

下面就来初始化这两个 collections 以及所对应的数据

  • 首先,在 accounts collection 中分别创建两个账户 $A$ 和 $B$ 的数据信息;

    1
    2
    3
    4
    5
    6
    db.accounts.insert(
    [
    { _id: "A", balance: 1000, pendingTransactions: [] },
    { _id: "B", balance: 1000, pendingTransactions: [] }
    ]
    )

    注意,pendingTransactions 字段,该字段中存储了当前账户正在执行的事务操作

  • 然后在 transactions collection 中创建如下的事务数据,

    1
    2
    3
    db.transactions.insert(
    { _id: 1, source: "A", destination: "B", value: 100, state: "initial", lastModified: new Date() }
    )

    下面我们分别来看看各个字段的意义,

    • source:源账户的 _id
    • destination:目标账户的 _id
    • state:表示当前事务的状态,其值可以是 initial, pending, applied, done, canceling, and canceled
    • lastModified:最后一次更新的时间;

    上述的记录即表示从账户 $A$ 转账 100 到 $B$ 的初始事务数据,state 为 initial,lastModified 为 current date

转账过程

正常流程

  1. 获取状态为 initial 状态的 Transaction $T_a$

    1
    var t = db.transactions.findOne( { state: "initial" } )

    将会得到如下的数据,

    1
    { "_id" : 1, "source" : "A", "destination" : "B", "value" : 100, "state" : "initial", "lastModified" : ISODate("2017-07-27T03:13:22.850Z") }
  2. 将 $T_a$ 的状态设置为 pending

    1
    2
    3
    4
    5
    6
    7
    db.transactions.update(
    { _id: t._id, state: "initial" },
    {
    $set: { state: "pending" },
    $currentDate: { lastModified: true }
    }
    )

    注意,该 update 语句中的 initial 状态作为 update 的匹配条件,目的是为了避免其它进程同时修改 transaction 的状态信息;

    备注,这里为什么一定可以保证其它进程不会同时修改该 document 的状态?如果是熟悉关系型数据库事务特性的读者一定会提出这样一个疑议,假设我们有两个 Processes,Process $P_1$ 和 Process $P_2$,两个 Process 同时获取得到 $T_a$ (同一时间并发读取),然后,对然后,它们一起并发执行上述的 update 语句,在关系型数据库中,如果在没有事务锁的情况下,它们一定可以同时得到 initial 状态的 $T_a$,然后分别对其进行 udpate 操作,然后.. 对然后,整个两阶段事务的完整性就被完全破坏掉了;是的,这种情况在关系型数据库中会发生,而且两阶段事务的完整性的确也就被破坏掉了;但问题是,我们现在使用的是 MongoDB,别忘了,默认情况下,MongoDB 的事务原子性是作用在单个 document 之上的,而且,是默认加排他锁的,也就是说上述的 update() 操作是按照顺序执行的,所以一定不会存在两个事务同时 update $T_a$ 的情况,所以,通过在 update 过程中添加 initial 状态参数就一定能够避免其它进程同时修改 transaction 状态的可能性;

  3. 将事务作用到 account $A$ 和 $B$ 上

    对源账户 $A$ 进行扣减动作

    1
    2
    3
    4
    db.accounts.update(
    { _id: t.source, pendingTransactions: { $ne: t._id } },
    { $inc: { balance: -t.value }, $push: { pendingTransactions: t._id } }
    )

    注意,在对源账户 $A$ 进行扣减动作之前,需要使用条件 pendingTransactions: { \$ne: t._id },( $ne 表示不包含 ),确保没有其它进程已经对对该账户 $A$ 进行了 $T_a$ 事务;在扣减动作执行成功以后,将事务 $T_a$ 追加入账户 $A$,表示账户 $A$ 正式被纳入两阶段事务中;

    对目标账户 $B$ 进行增加动作

    1
    2
    3
    4
    db.accounts.update(
    { _id: t.destination, pendingTransactions: { $ne: t._id } },
    { $inc: { balance: t.value }, $push: { pendingTransactions: t._id } }
    )

    同样,在对账户 $B$ 进行追加 update 操作之前,需要确保没有相同的事务已经作用到 $B$ 上了;同样,在追加动作执行成功以后,将 $T_a$ 追加入 $B$,表示账户 $B$ 正式被纳入两阶段事务中;

  4. 将事务 $T_a$ 的状态设置为applied,表示两阶段事务已经应用到事务数据( 既是账户 $A$ 和 $B$ )上了;

    1
    2
    3
    4
    5
    6
    7
    db.transactions.update(
    { _id: t._id, state: "pending" },
    {
    $set: { state: "applied" },
    $currentDate: { lastModified: true }
    }
    )
  5. 将事务分别从 $A$ 和 $B$ 中的 pendingTransactions 队列中移除;
    从 $A$ 中移除,

    1
    2
    3
    4
    db.accounts.update(
    { _id: t.source, pendingTransactions: t._id },
    { $pull: { pendingTransactions: t._id } }
    )

    从 $B$ 中移除,

    1
    2
    3
    4
    db.accounts.update(
    { _id: t.destination, pendingTransactions: t._id },
    { $pull: { pendingTransactions: t._id } }
    )
  6. 将事务 $T_b$ 的状态设置为done,表示该两阶段事务成功结束,

    1
    2
    3
    4
    5
    6
    7
    db.transactions.update(
    { _id: t._id, state: "applied" },
    {
    $set: { state: "done" },
    $currentDate: { lastModified: true }
    }
    )

异常流程

两阶段提交事务要确保事务中某个操作发生异常,database 会恢复到 consistant 的状态;下面,以 30 分钟为限,如果某个两阶段提交事务再该时间内仍然没有完成,便认为当前事务出现了某种异常,需要执行回滚;

首先来分析一下两阶段事务的所有状态,initial, pending, applied, done, canceling 以及 canceled,很明显,当 transaction 状态处在 pendingapplied 的时候,若发生异常,是需要进行回滚操作的;下面就分别针对这两种状态来分析如何进行回滚操作来保证事务的完整性,

在 Pending 状态下的回滚操作

首先,来分析一下,Pending 状态下当前数据可能存在的情况,

  1. 只执行到了正常流程中的 #2 步,并没有执行到 #2;
  2. 不仅执行到了正常流程中的 #2 步,而且执行到了 #3;
    可以是完全执行完 #3,也可以是部分执行完 #3 的步骤;

所以,由上述分析可知,账户的信息 $A$ 和 $B$ 有可能发生了变换,也有可能没有发生过变化;这里要回滚,要注意的是,如何判断 $A$ 和 $B$ 的变化情况;不过好在,每次更新账户 $A$ 和 $B$ 的时候,我们都需要将 $T_a$ 加入字段 pendingTransactions,所以可以从该字段是否包含 $T_a$ 来判断 $A$ 或者是 $B$ 发生过变化,这样,我们就可以分别针对 $A$ 和 $B$ 来进行回滚操作了;于是,我们便有了如下的回滚操作,

① 找到哪些 Pending 状态的异常事务且需要回滚
1
2
3
4
var dateThreshold = new Date();
dateThreshold.setMinutes(dateThreshold.getMinutes() - 30);

var t = db.transactions.findOne( { state: "pending", lastModified: { $lt: dateThreshold } } );

可以看到,如果找到比当前时间还小 30 分钟并且处于 pending 状态的 transaction 将会被视为事务异常,将要进行回滚;

② 将 transaction 的状态设置为 canceling
1
2
3
4
5
6
7
db.transactions.update(
{ _id: t._id, state: "pending" },
{
$set: { state: "canceling" },
$currentDate: { lastModified: true }
}
)

表示当前的事务要执行回滚的操作了;

③ 将发生过变化的账户进行回滚

针对账户 $A$ 和 $B$ 需要分别处理;源账户 $A$ 是支出方,所以回滚的时候,需要累加回支付金额;而目标账户 $B$ 是接收方,回滚的时候需要扣减金额;

首先,来看看如何对 $A$ 账户进行回滚,

1
2
3
4
5
6
7
db.accounts.update(
{ _id: t.source, pendingTransactions: t._id },
{
$inc: { balance: t.value},
$pull: { pendingTransactions: t._id }
}
)

$A$ 是支出方,所以,这里对其累加;但是,问题是,如果 $A$ 并没有支付给 $B$ 呢,这样做岂不是让 $A$ 多了 100 块?不用担心,注意这里回滚操作的条件是 pendingTransactions = t._id 正是它保证了 $A$ 一定支付了这笔钱;注意,为了表示操作成功,最后需要将 t._idpendingTransactions 中移除;

再次,来看看如何对 $B$ 账户进行回滚,

1
2
3
4
5
6
7
db.accounts.update(
{ _id: t.destination, pendingTransactions: t._id },
{
$inc: { balance: -t.value },
$pull: { pendingTransactions: t._id }
}
)

和对 $A$ 的操作类似,唯一不同的是,这里只需要将 $B$ 账户接收到的款项扣除即可;

④ 将 transaction 的状态设置为 cancelled
1
2
3
4
5
6
7
db.transactions.update(
{ _id: t._id, state: "canceling" },
{
$set: { state: "cancelled" },
$currentDate: { lastModified: true }
}
)

将 transaction 的事务状态设置为 cancelled 既表示回滚操作完成;

发发牢骚

大多数同学学到这里的时候,基本上也就离开了,却忘记了还有一道风景,忘了;笔者的牢骚便是,如果在执行回滚操作的时候,发生了异常,那又该如何?

要回答这个问题,我们首先看看回滚过程中对应事务状态有哪些?一旦进行回滚以后,就两种状态 cancelingcancelled;那么账户 $A$ 和 账户 $B$ 所对应的数据状态又是怎样的呢?三种状态,1. 完全没有回滚;2. 部分回滚;3. 全部回滚了,但是 transaction 还没有来得及将自身的状态设置为 cancelled;这三种情况都可以由 pendingTransactions 来表述,如果完全没有回滚,那么 pendingTransactions 不会发生改变;如果是部分回滚,那么回滚成功以后的账户将会把 t._id 从 pendingTransactions 中移除,表示我已经成功回滚;如果都回滚成功,$A$ 和 $B$ 相关的 t._id 都会从 pendingTransactions 中移除,表示都回滚成功了;

所以,如果在回滚的时候发生错误,因为 pendingTransactions 已经为账户信息的回滚动作保存了当前的状态,所以只需要再次回滚(既重试)直到成功即可;

在 Applied 状态下的回滚操作

当 transaction 处在这种状态下的时候,表示,$A$ 和 $B$ 已经完成了相应的交易操作,数据该扣的扣除了,该加的也加了,表示事务性操作其实都已经完成了;所以到这一步的时候,不应该全部去 roll back 数据;而应该是,继续从正常流程的第 5 步开始执行,直到事务的状态变更为 done 为止;

总结

如果想要使用 MongoDB 来保证复杂事务的完整性,会非常非常的麻烦,需要自己在应用层模拟实现两阶段提交事务才能保证,而该实现过程是非常非常的麻烦的;所以,在复杂的事务场景下,作者强力不推荐使用 MongoDB,而应该考虑结合使用关系型数据库的方式;