【EFCore】笔记19 并发控制

在对数据库中进行数据处理的时候,可能会有并发问题。因为一般数据库的 UPDATE、INSERT 等简单语句并不是原子的,当大量数据库连接同时进行数据更新或插入的时候,可能会发生冲突。

在实际开发中,应该将并发问题在客户端或服务端解决,尽量不要将并发问题引入数据库层面。但如果真的要对数据库进行并发控制,在 EFCore 也是有办法的。EFCore 的并发控制分为悲观、乐观两种策略,分别对应悲观锁和乐观锁。

悲观并发控制即对数据加锁,主要有行锁、表锁等。加锁确保同一时间只有一个连接可以对数据资源进行处理。不同数据库支持在 SQL 脚本中增加锁,且不同数据库的语法不一样,EFCore 本身没有封装数据库悲观锁的使用,所以只能通过程序员手动编写 SQL 脚本来实现。

以 MySQL 为例,通过 SELECT FOR UPDATE 语法可以在选择一行数据的时候加上行锁,Book 的 Sold 属性表示本书是否已经出售:

using(MyDbContext ctx = new MyDbContext())
using(var ts = ctx.Database.BeginTransaction())
{
    Console.WriteLine("WAITING " + DateTime.Now);
    var book = ctx.Books.FromSqlInterpolated<Book>
        (@$"SELECT * FROM T_Books WHERE Id = 1 FOR UPDATE").First();
    if (book.Sold)
    {
        Console.WriteLine("!!!");
        Console.ReadKey();
        return;
    }
    Console.WriteLine("SELECTED " + DateTime.Now);
    Thread.Sleep(10000);
    book.Sold = true;
    ctx.SaveChanges();
    Console.WriteLine("SAVED " + DateTime.Now);
    ts.Commit();
    Console.ReadKey();
}

由于锁需要使用到事务,所以需要用 DbContext.Database.BeginTransaction 方法创建一个事务,然后在所有操作结束后使用 Commit 方法提交事务,这里注意必须在 SaveChanges 后提交,否则锁是无效的。

加上悲观锁后,每一个程序实例运行到 FromSqlInterpolated 这一行时都要先看看锁是否占用,如果有那么则一直等待,直到锁释放。这样的悲观策略对于并发性能的影响是很大的,如果请求很多,那么就需要一直排队,效率就非常低了,并且还可能有死锁问题。

一个更推荐的策略是乐观锁,乐观锁使用 CAS(Compare And Swap)方法,CAS 即通过并发令牌(Concurrency Token)来判断数据是否已经被修过:当程序查询数据时,会通过记录并发令牌来记录当前数据的状态,在程序写入数据时,会将记录的状态与当前的状态对比,如果相同则说明中途没有其他程序修改过数据,那么写入是安全的,如果不同则说明发生了并发冲突。

从这里可以看出,必须保证并发令牌可以唯一的表示数据状态,在 EFCore 中,使用 PropertyBuilder.IsConcurrencyToken 方法来配置并发令牌:

public void Configure(EntityTypeBuilder<Book> builder)
{
    builder.ToTable("T_Books");
    builder.Property(e => e.Title).HasMaxLength(50).IsRequired();
    builder.HasOne<Author>(e => e.Author).WithMany(e => e.Books).IsRequired();
    builder.Property(e => e.Sold).IsConcurrencyToken();
}   

之后就可以正常进行查询了,如果出现并发冲突会抛出DbUpdateConcurrencyException 异常:

using(MyDbContext ctx = new MyDbContext())
{
    Console.WriteLine("WAITING " + DateTime.Now);
    var book = ctx.Books.First();
    if (book.Sold)
    {
        Console.WriteLine("!!!");
        Console.ReadKey();
        return;
    }
    Console.WriteLine("SELECTED " + DateTime.Now);
    Thread.Sleep(10000);
    book.Sold = true;
    ctx.SaveChanges();
    Console.WriteLine("SAVED " + DateTime.Now);
    Console.ReadKey();
}

这里需要注意的是,乐观并发控制在查询时是不等待的,只有在写入时才会检查是否发生并发冲突。这样的好处在于对于并发性能的影响比较小,所以乐观锁一般比悲观锁更好。但如果冲突发生的概率太大的话,乐观锁就会不断地重试,此时处理成本可能会比悲观锁还高。

上面是将一个现有属性设置为并发令牌,这样做可能会有 ABA 问题,或有时你无法给出一个属性可以作为并发令牌的话,就需要引入一个新的属性。

在 EFCore,可以借用属性的 IsRowVersion 配置,如果一个属性被配置为 RowVersion,那么 EFCore 会自动将它设置为并发令牌,并且在对应的数据库分配特殊的时间戳类型给它(在 SQLServer,该类型是 rowversion 或 timestamp),每次插入或更新该行数据时,EFCore 会自动为 RowVersion 列生成一个新的值。

以 SQLServer 为例,在 C# 代码中只需将此属性设置为 byte[] 类型,然后通过 PropertyBuilder.IsRowVersion 方法配置即可:

builder.Property(e => e.Version).IsRowVersion();

不过,取决于数据库,RowVersion 映射的数据类型是不同的。在 SQLServer 中,可以确保 rowversion 或 timestamp 类型的精度足够以应付高并发情况,但在 MySQL 中,可能会出现 timestamp 类型精度不足,导致在高并发下依旧会发生冲突。

本质上 RowVersion 和 ConcurrencyToken 是一样的,都是乐观锁用于判断数据状态的列而已。区别在于 ConcurrencyToken 一般可以使用任何类型的属性,包括现有的属性,它不会自动更新,只能被手动修改,并且可能为空。而 RowVersion 一般只能使用时间戳类型,它会在数据变更的时候自动更新自身,不需要手动修改,并且一般永不为空。

如果 RowVersion 不能满足需求(例如精度不够的情况下),那么也可以使用 Guid 类型的 ConcurrencyToken,只要在每次修改数据的时候手动生成一个新的 Guid 即可。

在某些时候我们可能需要一个 RowVersion,却不想将它作为并发令牌,这个时候就可以这么配置:

builder.Property(e => e.Version).IsRowVersion().IsConcurrencyToken(false);

这样的话,虽然每次修改都会自动更新 RowVersion 的值,但不会在查询的时候进行并发对比,此时的 RowVersion 仅仅是一列会自动更新的普通字段而已。

接下来的问题是,使用乐观锁时,在发生并发冲突后,EFCore 会抛出 DbUpdateConcurrencyException 异常,这个时候我们可能需要进行冲突处理:

using(MyDbContext ctx = new MyDbContext())
{
    bool SaveFailed;
    do
    {
        SaveFailed = false;
        Console.WriteLine("WAITING " + DateTime.Now);
        var book = ctx.Books.First();
        if (book.Sold)
        {
            Console.WriteLine("!!! " + book.Price);
            Console.ReadKey();
            return;
        }
        Console.WriteLine("SELECTED " + DateTime.Now);
        Thread.Sleep(10000); //便于调试
        book.Sold = true;

        try
        {
            ctx.SaveChanges();
        }
        catch (DbUpdateConcurrencyException ex)
        {
            Console.WriteLine("ThrowException " + DateTime.Now);
            SaveFailed = true;
            ex.Entries.Single().Reload();
        }
    } while (SaveFailed);
    Console.WriteLine("SAVED " + DateTime.Now);
    Console.ReadKey();
}

在上面的代码中,使用 try-catch 捕获到异常之后,我们通过 DbUpdateConcurrencyException.Entries 获取发生冲突异常的这些实体对象,然后通过 Reload 方法从数据库重新读取它们的值,这样才能在下次重试中使用新的状态。由于这里发生异常的实体对象只有一个,所以使用了 Entries.Single 方法,如果更新多个实体对象的时候发生了异常冲突,我们可能需要对 Entries 的每一个实体对象都执行 Reload 方法。

注意到上面处理异常的方式是从数据库中读取新的值(Database Wins)。有时候我们可能需要使用用户输入的值来重写(Client Wins),那么应该执行 Entry.OriginalValues.SetValues 方法:

catch (DbUpdateConcurrencyException ex)
{
    Console.WriteLine("ThrowException " + DateTime.Now);
    SaveFailed = true;
    foreach(var e in ex.Entries)
    {
        e.OriginalValues.SetValues(e.CurrentValues);
    }
}

这个时候,下次重试时,实体对象储存的值就是上一次用户输入的保存失败的值。无论是 Database Wins 还是 Client Wins,最终影响的只是实体对象储存的值,并不会影响数据库的数据,在重试的时候还是得通过 SaveChanges 来更新数据库。