[从源码学设计]蚂蚁金服SOFARegistry之服务注册和操作日志
目录
- [从源码学设计]蚂蚁金服SOFARegistry之服务注册和操作日志
- 0x00 摘要
- 0x01 整体业务流程
- 1.1 服务注册过程
- 1.2 数据分片
- 0x02 基础数据结构
- 2.1 Publisher
- 2.2 Datum
- 2.3 DatumCache
- 2.4 Operator
- 2.5 Acceptor
- 2.6 总结
- 0x03 Datum的来龙去脉
- 3.1 Session Server 内部
- 3.2 PublishDataHandler
- 3.3 DataChangeEventCenter
- 3.4 DataChangeEventQueue
- 3.5 DataChangeHandler
- 0x04 DataChangeHandler处理
- 0x05 AbstractAcceptorStore存储
- 5.1 Bean
- 5.2 StoreServiceFactory
- 5.3 AbstractAcceptorStore
- 5.4 加入
- 5.5 使用
- 5.5.1 Scheduler
- 5.5.2 changeDataCheck
- 5.5.2.1 通知NotifyDataSyncRequest
- 5.5.3 checkAcceptorsChangAndExpired
- 0x06 Acceptor日志操作
- 6.1 appendOperator
- 6.2 checkExpired
- 0x07 NotifyDataSyncRequest通知数据同步
- 7.1 NotifyDataSyncHandler
- 7.1.1 doHandle
- 7.1.2 executorRequest
- 7.1.3 GetSyncDataHandler
- 7.1.4 SyncDataCallback
- 7.1 NotifyDataSyncHandler
- 0x08 SyncDataRequest回送通知
- 8.1 SyncDataRequest
- 8.1.1 SyncDataRequest 从哪里来
- 8.2 syncDataHandler
- 8.3 SyncDataServiceImpl
- 8.4 AbstractAcceptorStore
- 8.5 Acceptor
- 8.1 SyncDataRequest
- 0x09 SyncDataCallback接受者回调
- 9.1 DataChangeHandler
- 0x10 总结
- 0xFF 参考
0x00 摘要
SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。
本文为第十四篇,介绍SOFARegistry服务上线和操作日志。上文是从Session Server角度,本文从 Data Server 角度介绍。
0x01 整体业务流程
我们首先回顾总体业务流程,这部分属于数据分片。
1.1 服务注册过程
回顾下“一次服务注册过程”的服务数据在内部流转过程。
- Client 调用 publisher.register 向 SessionServer 注册服务。
- SessionServer 收到服务数据 (PublisherRegister) 后,将其写入内存 (SessionServer 会存储 Client 的数据到内存,用于后续可以跟 DataServer 做定期检查),再根据 dataInfoId 的一致性 Hash 寻找对应的 DataServer,将 PublisherRegister 发给 DataServer。
- DataServer 接收到 PublisherRegister 数据,首先也是将数据写入内存 ,DataServer 会以 dataInfoId 的维度汇总所有 PublisherRegister。同时,DataServer 将该 dataInfoId 的变更事件通知给所有 SessionServer,变更事件的内容是 dataInfoId 和版本号信息 version。
- 同时,异步地,DataServer 以 dataInfoId 维度增量地同步数据给其他副本。因为 DataServer 在一致性 Hash 分片的基础上,对每个分片保存了多个副本(默认是3个副本)。
- SessionServer 接收到变更事件通知后,对比 SessionServer 内存中存储的 dataInfoId 的 version,若发现比 DataServer 发过来的小,则主动向 DataServer 获取 dataInfoId 的完整数据,即包含了所有该 dataInfoId 具体的 PublisherRegister 列表。
- 最后,SessionServer 将数据推送给相应的 Client,Client 就接收到这一次服务注册之后的最新的服务列表数据。
因为篇幅所限,上文讨论的是前两点,本文介绍第三,第四点。
1.2 数据分片
当服务上线时,会计算新增服务的 dataInfoId Hash 值,从而对该服务进行分片,最后寻找最近的一个节点,存储到相应的节点上。
DataServer 服务在启动时添加了 publishDataProcessor 来处理相应的服务发布者数据发布请求,该 publishDataProcessor 就是 PublishDataHandler。当有新的服务发布者上线,DataServer 的 PublishDataHandler 将会被触发。
该 Handler 首先会判断当前节点的状态,若是非工作状态则返回请求失败。若是工作状态,则触发数据变化事件中心 DataChangeEventCenter 的 onChange 方法。
DataChangeEventQueue 中维护着一个 DataChangeEventQueue 队列数组,数组中的每个元素是一个事件队列。当上文中的 onChange 方法被触发时,会计算该变化服务的 dataInfoId 的 Hash 值,从而进一步确定出该服务注册数据所在的队列编号,进而把该变化的数据封装成一个数据变化对象,传入到队列中。
DataChangeEventQueue#start 方法在 DataChangeEventCenter 初始化的时候被一个新的线程调用,该方法会源源不断地从队列中获取新增事件,并且进行分发。新增数据会由此添加进节点内,实现分片。
与此同时,DataChangeHandler 会把这个事件变更信息通过 ChangeNotifier 对外发布,通知其他节点进行数据同步。
0x02 基础数据结构
这里需要首先讲解几个相关数据结构。
2.1 Publisher
Publisher是数据发布者信息。
public class Publisher extends BaseInfo {
private List<ServerDataBox> dataList;
private PublishType publishType = PublishType.NORMAL;
}
2.2 Datum
是从SOFARegistry本身出发而汇集的数据发布者信息,里面核心是 :
- dataInfoId:服务唯一标识,由“<分组 group>
和
<租户 instanceId>构成,例如在 SOFARPC 的场景下,一个 dataInfoId 通常是
com.alipay.sofa.rpc.example.HelloService#@#SOFA#@#00001`,其中SOFA 是 group 名称,00001 是租户 id。group 和 instance 主要是方便对服务数据做逻辑上的切分,使不同 group 和 instance 的服务数据在逻辑上完全独立。模型里有 group 和 instanceId 字段,但这里不额外列出来,读者只要理解 dataInfoId 的含义即可; - dataCenter:一个物理机房,包含多个逻辑单元(zone)。zone:是一种单元化架构下的概念,代表一个机房内的逻辑单元。在服务发现场景下,发布服务时需指定逻辑单元(zone),而订阅服务者可以订阅逻辑单元(zone)维度的服务数据,也可以订阅物理机房(datacenter)维度的服务数据,即订阅该 datacenter 下的所有 zone 的服务数据。;
- pubMap:包括的Publisher;
- version:对应的版本
具体代码如下:
public class Datum implements Serializable {
private String dataInfoId;
private String dataCenter;
private String dataId;
private String instanceId;
private String group;
private Map<String/*registerId*/, Publisher> pubMap = new ConcurrentHashMap<>();
private long version;
private boolean containsUnPub = false;
}
2.3 DatumCache
DatumCache 是最新的Datum。
public class DatumCache {
@Autowired
private DatumStorage localDatumStorage;
}
具体存储是在LocalDatumStorage中完成。
public class LocalDatumStorage implements DatumStorage {
/**
* row: dataCenter
* column: dataInfoId
* value: datum
*/
protected final Map<String, Map<String, Datum>> DATUM_MAP = new ConcurrentHashMap<>();
/**
* all datum index
*
* row: ip:port
* column: registerId
* value: publisher
*/
protected final Map<String, Map<String, Publisher>> ALL_CONNECT_ID_INDEX = new ConcurrentHashMap<>();
@Autowired
private DataServerConfig dataServerConfig;
}
2.4 Operator
Operator 是每一步Datum对应的操作。
public class Operator {
private Long version;
private Long sourceVersion;
private Datum datum;
private DataSourceTypeEnum sourceType;
}
2.5 Acceptor
记录了所有的Datum操作。其中:
- logOperatorsOrder记录了操作的顺序;
- logOperators是所有的操作;
public class Acceptor {
private final String dataInfoId;
private final String dataCenter;
private int maxBufferSize;
static final int DEFAULT_DURATION_SECS = 30;
private final Deque<Long/*version*/> logOperatorsOrder = new ConcurrentLinkedDeque<>();
private Map<Long/*version*/, Operator> logOperators = new ConcurrentHashMap<>();
private final DatumCache datumCache;
}
2.6 总结
总结下这几个数据结构的联系:
- Publisher是数据发布者信息。
- Datum是从SOFARegistry本身出发而汇集的数据发布者信息。
- DatumCache 是最新的Datum。
- Operator 是每一步Datum对应的操作。
- Acceptor记录了所有的Datum操作。
0x03 Datum的来龙去脉
我们先回顾下 Datum 的来龙去脉。
3.1 Session Server 内部
首先,我们讲讲Session Server 内部如何获取Datum
在 Session Server 内部,Datum存储在 SessionCacheService 之中。
比如在 DataChangeFetchCloudTask 内部,可以这样获取 Datum。
private Map<String, Datum> getDatumsCache() {
Map<String, Datum> map = new HashMap<>();
NodeManager nodeManager = NodeManagerFactory.getNodeManager(NodeType.META);
Collection<String> dataCenters = nodeManager.getDataCenters();
if (dataCenters != null) {
Collection<Key> keys = dataCenters.stream().
map(dataCenter -> new Key(KeyType.OBJ, DatumKey.class.getName(),
new DatumKey(fetchDataInfoId, dataCenter))).
collect(Collectors.toList());
Map<Key, Value> values = null;
values = sessionCacheService.getValues(keys);
if (values != null) {
values.forEach((key, value) -> {
if (value != null && value.getPayload() != null) {
map.put(((DatumKey) key.getEntityType()).getDataCenter(), (Datum) value.getPayload());
}
});
}
}
return map;
}
Session Server 会向 Data Server 发送 PublishDataRequest 请求。
3.2 PublishDataHandler
在DataServer内部,PublishDataHandler 是用来处理 PublishDataRequest。
public class PublishDataHandler extends AbstractServerHandler<PublishDataRequest> {
@Autowired
private ForwardService forwardService;
@Autowired
private SessionServerConnectionFactory sessionServerConnectionFactory;
@Autowired
private DataChangeEventCenter dataChangeEventCenter;
@Autowired
private DataServerConfig dataServerConfig;
@Autowired
private DatumLeaseManager datumLeaseManager;
@Autowired
private ThreadPoolExecutor publishProcessorExecutor;
@Override
public Object doHandle(Channel channel, PublishDataRequest request) {
Publisher publisher = Publisher.internPublisher(request.getPublisher());
if (forwardService.needForward()) {
CommonResponse response = new CommonResponse();
response.setSuccess(false);
response.setMessage("Request refused, Server status is not working");
return response;
}
dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter());
if (publisher.getPublishType() != PublishType.TEMPORARY) {
String connectId = WordCache.getInstance().getWordCache(
publisher.getSourceAddress().getAddressString());
sessionServerConnectionFactory.registerConnectId(request.getSessionServerProcessId(),
connectId);
// record the renew timestamp
datumLeaseManager.renew(connectId);
}
return CommonResponse.buildSuccessResponse();
}
}
3.3 DataChangeEventCenter
在 DataChangeEventCenter 的 onChange 函数中,会进行投放。
public void onChange(Publisher publisher, String dataCenter) {
int idx = hash(publisher.getDataInfoId());
Datum datum = new Datum(publisher, dataCenter);
if (publisher instanceof UnPublisher) {
datum.setContainsUnPub(true);
}
if (publisher.getPublishType() != PublishType.TEMPORARY) {
dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
DataSourceTypeEnum.PUB, datum));
} else {
dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
DataSourceTypeEnum.PUB_TEMP, datum));
}
}
3.4 DataChangeEventQueue
在DataChangeEventQueue之中,会调用 handleDatum 来处理。在这里对Datum进行存储。
3.5 DataChangeHandler
在 DataChangeHandler 之中,会提取ChangeData,然后进行Notify。
public void start() {
DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();
int queueCount = queues.length;
Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
Executor notifyExecutor = ExecutorFactory
.newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName());
for (int idx = 0; idx < queueCount; idx++) {
final DataChangeEventQueue dataChangeEventQueue = queues[idx];
final String name = dataChangeEventQueue.getName();
executor.execute(() -> {
while (true) {
final ChangeData changeData = dataChangeEventQueue.take();
notifyExecutor.execute(new ChangeNotifier(changeData, name));
}
});
}
}
具体如下:
+
Session Server | Data Server
|
|
|
|
+--------------------------+ PublishDataRequest +--------------------+
| DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler |
+-----------+--------------+ | +------+-------------+
^ | |
| getValues | | onChange(Publisher)
| | v
| | +--------+--------------+
+---------+----------+ | | DataChangeEventCenter |
|sessionCacheService | | +--------+--------------+
+--------------------+ | |
| | Datum
| |
| v
| +--------+-------------+
| | DataChangeEventQueue |
| +--------+-------------+
| |
| |
| | ChangeData
| v
| +-------+-----------+
| | DataChangeHandler |
+ +-------------------+
0x04 DataChangeHandler处理
于是我们接着进行 DataChangeHandler 处理。即总述中提到的:DataChangeHandler 会把这个事件变更信息:
- 把这个事件变更信息变成Operator,放到AbstractAcceptorStore;
- 通过 ChangeNotifier 对外发布,通知其他节点进行数据同步;
下面我们从第一部分 :把这个事件变更信息变成Operator,放到AbstractAcceptorStore 出发,进行讲解日志操作。
即如图所示:
+
Session Server | Data Server
|
|
|
+
+--------------------------+ PublishDataRequest +--------------------+
| DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler |
+-----------+--------------+ | +------+-------------+
^ | |
| getValues | | onChange(Publisher)
| | v
| | +--------+--------------+
+---------+----------+ | | DataChangeEventCenter |
|sessionCacheService | | +--------+--------------+
+--------------------+ | |
| | Datum
| |
| v
| +--------+-------------+
| | DataChangeEventQueue |
| +--------+-------------+
| |
| |
| | ChangeData
| v
| +-------+-----------+
| | DataChangeHandler |
| +-------+-----------+
| |
| |
| v
| +-------+---------+
| | ChangeNotifier |
| +-------+---------+
| |
| |
| v
| +----------+------------+
| | AbstractAcceptorStore |
| +-----------------------+
+
Acceptor的appendOperator谁来调用?在Notifier 里面有,比如:
public class BackUpNotifier implements IDataChangeNotifier {
@Autowired
private SyncDataService syncDataService;
@Override
public void notify(Datum datum, Long lastVersion) {
syncDataService.appendOperator(new Operator(datum.getVersion(), lastVersion, datum,
DataSourceTypeEnum.BACKUP));
}
}
以及另一个:
public class SnapshotBackUpNotifier implements IDataChangeNotifier {
@Autowired
private SyncDataService syncDataService;
@Override
public void notify(Datum datum, Long lastVersion) {
syncDataService.appendOperator(new SnapshotOperator(datum.getVersion(), lastVersion, datum,
DataSourceTypeEnum.BACKUP));
}
}
0x05 AbstractAcceptorStore存储
AbstractAcceptorStore是日志存储,我们下面详细分析。
5.1 Bean
对于操作信息,提供了一个Bean来存储。
@Bean
public AcceptorStore localAcceptorStore() {
return new LocalAcceptorStore();
}
5.2 StoreServiceFactory
作用是在 storeServiceMap 中存放各种 AcceptorStore,目前只有LocalAcceptorStore 这一个。
public class StoreServiceFactory implements ApplicationContextAware {
private static Map<String/*supportType*/, AcceptorStore> storeServiceMap = new HashMap<>();
/**
* get AcceptorStore by storeType
* @param storeType
* @return
*/
public static AcceptorStore getStoreService(String storeType) {
return storeServiceMap.get(storeType);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, AcceptorStore> map = applicationContext.getBeansOfType(AcceptorStore.class);
map.forEach((key, value) -> storeServiceMap.put(value.getType(), value));
}
}
5.3 AbstractAcceptorStore
AbstractAcceptorStore 是存储的基本实现类,几个基本成员是。
-
acceptors :是一个矩阵,按照dataCenter,dataInfoId维度来分类,存储了此维度下的Acceptor;就是说,针对每一个dataCenter,dataInfoId的组合,都有一个Acceptor,用来存储这下面的Operator。
-
notifyAcceptorsCache :是一个矩阵,按照dataCente,dataInfoId维度来分类,缓存了此维度下需要进一步处理的Acceptor;
-
delayQueue :配合notifyAcceptorsCache使用,针对notifyAcceptorsCache的每一个新acceptor,系统会添加一个消息进入queue,这个queue等延时到了,就会取出,并且从notifyAcceptorsCache取出对应的新acceptor进行相应处理;
按说应该是 cache 有东西,所以dequeue 时候就会取出来,但是如果这期间多放入了几个进入 Cache,原有cache 的 value 只是被替换而已,等时间到了,也会取出来。
notifyAcceptorsCache 也是按照 data center 来控制的,只有定期 removeCache。
public abstract class AbstractAcceptorStore implements AcceptorStore {
private static final int DEFAULT_MAX_BUFFER_SIZE = 30;
@Autowired
protected IMetaServerService metaServerService;
@Autowired
private Exchange boltExchange;
@Autowired
private DataServerConfig dataServerConfig;
@Autowired
private DataServerConnectionFactory dataServerConnectionFactory;
@Autowired
private DatumCache datumCache;
private Map<String/*dataCenter*/, Map<String/*dataInfoId*/, Acceptor>> acceptors = new ConcurrentHashMap<>();
private Map<String/*dataCenter*/, Map<String/*dataInfoId*/, Acceptor>> notifyAcceptorsCache = new ConcurrentHashMap<>();
private DelayQueue<DelayItem<Acceptor>> delayQueue
}
具体如下图:
+-----------------------------+ +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
|[AbstractAcceptorStore] | |
| | +-> dataCenter +---+
| | | |
| acceptors +--------------->+ +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
| | |
| notifyAcceptorsCache | | +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
| + | +-> dataCenter +-->+
+-----------------------------+ |
| +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
|
|
| +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
| +-> dataCenter +-->+
| | +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
+-------------------->+
| +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
+-> dataCenter +---+
+--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
手机如图:
有一点需要说明,就是delayQueue 为何要延迟队列。这是由于SOFA的“秒级服务上下线通知“特性造成的。
因为要实现此特性,所以涉及到了一个连接敏感性问题,即在 SOFARegistry 里,所有 Client 都与 SessionServer 保持长连接,每条长连接都会有基于 bolt 的连接心跳,如果连接断开,Client 会马上重新建连,时刻保证 Client 与 SessionServer 之间有可靠的连接。
因为强烈的连接敏感性,所以导致如果只是网络问题导致连接断开,实际的进程并没有宕机,那么 Client 会马上重连 SessionServer 并重新注册所有服务数据。这种大量的短暂的服务下线后又重新上线会给用户带来困扰和麻烦。
因此在 DataServer 内部实现了数据延迟合并的功能,就是这里的DelayQueue。
5.4 加入
addOperator的基本逻辑是:
- 从Operator的Datum中提取dataCenter和dataInfoId;
- 从acceptors取出dataCenter对应的Map<dataInfoId, Acceptor> acceptorMap;
- 从acceptorMap中提取dataInfoId对应的existAcceptor;
- 如果新operator是SnapshotOperator类型,则清除之前的 opeator queue。
- 否则加入新operator;
- 使用putCache(existAcceptor);把目前的Acceptor加入Cache,定时任务会处理;
在操作中,都是使用putIfAbsent,这样短期内若有多个同样value插入,则不会替换原有的value,这样 起到了归并作用。
@Override
public void addOperator(Operator operator) {
Datum datum = operator.getDatum();
String dataCenter = datum.getDataCenter();
String dataInfoId = datum.getDataInfoId();
try {
Map<String/*dataInfoId*/, Acceptor> acceptorMap = acceptors.get(dataCenter);
if (acceptorMap == null) {
Map<String/*dataInfoId*/, Acceptor> newMap = new ConcurrentHashMap<>();
acceptorMap = acceptors.putIfAbsent(dataCenter, newMap);
if (acceptorMap == null) {
acceptorMap = newMap;
}
}
Acceptor existAcceptor = acceptorMap.get(dataInfoId);
if (existAcceptor == null) {
Acceptor newAcceptor = new Acceptor(DEFAULT_MAX_BUFFER_SIZE, dataInfoId,
dataCenter, datumCache);
existAcceptor = acceptorMap.putIfAbsent(dataInfoId, newAcceptor);
if (existAcceptor == null) {
existAcceptor = newAcceptor;
}
}
if (operator instanceof SnapshotOperator) {
//snapshot: clear the queue, Make other data retrieve the latest memory data
existAcceptor.clearBefore();
} else {
existAcceptor.appendOperator(operator);
}
//put cache
putCache(existAcceptor);
}
}
putCache的作用是:
- 从acceptor中提取dataCenter和dataInfoId;
- 从notifyAcceptorsCache中取出dataCenter对应的Map<dataInfoId, Acceptor> acceptorMap;
- 向acceptorMap中放入dataInfoId对应的acceptor;
- 如果acceptorMap中之前没有对应的value,则把acceptor放入delayQueue;
这里也使用putIfAbsent,这样短期内若有多个同样value插入,则不会替换原有的value,这样 起到了归并作用。
private void putCache(Acceptor acceptor) {
String dataCenter = acceptor.getDataCenter();
String dataInfoId = acceptor.getDataInfoId();
try {
Map<String/*dataInfoId*/, Acceptor> acceptorMap = notifyAcceptorsCache.get(dataCenter);
if (acceptorMap == null) {
Map<String/*dataInfoId*/, Acceptor> newMap = new ConcurrentHashMap<>();
acceptorMap = notifyAcceptorsCache.putIfAbsent(dataCenter, newMap);
if (acceptorMap == null) {
acceptorMap = newMap;
}
}
Acceptor existAcceptor = acceptorMap.putIfAbsent(dataInfoId, acceptor);
if (existAcceptor == null) {
addQueue(acceptor);
}
}
}
5.5 使用
具体消费是在定期任务中完成。消费日志的目的就是同步日志操作给其他 DataServer。
5.5.1 Scheduler
Scheduler类是定期任务,会启动两个线程池定期调用AcceptorStore的函数。
public class Scheduler {
private final ScheduledExecutorService scheduler;
public final ExecutorService versionCheckExecutor;
private final ThreadPoolExecutor expireCheckExecutor;
@Autowired
private AcceptorStore localAcceptorStore;
public Scheduler() {
scheduler = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("SyncDataScheduler"));
expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck"));
versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new NamedThreadFactory(
"SyncDataScheduler-versionChangeCheck"));
}
public void startScheduler() {
scheduler.schedule(
new TimedSupervisorTask("FetchDataLocal", scheduler, expireCheckExecutor, 3,
TimeUnit.SECONDS, 10, () -> localAcceptorStore.checkAcceptorsChangAndExpired()),
30, TimeUnit.SECONDS);
versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck());
}
}
AbstractAcceptorStore中函数如下:
5.5.2 changeDataCheck
changeDataCheck 内部是一个while true,所以不需要再使用线程池。
changeDataCheck绑定在delayQueue上,如果有新消息,则会取出Acceptor,也从notifyAcceptorsCache取出Acceptor,调用notifyChange(acceptor);进行处理 。
@Override
public void changeDataCheck() {
while (true) {
try {
DelayItem<Acceptor> delayItem = delayQueue.take();
Acceptor acceptor = delayItem.getItem();
removeCache(acceptor); // compare and remove
} catch (InterruptedException e) {
break;
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
}
}
}
消费Cache用到的是removeCache。
private void removeCache(Acceptor acceptor) {
String dataCenter = acceptor.getDataCenter();
String dataInfoId = acceptor.getDataInfoId();
try {
Map<String/*dataInfoId*/, Acceptor> acceptorMap = notifyAcceptorsCache.get(dataCenter);
if (acceptorMap != null) {
boolean result = acceptorMap.remove(dataInfoId, acceptor);
if (result) {
//data change notify
notifyChange(acceptor);
}
}
}
}
5.5.2.1 通知NotifyDataSyncRequest
在removeCache中,也使用notifyChange进行了通知,逻辑如下:
- 从acceptor中提取 DataInfoId;
- 根据DataInfoId从meta service中获取dataServerNodes的ip;
- 遍历ip,通过bolt server进行通知syncServer.sendSync,就是给ip对应的data center发送 NotifyDataSyncRequest;
private void notifyChange(Acceptor acceptor) {
Long lastVersion = acceptor.getLastVersion();
NotifyDataSyncRequest request = new NotifyDataSyncRequest(acceptor.getDataInfoId(),
acceptor.getDataCenter(), lastVersion, getType());
List<String> targetDataIps = getTargetDataIp(acceptor.getDataInfoId());
for (String targetDataIp : targetDataIps) {
if (DataServerConfig.IP.equals(targetDataIp)) {
continue;
}
Server syncServer = boltExchange.getServer(dataServerConfig.getSyncDataPort());
for (int tryCount = 0; tryCount < dataServerConfig.getDataSyncNotifyRetry(); tryCount++) {
try {
Connection connection = dataServerConnectionFactory.getConnection(targetDataIp);
if (connection == null) {
TimeUtil.randomDelay(1000);
continue;
}
syncServer.sendSync(syncServer.getChannel(connection.getRemoteAddress()),
request, 1000);
break;
}
}
}
}
这部分的调用逻辑为:versionCheckExecutor.execute ------- localAcceptorStore.changeDataChheck ------ removeCache ----- notifyChange ------ NotifyDataSyncRequest
。
具体如下图:
+--------------------------+
| | +----------------------------------------------------------------------+
| versionCheckExecutor | | [AbstractAcceptorStore] |
| | | |
+--------+-----------------+ | |
| | |
| | |
| | |
| | Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors |
| changeDataCheck | |
+---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
removeCache / notifyChange | |
+ +----------------------------------------------------------------------+
|
|
|
| NotifyDataSyncRequest
|
v
+------+-----------+
| Other DataServer |
+------------------+
手机如下图:
5.5.3 checkAcceptorsChangAndExpired
checkAcceptorsChangAndExpired作用是遍历acceptors每个acceptor,看看是否expired,进行处理。
@Override
public void checkAcceptorsChangAndExpired() {
acceptors.forEach((dataCenter, acceptorMap) -> {
if (acceptorMap != null && !acceptorMap.isEmpty()) {
acceptorMap.forEach((dataInfoId, acceptor) -> acceptor.checkExpired(0));
}
});
}
此时,逻辑如下:
+--------------------------+ +------------------------+
| | +----------------------------------------------------------------------+ | |
| versionCheckExecutor | | [AbstractAcceptorStore] | | expireCheckExecutor |
| | | | | |
+--------+-----------------+ | | +--------------+---------+
| | | |
| | | |
| | | |
| | Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors <---------------------------------+
| changeDataCheck | | checkAcceptorsChangAndExpired
+---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
removeCache / notifyChange | |
+ +----------------------------------------------------------------------+
|
|
|
| NotifyDataSyncRequest
|
v
+------+-----------+
| Other DataServer |
+------------------+
手机如下:
0x06 Acceptor日志操作
这里记录了日志,即记录了所有的Datum操作。
操作日志存储采用Queue方式,获取日志时候通过当前版本号在堆栈内所在位置,把所有版本之后的操作日志同步过来执行。
public class Acceptor {
private final String dataInfoId;
private final String dataCenter;
private int maxBufferSize;
static final int DEFAULT_DURATION_SECS = 30;
private final Deque<Long/*version*/> logOperatorsOrder = new ConcurrentLinkedDeque<>();
private Map<Long/*version*/, Operator> logOperators = new ConcurrentHashMap<>();
private final DatumCache datumCache;
}
关键变量是:
- logOperators:按照版本号为key存储的map,用来存储所有的Operator;
- logOperatorsOrder:因为map没有办法排序,所以设置此queue来存储版本号;
Operator 就是每一步操作对应的Datum。
public class Operator {
private Long version;
private Long sourceVersion;
private Datum datum;
private DataSourceTypeEnum sourceType;
}
6.1 appendOperator
此函数作用是:添加一个操作日志。
- 如果queue已经满了,则取出第一个消息,为了向后段插入一个新的 。
- 如果Operator版本号为空,则设置为0L;
- 如果Operator的前一个版本号与queue尾部Operator版本号不一致,说明queue里面不对了,需要清空map和queue。
- 向map中加入Operator;
- 如果是新版本的Operator,则把版本加入queue;
具体代码如下:
/**
* append operator to queue,if queue is full poll the first element and append.
* Process will check version sequence,it must append with a consequent increase in version,
* otherwise queue will be clean
*
* @param operator
*/
public void appendOperator(Operator operator) {
write.lock();
try {
if (isFull()) {
logOperators.remove(logOperatorsOrder.poll());
}
if (operator.getSourceVersion() == null) {
operator.setSourceVersion(0L);
}
Long tailVersion = logOperatorsOrder.peekLast();
if (tailVersion != null) {
//operation add not by solid sequence
if (tailVersion.longValue() != operator.getSourceVersion().longValue()) {
clearBefore();
}
}
Operator previousOperator = logOperators.put(operator.getVersion(), operator);
if (previousOperator == null) {
logOperatorsOrder.add(operator.getVersion());
}
} finally {
write.unlock();
}
}
appendOperator谁来调用?在Notifier 里面有,比如:
public class BackUpNotifier implements IDataChangeNotifier {
@Autowired
private SyncDataService syncDataService;
@Override
public void notify(Datum datum, Long lastVersion) {
syncDataService.appendOperator(new Operator(datum.getVersion(), lastVersion, datum,
DataSourceTypeEnum.BACKUP));
}
}
以及
public class SnapshotBackUpNotifier implements IDataChangeNotifier {
@Autowired
private SyncDataService syncDataService;
@Override
public void notify(Datum datum, Long lastVersion) {
syncDataService.appendOperator(new SnapshotOperator(datum.getVersion(), lastVersion, datum,
DataSourceTypeEnum.BACKUP));
}
}
6.2 checkExpired
此方法作用是去除过期日志。version是时间戳,所以可以定期check,如果过期,就清除。
public void checkExpired(int durationSEC) {
write.lock();
try {
//check all expired
Long peekVersion = logOperatorsOrder.peek();
if (peekVersion != null && isExpired(durationSEC, peekVersion)) {
logOperators.remove(logOperatorsOrder.poll());
checkExpired(durationSEC);
}
} finally {
write.unlock();
}
}
0x07 NotifyDataSyncRequest通知数据同步
此请求作用是通知接收端进行数据同步。
回忆下这部分的调用逻辑为:versionCheckExecutor.execute ------- localAcceptorStore.changeDataChheck ------ removeCache ----- notifyChange ------ NotifyDataSyncRequest
。
7.1 NotifyDataSyncHandler
接收端data server通过NotifyDataSyncHandler处理
public class NotifyDataSyncHandler extends AbstractClientHandler<NotifyDataSyncRequest> implements
AfterWorkingProcess {
@Autowired
private DataServerConfig dataServerConfig;
@Autowired
private GetSyncDataHandler getSyncDataHandler;
@Autowired
private DataChangeEventCenter dataChangeEventCenter;
private Executor executor = ExecutorFactory
.newFixedThreadPool(
10,
NotifyDataSyncHandler.class
.getSimpleName());
private ThreadPoolExecutor notifyExecutor;
@Autowired
private DataNodeStatus dataNodeStatus;
@Autowired
private DatumCache datumCache;
}
7.1.1 doHandle
doHandle方法用来继续处理。
@Override
public Object doHandle(Channel channel, NotifyDataSyncRequest request) {
final Connection connection = ((BoltChannel) channel).getConnection();
if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) {
noWorkQueue.add(new SyncDataRequestForWorking(connection, request));
return CommonResponse.buildSuccessResponse();
}
executorRequest(connection, request);
return CommonResponse.buildSuccessResponse();
}
7.1.2 executorRequest
因为接到了发起端DataServer的同步通知NotifyDataSyncRequest,所以接收端DataServer主动发起拉取,进行同步数据。即调用GetSyncDataHandler来发送SyncDataRequest
private void executorRequest(Connection connection, NotifyDataSyncRequest request) {
executor.execute(() -> {
fetchSyncData(connection, request);
});
}
protected void fetchSyncData(Connection connection, NotifyDataSyncRequest request) {
String dataInfoId = request.getDataInfoId();
String dataCenter = request.getDataCenter();
Datum datum = datumCache.get(dataCenter, dataInfoId);
Long version = (datum == null) ? null : datum.getVersion();
Long requestVersion = request.getVersion();
if (version == null || requestVersion == 0L || version < requestVersion) {
getSyncDataHandler.syncData(new SyncDataCallback(getSyncDataHandler, connection,
new SyncDataRequest(dataInfoId, dataCenter, version, request.getDataSourceType()),
dataChangeEventCenter));
}
}
7.1.3 GetSyncDataHandler
GetSyncDataHandler和SyncDataCallback配合。
即调用GetSyncDataHandler来发送SyncDataRequest,用SyncDataCallback接收同步结果。
├── remoting
│ ├── dataserver
│ │ ├── DataServerConnectionFactory.java
│ │ ├── DataServerNodeFactory.java
│ │ ├── GetSyncDataHandler.java
│ │ ├── SyncDataCallback.java
│ │ ├── handler
│ │ └── task
GetSyncDataHandler 和 SyncDataCallback 这两个辅助类的位置比较奇怪,大概因为是功能类,所以放在dataserver目录下,个人认为也许单独设置一个目录存放更好。
public class GetSyncDataHandler {
@Autowired
private DataNodeExchanger dataNodeExchanger;
public void syncData(SyncDataCallback callback) {
int tryCount = callback.getRetryCount();
if (tryCount > 0) {
try {
callback.setRetryCount(--tryCount);
dataNodeExchanger.request(new Request() {
@Override
public Object getRequestBody() {
return callback.getRequest();
}
@Override
public URL getRequestUrl() {
return new URL(callback.getConnection().getRemoteIP(), callback
.getConnection().getRemotePort());
}
@Override
public CallbackHandler getCallBackHandler() {
return new CallbackHandler() {
@Override
public void onCallback(Channel channel, Object message) {
callback.onResponse(message);
}
@Override
public void onException(Channel channel, Throwable exception) {
callback.onException(exception);
}
@Override
public Executor getExecutor() {
return callback.getExecutor();
}
};
}
});
}
}
}
}
7.1.4 SyncDataCallback
这里接收同步结果。
public class SyncDataCallback implements InvokeCallback {
private static final Executor EXECUTOR = ExecutorFactory.newFixedThreadPool(5,
SyncDataCallback.class.getSimpleName());
private static final int RETRY_COUNT = 3;
private Connection connection;
private SyncDataRequest request;
private GetSyncDataHandler getSyncDataHandler;
private int retryCount;
private DataChangeEventCenter dataChangeEventCenter;
@Override
public void onResponse(Object obj) {
GenericResponse<SyncData> response = (GenericResponse) obj;
if (!response.isSuccess()) {
getSyncDataHandler.syncData(this);
} else {
SyncData syncData = response.getData();
Collection<Datum> datums = syncData.getDatums();
DataSourceTypeEnum dataSourceTypeEnum = DataSourceTypeEnum.valueOf(request
.getDataSourceType());
if (syncData.getWholeDataTag()) {
//handle all data, replace cache with these datum directly
for (Datum datum : datums) {
if (datum == null) {
datum = new Datum();
datum.setDataInfoId(syncData.getDataInfoId());
datum.setDataCenter(syncData.getDataCenter());
}
Datum.internDatum(datum);
dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum);
break;
}
} else {
//handle incremental data one by one
if (!CollectionUtils.isEmpty(datums)) {
for (Datum datum : datums) {
if (datum != null) {
Datum.internDatum(datum);
dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE,
dataSourceTypeEnum, datum);
}
}
}
}
}
}
}
此时逻辑如下:
[Sender DataServer]
+--------------------------+ +------------------------+
| | +----------------------------------------------------------------------+ | |
| versionCheckExecutor | | [AbstractAcceptorStore] | | expireCheckExecutor |
| | | | | |
+--------+-----------------+ | | +--------------+---------+
| | | |
| | | |
| | | |
| | Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors <---------------------------------+
| changeDataCheck | | checkAcceptorsChangAndExpired
+---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
removeCache / notifyChange | |
+ +----------------------------------------------------------------------+
|
NotifyDataSyncRequest| 1 ^ 2
| |
+-------------------------------------------------------------------------------------------------------------------------------------------+
| | SyncDataRequest
v |
+-------+-----------------------------------+
|[Other DataServer] | |
| | |
| | |
| + |
| GetSyncDataHandler SyncDataCallback |
| |
| |
| |
| |
+-------------------------------------------+
手机如图:
0x08 SyncDataRequest回送通知
SyncDataRequest发送回通知发送者。所以这里是other DataServer 发送给 Sender DataServer。
8.1 SyncDataRequest
public class SyncDataRequest implements Serializable {
private String dataInfoId;
private String dataCenter;
private String dataSourceType;
/**
* be null when dataInfoId not exist in local datumCache
*/
private Long version;
}
8.1.1 SyncDataRequest 从哪里来
我们回忆下,SyncDataRequest 从哪里来?在 NotifyDataSyncHandler 的响应函数中,会产生 SyncDataRequest。这里会根据请求的信息,从cache之中获取infoId对应的version,然后发送请求。
public class NotifyDataSyncHandler extends AbstractClientHandler<NotifyDataSyncRequest> implements AfterWorkingProcess {
protected void fetchSyncData(Connection connection, NotifyDataSyncRequest request) {
String dataInfoId = request.getDataInfoId();
String dataCenter = request.getDataCenter();
Datum datum = datumCache.get(dataCenter, dataInfoId);
Long version = (datum == null) ? null : datum.getVersion();
Long requestVersion = request.getVersion();
if (version == null || requestVersion == 0L || version < requestVersion) {
getSyncDataHandler.syncData(new SyncDataCallback(getSyncDataHandler, connection,
new SyncDataRequest(dataInfoId, dataCenter, version, request.getDataSourceType()),
dataChangeEventCenter));
}
}
}
进而在AbstractAcceptorStore之中
private void notifyChange(Acceptor acceptor) {
Long lastVersion = acceptor.getLastVersion();
//may be delete by expired
if (lastVersion == null) {
lastVersion = 0L;
}
NotifyDataSyncRequest request = new NotifyDataSyncRequest(acceptor.getDataInfoId(),
acceptor.getDataCenter(), lastVersion, getType());
syncServer.sendSync(syncServer.getChannel(connection.getRemoteAddress()),
request, 1000);
}
8.2 syncDataHandler
通知发起者使用 SyncDataHandler 来处理。
- syncDataHandler
节点间数据同步 Handler,该 Handler 被触发时,会通过版本号进行比对,若当前 DataServer 所存储数据版本号含有当前请求版本号,则会返回所有大于当前请求数据版本号的所有数据,便于节点间进行数据同步。
public class SyncDataHandler extends AbstractServerHandler<SyncDataRequest> {
@Autowired
private SyncDataService syncDataService;
@Override
public Object doHandle(Channel channel, SyncDataRequest request) {
SyncData syncData = syncDataService.getSyncDataChange(request);
return new GenericResponse<SyncData>().fillSucceed(syncData);
}
@Override
public HandlerType getType() {
return HandlerType.PROCESSER;
}
@Override
public Class interest() {
return SyncDataRequest.class;
}
@Override
protected Node.NodeType getConnectNodeType() {
return Node.NodeType.DATA;
}
}
8.3 SyncDataServiceImpl
具体业务服务是SyncDataServiceImpl。会从acceptorStore获取data,即getSyncDataChange方法。
public class SyncDataServiceImpl implements SyncDataService {
@Override
public void appendOperator(Operator operator) {
AcceptorStore acceptorStore = StoreServiceFactory.getStoreService(operator.getSourceType()
.toString());
if (acceptorStore != null) {
acceptorStore.addOperator(operator);
}
}
@Override
public SyncData getSyncDataChange(SyncDataRequest syncDataRequest) {
AcceptorStore acceptorStore = StoreServiceFactory.getStoreService(syncDataRequest
.getDataSourceType());
if (acceptorStore != null) {
return acceptorStore.getSyncData(syncDataRequest);
}
}
}
关于appendOperator如何调用,前文有描述。
SyncDataServiceImpl会继续调用到AbstractAcceptorStore。
8.4 AbstractAcceptorStore
根据dataCenter和dataInfoId获取出Acceptor,然后返回其process后的数据。
@Override
public SyncData getSyncData(SyncDataRequest syncDataRequest) {
String dataCenter = syncDataRequest.getDataCenter();
String dataInfoId = syncDataRequest.getDataInfoId();
Long currentVersion = syncDataRequest.getVersion();
try {
Map<String/*dataInfoId*/, Acceptor> acceptorMap = acceptors.get(dataCenter);
Acceptor existAcceptor = acceptorMap.get(dataInfoId);
return existAcceptor.process(currentVersion);
}
}
8.5 Acceptor
然后是Acceptor的处理。
处理发送数据的当前版本号,如果当前版本号存在于当前queue中,返回所有版本号大于当前版本号的Operator,否则所有Operator。
public SyncData process(Long currentVersion) {
read.lock();
try {
Collection<Operator> operators = acceptOperator(currentVersion);
List<Datum> retList = new LinkedList<>();
SyncData syncData;
boolean wholeDataTag = false;
if (operators != null) {
//first get all data
if (operators.isEmpty()) {
wholeDataTag = true;
retList.add(datumCache.get(dataCenter, dataInfoId));
} else {
for (Operator operator : operators) {
retList.add(operator.getDatum());
}
}
syncData = new SyncData(dataInfoId, dataCenter, wholeDataTag, retList);
} else {
//no match get all data
wholeDataTag = true;
retList.add(datumCache.get(dataCenter, dataInfoId));
syncData = new SyncData(dataInfoId, dataCenter, wholeDataTag, retList);
}
return syncData;
} finally {
read.unlock();
}
}
同步数据结构如下:
public class SyncData implements Serializable {
private String dataInfoId;
private String dataCenter;
private Collection<Datum> datums;
private boolean wholeDataTag;
}
此时图示如下:
[Sender DataServer]
+--------------------------+ +------------------------+
| | +----------------------------------------------------------------------+ | |
| versionCheckExecutor | | [AbstractAcceptorStore] | | expireCheckExecutor |
| | | | | |
+--------+-----------------+ | | +--------------+---------+
| | | |
| | | |
| | | |
| | Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors <---------------------------------+
| changeDataCheck | | checkAcceptorsChangAndExpired
+---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
removeCache / notifyChange | |
+ +------------------------------------------------+-----+---------------+
| ^ |
NotifyDataSyncRequest| 1 +-----------------+ 3 +--------------------+ 4 | |
| | syncDataHandler +------> | SyncDataServiceImpl+------+ |
| +-----+-----------+ +--------------------+ |
| ^ 2 |
| | | 5
| | |
+-------------------------------------------------------------------------------------------------------------------------------------------+
| | SyncDataRequest |
v | |
+-------+-----------------------------------+ |
|[Other DataServer] | | |
| | | |
| | | |
| + | |
| GetSyncDataHandler SyncDataCallback | <---------------------------+
| |
| |
| |
| |
+-------------------------------------------+
手机如下:
0x09 SyncDataCallback接受者回调
回到接受者,遍历接受到的所有Datum,逐一调用:
如果是全部datum,调用
dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum);
否则调用
dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE,dataSourceTypeEnum, datum)
具体如下:
public class SyncDataCallback implements InvokeCallback {
private static final Executor EXECUTOR = ExecutorFactory.newFixedThreadPool(5,
SyncDataCallback.class.getSimpleName());
private static final int RETRY_COUNT = 3;
private Connection connection;
private SyncDataRequest request;
private GetSyncDataHandler getSyncDataHandler;
private int retryCount;
private DataChangeEventCenter dataChangeEventCenter;
@Override
public void onResponse(Object obj) {
GenericResponse<SyncData> response = (GenericResponse) obj;
if (!response.isSuccess()) {
getSyncDataHandler.syncData(this);
} else {
SyncData syncData = response.getData();
Collection<Datum> datums = syncData.getDatums();
DataSourceTypeEnum dataSourceTypeEnum = DataSourceTypeEnum.valueOf(request
.getDataSourceType());
if (syncData.getWholeDataTag()) {
//handle all data, replace cache with these datum directly
for (Datum datum : datums) {
if (datum == null) {
datum = new Datum();
datum.setDataInfoId(syncData.getDataInfoId());
datum.setDataCenter(syncData.getDataCenter());
}
Datum.internDatum(datum);
dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum);
break;
}
} else {
//handle incremental data one by one
if (!CollectionUtils.isEmpty(datums)) {
for (Datum datum : datums) {
if (datum != null) {
Datum.internDatum(datum);
dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE,
dataSourceTypeEnum, datum);
}
}
}
}
}
}
}
DataChangeEventCenter调用如下:
public void sync(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum datum) {
int idx = hash(datum.getDataInfoId());
DataChangeEvent event = new DataChangeEvent(changeType, sourceType, datum);
dataChangeEventQueues[idx].onChange(event);
}
DataChangeEventQueue调用handleDatum处理,这部分在其他文章中已经讲述。这里只是贴出代码。
@Override
public void run() {
if (changeData instanceof SnapshotData) {
SnapshotData snapshotData = (SnapshotData) changeData;
String dataInfoId = snapshotData.getDataInfoId();
Map<String, Publisher> toBeDeletedPubMap = snapshotData.getToBeDeletedPubMap();
Map<String, Publisher> snapshotPubMap = snapshotData.getSnapshotPubMap();
Datum oldDatum = datumCache.get(dataServerConfig.getLocalDataCenter(), dataInfoId);
long lastVersion = oldDatum != null ? oldDatum.getVersion() : 0l;
Datum datum = datumCache.putSnapshot(dataInfoId, toBeDeletedPubMap, snapshotPubMap);
long version = datum != null ? datum.getVersion() : 0l;
notify(datum, changeData.getSourceType(), null);
} else {
Datum datum = changeData.getDatum();
String dataCenter = datum.getDataCenter();
String dataInfoId = datum.getDataInfoId();
DataSourceTypeEnum sourceType = changeData.getSourceType();
DataChangeTypeEnum changeType = changeData.getChangeType();
if (changeType == DataChangeTypeEnum.MERGE
&& sourceType != DataSourceTypeEnum.BACKUP
&& sourceType != DataSourceTypeEnum.SYNC) {
//update version for pub or unPub merge to cache
//if the version product before merge to cache,it may be cause small version override big one
datum.updateVersion();
}
long version = datum.getVersion();
try {
if (sourceType == DataSourceTypeEnum.CLEAN) {
if (datumCache.cleanDatum(dataCenter, dataInfoId)) {
}
} else if (sourceType == DataSourceTypeEnum.PUB_TEMP) {
notifyTempPub(datum, sourceType, changeType);
} else {
MergeResult mergeResult = datumCache.putDatum(changeType, datum);
Long lastVersion = mergeResult.getLastVersion();
if (lastVersion != null
&& lastVersion.longValue() == LocalDatumStorage.ERROR_DATUM_VERSION) {
return;
}
//lastVersion null means first add datum
if (lastVersion == null || version != lastVersion) {
if (mergeResult.isChangeFlag()) {
notify(datum, sourceType, lastVersion);
}
}
}
}
}
}
9.1 DataChangeHandler
DataChangeHandler 会定期提取DataChangeEventCenter中的消息,然后进行处理。
ChangeNotifier存储了Datum。因为此时版本号已经更新,所以不会再次通知,至此流程结束。
MergeResult mergeResult = datumCache.putDatum(changeType, datum);
//lastVersion null means first add datum
if (lastVersion == null || version != lastVersion) {
if (mergeResult.isChangeFlag()) {
notify(datum, sourceType, lastVersion);
}
}
此时逻辑如下:
[Sender DataServer]
+--------------------------+ +------------------------+
| | +----------------------------------------------------------------------+ | |
| versionCheckExecutor | | [AbstractAcceptorStore] | | expireCheckExecutor |
| | | | | |
+--------+-----------------+ | | +--------------+---------+
| | | |
| | | |
| | | |
| | Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors <---------------------------------+
| changeDataCheck | | checkAcceptorsChangAndExpired
+---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
removeCache / notifyChange | |
+ +------------------------------------------------+-----+---------------+
| ^ |
NotifyDataSyncRequest| 1 +-----------------+ 3 +--------------------+ 4 | |
| | syncDataHandler +------> | SyncDataServiceImpl+------+ |
| +-----+-----------+ +--------------------+ |
| ^ 2 |
| | | 5
| | |
+-------------------------------------------------------------------------------------------------------------------------------------------+
| | SyncDataRequest |
[Other DataServer] | | |
| | |
| | |
| | +---------------------------------------+
| | |
| | |
v | v
+------+-----------++ +-----------+-------+ 6 +-----------------------+ 7 +--------------------+ 8 +-----------------+
| GetSyncDataHandler| | SyncDataCallback +-----> | DataChangeEventCenter | +--> |DataChangeEventQueue| +--> |DataChangeHandler|
+-------------------+ +-------------------+ +-----------------------+ +--------------------+ +-----------------+
手机上如下:
0x10 总结
回顾下“一次服务注册过程”的服务数据在内部流转过程。
- Client 调用 publisher.register 向 SessionServer 注册服务。
- SessionServer 收到服务数据 (PublisherRegister) 后,将其写入内存 (SessionServer 会存储 Client 的数据到内存,用于后续可以跟 DataServer 做定期检查),再根据 dataInfoId 的一致性 Hash 寻找对应的 DataServer,将 PublisherRegister 发给 DataServer。
- DataServer 接收到 PublisherRegister 数据,首先也是将数据写入内存 ,DataServer 会以 dataInfoId 的维度汇总所有 PublisherRegister。同时,DataServer 将该 dataInfoId 的变更事件通知给所有 SessionServer,变更事件的内容是 dataInfoId 和版本号信息 version。
- 同时,异步地,DataServer 以 dataInfoId 维度增量地同步数据给其他副本。因为 DataServer 在一致性 Hash 分片的基础上,对每个分片保存了多个副本(默认是3个副本)。
- SessionServer 接收到变更事件通知后,对比 SessionServer 内存中存储的 dataInfoId 的 version,若发现比 DataServer 发过来的小,则主动向 DataServer 获取 dataInfoId 的完整数据,即包含了所有该 dataInfoId 具体的 PublisherRegister 列表。
- 最后,SessionServer 将数据推送给相应的 Client,Client 就接收到这一次服务注册之后的最新的服务列表数据。
因为篇幅所限,上文讨论的是前两点,本文介绍第三,第四点。如果以后有时间,会介绍最后两点。
0xFF 参考
Eureka系列(六) TimedSupervisorTask类解析
Eureka的TimedSupervisorTask类(自动调节间隔的周期性任务)
java线程池ThreadPoolExecutor类使用详解
Java线程池ThreadPoolExecutor实现原理剖析
深入理解Java线程池:ThreadPoolExecutor
深入理解Java线程池:ThreadPoolExecutor
Java中线程池ThreadPoolExecutor原理探究
蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容
蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路
服务注册中心 Session 存储策略 | SOFARegistry 解析
海量数据下的注册中心 – SOFARegistry 架构介绍
服务注册中心数据分片和同步方案详解 | SOFARegistry 解析
蚂蚁金服开源通信框架SOFABolt解析之连接管理剖析
蚂蚁金服开源通信框架SOFABolt解析之超时控制机制及心跳机制
蚂蚁金服开源通信框架 SOFABolt 协议框架解析
蚂蚁金服服务注册中心数据一致性方案分析 | SOFARegistry 解析
蚂蚁通信框架实践
sofa-bolt 远程调用
sofa-bolt学习
SOFABolt 设计总结 – 优雅简洁的设计之道
SofaBolt源码分析-服务启动到消息处理
SOFABolt 源码分析
SOFABolt 源码分析9 – UserProcessor 自定义处理器的设计
SOFARegistry 介绍
SOFABolt 源码分析13 – Connection 事件处理机制的设计