# distributed, 2023-05-20
这是一篇原本要作为产品介绍的文章,但种种原因搁置了,最近又在完善相关功能,索性就完善一下发出来。感恩 JmPotato 哥哥的带飞~
代码请见:roadmap & TiKV tracking issue
Flashback(通常指 Oracle Flashback)是用于在用户发生误操作的时候,快速回滚至原先版本,避免产生重大损失的特性。
游戏行业中会不时出现版本错误等问题,定期的备份只能回滚到备份时间点,且浪费资源。TiDB v6.4.0 引入了 FLASHBACK CLUSTER TO TIMESTAMP 语法,其功能是将集群、数据库、数据表的数据恢复到特定的时间点。
在 TiDB 中,存在一些相关的功能:
我们最终采用在多版本并发控制(MVCC)的基础上,取出 TIMESTAMP 之前的最新时间戳数据来覆盖当前的数据。
『详细操作请参考 用户文档』
mysql> CREATE TABLE t(a INT);
Query OK, 0 rows affected (0.09 sec)
mysql> SELECT * FROM t;
Empty set (0.01 sec)
mysql> SELECT now();
+---------------------+
| now() |
+---------------------+
| 2022-09-28 17:24:16 |
+---------------------+
1 row in set (0.02 sec)
mysql> INSERT INTO t VALUES (1);
Query OK, 1 row affected (0.02 sec)
mysql> SELECT * FROM t;
+------+
| a |
+------+
| 1 |
+------+
1 row in set (0.01 sec)
mysql> FLASHBACK CLUSTER TO TIMESTAMP '2022-09-28 17:24:16';
Query OK, 0 rows affected (0.20 sec)
mysql> SELECT * FROM t;
Empty set (0.00 sec)
我们在 Flashback 中采用 Region 锁来阻断 Flashback 过程中的所有读写以及调度,避免产生任何可能导致数据不一致的外部因素。
通过给 Region 先上锁再 Flashback,我们可以获得以下好处:
TiDB 首先会去判断:
让我们从 TiKV 启动开始说起 :)
从 cmd/tikv-server/main.rs 开始,TiKV 完成配置一系列参数后,main 函数末尾走到 server::run_tikv(config) 运行 TiKV server,通过 main 中所配置的参数选择对应 Engine,调用 run_impl 函数,在 run_server 中进行绑定,并启动 grpc_server.start(); 服务。具体的绑定启动流程可以查看此文档。
本文关注的 Flashback 位于 kvproto 对应的此处
rpc KvPrepareFlashbackToVersion(kvrpcpb.PrepareFlashbackToVersionRequest) returns (kvrpcpb.PrepareFlashbackToVersionResponse) {}
rpc KvFlashbackToVersion(kvrpcpb.FlashbackToVersionRequest) returns (kvrpcpb.FlashbackToVersionResponse) {}
回到 TiKV 代码,TiKV 包含多个 gRPC service。其中最重要的一个是 KvService,位于 src/server/service/kv.rs 文件中。包括 TiKV 的 kv_get,kv_scan,kv_prewrite,kv_commit 等事务操作的 API。本文 Flashback 由于采用事务模型,很自然地放在此文件中。
让我们先来纵览一下 Flashback 整体流程,抽出主要代码大致可以看出主要就是四步:
// First Phase
fn future_prepare_flashback_to_version() {
// 1. prepare the raftstore for the later flashback.
send_flashback_msg(.., AdminCmdType::PrepareFlashback);
// 2.prewrite the first user key to prevent `resolved_ts` from advancing.
let (cb, f) = paired_future_callback();
res = storage.sched_txn_command(req.clone().into(), cb);
}
// Second Phase
fn future_flashback_to_version() {
// 3. execute overwrite and commit the first user key.
let (cb, f) = paired_future_callback();
let res = storage_clone.sched_txn_command(req.into(), cb);
// 4. notify raftstore flashback has been finished.
send_flashback_msg(.., AdminCmdType::FinishFlashback);
}
那就让我们首先进入第一阶段:prepare 模块!
Prepare 函数的目的可以概况为:停读停写停调度,持久化 Flashback 状态,停止 resolved_ts 的推进。
首先介绍“停读写停调度”,具体原因已经在背景介绍小节中解释过。为了实现这个目的,直觉上是在所有读写调度任务执行处进行隔断。由于 TiKV 底层采用 Raft 协议,最终会进行 Propose,Commit 然后 Apply 的流程,那么很自然地在进行 Propose 前隔断掉读写调度。
此处剧透 Flashback 实现为:在 Propose 处快速隔断掉 Flashback 过程中的其他读写,在 Apply 处隔断进行兜底。
通过此文章可以了解到在 TiKV 处理 Proposal 的大致流程 TiKV 源码解析系列文章(十八)Raft Propose 的 Commit 和 Apply 情景分析
一言以蔽之:TiKV 使用了两个线程池来处理 Proposal,并且将一个 Raft Peer 分成了两部分:PeerFsm和 ApplyFsm。在处理 Proposal 的过程中,首先由 PeerFsm获取日志并驱动 Raft 内部的状态机,由 ApplyFsm根据已提交日志修改对应数据的状态机(region 信息和用户数据)。
可通过此文章了解 PeerFsm 和 ApplyFsm TiKV 源码解析系列文章(十七)raftstore 概览
在 “PeerFsm获取日志并驱动 Raft 内部的状态机”时,会走到下面函数:
fn propose_raft_command_internal() {
match self.pre_propose_raft_command(&msg) { .. }
if self.fsm.peer.propose(self.ctx, cb, msg, resp, diskfullopt) {
self.fsm.has_ready = true;
}
}
我们发现 PeerFsmDelegate::pre_propose_raft_command 函数会在 propose request 前进行检查。
那么很自然地将判断当前 request 是否为 Flashback 的检查放在此处。
需要关注的一个地方是:
当然不能全盘隔断,Flashback 也需要走 raft 流程,应当给予通行证。
随之我们发现在 RaftCmdRequest 中的 header 里面有个 flags,可以将通行证放在此处。
fn pre_propose_raft_command(
&mut self,
req: &RaftCmdRequest,
) -> Result<Option<RaftCmdResponse>> {
// When in the flashback state, we should not allow any other request to be proposed.
if self.region().is_in_flashback {
let flags = req.get_header().get_flags();
if !flags.contains(FLASHBACK) {
return Err;
}
}
}
在完成“隔断点”的安插后,为了让 Flashback 相关操作不受隔断顺利通行,我们会在 header 处加上 flag。
// First Phase
fn future_prepare_flashback_to_version{
// 1. prepare the raftstore for the later flashback.
send_flashback_msg(.., AdminCmdType::PrepareFlashback);
}
所调用的 start_flashback/end_flashback 函数在加上 flag 后发送 admin req.
async fn start_flashback/end_flashback {
...
req.mut_header()
.set_flags(WriteBatchFlags::FLASHBACK.bits());
// call admin request directly
let raft_router = raft_router.lock().await;
raft_router.send_command(req, cb, ...)
...
}
Admin request 通过 RaftStoreRouter 构成一条 RaftCommand 发送,会按照 Propose 流程,通过 pre_propose 的检查后到 PeerFsmDelegate.fsm.peer.propose 完成 Propose 一条 Raft Log。
之后 PeerFsm 会将 Proposal 以及已提交日志发送给对应的 ApplyFsm 来到 apply 流程。
ApplyFsm 会针对这些日志进行(见 ApplyFsm::handle_apply):
check_flashback_state(self.region.get_is_in_flashback());
pub fn check_flashback_state() -> Result<()> {
// The admin flashback cmd could be proposed/applied under any state.
if AdminCmdType::PrepareFlashback || AdminCmdType::FinishFlashback {
return Ok(());
}
let is_flashback_request = req.get_header().get_flags()
.contains(FLASHBACK);
...
}
完成 check 后执行 ApplyDelegate::exec_admin_cmd ,最终识别 Flashback 标识,到达我们的目的地 exec_flashback 函数。
AdminCmdType::PrepareFlashback | AdminCmdType::FinishFlashback =>
self.exec_flashback(ctx, request),
exec_flashback 放入 region 的元信息中,并完成持久态 RegionLocalState 的更新。
fn exec_flashback() -> Result<> {
let is_in_flashback = req.get_cmd_type() == AdminCmdType::PrepareFlashback;
let mut region = self.region.clone();
region.set_is_in_flashback(is_in_flashback);
put_msg_cf(CF_RAFT, &keys::region_state_key(region_id), &old_state)
Ok(ApplyResult::Res(ExecResult::SetFlashbackState { region } )
}
ApplyFSM 在应用一批日志之后会发送一条 ApplyRes 的消息到 PeerFsm。
最终又回到 PeerFsm 的PeerFsmDelegate::handle_msgs 函数,走到 PeerMsg::ApplyRes { res } 分支,调用 PeerFsmDelegate::on_apply_res 完成对 Flashback 的持久态更新。
fn on_set_flashback_state(&mut self, is_in_flashback: bool) {
// update flashback state
self.update_region();
// 此行代码在做的事将在「停读 - ReadLocal & StaleRead 」小节解释
self.fsm.peer.leader_lease_mut().expire_remote_lease();
}
梳理完后,很清晰的看到存在两个对非 Flashback 操作的隔断位置,分别位于 propose 和 apply 处。
那么随之而来的问题便是:在何处加入对 Flashback req 的“通行证”呢?
Flashback 既然可以理解为基于 MVCC 进行实现,那么便也需要通过 MVCC 的读写接口流程。因此溯源一下 MVCC 的读写流程,便能更好地找到“通行证”的放置位置。
让我们首先来梳理一下读流程,可结合 TiKV 源码阅读三部曲(二)读流程 来阅读详细读过程。
在调用到 LocalReader::propose_raft_command 时,发现是通过 LocalReader::pre_propose_raft_command 进行判断 req。
会对 ReadLocal 和 StaleRead 进行特定逻辑处理,其余信息将转发给 RaftStore 来执行,即由 ProposalRouter::send 转发后走到我们之前所提到的 Propose 流程。
fn propose_raft_command() {
match self.pre_propose_raft_command(&req) {
RequestPolicy::ReadLocal => ..
RequestPolicy::StaleRead => ..
// Forward to raftstore.
_ => self.redirect(RaftCommand::new(req, cb)),
}
}
这样很自然地对于除 ReadLocal 和 StaleRead 外的 req,都可以做出以下隔断:
在进入 read 之前,判断一下 req 中是否有 Flashback flag,便实现了在开启 Flashback 之后,只能让 Flashback 相关的读指令通行。
fn exec_snapshot() {
...
if ctx.for_flashback {
flags |= WriteBatchFlags::FLASHBACK.bits();
}
header.set_flags(flags);
...
self.router.read(...)
}
Flashback 执行时将对 exec_snapshot 进行 ctx.for_flashback 的设置,其中 for_flashback 从哪获取的将在下文 「Phase2-1: Exec - 读取阶段」 解释。
正如上小节提到会对 ReadLocal 和 StaleRead 存在着特定逻辑处理,可以作为扩展阅读资料了解 TiKV 功能介绍 - Lease Read && Stale Read 功能的使用场景。
首先看看ReadLocal 的特殊处理即对 Peer 中的 leader_lease 进行检查。当发现不在租期内时,便会转发到正常 Propose 流程中。
因此我们采取:在 prepare Flashback 时,对 lease 手动设置超时,来确保 local read 不会执行。
这便是对之前完成 apply res 之后与更新 region 同时所做的事。
fn on_set_flashback_state(&mut self, is_in_flashback: bool) {
// Update the region meta.
self.update_region()
// Let the leader lease to None to ensure that local reads are not executed.
self.fsm.peer.leader_lease_mut().expire_remote_lease();
}
而对于 StaleRead 的运行前提是需要不停推进 safe ts(即 resolved_ts)。会在 TiDB 检查 resolved_ts,保证 Flashback 的版本不会超过 resolved_ts,因此也完成隔断。
至此对于读的隔断介绍完毕。
可参考 TiKV 源码阅读三部曲(三)写流程 阅读写请求全链路的执行流程。
在写的过程中 RaftKv::exec_write_requests 内部将会走向 router 进行 Propose 流程,因此在此处加上类似上文「读请求」的判断,只能让 Flashback 相关的写指令通行。
fn exec_write_requests() {
...
if txn_extra.for_flashback {
flags |= WriteBatchFlags::FLASHBACK.bits();
}
header.set_flags(flags);
...
self.router.send_command(cmd, cb, extra_opts)?;
}
完成停读写调度后,我们遇到了一个新的问题,那就是持续推进的 resolved ts 会导致 CDC 的 Panic.
一言蔽之 Resolved TS(TODO: 介绍 Resolved TS),由于以下原因:
Resolved TS 组件通过观察 LockCF 的修改,维护一个 StartTS 的最小堆,有 ResolvedTS = max(ResolvedTS, min(StartTS));
Flashback 将以 Region 为单位移除掉所有的锁;
正在进行 Flashback 的 Region 上将不再存在任何的锁,此时 ResolvedTS 将会正常向前推进,无论是否有数据写入。
那么将会带来:改动的 CommitTS < ResolvedTS 的情况(Flashback 的改动自始至终都使用同一个 CommitTS,ResolvedTS 随着推进迟早会超过它)。
为防止 resolved_ts 在我们后续执行 Flashback 前被推进,我们采用:TiDB 请求中带上一个 start_ts,TiKV 会在 CF_WRITE 中选择最新的 user key,以此 start_ts prewrite 上一个锁,将在执行完 Flashback 后 commit 清除掉。
回到最开始的 future_prepare_flashback_to_version 中,首先内部将 req.into。
// First Phase
fn future_prepare_flashback_to_version{
...
let res = storage_clone.sched_txn_command(req.into(), cb);
...
}
在此文件中实现了 PrepareFlashbackToVersionRequest 到 FlashbackToVersionReadPhase 的 from。
impl From<PrepareFlashbackToVersionRequest> for TypedCommand<()> {
fn from(mut req: PrepareFlashbackToVersionRequest) -> Self {
FlashbackToVersionReadPhase { .. }
}
}
因此此处调度的实则是 FlashbackToVersionReadPhase,也即对于 sched_txn_command 将会走到由 FlashbackToVersionReadPhase 提供的 process_read 处,在执行完该函数后会触发 FlashbackToVersion 的 process_write。
fn process_read(self, snapshot: S, statistics: &mut Statistics) -> Result<ProcessResult> {
...
let next_cmd = FlashbackToVersion {
...
}
Ok(ProcessResult::NextCommand {
cmd: Command::FlashbackToVersion(next_cmd),
})
}
大致流程按照如此往复读写读写,直到没有读为止。
由于都是属于 Flashback 操作,需要加上「通行证」后才能被 raftstore 执行。
因此也很合理地在 process_write 加上 Write 的通行证。
fn process_write(mut self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
...
write_data.extra.for_flashback = true;
...
if next_lock_key.is_none() && next_write_key.is_none() {
...
} else {
let next_cmd = FlashbackToVersionReadPhase {
...
}
}
}
而对于 process_read,由于是从 snapshot 中读取,因此会在读取 snapshot 时加上 Read 的通行证。
fn exec_snapshot() {
...
if ctx.for_flashback {
flags |= WriteBatchFlags::FLASHBACK.bits();
}
...
self.router.read( ... )
}
这也便实现了在「停读停写」后能顺利执行 flashback 相关操作。
具体到往复读写的过程,代码位于此处。为了流程更加清晰,我们标识出当前状态:
正如 「停止 resolved _ts 的推进」小节中描述的一样,大致流程如下:
回到最开始的 future_flashback_to_version 中,也会内部进行 req.into 后走到 FlashbackToVersionReadPhase 和 FlashbackToVersion 进行读写,大致流程在「读写阶段介绍」小节已经详细介绍,就不再赘述。
在执行完成 Flashback 后,最后需要做的便是 unset 掉所有为 Flashback 服务的配置。那么便回到了经典 future_flashback_to_version 函数。
fn future_flashback_to_version() -> impl Future<Output = ServerResult<FlashbackToVersionResponse>> {
...
// 3. notify raftstore the flashback has been finished
raft_router_clone.significant_send(region_id, SignificantMsg::FinishFlashback)?;
}
流程与 Prepare 大致相同,发送 Admin 指令,完成持久态的取缔。
至此,完成整个 Flashback!
TODO
现有机制在可用性和易用性上有一些不足: