[从源码学设计]蚂蚁金服SOFARegistry之配置信息
目录
- [从源码学设计]蚂蚁金服SOFARegistry之配置信息
- 0x00 摘要
- 0x01 业务范畴
- 1.1 配置作用
- 1.2 学习方向
- 0x02 数据结构
- 2.1 目录结构
- 2.2 数据结构定义
- 0x03 Meta Server 内流程
- 3.1 Admin请求响应
- 3.2 DBService
- 3.3 Bean
- 3.4 Listener
- 3.5 Task
- 3.6 服务
- 0x04 调用路径 in Data Server
- 4.1 Bean
- 4.2 网络交互
- 4.3 Handler 定义
- 4.4 调用 Handler
- 4.5 获取 ProvideData
- 0x05 回到了MetaServer
- 5.1 ata Server请求响应
- 5.2 Session Server对应处理
- 0x06 DataServer
- 6.1 处理 ProvideData
- 6.1.1 Bean
- 6.1.2 处理引擎 ProvideDataProcessorManager
- 6.1.3 处理Handler
- 6.1 处理 ProvideData
- 0xFF 参考
0x00 摘要
SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。
本文为第十七篇,介绍SOFARegistry网络操作之配置信息如何处理。
0x01 业务范畴
1.1 配置作用
比如某些系统相关的服务,需要由控制台来设定。所以Meta Server对控制台提供了接口,当Meta Server 接受到控制台请求后,会和 Data Server,Session Server 进行交互,比如Meta Server 提供了如下接口:
@Bean
@ConditionalOnMissingBean
public StopPushDataResource stopPushDataResource() {
return new StopPushDataResource();
}
@Bean
public BlacklistDataResource blacklistDataResource() {
return new BlacklistDataResource();
}
@Bean
public RenewSwitchResource renewSwitchResource() {
return new RenewSwitchResource();
}
对外提供http接口,是因为这是正常基本操作。但是Server之间依然是Bolt协议操作。
1.2 学习方向
此处推导如下:在DataServer端,如何把配置信息单独摘出来。
0x02 数据结构
2.1 目录结构
DataServer之中,配置相关目录如下,可以看到有Handler,服务,task以及provideData。
│ ├── metaserver
│ │ ├── DefaultMetaServiceImpl.java
│ │ ├── IMetaServerService.java
│ │ ├── MetaServerConnectionFactory.java
│ │ ├── handler
│ │ │ ├── NotifyProvideDataChangeHandler.java
│ │ │ ├── ServerChangeHandler.java
│ │ │ └── StatusConfirmHandler.java
│ │ ├── provideData
│ │ │ ├── ProvideDataProcessor.java
│ │ │ ├── ProvideDataProcessorManager.java
│ │ │ └── processor
│ │ │ └── DatumExpireProvideDataProcessor.java
│ │ └── task
2.2 数据结构定义
配置相关数据结构如下:
ProvideData是对外的交互接口,里面是版本号和服务标示dataInfoId。
public class ProvideData implements Serializable {
private ServerDataBox provideData;
private String dataInfoId;
private Long version;
}
ServerDataBox是具体业务,其定义如下
public class ServerDataBox implements Serializable {
/** Null for locally instantiated, otherwise for internalized */
private byte[] bytes;
/** Only available if bytes != null */
private int serialization;
/** Actual object, lazy deserialized */
private Object object;
}
关于ServerDataBox,目前在Data Server只有一处使用。使用的是boolean类型,也就是控制开关配置。
public void changeDataProcess(ProvideData provideData) {
boolean enableDataDatumExpire = Boolean.parseBoolean((String) provideData.getProvideData()
.getObject());
datumLeaseManager.setRenewEnable(enableDataDatumExpire);
}
0x03 Meta Server 内流程
这里为了打通流程,需要先提一下 meta server 内部与 metaServerService.fetchData(dataInfoId) 相关的流程。
处于解耦的目的,Meta Server 把某些业务功能分割成四个层次,基本逻辑是:
Http Resource ———> TaskListener ———> Task ————> Service
首先给出流程图如下,下文会逐步介绍流程:
+------------------------+
| | 2 +-------------------------+
| BlacklistDataResource +------>-+PersistenceDataDBService |
| | update +-------------------------+ 7
+-------+ 1 | | +---------------------------------------+
| Admin | +---> | +--------------------- | | Data Server |
+-------+ | |fireDataChangeNotify| | | |
| +--------------------+ | 6 | +-----------------------------------+ |
+------------------------+ | | metaClientHandlers | |
| +---------------------+ dataNodeExchanger.request | | +-------------------------------+ | |
| 3 | DataNodeServiceImpl | +----------------------------->+ | | notifyProvideDataChangeHandler| | |
| +----------+----------+ NotifyProvideDataChange | | +-------------------------------+ | |
| NotifyProvideDataChange ^ | | | |
| | | +-----------------------------------+ |
| 5 | notifyProvideDataChange +---------------------------------------+
v |
+---------+-----------------------------------+ |
| DefaultTaskListenerManager | |
| | +----+----------------------------+
| +-----------------------------------------+ | 4 | |
| | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask |
| | | | | |
| | receiveStatusConfirmNotifyTaskListener | | +---------------------------------+
| | | |
| | dataNodeChangePushTaskListener | |
| | | |
| | sessionNodeChangePushTaskListener | |
| +-----------------------------------------+ |
+---------------------------------------------+
手机图示如下 :
3.1 Admin请求响应
前面提到,Meta Server 通过 Http协议给Admin提供了一些控制接口,下面我们就以 BlacklistDataResource 为例研究下。
可以看到,blacklistPush 函数中会先存储在 persistenceDataDBService 中,然后 fireDataChangeNotify 间接发送 NotifyProvideDataChange。
@Path("blacklist")
public class BlacklistDataResource {
@RaftReference
private DBService persistenceDataDBService;
@Autowired
private TaskListenerManager taskListenerManager;
/**
* update blacklist
* e.g. curl -d '{"FORBIDDEN_PUB":{"IP_FULL":["1.1.1.1","10.15.233.150"]},"FORBIDDEN_SUB_BY_PREFIX":{"IP_FULL":["1.1.1.1"]}}' -H "Content-Type: application/json" -X POST http://localhost:9615/blacklist/update
*/
@POST
@Path("update")
@Produces(MediaType.APPLICATION_JSON)
public Result blacklistPush(String config) {
PersistenceData persistenceData = createDataInfo();
persistenceData.setData(config);
boolean ret = persistenceDataDBService.update(ValueConstants.BLACK_LIST_DATA_ID,
persistenceData);
fireDataChangeNotify(persistenceData.getVersion(), ValueConstants.BLACK_LIST_DATA_ID,
DataOperator.UPDATE);
Result result = new Result();
result.setSuccess(true);
return result;
}
private PersistenceData createDataInfo() {
DataInfo dataInfo = DataInfo.valueOf(ValueConstants.BLACK_LIST_DATA_ID);
PersistenceData persistenceData = new PersistenceData();
persistenceData.setDataId(dataInfo.getDataId());
persistenceData.setGroup(dataInfo.getDataType());
persistenceData.setInstanceId(dataInfo.getInstanceId());
persistenceData.setVersion(System.currentTimeMillis());
return persistenceData;
}
private void fireDataChangeNotify(Long version, String dataInfoId, DataOperator dataOperator) {
NotifyProvideDataChange notifyProvideDataChange = new NotifyProvideDataChange(dataInfoId,
version, dataOperator);
TaskEvent taskEvent = new TaskEvent(notifyProvideDataChange,
TaskType.PERSISTENCE_DATA_CHANGE_NOTIFY_TASK);
taskListenerManager.sendTaskEvent(taskEvent);
}
}
这里对应上图的:
+------------------------+
| | 2 +-------------------------+
| BlacklistDataResource +------>-+PersistenceDataDBService |
| | update +-------------------------+
+-------+ 1 | |
| Admin | +---> | +--------------------+ |
+-------+ | |fireDataChangeNotify| |
| +--------------------+ |
+------------------------+
3.2 DBService
可以看到,DBService也是基于 Raft,这说明在MetaServer集群内部自己维护了一致性。
@RaftReference
private DBService persistenceDataDBService;
PersistenceDataDBService 类精简版定义如下:
@RaftService
public class PersistenceDataDBService extends AbstractSnapshotProcess implements DBService {
private ConcurrentHashMap<String, Object> serviceMap = new ConcurrentHashMap<>();
@Override
public boolean put(String key, Object value) {
Object ret = serviceMap.put(key, value);
return true;
}
@Override
public DBResponse get(String key) {
Object ret = serviceMap.get(key);
return ret != null ? DBResponse.ok(ret).build() : DBResponse.notfound().build();
}
@Override
public boolean update(String key, Object value) {
Object ret = serviceMap.put(key, value);
return true;
}
@Override
public Set<String> getSnapshotFileNames() {
if (!snapShotFileNames.isEmpty()) {
return snapShotFileNames;
}
snapShotFileNames.add(this.getClass().getSimpleName());
return snapShotFileNames;
}
}
可以看出来,主要采用了ConcurrentHashMap来进行存储,Raft机制则用文件系统完成快照备份。
3.3 Bean
如前所述,为了解耦,Meta Server 把一些消息处理转发等功能封装为TaskListener,由 TaskListenerManager 在逻辑上负责统一执行。这里就以ProvideData相关功能为例,对应的Bean是。
@Configuration
public static class MetaServerTaskConfiguration {
......
@Bean
public TaskListener persistenceDataChangeNotifyTaskListener(TaskListenerManager taskListenerManager) {
TaskListener taskListener = new PersistenceDataChangeNotifyTaskListener(
sessionNodeSingleTaskProcessor());
taskListenerManager.addTaskListener(taskListener);
return taskListener;
}
@Bean
public TaskListenerManager taskListenerManager() {
return new DefaultTaskListenerManager();
}
}
3.4 Listener
Listener的执行引擎如下,可以看出来是遍历listener列表进行处理,如果某listener可以处理,就执行。
public class DefaultTaskListenerManager implements TaskListenerManager {
private Multimap<TaskType, TaskListener> taskListeners = ArrayListMultimap.create();
@Override
public Multimap<TaskType, TaskListener> getTaskListeners() {
return taskListeners;
}
@Override
public void addTaskListener(TaskListener taskListener) {
taskListeners.put(taskListener.support(), taskListener);
}
@Override
public void sendTaskEvent(TaskEvent taskEvent) {
Collection<TaskListener> taskListeners = this.taskListeners.get(taskEvent.getTaskType());
for (TaskListener taskListener : taskListeners) {
taskListener.handleEvent(taskEvent);
}
}
}
对应业务Listener如下:
public class PersistenceDataChangeNotifyTaskListener implements TaskListener {
@Autowired
private MetaServerConfig metaServerConfig;
private TaskDispatcher<String, MetaServerTask> singleTaskDispatcher;
public PersistenceDataChangeNotifyTaskListener(TaskProcessor sessionNodeSingleTaskProcessor) {
singleTaskDispatcher = TaskDispatchers.createDefaultSingleTaskDispatcher(
TaskType.PERSISTENCE_DATA_CHANGE_NOTIFY_TASK.getName(), sessionNodeSingleTaskProcessor);
}
@Override
public TaskType support() {
return TaskType.PERSISTENCE_DATA_CHANGE_NOTIFY_TASK;
}
@Override
public void handleEvent(TaskEvent event) {
MetaServerTask persistenceDataChangeNotifyTask = new PersistenceDataChangeNotifyTask(
metaServerConfig);
persistenceDataChangeNotifyTask.setTaskEvent(event);
singleTaskDispatcher.dispatch(persistenceDataChangeNotifyTask.getTaskId(),
persistenceDataChangeNotifyTask, persistenceDataChangeNotifyTask.getExpiryTime());
}
}
这里对应了如下:
+------------------------+
| | 2 +-------------------------+
| BlacklistDataResource +------>-+PersistenceDataDBService |
| | update +-------------------------+
+-------+ 1 | |
| Admin | +---> | +--------------------+ |
+-------+ | |fireDataChangeNotify| |
| +--------------------+ |
+------------------------+
|
| 3
|
| NotifyProvideDataChange
|
|
v
+---------+-----------------------------------+
| DefaultTaskListenerManager |
| |
| +-----------------------------------------+ |
| | persistenceDataChangeNotifyTaskListener | |
| | | |
| | receiveStatusConfirmNotifyTaskListener | |
| | | |
| | dataNodeChangePushTaskListener | |
| | | |
| | sessionNodeChangePushTaskListener | |
| +-----------------------------------------+ |
+---------------------------------------------+
3.5 Task
Listener会调用到Task。
处理Task如下,需要区分根据NoteType不同,来调用不同的服务:
public class PersistenceDataChangeNotifyTask extends AbstractMetaServerTask {
private final SessionNodeService sessionNodeService;
private final DataNodeService dataNodeService;
final private MetaServerConfig metaServerConfig;
private NotifyProvideDataChange notifyProvideDataChange;
@Override
public void execute() {
Set<NodeType> nodeTypes = notifyProvideDataChange.getNodeTypes();
if (nodeTypes.contains(NodeType.DATA)) {
dataNodeService.notifyProvideDataChange(notifyProvideDataChange);
}
if (nodeTypes.contains(NodeType.SESSION)) {
sessionNodeService.notifyProvideDataChange(notifyProvideDataChange);
}
}
@Override
public void setTaskEvent(TaskEvent taskEvent) {
Object obj = taskEvent.getEventObj();
if (obj instanceof NotifyProvideDataChange) {
this.notifyProvideDataChange = (NotifyProvideDataChange) obj;
}
}
}
这里对应
+------------------------+
| | 2 +-------------------------+
| BlacklistDataResource +------>-+PersistenceDataDBService |
| | update +-------------------------+
+-------+ 1 | |
| Admin | +---> | +--------------------+ |
+-------+ | |fireDataChangeNotify| |
| +--------------------+ |
+------------------------+
|
| 3
|
| NotifyProvideDataChange
|
|
v
+-------------------+-------------------------+
| DefaultTaskListenerManager |
| | +---------------------------------+
| +-----------------------------------------+ | 4 | |
| | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask |
| | | | | |
| | receiveStatusConfirmNotifyTaskListener | | +---------------------------------+
| | | |
| | dataNodeChangePushTaskListener | |
| | | |
| | sessionNodeChangePushTaskListener | |
| +-----------------------------------------+ |
+---------------------------------------------+
3.6 服务
task会调用服务来执行具体业务,具体业务服务如下,这里会向DataServer或者SessionServer发送推送。
public class DataNodeServiceImpl implements DataNodeService {
@Autowired
private NodeExchanger dataNodeExchanger;
@Autowired
private StoreService dataStoreService;
@Autowired
private AbstractServerHandler dataConnectionHandler;
@Override
public NodeType getNodeType() {
return NodeType.DATA;
}
@Override
public void notifyProvideDataChange(NotifyProvideDataChange notifyProvideDataChange) {
NodeConnectManager nodeConnectManager = getNodeConnectManager();
Collection<InetSocketAddress> connections = nodeConnectManager.getConnections(null);
// add register confirm
StoreService storeService = ServiceFactory.getStoreService(NodeType.DATA);
Map<String, DataNode> dataNodes = storeService.getNodes();
for (InetSocketAddress connection : connections) {
if (!dataNodes.keySet().contains(connection.getAddress().getHostAddress())) {
continue;
}
try {
Request<NotifyProvideDataChange> request = new Request<NotifyProvideDataChange>() {
@Override
public NotifyProvideDataChange getRequestBody() {
return notifyProvideDataChange;
}
@Override
public URL getRequestUrl() {
return new URL(connection);
}
};
dataNodeExchanger.request(request);
}
}
}
}
这里对应
+------------------------+
| | 2 +-------------------------+
| BlacklistDataResource +------>-+PersistenceDataDBService |
| | update +-------------------------+
+-------+ 1 | |
| Admin | +---> | +--------------------+ |
+-------+ | |fireDataChangeNotify| |
| +--------------------+ |
+------------------------+
| +---------------------+
| 3 | DataNodeServiceImpl |
| +----------+----------+
| NotifyProvideDataChange ^
| |
| 5 | notifyProvideDataChange
v |
+---------+-----------------------------------+ |
| DefaultTaskListenerManager | |
| | +----+----------------------------+
| +-----------------------------------------+ | 4 | |
| | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask |
| | | | | |
| | receiveStatusConfirmNotifyTaskListener | | +---------------------------------+
| | | |
| | dataNodeChangePushTaskListener | |
| | | |
| | sessionNodeChangePushTaskListener | |
| +-----------------------------------------+ |
+---------------------------------------------+
发送之后,就是
+------------------------+
| | 2 +-------------------------+
| BlacklistDataResource +------>-+PersistenceDataDBService |
| | update +-------------------------+
+-------+ 1 | |
| Admin | +---> | +--------------------+ |
+-------+ | |fireDataChangeNotify| |
| +--------------------+ |
+------------------------+
| 3
| NotifyProvideDataChange
v
+-------------------+-------------------------+
| DefaultTaskListenerManager |
| | +---------------------------------+
| +-----------------------------------------+ | 4 | |
| | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask |
| | receiveStatusConfirmNotifyTaskListener | | | |
| | dataNodeChangePushTaskListener | | +----+----------------------------+
| | sessionNodeChangePushTaskListener | | |
| +-----------------------------------------+ | |
+---------------------------------------------+ 5 | notifyProvideDataChange
|
+-------------------------------------------+
|
v
+---v---------+-------+
| DataNodeServiceImpl | +---------------------------------------+
+-------------+-------+ | Data Server 7 |
| 6 | |
| dataNodeExchanger.request | +-----------------------------------+ |
+->------------------------------>+ | metaClientHandlers | |
NotifyProvideDataChange | | +-------------------------------+ | |
| | | notifyPro|ideDataChangeHandler| | |
| | +-------------------------------+ | |
| +-----------------------------------+ |
+---------------------------------------+
现在我们知道了,在Meta Server 之中,DataNodeServiceImpl.notifyProvideDataChange 函数会通知 Data Server,现在有一个NotifyProvideDataChange 消息。
0x04 调用路径 in Data Server
执行序列来到了DataServer。我们先要做一些前提准备。
4.1 Bean
Bean metaClientHandlers是 MetaNodeExchanger 的响应函数。而 notifyProvideDataChangeHandler 是 metaClientHandlers 的一部分。
@Bean(name = "metaClientHandlers")
public Collection<AbstractClientHandler> metaClientHandlers() {
Collection<AbstractClientHandler> list = new ArrayList<>();
list.add(serverChangeHandler());
list.add(statusConfirmHandler());
list.add(notifyProvideDataChangeHandler());
return list;
}
4.2 网络交互
MetaNodeExchanger 在 DefaultMetaServiceImpl.getMetaServerMap 调用 metaNodeExchanger.connect 的时候,会设置这个 metaClientHandlers。这样就把notifyProvideDataChangeHandler同MetaServer以Bolt方式联系了起来。
public class DefaultMetaServiceImpl implements IMetaServerService {
@Override
public Map<String, Set<String>> getMetaServerMap() {
Connection connection = null;
connection = ((BoltChannel) metaNodeExchanger.connect(new URL(list.iterator()
.next(), dataServerConfig.getMetaServerPort()))).getConnection();
}
}
MetaNodeExchanger定义如下,其作用是统一处理DataServer内部关于MetaServer的交互。
public class MetaNodeExchanger implements NodeExchanger {
@Autowired
private Exchange boltExchange;
@Autowired
private IMetaServerService metaServerService;
@Resource(name = "metaClientHandlers")
private Collection<AbstractClientHandler> metaClientHandlers;
public Channel connect(URL url) {
Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE);
if (client == null) {
synchronized (this) {
client = boltExchange.getClient(Exchange.META_SERVER_TYPE);
if (client == null) {
client = boltExchange.connect(Exchange.META_SERVER_TYPE, url,
metaClientHandlers.toArray(new ChannelHandler[metaClientHandlers.size()]));
}
}
}
//try to connect data
Channel channel = client.getChannel(url);
if (channel == null) {
synchronized (this) {
channel = client.getChannel(url);
if (channel == null) {
channel = client.connect(url);
}
}
}
return channel;
}
}
4.3 Handler 定义
NotifyProvideDataChangeHandler 在 interest 函数中,设定了自己可以处理 NotifyProvideDataChange 类型消息。这样当 MetaServer 通知有 NotifyProvideDataChange 的时候,就会调用 metaServerService.fetchData(dataInfoId); 获取 ProvideData,进行后续处理。
public class NotifyProvideDataChangeHandler extends AbstractClientHandler {
@Autowired
private IMetaServerService metaServerService;
@Autowired
private ProvideDataProcessor provideDataProcessorManager;
@Override
public Object doHandle(Channel channel, Object request) {
NotifyProvideDataChange notifyProvideDataChange = (NotifyProvideDataChange) request;
String dataInfoId = notifyProvideDataChange.getDataInfoId();
if (notifyProvideDataChange.getDataOperator() != DataOperator.REMOVE) {
ProvideData provideData = metaServerService.fetchData(dataInfoId);
provideDataProcessorManager.changeDataProcess(provideData);
}
return null;
}
@Override
public Class interest() {
return NotifyProvideDataChange.class;
}
}
4.4 调用 Handler
在Meta Server 之中,DataNodeServiceImpl.notifyProvideDataChange 函数会通知 Data Server,现在有一个NotifyProvideDataChange 消息。
于是NotifyProvideDataChangeHandler将作出响应。
4.5 获取 ProvideData
在 NotifyProvideDataChangeHandler 之中 ,有如下
ProvideData provideData = metaServerService.fetchData(dataInfoId);
然后调用 DefaultMetaServiceImpl 中 fetchData 来去 Meta Server 获取 ProvideData。
@Override
public ProvideData fetchData(String dataInfoId) {
Map<String, Connection> connectionMap = metaServerConnectionFactory
.getConnections(dataServerConfig.getLocalDataCenter());
String leader = getLeader().getIp();
if (connectionMap.containsKey(leader)) {
Connection connection = connectionMap.get(leader);
if (connection.isFine()) {
try {
Request<FetchProvideDataRequest> request = new Request<FetchProvideDataRequest>() {
@Override
public FetchProvideDataRequest getRequestBody() {
return new FetchProvideDataRequest(dataInfoId);
}
@Override
public URL getRequestUrl() {
return new URL(connection.getRemoteIP(), connection.getRemotePort());
}
};
Response response = metaNodeExchanger.request(request);
Object result = response.getResult();
if (result instanceof ProvideData) {
return (ProvideData) result;
}
}
}
}
String newip = refreshLeader().getIp();
return null;
}
现在图示如下:
+---------------------------+ +--------------------------------------------------+ +---------------------------------+
| DefaultMetaServiceImpl | | MetaNodeExchanger | | Meta Server |
| | | | 1 | +-----------------------------+ |
| getMetaServerMap +---------->-+boltExchange.connect(metaClientHandlers.toArray) +-------> | | DataNodeServiceImpl | |
| | +-----------------------------------+--------------+ | | | |
+---------------------------+ ^ | | | |
| | | notifyProvideDataChange | |
| | | + | |
+------------------------------------------+ | | | | | |
| metaClientHandlers +--------------------------+ | +-----------------------------+ |
| | +---------------------------------+
| +------------------------------------+ | | ^
| | serverChangeHandler | | | |
| | | | | |
| | statusConfirmHandler | | NotifyProvideDataChange | |
| | | | | |
| | +--------------------------------+ | | 2 | 3 |
| | |notifyProvideDataChangeHandler<-------------------------------------<--------------------------------------+ |
| | | | | | |
| | | | | | |
| | | | | | |
| | | ProvideData provideData = +--------------------------------------------------------------------------------------+
| | | | | | get ProvideData from Meta Server FetchProvideDataRequest
| | | metaServerService.fetchData | | |
| | | | | |
| | | | | |
| | | | | |
| | | changeDataProcess(provideData) | | |
| | | | | |
| | +--------------------------------+ | |
| +------------------------------------+ |
+------------------------------------------+
手机上如下:
0x05 回到了MetaServer
执行序列回到了MetaServer,它收到了FetchProvideDataRequest。
5.1 ata Server请求响应
FetchProvideDataRequestHandler是响应函数。函数逻辑相对简单,就是从DBService之中根据DataInfoId获取数据,返回给调用者。
public class FetchProvideDataRequestHandler extends AbstractServerHandler<FetchProvideDataRequest> {
@RaftReference
private DBService persistenceDataDBService;
@Override
public Object reply(Channel channel, FetchProvideDataRequest fetchProvideDataRequest) {
DBResponse ret = persistenceDataDBService.get(fetchProvideDataRequest.getDataInfoId());
if (ret.getOperationStatus() == OperationStatus.SUCCESS) {
PersistenceData data = (PersistenceData) ret.getEntity();
ProvideData provideData = new ProvideData(new ServerDataBox(data.getData()),
fetchProvideDataRequest.getDataInfoId(), data.getVersion());
return provideData;
} else if (ret.getOperationStatus() == OperationStatus.NOTFOUND) {
ProvideData provideData = new ProvideData(null,
fetchProvideDataRequest.getDataInfoId(), null);
return provideData;
}
}
}
@Override
public HandlerType getType() {
return HandlerType.PROCESSER;
}
@Override
public Class interest() {
return FetchProvideDataRequest.class;
}
}
由此可见,这里的关键是 DBService。
于是从MetaServer角度看,流程如下:
+----------------------------------------------+
| Data Server |
| |
| +---------------------------------------+ |
| | NotifyProvideDataChangeHandler | |
| | | |
| | | |
| |metaSer^erSer^ice.fetchData(dataInfoId)| |
| +---------------------------------------+ |
+----------------------------------------------+
| ^
| |
1 | |
FetchProvideDataRequest | | ProvideData
| |
| | 4
+-----------------------------------------+
| Meta Server | | |
| | | |
| +--------------------v---+-------+ |
| | FetchProvideDataRequestHandler | |
| +--------------+---+-------------+ |
| 2 | ^ |
| | | DBResponse |
| get(DataInfoId) | | 3 |
| v | |
| +---------+---+------------+ |
| | PersistenceDataDBService | |
| +--------------------------+ |
+-----------------------------------------+
5.2 Session Server对应处理
Session Server 也会发起 FetchProvideDataRequest。在 SessionServerBootstrap 中有如下函数,都会发起请求,获取配置信息。
private void fetchStopPushSwitch(URL leaderUrl) {
FetchProvideDataRequest fetchProvideDataRequest = new FetchProvideDataRequest(
ValueConstants.STOP_PUSH_DATA_SWITCH_DATA_ID);
Object ret = sendMetaRequest(fetchProvideDataRequest, leaderUrl);
if (ret instanceof ProvideData) {
ProvideData provideData = (ProvideData) ret;
provideDataProcessorManager.fetchDataProcess(provideData);
}
}
private void fetchEnableDataRenewSnapshot(URL leaderUrl) {
FetchProvideDataRequest fetchProvideDataRequest = new FetchProvideDataRequest(
ValueConstants.ENABLE_DATA_RENEW_SNAPSHOT);
Object data = sendMetaRequest(fetchProvideDataRequest, leaderUrl);
if (data instanceof ProvideData) {
ProvideData provideData = (ProvideData) data;
provideDataProcessorManager.fetchDataProcess(provideData);
}
}
private void fetchBlackList() {
blacklistManager.load();
}
0x06 DataServer
6.1 处理 ProvideData
在 NotifyProvideDataChangeHandler 之中,如下语句用来处理ProvideData。就是在fetchData之中。
在请求响应处理中
Response response = metaNodeExchanger.request(request);
Object result = response.getResult();
if (result instanceof ProvideData) {
return (ProvideData) result;
}
就是如下:
+---------------------------+ +--------------------------------------------------+ +---------------------------------+
| DefaultMetaServiceImpl | | MetaNodeExchanger | | Meta Server |
| | | | 1 | +-----------------------------+ |
| getMetaServerMap +---------->-+boltExchange.connect(metaClientHandlers.toArray) +-------> | | DataNodeServiceImpl | |
| | +-----------------------------------+--------------+ | | | |
+---------------------------+ ^ | | | |
| | | notifyProvideDataChange | |
| | | + | |
+------------------------------------------+ | | | | | |
| metaClientHandlers +--------------------------+ | +-----------------------------+ |
| | +---------------------------------+
| +------------------------------------+ | | ^ |
| | serverChangeHandler | | | | |
| | | | | | |
| | statusConfirmHandler | | NotifyProvideDataChange | | |
| | | | | | |
| | +--------------------------------+ | | 2 | 3 | | 4
| | |notifyProvideDataChangeHandler<-------------------------------------<--------------------------------------+ | |
| | | | | | | |
| | | | | | get ProvideData from Meta Server | |
| | | | | | | |
| | | ProvideData provideData = +--------------------------------------------------------------------------------------+ |
| | | | | | |
| | | metaServerService.fetchData <-----------------------------------------------------------------------------------------+
| | | | | | ProvideData
| | | | | |
| | | | | |
| | | changeDataProcess(provideData) | | |
| | | | | |
| | +--------------------------------+ | |
| +------------------------------------+ |
+------------------------------------------+
手机如下:
继续处理是如下:
provideDataProcessorManager.changeDataProcess(provideData);
这就牵扯了如何用引擎处理。
6.1.1 Bean
这里生成了处理引擎 ProvideDataProcessorManager,添加了一个处理handler DatumExpireProvideDataProcessor。
@Configuration
public static class DataProvideDataConfiguration {
@Bean
public ProvideDataProcessor provideDataProcessorManager() {
return new ProvideDataProcessorManager();
}
@Bean
public ProvideDataProcessor datumExpireProvideDataProcessor(ProvideDataProcessor provideDataProcessorManager) {
ProvideDataProcessor datumExpireProvideDataProcessor = new DatumExpireProvideDataProcessor();
((ProvideDataProcessorManager) provideDataProcessorManager)
.addProvideDataProcessor(datumExpireProvideDataProcessor);
return datumExpireProvideDataProcessor;
}
}
6.1.2 处理引擎 ProvideDataProcessorManager
这里的套路依然很熟悉,即ProvideDataProcessor引擎,也就是ProvideDataProcessorManager也继承了ProvideDataProcessor,但是在support之中设置了 return false
,这样引擎遍历执行时候,就不会执行自己了。
public class ProvideDataProcessorManager implements ProvideDataProcessor {
private Collection<ProvideDataProcessor> provideDataProcessors = new ArrayList<>();
public void addProvideDataProcessor(ProvideDataProcessor provideDataProcessor) {
provideDataProcessors.add(provideDataProcessor);
}
@Override
public void changeDataProcess(ProvideData provideData) {
for (ProvideDataProcessor provideDataProcessor : provideDataProcessors) {
if (provideDataProcessor.support(provideData)) {
provideDataProcessor.changeDataProcess(provideData);
}
}
}
@Override
public boolean support(ProvideData provideData) {
return false;
}
}
6.1.3 处理Handler
这里的 DatumLeaseManager 就可以对应到前面讲的 AfterWorkingProcess。
Handler之中调用DatumLeaseManager完成配置数据的部署。
public class DatumExpireProvideDataProcessor implements ProvideDataProcessor {
@Autowired
private DatumLeaseManager datumLeaseManager;
@Override
public void changeDataProcess(ProvideData provideData) {
if (checkInvalid(provideData)) {
return;
}
boolean enableDataDatumExpire = Boolean.parseBoolean((String) provideData.getProvideData()
.getObject());
datumLeaseManager.setRenewEnable(enableDataDatumExpire);
}
private boolean checkInvalid(ProvideData provideData) {
boolean invalid = provideData == null || provideData.getProvideData() == null
|| provideData.getProvideData().getObject() == null;
return invalid;
}
@Override
public boolean support(ProvideData provideData) {
return ValueConstants.ENABLE_DATA_DATUM_EXPIRE.equals(provideData.getDataInfoId());
}
}
最终,图示如下:
+---------------------------+ +--------------------------------------------------+ +---------------------------------+
| DefaultMetaServiceImpl | | MetaNodeExchanger | | Meta Server |
| | | | 1 | +-----------------------------+ |
| getMetaServerMap +---------->--boltExchange.connect(metaClientHandlers.toArray) +-------> | | DataNodeServiceImpl | |
| | +-----------------------------------+--------------+ | | | |
+---------------------------+ ^ | | | |
| | | notifyProvideDataChange | |
| | | + | |
+------------------------------------------+ | | | | | |
| metaClientHandlers +--------------------------+ | +-----------------------------+ |
| | +---------------------------------+
| +------------------------------------+ | | ^ |
| | serverChangeHandler | | | | |
| | | | | | |
| | statusConfirmHandler | | NotifyProvideDataChange | | |
| | | | | | |
| | +--------------------------------+ | | 2 | 3 | | 4
| | |notifyProvideDataChangeHandler<-------------------------------------<--------------------------------------+ | |
| | | | | | | |
| | | | | | get ProvideData from Meta Server | |
| | | | | | | |
| | | ProvideData provideData = +--------------------------------------------------------------------------------------+ |
| | | | | | |
| | | metaServerService.fetchData <-----------------------------------------------------------------------------------------+
| | | | | | ProvideData
| | | | | |
| | | | | | 5 +---------------------------------------------+
| | | changeDataProcess(provideData)+--------------+ | ProvideDataProcessor |
| | | | | | | | |
| | +--------------------------------+ | | +-------> | changeDataProcess(ProvideData provideData) |
| +------------------------------------+ | | |
+------------------------------------------+ +---------------------------------------------+
手机图例如下:
0xFF 参考
蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容
蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路
服务注册中心 Session 存储策略 | SOFARegistry 解析
海量数据下的注册中心 – SOFARegistry 架构介绍
服务注册中心数据分片和同步方案详解 | SOFARegistry 解析
蚂蚁金服开源通信框架SOFABolt解析之连接管理剖析
蚂蚁金服开源通信框架SOFABolt解析之超时控制机制及心跳机制
蚂蚁金服开源通信框架 SOFABolt 协议框架解析
蚂蚁金服服务注册中心数据一致性方案分析 | SOFARegistry 解析
蚂蚁通信框架实践
sofa-bolt 远程调用
sofa-bolt学习
SOFABolt 设计总结 – 优雅简洁的设计之道
SofaBolt源码分析-服务启动到消息处理
SOFABolt 源码分析
SOFABolt 源码分析9 – UserProcessor 自定义处理器的设计
SOFARegistry 介绍
SOFABolt 源码分析13 – Connection 事件处理机制的设计