通用接口开放平台设计与实现——(19)消息服务端之消息验证
时间:2022-08-20 19:00:01
概述
之前已经完成了整体处理过程,接下来的几篇文章从一些专题的角度进行了一些补充。有些地方之前提到过,但没有具体说明。今天我们来谈谈新闻验证。
为了保证系统的稳定可靠运行,必须严格验证输入的数据,防止系统后续处理过程中因一些非法异常数据而出现错误甚至崩溃。同时,对于验证失败,需要输出清晰友好的错误信息,以便对接方开发调试,上线后运行异常调查。
验证主要包括以下内容:
1.验证消息对象的属性,如是否空,格式是否正确,属于基本验证工作
2.验证消息是否已收到。如果是这样,就不会进行后续处理。这一点是确保权力和其他性质,重新处理消息的关键
3.验证消息主题编码是否存在和可用
4.验证应用编码是否存在和可用
5.验证权限,确保新闻主题的功能权限应用
6.验证时效性,约定收到消息的时间不得超过10分钟,否则拒绝服务
在我们设计的时候,以上六条分为三类,处理方法不同。
基本验证
第一条是基本验证,需要请求信息和响应信息,也需要客户端和服务端。我们专门实现了自定义处理器来实现代码重用的目的。
/** * 信息验证处理器 * @author wqliu * @date 2022-1-19 13:50 **/ @Slf4j public class ValidateMessageHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
TextWebSocketFrame textWebSocketFrame=(TextWebSocketFrame)msg; String content = textWebSocketFrame.text(); log.info("收到消息:{}",content); BaseMessage message = JSON.parseObject(content, BaseMessage.class); if(message!=null) {
///验证消息属性 validateProperty(message); ctx.fireChannelRead(textWebSocketFrame); }else {
// 收到未按约定格式发送的无法分析的消息时,只记录错误日志 log.error("收到无法分析的消息:{}",content); } } /** * 验证新闻属性 * * @param message 消息 */ protected void validateProperty(BaseMessage message) {
String errorCode;
String errorMessage;
// id
if (StringUtils.isBlank(message.getId())){
errorCode = "S001";
errorMessage = "消息标识不能为空";
throw new MessageException(errorCode, errorMessage);
}
// 主题
if (StringUtils.isBlank(message.getTopic())) {
errorCode = "S002";
errorMessage = "消息主题不能为空";
throw new MessageException(errorCode, errorMessage);
}
// 发布者
if (StringUtils.isBlank(message.getPublishAppCode())) {
errorCode = "S003";
errorMessage = "消息发布者不能为空";
throw new MessageException(errorCode, errorMessage);
}
// 发布时间
String publishTimeString = message.getPublishTime();
if (StringUtils.isBlank(publishTimeString)) {
errorCode = "S004";
errorMessage = "发布时间不能为空";
throw new MessageException(errorCode, errorMessage);
} else if (!ValidateUtil.dateIsFormat(publishTimeString))
{
errorCode = "S005";
errorMessage = "发布时间格式不正确";
throw new MessageException(errorCode, errorMessage);
}
// 消息类型
if (StringUtils.isBlank(message.getMessageType())) {
errorCode = "S006";
errorMessage = "消息类型不能为空";
throw new MessageException(errorCode, errorMessage);
}else{
if(EnumUtils.isValidEnum(MessageTypeEnum.class,message.getMessageType())==false){
errorCode = "S007";
errorMessage = "消息类型无效";
throw new MessageException(errorCode, errorMessage);
}
}
}
}
幂等性验证
第二条验证消息是否曾经接收过,如是,则不进行后续处理,该点是保证幂等性,对消息进行去重的关键处理。
对于请求消息的数据验证,还有个特殊点影响到流程处理,即上文提到过的,请求消息的接收方,无论是客户端还是服务端,收到重复消息时,除了要停止后续处理外,还需要发送一个响应给请求方,有两种方案,一种是抛出异常,告知请求方该消息曾经接收过;二是告知请求方,该消息已处理成功。前一种方案会造成消息日志表中该请求消息的响应结果是异常,而真实情况是该消息已经处理成功,因此采用后一种方案更佳。
对于该验证,我们同样实现了一个自定义的消息处理器
/** * 去重处理器 * @author wqliu * @date 2022-1-19 13:50 **/
public class DistinctMessageHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private ApiMessageLogService apiMessageLogService = SpringUtil.getBean(ApiMessageLogService.class);
private ApiMessageTopicService apiMessageTopicService = SpringUtil.getBean(ApiMessageTopicService.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
String content = textWebSocketFrame.text();
BaseMessage message = JSON.parseObject(content, BaseMessage.class);
String messageType = message.getMessageType();
String messageId = message.getId();
String topic=message.getTopic();
boolean hasReceived=false;
if(messageType.equals(MessageTypeEnum.REQUEST.name())){
hasReceived = apiMessageLogService.checkRequestMessageExist(messageId);
if(hasReceived){
//发送响应,终止流程
String responseTopic = apiMessageTopicService.getResponseTopicCodeByCode(topic);
RequestMessageSender sender = (RequestMessageSender) MessageSenderFactory.createSender(responseTopic);
ApiMessageLog log = apiMessageLogService.getByRequestMessageId(messageId);
sender.sendMessage(log.getRequestAppCode(),log.getResponseData(), log.getResponseId());
}else{
//继续往下传递
ReferenceCountUtil.retain(textWebSocketFrame);
ctx.fireChannelRead(textWebSocketFrame);
}
}else if(messageType.equals(MessageTypeEnum.RESPONSE.name())){
hasReceived =apiMessageLogService.checkResponseMessageExist(messageId);
if(hasReceived){
//不做处理,终止流程
}else{
//继续往下传递
ReferenceCountUtil.retain(textWebSocketFrame);
ctx.fireChannelRead(textWebSocketFrame);
}
}
}
}
框架验证
第3-6条,实际是框架平台层面的验证,放在消息处理器基类MessageHandler中。
/** * 验证框架 */
protected void validateFramework(BaseMessage message){
// 消息主题验证(是否存在及是否可用)
validateTopic(message.getTopic());
// 应用验证(是否存在及是否可用)
validateAppCode(message.getPublishAppCode());
// 权限验证
validatePermission(message.getPublishAppCode(),message.getTopic());
// 时效性验证
validatePublishTimeValid(message.getPublishTime());
}
/** * 验证权限 * * @param publishAppCode 应用程序编码 * @param topicCode 主题编码 */
protected void validatePermission(String publishAppCode, String topicCode) {
boolean hasPermission = apiMessagePermissionService.checkPermission(publishAppCode, topicCode);
if(hasPermission==false){
throw new MessageException("301", "应用无权限");
}
}
/** * 验证时效性 * * @param publishTimeString 发布时间字符串 */
protected void validatePublishTimeValid(String publishTimeString) {
// 数据验证环节已验证可转换,此处不再处理转换异常
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date publishTime = null;
try {
publishTime = dateFormat.parse(publishTimeString);
} catch (ParseException e) {
// 前序环节已验证过日期格式,此处不会抛出异常,仅为编译通过
}
// 获取系统当前时间
Date currentTime = new Date();
// 比较时间
long diff = Math.abs(currentTime.getTime() - publishTime.getTime());
//允许最大的时间差,单位毫秒
int maxTimeSpan = 10*60*1000;
if (diff > maxTimeSpan)
{
// 请求时间超出合理范围(10分钟)
throw new MessageException("S401", "发布时间超出合理范围");
}
}
/** * 验证应用 * * @param publishAppCode 应用编码 */
protected void validateAppCode(String publishAppCode) {
try {
ApiApp app =apiAppService.getByCode(publishAppCode);
if(app.getStatus().equals(StatusEnum.DEAD.name())){
throw new MessageException("S202", "应用被停用");
}
}catch (Exception ex){
throw new MessageException("S201", "应用标识无效");
}
}
/** * 验证主题编码 * * @param topicCode 主题编码 */
protected void validateTopic(String topicCode) {
try {
ApiMessageTopic messageTopic = apiMessageTopicService.getByCode(topicCode);
if(messageTopic.getStatus().equals(StatusEnum.DEAD.name())){
throw new MessageException("S102", "消息主题不可用");
}
}catch (Exception ex){
throw new MessageException("S101", "消息主题不存在");
}
}
会被子类RequestMessageHandler和ResponseMessageHandler在逻辑处理的开始进行处理。
/** * 消息处理 * * @param message 消息 * @param channel 通道 */
public void handleMessage(RequestMessage requestMessage, Channel channel) {
// 记录消息请求日志
apiMessageLogService.createRequestPart(requestMessage);
//验证框架
validateFramework(requestMessage);
//将请求消息状态默认设置为无需发送
apiMessageLogService.updateStatus(MessageStatusEnum.NOT_TO_REQUEST.name(),requestMessage.getId());
//特殊处理
messageOperation(requestMessage, channel);
//发送响应至消息发送者
sendResponse(requestMessage, channel);
//消息处理(复制及转发)
if(isNeedRepost()){
repostMessage(requestMessage.getTopic(),requestMessage.getContent());
}
}
/** * 消息处理 * * @param message 消息 * @param channel 通道 */
public void handleMessage(ResponseMessage responseMessage, Channel channel) {
//验证框架
validateFramework(responseMessage);
// 更新消息日志
apiMessageLogService.updateResponsePart(responseMessage);
//特殊处理
messageOperation(responseMessage, channel);
}