# distributed, 2023-10-25
This is an article originally intended as a product introduction, but was shelved for various reasons. Recently, as we have been improving related features, we decided to refine it and release it. Very excited to implement this with JmPotato~
For the code, please see: roadmap & TiKV tracking issue
Flashback (usually referring to Oracle Flashback) is a feature used to quickly revert to a previous version in case of user errors, to avoid significant losses.
In the gaming industry, issues like version errors occur from time to time, and regular backups can only revert to the backup point in time, which is also a waste of resources. TiDB v6.4.0 introduced the FLASHBACK CLUSTER TO TIMESTAMP
syntax, which allows the data of a cluster, database, or data table to be restored to a specific point in time.
In TiDB, there are some related features:
「For details, please refer to User documentation.」
mysql> CREATE TABLE t(a INT);
Query OK, 0 rows affected (0.09 sec)
mysql> SELECT * FROM t;
Empty set (0.01 sec)
mysql> SELECT now();
+---------------------+
| now() |
+---------------------+
| 2022-09-28 17:24:16 |
+---------------------+
1 row in set (0.02 sec)
mysql> INSERT INTO t VALUES (1);
Query OK, 1 row affected (0.02 sec)
mysql> SELECT * FROM t;
+------+
| a |
+------+
| 1 |
+------+
1 row in set (0.01 sec)
mysql> FLASHBACK CLUSTER TO TIMESTAMP '2022-09-28 17:24:16';
Query OK, 0 rows affected (0.20 sec)
mysql> SELECT * FROM t;
Empty set (0.00 sec)
In Flashback, we use Region locks to block all read/write operations and scheduling during the Flashback process to avoid any external factors that might cause data inconsistencies.
By locking the Region before performing Flashback, we can gain the following benefits:
First, TiDB will determine:
Let’s start from the beginning of TiKV startup. :)
Beginning with cmd/tikv-server/main.rs, after TiKV finishes configuring a series of parameters, the main function culminates with server::run_tikv(config) to run the TiKV server. The main function selects the corresponding Engine based on the configured parameters and calls the run_impl function. Inside run_server, it performs binding and initiates the grpc_server.start(); service. You can view the specific binding and startup process in this document.
The focus of this article, Flashback, is located at the following place in kvproto:
rpc KvPrepareFlashbackToVersion(kvrpcpb.PrepareFlashbackToVersionRequest) returns (kvrpcpb.PrepareFlashbackToVersionResponse) {}
rpc KvFlashbackToVersion(kvrpcpb.FlashbackToVersionRequest) returns (kvrpcpb.FlashbackToVersionResponse) {}
Returning to the TiKV code, TiKV includes multiple gRPC services. One of the most important is the KvService, located in the src/server/service/kv.rs file. It includes the APIs for transaction operations in TiKV, such as kv_get, kv_scan, kv_prewrite, kv_commit, etc. The Flashback feature in this article, since it uses the transaction model, is quite naturally placed in this file.
Before we delve deeper into the specific code, let’s take a broad look at the overall Flashback process. We can distill it into four main steps based on the key code components:
// First Phase
fn future_prepare_flashback_to_version() {
// 1. prepare the raftstore for the later flashback.
send_flashback_msg(.., AdminCmdType::PrepareFlashback);
// 2.prewrite the first user key to prevent `resolved_ts` from advancing.
let (cb, f) = paired_future_callback();
res = storage.sched_txn_command(req.clone().into(), cb);
}
// Second Phase
fn future_flashback_to_version() {
// 3. execute overwrite and commit the first user key.
let (cb, f) = paired_future_callback();
let res = storage_clone.sched_txn_command(req.into(), cb);
// 4. notify raftstore flashback has been finished.
send_flashback_msg(.., AdminCmdType::FinishFlashback);
}
Entering the first phase of Flashback operation, the preparation module is critical.
The primary goal of the Prepare function is to halt reads and writes, prevent scheduling, and persist the Flashback state, as well as stop the advancement of resolved_ts.
In the “Preparation” phase, the necessity of stopping all reads, writes, and scheduling operations has already been explained in the Background Introduction section. To achieve this, one would intuitively block these operations at the point of execution. With TiKV using the Raft consensus protocol, the operations eventually go through a Propose, Commit, then Apply process, so it makes sense to intercept reads and writes before Propose.
Spoiler Alert for Flashback Implementation: The interception is done quickly at the Propose to block other reads and writes during the Flashback process, with the Apply step acting as a safety net.
To understand the process of handling a proposal in TiKV, you can refer to this article.
In short, TiKV utilizes two thread pools to handle proposals, and a Raft peer is divided into two parts: PeerFsm and ApplyFsm. During the proposal processing, PeerFsm fetches logs and drives the internal state machine of Raft, while ApplyFsm updates the state machine according to the committed logs, which includes both region information and user data.
For more details on PeerFsm and ApplyFsm, you can refer to this article
During the process where “PeerFsm fetches logs and drives the internal state machine of Raft” it encounters the following function:
fn propose_raft_command_internal() {
match self.pre_propose_raft_command(&msg) { .. }
if self.fsm.peer.propose(self.ctx, cb, msg, resp, diskfullopt) {
self.fsm.has_ready = true;
}
}
Upon examining the codebase, we find that the PeerFsmDelegate::pre_propose_raft_command function is indeed the checkpoint where a request is examined before a propose is allowed to proceed.
It’s logical to place the check to determine if the current request is related to Flashback right at this stage.
Key Consideration:
However, we cannot block all operations indiscriminately. Flashback operations themselves need to pass through the Raft process without hindrance. Thus, we should issue a sort of ‘pass’ for them.
Utilizing flags in RaftCmdRequest Header:
We then notice that the RaftCmdRequest structure has a header field that includes flags. This is a suitable place to set flags that can be used as a ‘pass’ for Flashback-related commands, allowing them to be distinguished from regular operations.
fn pre_propose_raft_command(
&mut self,
req: &RaftCmdRequest,
) -> Result<Option<RaftCmdResponse>> {
// When in the flashback state, we should not allow any other request to be proposed.
if self.region().is_in_flashback {
let flags = req.get_header().get_flags();
if !flags.contains(FLASHBACK) {
return Err;
}
}
}
After implementing the “block points” to prevent non-Flashback operations, to ensure that Flashback-related operations proceed smoothly and are not blocked, we add a flag to the header.
// First Phase
fn future_prepare_flashback_to_version{
// 1. prepare the raftstore for the later flashback.
send_flashback_msg(.., AdminCmdType::PrepareFlashback);
}
The functions start_flashback/end_flashback that are invoked will send an admin request after the flag is added.
async fn start_flashback/end_flashback {
...
req.mut_header()
.set_flags(WriteBatchFlags::FLASHBACK.bits());
// call admin request directly
let raft_router = raft_router.lock().await;
raft_router.send_command(req, cb, ...)
...
}
The Admin request is formed into a RaftCommand via the RaftStoreRouter and sent along. It goes through the proposing process, passing through the pre_propose check, and arrives at PeerFsmDelegate.fsm.peer.propose to complete the proposal of a Raft log.
Subsequently, the PeerFsm will send the Proposal and committed logs to the corresponding ApplyFsm for the apply process.
The ApplyFsm will, for these logs (see ApplyFsm::handle_apply):
check_flashback_state(self.region.get_is_in_flashback());
pub fn check_flashback_state() -> Result<()> {
// The admin flashback cmd could be proposed/applied under any state.
if AdminCmdType::PrepareFlashback || AdminCmdType::FinishFlashback {
return Ok(());
}
let is_flashback_request = req.get_header().get_flags()
.contains(FLASHBACK);
...
}
After the checks are passed, the process executes ApplyDelegate::exec_admin_cmd, which ultimately recognizes the Flashback identifier and reaches our target function exec_flashback.
AdminCmdType::PrepareFlashback | AdminCmdType::FinishFlashback =>
self.exec_flashback(ctx, request),
When the exec_flashback function is invoked, it performs the necessary operations to alter the Region’s metadata to reflect the state of the system as it should be after the Flashback process. This usually involves the following steps:
fn exec_flashback() -> Result<> {
let is_in_flashback = req.get_cmd_type() == AdminCmdType::PrepareFlashback;
let mut region = self.region.clone();
region.set_is_in_flashback(is_in_flashback);
put_msg_cf(CF_RAFT, &keys::region_state_key(region_id), &old_state)
Ok(ApplyResult::Res(ExecResult::SetFlashbackState { region } )
}
After the ApplyFsm applies a series of Raft logs, it generates an ApplyRes message that encapsulates the outcomes of this apply process. This message is dispatched to the corresponding PeerFsm.
Once received, the PeerFsm processes the message through the PeerFsmDelegate::handle_msgs function, specifically within the PeerMsg::ApplyRes { res } case. It is here that PeerFsmDelegate::on_apply_res is invoked, thereby updating the durable state to reflect the effects of the Flashback operation, ensuring that the persistent view of the region’s state is consistent with the Flashback’s target historical state.
fn on_set_flashback_state(&mut self, is_in_flashback: bool) {
// update flashback state
self.update_region();
// 此行代码在做的事将在「停读 - ReadLocal & StaleRead 」小节解释
self.fsm.peer.leader_lease_mut().expire_remote_lease();
}
After a comprehensive analysis, we observe that there are two critical blocks that act as barriers to non-Flashback operations, located at the propose and apply stages, respectively.
This leads to the ensuing inquiry: where should we integrate the ‘pass’ that permits Flashback requests to proceed unhindered?
Considering that Flashback is conceptually an operation built upon Multi-Version Concurrency Control (MVCC) mechanisms, it inherently requires traversal through the established read-write interfaces of MVCC. By retracing the MVCC read-write process, we can better ascertain the strategic location to embed this ‘pass’, ensuring that Flashback requests are granted seamless continuity through the system.
Firstly, let’s comb through the read process, which can be studied in detail in conjunction with TiKV Source Code Reading Part Two: The Read Process.
When LocalReader::propose_raft_command is invoked, it’s discerned that the request is judged through LocalReader::pre_propose_raft_command.
Specific logic is applied to ReadLocal and StaleRead, while other requests are forwarded to RaftStore for execution. This forwarding is done via ProposalRouter::send, after which the process enters the Propose flow we previously mentioned.
fn propose_raft_command() {
match self.pre_propose_raft_command(&req) {
RequestPolicy::ReadLocal => ..
RequestPolicy::StaleRead => ..
// Forward to raftstore.
_ => self.redirect(RaftCommand::new(req, cb)),
}
}
Thus, for requests other than ReadLocal and StaleRead, the following interruption can be naturally implemented:
Before entering the read process, check whether the request contains the Flashback flag. This achieves the goal that after Flashback is initiated, only read commands related to Flashback are allowed to pass through.
fn exec_snapshot() {
...
if ctx.for_flashback {
flags |= WriteBatchFlags::FLASHBACK.bits();
}
header.set_flags(flags);
...
self.router.read(...)
}
When Flashback is executed, the exec_snapshot is set with ctx.for_flashback, where for_flashback is obtained will be explained in the following section 「Phase2-1: Exec - Read Stage」.
As mentioned in the previous section, ReadLocal and StaleRead have specific logic that can be understood further through the reading materials about TiKV’s Lease Read feature && use cases for Stale Read functionality.
The special handling for ReadLocal involves checking the leader_lease in the Peer. If it is found to be outside of the lease period, it will be forwarded to the regular Propose process.
Therefore, our approach is: during the preparation of Flashback, to manually set the lease to expire to ensure that local reads will not execute.
This is what is done simultaneously with the lease update after completing the apply res and updating the region.
fn on_set_flashback_state(&mut self, is_in_flashback: bool) {
// Update the region meta.
self.update_region()
// Let the leader lease to None to ensure that local reads are not executed.
self.fsm.peer.leader_lease_mut().expire_remote_lease();
}
Regarding StaleRead, its prerequisite for operation is the continuous advancement of safe ts (also known as resolved_ts). This check is performed in TiDB to ensure that the version used by Flashback will not exceed resolved_ts, therefore providing a cutoff.
With this, the explanation for the interruption of read operations during Flashback is complete.
For further details on the execution process of write requests in TiKV, one can refer to TiKV Source Code Reading (Part III) Write Process.
During the write process, the RaftKv::exec_write_requests internally moves towards the router to initiate the Propose process. Therefore, similar to the “read request” discussed previously, a checkpoint is added at this stage to only allow Flashback-related write commands to pass through.
This ensures that during the execution of Flashback, only write operations associated with it can proceed, and all other write requests are effectively halted, preserving the integrity of the Flashback operation.
fn exec_write_requests() {
...
if txn_extra.for_flashback {
flags |= WriteBatchFlags::FLASHBACK.bits();
}
header.set_flags(flags);
...
self.router.send_command(cmd, cb, extra_opts)?;
}
After halting read and write operations, we are faced with a new issue: the continuously advancing resolved timestamp (resolved_ts) could cause a panic in Change Data Capture (CDC).
In short, resolved_ts is an internal mechanism of TiKV (TODO: introduce resolved_ts). It’s maintained due to the following reasons:
The Resolved TS component maintains a minimum heap of StartTS by observing changes in the LockCF, with the rule ResolvedTS = max(ResolvedTS, min(StartTS)).
Flashback will remove all locks at the granularity of a Region.
For a Region undergoing Flashback, there will no longer be any locks, meaning the ResolvedTS will continue to advance as normal, regardless of whether data is being written.
This would lead to a scenario where the CommitTS of the changes is less than the ResolvedTS (since Flashback uses the same CommitTS for all changes, and eventually, the ResolvedTS would surpass it).
To prevent resolved_ts from advancing before we execute Flashback, we employ the following strategy: TiDB includes a start_ts with its requests, and TiKV selects the latest user key in the CF_WRITE. A lock is prewritten with this start_ts, which will be committed and cleared after Flashback execution is complete.
Returning to the initial future_prepare_flashback_to_version function, it begins by internally converting the request using req.into.
// First Phase
fn future_prepare_flashback_to_version{
...
let res = storage_clone.sched_txn_command(req.into(), cb);
...
}
The from of PrepareFlashbackToVersionRequest to FlashbackToVersionReadPhase is implemented here.
impl From<PrepareFlashbackToVersionRequest> for TypedCommand<()> {
fn from(mut req: PrepareFlashbackToVersionRequest) -> Self {
FlashbackToVersionReadPhase { .. }
}
}
Therefore, what is actually being scheduled here is FlashbackToVersionReadPhase, which means that for sched_txn_command, the process will proceed to the process_read provided by FlashbackToVersionReadPhase. After this function is executed, it will trigger the process_write of FlashbackToVersion.
fn process_read(self, snapshot: S, statistics: &mut Statistics) -> Result<ProcessResult> {
...
let next_cmd = FlashbackToVersion {
...
}
Ok(ProcessResult::NextCommand {
cmd: Command::FlashbackToVersion(next_cmd),
})
}
The general process follows a repetitive read-write-read-write sequence until there are no more reads to perform.
Since all these operations are part of the Flashback process, they must be marked with a “pass” to be executed by raftstore.
Therefore, it is also quite reasonable to add a Write “pass” during the process_write.
fn process_write(mut self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
...
write_data.extra.for_flashback = true;
...
if next_lock_key.is_none() && next_write_key.is_none() {
...
} else {
let next_cmd = FlashbackToVersionReadPhase {
...
}
}
}
For process_read, since it is reading from a snapshot, a Read pass is added when reading the snapshot.
fn exec_snapshot() {
...
if ctx.for_flashback {
flags |= WriteBatchFlags::FLASHBACK.bits();
}
...
self.router.read( ... )
}
This also ensures that after the “read and write suspension,” Flashback-related operations can be smoothly executed.
Specifically, the process of repetitive reading and writing’s code is here. To make the process clearer, we have marked the current status:
Just as described in the section 「Halting the Advancement of resolved_ts」 the general process is as follows:
Returning to the initial future_flashback_to_version, the process also internally goes through req.into and proceeds to FlashbackToVersionReadPhase and FlashbackToVersion for reading and writing. The detailed process has been introduced in the section 「Introduction to the Read and Write Phase」 so it is not repeated here.
After the Flashback is executed, the last thing we need to do is to unset all the configurations for the Flashback service. This brings us back to the classic future_flashback_to_version function.
fn future_flashback_to_version() -> impl Future<Output = ServerResult<FlashbackToVersionResponse>> {
...
// 3. notify raftstore the flashback has been finished
raft_router_clone.significant_send(region_id, SignificantMsg::FinishFlashback)?;
}
The process is similar to that of Prepare, and the Admin command is sent to complete the persistence takedown.
At this point, the entire Flashback is complete!
TODO
The existing mechanism has some shortcomings in terms of usability and ease of use: