为了保证服务的可用性,现代数据库都提供了复制功能,同时在多个进程中维护一致的数据状态。
Redis 支持一主多从的复制架构,该功能被简化成了一条 SLAVEOF
命令,下面通过条命令来解析 Redis 的主从复制机制。
通过 tcpdump 观察
在本机上通过 redis-server 启动两个服务,然后通过 tcpdump 观察主从间的交互情况:
redis-server --port 6379 --requirepass 123456 # 启动 master
redis-server --port 6380 --masterauth 123456 # 启动 slave
tcpdump -t -i lo0 host localhost and port 6379 | awk -F ']' '{print $1"]"$3}'
# 在 localhost:6380 上执行 SLAVEOF localhost 6379 建立同步连接,进入 Full-ReSync 阶段
localhost.59297 > localhost.6379: Flags [S]
localhost.6379 > localhost.59297: Flags [S.]
localhost.59297 > localhost.6379: Flags [P.] "PING"
localhost.6379 > localhost.59297: Flags [P.] "NOAUTH Authentication required."
localhost.59297 > localhost.6379: Flags [P.] "AUTH 123456"
localhost.6379 > localhost.59297: Flags [P.] "OK"
localhost.59297 > localhost.6379: Flags [P.] "REPLCONF listening-port 6380"
localhost.6379 > localhost.59297: Flags [P.] "OK":
localhost.59297 > localhost.6379: Flags [P.] "REPLCONF capa eof"
localhost.6379 > localhost.59297: Flags [P.] "OK":
localhost.59297 > localhost.6379: Flags [P.] "PSYNC ? -1"
localhost.6379 > localhost.59297: Flags [P.] "FULLRESYNC 8efb6ca4edf1258c05a5ced43b0c73fe4deb1908 1"
localhost.6379 > localhost.59297: Flags [P.] [|RESP:
localhost.6379 > localhost.59297: Flags [P.] "REDIS0007M-z^Iredis-ver^F3.2.11M-z" [|RESP
# 完成 Full-ReSync 后进入 Propagation 阶段
localhost.59297 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "1"
localhost.59297 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "1"
localhost.6379 > localhost.59297: Flags [P.] "PING"
localhost.59297 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "15"
localhost.59297 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "15"
localhost.6379 > localhost.59297: Flags [P.] "SELECT" "0" "SET" "KEY" "VALUE"
localhost.59297 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "85"
localhost.59297 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "85"
localhost.6379 > localhost.59297: Flags [P.] "SET" "KEY2" "VALUE2"
localhost.6379 > localhost.59297: Flags [P.] "MSET" "KEY3" "VALUE3" "KEY4" "VALUE4" "KEY5" "VALUE5"
localhost.59297 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "256"
localhost.59297 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "256"
localhost.6379 > localhost.59297: Flags [P.] "PING"
localhost.59297 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "270"
localhost.59297 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "270"
# 在 localhost:6380 上执行 DEBUG SLEEP 60 模拟网络中断的情况
localhost.6379 > localhost.59297: Flags [P.] "PING"
localhost.6379 > localhost.59297: Flags [P.] "SET" "KEY6" "VALUE6"
localhost.6379 > localhost.59297: Flags [P.] "SET" "KEY7" "VALUE7"
localhost.6379 > localhost.59297: Flags [P.] "PING"
localhost.6379 > localhost.59297: Flags [P.] "MSET" "KEY8" "VALUE8" "KEY9" "VALUE9"
localhost.6379 > localhost.59297: Flags [P.] "PING"
localhost.6379 > localhost.59297: Flags [P.] "PING"
localhost.59297 > localhost.6379: Flags [.]
localhost.59297 > localhost.6379: Flags [R.]
# 旧的同步连接断开后重新建立同步连接,进入 Partical-ReSync 阶段
localhost.59313 > localhost.6379: Flags [S]
localhost.6379 > localhost.59313: Flags [S.]
localhost.59313 > localhost.6379: Flags [P.] "PING"
localhost.6379 > localhost.59313: Flags [P.] "NOAUTH Authentication required."
localhost.59313 > localhost.6379: Flags [P.] "AUTH 123456"
localhost.6379 > localhost.59313: Flags [P.] "OK"
localhost.59313 > localhost.6379: Flags [P.] "REPLCONF listening-port 6380"
localhost.6379 > localhost.59313: Flags [P.] "OK"
localhost.59313 > localhost.6379: Flags [P.] "REPLCONF capa eof"
localhost.6379 > localhost.59313: Flags [P.] "OK"
localhost.59313 > localhost.6379: Flags [P.] "PSYNC 8efb6ca4edf1258c05a5ced43b0c73fe4deb1908 271"
localhost.6379 > localhost.59313: Flags [P.] "CONTINUE"
localhost.6379 > localhost.59313: Flags [P.] "PING" "PING" "SET" "KEY6" "VALUE6" "PING" "SET" "KEY7" "VALUE7" "PING" "MSET" "KEY8" "VALUE8" "KEY9" "VALUE9" "PING" "PING"
localhost.59313 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "519"
localhost.59313 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "519"
localhost.6379 > localhost.59313: Flags [P.] "PING"
localhost.59313 > localhost.6379: Flags [P.]: "REPLCONF" "ACK" "533"
localhost.59313 > localhost.6379: Flags [P.]: "REPLCONF" "ACK" "533"
整个过程可以分为 Full-ReSync
, Command-Propagate
, Partical-ReSync
总共 3 阶段:
+----------------------+ +---------------------+
| redisServer (master) | | redisServer (slave) |
| localhost:6379 | | localhost:6380 |
+----------------------+ +---------------------+
| slaves | | master |
+----------------------+ +---------------------+
| |
+----------------+ +-------------+
| redisClient[?] | | redisClient |
+----------------+ +-------------+
|
^ <<<<<<<<<<<<<<<<<< PING <<<<<<<<<<<<<<<<<
| | Step 1 : 检查套接字与 master 状态
| >>>>>>>>>>>>> PONG / NOAUTH >>>>>>>>>>>>>
| |
| <<<<<<<<<<<<<<<<<< AUTH <<<<<<<<<<<<<<<<<
| | Step 2 : 身份验证
| >>>>>>>>>>>>>>>>>>> OK >>>>>>>>>>>>>>>>>>
| |
| <<<< REPLCONF listening-port [port] <<<<<
| | Step 3 : 发送 slave 端口
Full-ReSync >>>>>>>>>>>>>>>>>>> OK >>>>>>>>>>>>>>>>>>
| |
| <<<<<< REPLCONF capa [eof|psync2] <<<<<<<
| | Step 4 : 检查命令兼容性
| >>>>>>>>>>>>>>>>>>> OK >>>>>>>>>>>>>>>>>>
| |
| <<<<<<<<<<<<<< PSYNC ? -1 <<<<<<<<<<<<<<<
| |
| >>>>>> FULLRESYNC [replid] [offset] >>>>> Step 6 : 执行全量同步
| V
| BGSAVE
| V
v >>>>>>>>>>>>> RDB Snapshot >>>>>>>>>>>>>>
^ <<<<<<<<< REPLCONF ACK [offset] <<<<<<<<<
| >>>>>>>>>>>>>>> COMMAND 1 >>>>>>>>>>>>>>>
| >>>>>>>>>>>>>>> COMMAND 2 >>>>>>>>>>>>>>>
| <<<<<<<< REPLCONF ACK [offset+?] <<<<<<<< 心跳检测 & 命令传播
Command-Propagate >>>>>>>>>>>>>>>>>> PING >>>>>>>>>>>>>>>>>
| >>>>>>>>>>>>>>> COMMAND 3 >>>>>>>>>>>>>>>
| <<<<<<<< REPLCONF ACK [offset+?] <<<<<<<<
| <<<<<<<< REPLCONF ACK [offset+?] <<<<<<<<
v >>>>>>>>>>>>>>>>>> PING >>>>>>>>>>>>>>>>>
^ =========================================
| ====== The Same With Full-ReSync ========
| =========================================
| |
Partical-ReSync <<<<<<<< PSYNC [replid] [offset] <<<<<<<< 重连后执行部分同步
| |
| >>>>>>>>>>>>>>> CONTINUE >>>>>>>>>>>>>>>>
| >>>>>>>>>>>>>>> COMMAND N >>>>>>>>>>>>>>>
v >>>>>>>>>>>>>>> COMMAND ... >>>>>>>>>>>>>
PSYNC 命令
最初 Redis 用于同步的命令是SYNC
,每次重连执行该命令时都会生成、传输、加载整个完整的 RDB 快照,严重占用机器资源与网络带宽。为了解决这一问题,后续版本的 Redis 追加了PSYNC
命令,该命令支持以下两种同步模式:
- 全量重新同步
Full-ReSync
- slave 首次连接 master
- master 与 slave 之间的状态差异过大
- 部分重新同步
Partical-ReSync
- 网络抖动导致同步连接断开重连
- sentinel 机制导致 master 节点发生变更
数据结构
下面看看 redisServer 中与PSYNC
相关的数据结构:
struct redisServer {
/*
* 节点ID 与 复制偏移量
*
* 若当前节点是 master
* server.replid 就是 server.runid
*
* 若当前节点原本是 master,转化为 slave 节点后
* server.replid 与 server.master_repl_offset 会被新 master 的同步信息覆盖
*
* 若当前节点原本是 slave,被提升为 master 节点后
* rserver.eplid2 与 server.second_replid_offset 会记录当前节点作为 slave 时的同步信息
*/
char runid[CONFIG_RUN_ID_SIZE+1]; /* 当前节点的运行时ID(每次重启都会发生变化) */
char replid[CONFIG_RUN_ID_SIZE+1]; /* 当前 master 节点的 runid */
char replid2[CONFIG_RUN_ID_SIZE+1]; /* 当前 master 节点作为 slave 节点时连接的 master 的 runid */
long long master_repl_offset; /* 当前 master 节点的复制偏移量 */
long long second_replid_offset; /* 当前 master 节点作为 slave 节点时的同步偏移量 */
/*
* 复制积压缓冲
*
* master 只维护一个全局的 server.repl_backlog,由所有 slave 节点共享
* 为了减少内存占用,server.repl_backlog 仅在 slave 节点存在时按需创建
*/
char *repl_backlog; /* 复制积压缓冲(环形缓冲)*/
long long repl_backlog_size; /* 积压缓冲大小 */
long long repl_backlog_histlen; /* 积压数据长度 */
long long repl_backlog_idx; /* 积压缓冲尾部(可写位置)*/
long long repl_backlog_off; /* 积压缓冲首字节对应的同步偏移量(master offset)*/
}
运行ID
无论主从,每个 Redis 服务器会在启动时生成一个长度为 40 的十六进制字符串作为运行IDrunid
:
- 当 slave 首次请求同步时,会将 master 返回的
server.runid
保存至server.replid
- 当 slave 重新请求同步时,会将之前保存的
server.replid
发送给 master:- 如果该 ID 与当前 master 的
server.runid
不一致,则必须执行一次全量重新同步 - 如果该 ID 与当前 master 的
server.runid
一致,则可以尝试执行部分同步操作
- 如果该 ID 与当前 master 的
复制偏移量
主从双方都会维护一个单位为字节的复制偏移量offset
,通过该偏移量可以判断主从间的状态是否一致:
- master 向 slave 传播 N 字节数据后,会将自己的复制偏移量增加 N
- slave 接收到 master 传来的 N 个字节数据时,会将自己的复制偏移量增加 N
当 master 接收到 REPLCONF ACK
中的偏移量时,可以据此判断发送给 slave 的数据是否发生了丢失,并重发丢失的数据。
积压缓冲
master 端维护了一个定长的积压缓冲队列backlog
。
master 向 slave 传播命令时会同时将命令放入该队列,因此缓冲区里会保留一部分最新的命令。
slave 发出同步请求时,如果 slave 的偏移量之后 (offset+1) 的数据存在于积压缓冲,master 才会执行部分同步。
同步流程
SLAVE 视角
slave 接收到SLAVEOF
命令后,会调用replicaofCommand
开始执行主从同步:
void replicaofCommand(client *c) {
// ...
if (!strcasecmp(c->argv[1]->ptr,"no") &&
!strcasecmp(c->argv[2]->ptr,"one")) {
if (server.masterhost) { // 如果接收到的命令是 SLAVE NO ONE 则断开主从同步
// ...
}
} else {
if (c->flags & CLIENT_SLAVE) {
return; // 如果已经是客户端是一个 slave 节点,则拒绝该命令
}
if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) && server.masterport == port) {
return; // 如果已经连接上 SLAVEOF 中指定的 master 节点,则直接返回
}
// 如果尚未连接任意 master 节点,则根据 masterhost 与 masterport 建立 TCP 连接
// 并注册监听函数 syncWithMaster
}
}
void syncWithMaster(connection *conn) {
// 向 master 节点发送 PING 命令
if (server.repl_state == REPL_STATE_CONNECTING) {
server.repl_state = REPL_STATE_RECEIVE_PONG;
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PING",NULL); // 发送 PING 命令
// ...
}
// 监听到 master 对 PING 命令的响应
if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
if (err[0] != '+' &&
strncmp(err,"-NOAUTH",7) != 0 &&
strncmp(err,"-NOPERM",7) != 0 &&
strncmp(err,"-ERR operation not permitted",28) != 0)
{
goto error;
}
server.repl_state = REPL_STATE_SEND_AUTH; // 只处理 master 响应值为 PONG、NOAUTH、NOPERM 的情况
}
// 根据 master 对 PING 的响应值,判断是否需要授权
if (server.repl_state == REPL_STATE_SEND_AUTH) {
if (server.masteruser && server.masterauth) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"AUTH",
server.masteruser,server.masterauth,NULL); // 发送 AUTH 命令
// ...
server.repl_state = REPL_STATE_RECEIVE_AUTH;
} else {
// 如果没有设置 server.masteruser 与 server.masterauth 授权信息,则跳过 AUTH
server.repl_state = REPL_STATE_SEND_PORT;
}
}
// 此处略过以下步骤:
// 使用 REPLCONF listening-port 命令将 slave 的端口告知 master
// 使用 REPLCONF ip-address 命令将 slave 的 IP 告知 master
// 使用 REPLCONF capa eof / capa psync2 命令将 slave 兼容性(支持的特性)告知 master
// 开始发送 PSYNC 命令
if (server.repl_state == REPL_STATE_SEND_PSYNC) {
if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) {
goto write_error;
}
server.repl_state = REPL_STATE_RECEIVE_PSYNC;
return;
}
// 读取 PSYNC 命令的响应
psync_result = slaveTryPartialResynchronization(conn,1);
// 如果监听到 CONTINUE 响应,跳过全量同步
if (psync_result == PSYNC_CONTINUE) return;
// 如果返回值为 PSYNC_FULLRESYNC 或 PSYNC_NOT_SUPPORTED
// 开始执行执行全量同步,注册 readSyncBulkPayload 监听 RDB 文件下载
if (connSetReadHandler(conn, readSyncBulkPayload) == C_ERR) {
// ...
goto error;
}
server.repl_state = REPL_STATE_TRANSFER;
// ...
}
int slaveTryPartialResynchronization(connection *conn, int read_reply) {
if (!read_reply) {
if (server.cached_master) { // server.cached_master 中存在记录,尝试执行部分同步
psync_replid = server.cached_master->replid;
} else {
psync_replid = "?"; // server.cached_master 中不存在记录,只能执行全量同步
}
// 发起 PSYNC 命令
reply = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PSYNC",psync_replid,psync_offset,NULL);
// ...
return PSYNC_WAIT_REPLY;
}
reply = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL); // 读取 PSYNC 响应
// 如果 master 响应 FULLRESYNC 则直接进行全量同步
if (!strncmp(reply,"+FULLRESYNC",11)) {
// ...
return PSYNC_FULLRESYNC;
}
// 如果 master 响应 CONTINUE 则尝试执行部分同步
if (!strncmp(reply,"+CONTINUE",9)) {
// ...
return PSYNC_CONTINUE;
}
// master 暂时无法处理 PSYNC 命令 —> PSYNC_TRY_LATER
// master 不支持 PSYNC 命令 -> PSYNC_NOT_SUPPORTED
}
MASTER 视角
master 接收到PSYNC
命令后,会调用syncCommand
开启同步流程:
void syncCommand(client *c) {
// 接收到 slave 发送的 PSYNC 命令
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
if (masterTryPartialResynchronization(c) == C_OK) {
return; // 无需全量同步,直接返回
}
}
// 若代码运行至此处,意味着部分同步失败,需要执行全量同步
// master 会执行 BGSAVE 命令生成快照并传输给 slave
// 同步 RDB 快照的方式有两种:
// 基于磁盘(Disk-backed):在磁盘生成 RDB 快照文件,然后再传输给 slave
// 无盘(Diskless):直接将 RDB 快照数据写入 slave socket
}
int masterTryPartialResynchronization(client *c) {
long long psync_offset; // 该 slave 最新的同步偏移量
char *master_replid; // slave 同步偏移量对应的 master 的 runid
/*
* 以下情况可以避免全量同步:
* 1. slave 最近一次同步的 master 是当前实例(网络抖动)
* 2. slave 与当前节点原本是同个 master 的从节点,且当前节点的同步偏移量 second_replid_offset 较大(维护重启、故障切换)*/
if (strcasecmp(master_replid, server.replid) &&
(strcasecmp(master_replid, server.replid2) ||psync_offset > server.second_replid_offset))
{
goto need_full_resync; // 不满足 PSYNC 条件,需要执行全量同步
}
/*
* 以下情况只能执行全量同步:
* 1. master 没有初始化积压缓冲
* 2. slave 的同步偏移量落后于积压缓冲 */
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
goto need_full_resync; // 进行全量同步
}
// 若代码运行至此处,意味着可以执行部分同步
listAddNodeTail(server.slaves,c);
// 根据客户端是否兼容 PSYNC2,返回不同的 CONTINUE 响应
if (c->slave_capa & SLAVE_CAPA_PSYNC2) {
buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid);
} else {
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
}
// CONTINUE 命令后面,紧接着就是 server.repl_backlog 的内容
psync_len = addReplyReplicationBacklog(c,psync_offset);
// ...
}
心跳 & 命令传播
Redis 每秒会执行一次定时任务replicationCron
,其中就包含主从同步间的心跳,可以发现主从双方的心跳频率是不一致的:
void replicationCron(void) {
// slave 定时向 master 发送 REPLCONF ACK 命令
if (server.masterhost && server.master &&
!(server.master->flags & CLIENT_PRE_PSYNC)) {
addReplyArrayLen(c,3);
addReplyBulkCString(c,"REPLCONF");
addReplyBulkCString(c,"ACK");
addReplyBulkLongLong(c,c->reploff);
}
// master 定时向 slave 发送 PING 命令
if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
listLength(server.slaves))
{
robj *ping_argv[1];
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1);
decrRefCount(ping_argv[0]);
}
}
master 在调用call
函数执行客户端传过来的命令时,会将命令传播给 slave 并同时写入积压缓冲:
void call(client *c, int flags) {
// ...
if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
// 当前命令是否需要传播
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
}
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
// ...
if (flags & PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
// 如果当前节点没有 slave 节点或复制积压缓冲,立即返回
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
// 向 repl_backlog 中批量写入命令
if (server.repl_backlog) {
char aux[LONG_STR_SIZE+3]; // 命令缓冲,用于序列化 redis 命令
/* 写入当前批次的命令数量 */
aux[0] = '*';
len = ll2string(aux+1,sizeof(aux)-1,argc);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
/* 逐个遍历命令,将其序列化后写入 repl_backlog */
for (j = 0; j < argc; j++) {
long objlen = stringObjectLen(argv[j]);
aux[0] = '$';
len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
feedReplicationBacklogWithObject(argv[j]);
feedReplicationBacklog(aux+len+1,2);
}
}
// 将命令批量传播给所有 slaves 对应的 client
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
/* 写入当前批次的命令数量 */
addReplyArrayLen(slave,argc);
/* 逐个遍历命令,将传播给 slave 节点 */
for (j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]);
}
}
相关参数
slave-serve-stale-data
主从节点断开时或同步未完成时,slave 如何响应客户端请求
- yes:正常响应命令,但是不保证数据质量
- no:拒绝响应命令,返回 SYNC with master in progress
repl-diskless-sync
执行全量同步时,master 如何将 RDB 快照传输给 slave
- no:先在磁盘生成 RDB 文件再进行传输(低带宽网络)
- yes:直接将 RDB 快照写入 slave 的 socket(低速磁盘 + 高带宽网络)
repl-ping-slave-period
master 向 slave 发送 PING
心跳的间隔,默认 10s 发送一次
repl-backlog-size
同步积压缓冲的空间,默认值大小为 1mb。
由于主从连接断开后,所有的命令都会积压在这里,如果该值太小会导致 PSYNC
命令会无法执行部分同步。
如果 master 需要执行大量写命令,或者 slave 需要较长时间才能重连成功,则需要根据实际情况进行估算。
min-slaves-to-write & min-slaves-max-lag
则当不满足下列条件时,master 会拒绝写命令直至恢复:
- 连接当前 master 的 slave 数量大于等于 min-slaves-to-write 个节点连接正常
- 连接正常的 slave 节点中不少于 min-slaves-to-write 个节点的延迟时间小于 *min-slaves-max-lag 秒
启用这两个选项后,写命令大概率能够被复制到 min-slaves-to-write 个从节点中,减少了命令丢失的概率。
至此,对 redis 的主从同步分析完毕,后续将对 redis 的一些其他细节进行分享,感谢观看。