主从同步
pika主从同步
主要为了分析探索一下pika是如何实现主从同步的,pika的主从同步的原理与redis的同步方案还不相同,本文主要是为了分析其主从同步的相关流程(pika基于3.4版本)。
pika主从同步原理
主从同步的原理,主要是通过在启动的时候启动了两部分的线程来进行的。
- auxiliary_thread线程
- pika_rm中的pika_repl_client线程池和pika_repl_server线程池
先逐个分析一下两个部分线程的工作的流程。
auxiliary_thread线程
在pika的pika_server的Start函数中启动了auxiliary_thread线程。
ret = pika_auxiliary_thread_->StartThread();
if (ret != pink::kSuccess) {
tables_.clear();
LOG(FATAL) << "Start Auxiliary Thread Error: " << ret << (ret == pink::kCreateThreadError ? ": create thread error " : ": other error");
}
此时启动的线程就是位于pika_auxiliary_thread.cc中的线程函数。
void* PikaAuxiliaryThread::ThreadMain() {
while (!should_stop()) { // 是否停止线程
if (g_pika_conf->classic_mode()) { // 判断当前运行的模式 是分布式模式还是经典模式
if (g_pika_server->ShouldMetaSync()) {
g_pika_rm->SendMetaSyncRequest();
} else if (g_pika_server->MetaSyncDone()) {
g_pika_rm->RunSyncSlavePartitionStateMachine();
}
} else {
g_pika_rm->RunSyncSlavePartitionStateMachine(); // 分布式模式则直接启动状态机的同步
}
Status s = g_pika_rm->CheckSyncTimeout(slash::NowMicros()); // 检查超时的节点
if (!s.ok()) {
LOG(WARNING) << s.ToString();
}
// TODO(whoiami) timeout
s = g_pika_server->TriggerSendBinlogSync(); // 触发binlog的主从同步
if (!s.ok()) {
LOG(WARNING) << s.ToString();
}
// send to peer
int res = g_pika_server->SendToPeer(); // 将待发送的任务加入到工作线程队列中
if (!res) {
// sleep 100 ms
mu_.Lock();
cv_.TimedWait(100);
mu_.Unlock();
} else {
//LOG_EVERY_N(INFO, 1000) << "Consume binlog number " << res;
}
}
return NULL;
}
RunSyncSlavePartitionStateMachine-
该函数就是处理主从同步过程中的状态机,根据不同的状态去进行不同的操作。
Status PikaReplicaManager::RunSyncSlavePartitionStateMachine() {
slash::RWLock l(&partitions_rw_, false);
for (const auto& item : sync_slave_partitions_) { // 获取所有的从节点同步信息
PartitionInfo p_info = item.first;
std::shared_ptr<SyncSlavePartition> s_partition = item.second;
if (s_partition->State() == ReplState::kTryConnect) { // 如果同步的信息是kTryConnect则发送TrySync的同步请求
LOG(WARNING) << "Partition start, Table Name: "
<< p_info.table_name_ << " Partition Id: " << p_info.partition_id_;
SendPartitionTrySyncRequest(p_info.table_name_, p_info.partition_id_);
} else if (s_partition->State() == ReplState::kTryDBSync) { // 如果是kTryDB的状态则发送DB同步的请求
SendPartitionDBSyncRequest(p_info.table_name_, p_info.partition_id_);
} else if (s_partition->State() == ReplState::kWaitReply) { // 如果是wait状态则什么都不做
continue;
} else if (s_partition->State() == ReplState::kWaitDBSync) { // 如果是waitdb状态则等待
std::shared_ptr<Partition> partition =
g_pika_server->GetTablePartitionById(
p_info.table_name_, p_info.partition_id_);
if (partition) {
partition->TryUpdateMasterOffset(); // 更新和主之间的offset
} else {
LOG(WARNING) << "Partition not found, Table Name: "
<< p_info.table_name_ << " Partition Id: " << p_info.partition_id_;
}
} else if (s_partition->State() == ReplState::kConnected
|| s_partition->State() == ReplState::kNoConnect
|| s_partition->State() == ReplState::kDBNoConnect) { // 如果是已连接或者失联则什么都不处理
continue;
}
}
return Status::OK();
}
从状态机的运行来看,所有的步骤都是依赖于该函数通过状态来驱动进行不同的操作。
CheckSyncTimeout-检查连接的超时时间
Status PikaReplicaManager::CheckSyncTimeout(uint64_t now) {
slash::RWLock l(&partitions_rw_, false);
for (auto& iter : sync_master_partitions_) {
std::shared_ptr<SyncMasterPartition> partition = iter.second;
Status s = partition->CheckSyncTimeout(now); // 获取所有的master的同步节点检查是否超时
if (!s.ok()) {
LOG(WARNING) << "CheckSyncTimeout Failed " << s.ToString();
}
}
for (auto& iter : sync_slave_partitions_) {
std::shared_ptr<SyncSlavePartition> partition = iter.second;
Status s = partition->CheckSyncTimeout(now); // 获取所有slave的同步节点信息检查是否超时
if (!s.ok()) {
LOG(WARNING) << "CheckSyncTimeout Failed " << s.ToString();
}
}
return Status::OK();
}
主要是检查master和slave的同步连接信息是否超时。
Status SyncMasterPartition::CheckSyncTimeout(uint64_t now) {
std::unordered_map<std::string, std::shared_ptr<SlaveNode>> slaves = GetAllSlaveNodes();
std::vector<Node> to_del;
for (auto& slave_iter : slaves) {
std::shared_ptr<SlaveNode> slave_ptr = slave_iter.second; // 获取所有slave的连接信息
slash::MutexLock l(&slave_ptr->slave_mu);
if (slave_ptr->LastRecvTime() + kRecvKeepAliveTimeout < now) { // 如果最后的时间超时则删除该连接
to_del.push_back(Node(slave_ptr->Ip(), slave_ptr->Port()));
} else if (slave_ptr->LastSendTime() + kSendKeepAliveTimeout < now && slave_ptr->sent_offset == slave_ptr->acked_offset) { // 如果最后的发送时间未超时 并且主从同步的偏移量发送的与回复的相同则发送binlogchips请求并且更新当前的最后发送时间
std::vector<WriteTask> task;
RmNode rm_node(slave_ptr->Ip(), slave_ptr->Port(), slave_ptr->TableName(), slave_ptr->PartitionId(), slave_ptr->SessionId());
WriteTask empty_task(rm_node, BinlogChip(LogOffset(), ""), LogOffset());
task.push_back(empty_task);
Status s = g_pika_rm->SendSlaveBinlogChipsRequest(slave_ptr->Ip(), slave_ptr->Port(), task); // 同步当前的主从同步的信息
slave_ptr->SetLastSendTime(now);
if (!s.ok()) {
LOG(INFO)<< "Send ping failed: " << s.ToString();
return Status::Corruption("Send ping failed: " + slave_ptr->Ip() + ":" + std::to_string(slave_ptr->Port()));
}
}
}
for (auto& node : to_del) { // 将超时的连接信息都删除掉
coordinator_.SyncPros().RemoveSlaveNode(node.Ip(), node.Port());
g_pika_rm->DropItemInWriteQueue(node.Ip(), node.Port());
LOG(WARNING) << SyncPartitionInfo().ToString() << " Master del Recv Timeout slave success " << node.ToString();
}
return Status::OK();
}
主节点主要维护了当前的一些主从连接的信息维护。
Status SyncSlavePartition::CheckSyncTimeout(uint64_t now) {
slash::MutexLock l(&partition_mu_);
// no need to do session keepalive return ok
if (repl_state_ != ReplState::kWaitDBSync && repl_state_ != ReplState::kConnected) {
return Status::OK(); // 如果从节点的信息不是waitdb或者连接状态则返回ok
}
if (m_info_.LastRecvTime() + kRecvKeepAliveTimeout < now) {
// update slave state to kTryConnect, and try reconnect to master node
repl_state_ = ReplState::kTryConnect;
g_pika_server->SetLoopPartitionStateMachine(true); // 否则就设置成tryconnect状态去尝试连接主节点
}
return Status::OK();
}
TriggerSendBinlogSync-生成每个节点待发送的数据任务
Status PikaServer::TriggerSendBinlogSync() {
return g_pika_rm->WakeUpBinlogSync();
}
...
Status PikaReplicaManager::WakeUpBinlogSync() {
slash::RWLock l(&partitions_rw_, false);
for (auto& iter : sync_master_partitions_) {
std::shared_ptr<SyncMasterPartition> partition = iter.second;
Status s = partition->WakeUpSlaveBinlogSync(); // 检查每个节点是否需要生成binlog同步任务
if (!s.ok()) {
return s;
}
}
return Status::OK();
}
主要是检查每个连接的从节点信息是否需要生成同步binlog任务。
Status SyncMasterPartition::WakeUpSlaveBinlogSync() {
std::unordered_map<std::string, std::shared_ptr<SlaveNode>> slaves = GetAllSlaveNodes();
std::vector<std::shared_ptr<SlaveNode>> to_del;
for (auto& slave_iter : slaves) {
std::shared_ptr<SlaveNode> slave_ptr = slave_iter.second;
slash::MutexLock l(&slave_ptr->slave_mu);
if (slave_ptr->sent_offset == slave_ptr->acked_offset) { // 检查当前同步的数据信息是否跟回复的数据偏移相同
Status s = ReadBinlogFileToWq(slave_ptr); // 写binlog任务到该从节点连接上面
if (!s.ok()) {
to_del.push_back(slave_ptr);
LOG(WARNING) << "WakeUpSlaveBinlogSync falied, Delete from RM, slave: " <<
slave_ptr->ToStringStatus() << " " << s.ToString();
}
}
}
for (auto& to_del_slave : to_del) { // 如果同步失败则删除该node
RemoveSlaveNode(to_del_slave->Ip(), to_del_slave->Port());
}
return Status::OK();
}
其中ReadBinlogFileToWq就是根据当前的连接来生成binlog同步任务。
Status SyncMasterPartition::ReadBinlogFileToWq(const std::shared_ptr<SlaveNode>& slave_ptr) {
int cnt = slave_ptr->sync_win.Remaining();
std::shared_ptr<PikaBinlogReader> reader = slave_ptr->binlog_reader; //获取当前binlogreader
if (reader == nullptr) {
return Status::OK();
}
std::vector<WriteTask> tasks;
for (int i = 0; i < cnt; ++i) {
std::string msg;
uint32_t filenum;
uint64_t offset;
if (slave_ptr->sync_win.GetTotalBinlogSize() > PIKA_MAX_CONN_RBUF_HB * 2) {
LOG(INFO) << slave_ptr->ToString() << " total binlog size in sync window is :"
<< slave_ptr->sync_win.GetTotalBinlogSize();
break; //检查当前同步窗口的大小
}
Status s = reader->Get(&msg, &filenum, &offset); //获取对应的偏移数据
if (s.IsEndFile()) {
break;
} else if (s.IsCorruption() || s.IsIOError()) {
LOG(WARNING) << SyncPartitionInfo().ToString()
<< " Read Binlog error : " << s.ToString();
return s;
}
BinlogItem item;
if (!PikaBinlogTransverter::BinlogItemWithoutContentDecode(
TypeFirst, msg, &item)) {
LOG(WARNING) << "Binlog item decode failed";
return Status::Corruption("Binlog item decode failed");
}
BinlogOffset sent_b_offset = BinlogOffset(filenum, offset); // 生成发送的偏移量
LogicOffset sent_l_offset = LogicOffset(item.term_id(), item.logic_id());
LogOffset sent_offset(sent_b_offset, sent_l_offset);
slave_ptr->sync_win.Push(SyncWinItem(sent_offset, msg.size())); //设置同步窗口的大小
slave_ptr->SetLastSendTime(slash::NowMicros()); //设置最后的发送时间
RmNode rm_node(slave_ptr->Ip(), slave_ptr->Port(), slave_ptr->TableName(), slave_ptr->PartitionId(), slave_ptr->SessionId());
WriteTask task(rm_node, BinlogChip(sent_offset, msg), slave_ptr->sent_offset);
tasks.push_back(task); // 包装成任务
slave_ptr->sent_offset = sent_offset; // 设置当前的发送偏移量
}
if (!tasks.empty()) {
g_pika_rm->ProduceWriteQueue(slave_ptr->Ip(), slave_ptr->Port(), partition_info_.partition_id_, tasks); // 将任务放入队列中等待处理
}
return Status::OK();
}
主要就是通过获取偏移量,然后生成任务并放入发送队列中等待处理。