锐单电子商城 , 一站式电子元器件采购平台!
  • 电话:400-990-0325

通用接口开放平台设计与实现——(19)消息服务端之消息验证

时间:2022-08-20 19:00:01 abs轮速传感器s102端盖

概述

之前已经完成了整体处理过程,接下来的几篇文章从一些专题的角度进行了一些补充。有些地方之前提到过,但没有具体说明。今天我们来谈谈新闻验证。

为了保证系统的稳定可靠运行,必须严格验证输入的数据,防止系统后续处理过程中因一些非法异常数据而出现错误甚至崩溃。同时,对于验证失败,需要输出清晰友好的错误信息,以便对接方开发调试,上线后运行异常调查。

验证主要包括以下内容:
1.验证消息对象的属性,如是否空,格式是否正确,属于基本验证工作
2.验证消息是否已收到。如果是这样,就不会进行后续处理。这一点是确保权力和其他性质,重新处理消息的关键
3.验证消息主题编码是否存在和可用
4.验证应用编码是否存在和可用
5.验证权限,确保新闻主题的功能权限应用
6.验证时效性,约定收到消息的时间不得超过10分钟,否则拒绝服务

在我们设计的时候,以上六条分为三类,处理方法不同。

基本验证

第一条是基本验证,需要请求信息和响应信息,也需要客户端和服务端。我们专门实现了自定义处理器来实现代码重用的目的。

/** * 信息验证处理器 * @author wqliu * @date 2022-1-19 13:50 **/ @Slf4j public class ValidateMessageHandler  extends ChannelInboundHandlerAdapter { 
               @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
                 TextWebSocketFrame textWebSocketFrame=(TextWebSocketFrame)msg;         String content = textWebSocketFrame.text();         log.info("收到消息:{}",content);         BaseMessage message = JSON.parseObject(content, BaseMessage.class);         if(message!=null) { 
                     ///验证消息属性             validateProperty(message);             ctx.fireChannelRead(textWebSocketFrame);         }else { 
                    // 收到未按约定格式发送的无法分析的消息时,只记录错误日志            log.error("收到无法分析的消息:{}",content);         }      }      /** * 验证新闻属性 * * @param message 消息 */     protected void validateProperty(BaseMessage message) { 
        

        String errorCode;
        String errorMessage;
        // id
        if (StringUtils.isBlank(message.getId())){ 
        

            errorCode = "S001";
            errorMessage = "消息标识不能为空";
            throw new MessageException(errorCode, errorMessage);
        }
        // 主题
        if (StringUtils.isBlank(message.getTopic())) { 
        

            errorCode = "S002";
            errorMessage = "消息主题不能为空";
            throw new MessageException(errorCode, errorMessage);
        }
        // 发布者
        if (StringUtils.isBlank(message.getPublishAppCode())) { 
        

            errorCode = "S003";
            errorMessage = "消息发布者不能为空";
            throw new MessageException(errorCode, errorMessage);
        }

        // 发布时间
        String publishTimeString = message.getPublishTime();
        if (StringUtils.isBlank(publishTimeString)) { 
        
            errorCode = "S004";
            errorMessage = "发布时间不能为空";
            throw new MessageException(errorCode, errorMessage);

        } else if (!ValidateUtil.dateIsFormat(publishTimeString))
        { 
        
            errorCode = "S005";
            errorMessage = "发布时间格式不正确";
            throw new MessageException(errorCode, errorMessage);

        }
        // 消息类型
        if (StringUtils.isBlank(message.getMessageType())) { 
        

            errorCode = "S006";
            errorMessage = "消息类型不能为空";
            throw new MessageException(errorCode, errorMessage);
        }else{ 
        
            if(EnumUtils.isValidEnum(MessageTypeEnum.class,message.getMessageType())==false){ 
        
                errorCode = "S007";
                errorMessage = "消息类型无效";
                throw new MessageException(errorCode, errorMessage);
            }
        }
    }

}

幂等性验证

第二条验证消息是否曾经接收过,如是,则不进行后续处理,该点是保证幂等性,对消息进行去重的关键处理。

对于请求消息的数据验证,还有个特殊点影响到流程处理,即上文提到过的,请求消息的接收方,无论是客户端还是服务端,收到重复消息时,除了要停止后续处理外,还需要发送一个响应给请求方,有两种方案,一种是抛出异常,告知请求方该消息曾经接收过;二是告知请求方,该消息已处理成功。前一种方案会造成消息日志表中该请求消息的响应结果是异常,而真实情况是该消息已经处理成功,因此采用后一种方案更佳。

对于该验证,我们同样实现了一个自定义的消息处理器

/** * 去重处理器 * @author wqliu * @date 2022-1-19 13:50 **/
public class DistinctMessageHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { 
        

    private ApiMessageLogService apiMessageLogService = SpringUtil.getBean(ApiMessageLogService.class);


    private ApiMessageTopicService apiMessageTopicService = SpringUtil.getBean(ApiMessageTopicService.class);


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception { 
        
        String content = textWebSocketFrame.text();
        BaseMessage message = JSON.parseObject(content, BaseMessage.class);
        String messageType = message.getMessageType();
        String messageId = message.getId();
        String topic=message.getTopic();
        boolean hasReceived=false;
        if(messageType.equals(MessageTypeEnum.REQUEST.name())){ 
        
            hasReceived = apiMessageLogService.checkRequestMessageExist(messageId);
            if(hasReceived){ 
        
                //发送响应,终止流程
                String responseTopic = apiMessageTopicService.getResponseTopicCodeByCode(topic);
                RequestMessageSender sender = (RequestMessageSender) MessageSenderFactory.createSender(responseTopic);
                ApiMessageLog log = apiMessageLogService.getByRequestMessageId(messageId);
                sender.sendMessage(log.getRequestAppCode(),log.getResponseData(), log.getResponseId());


            }else{ 
        
                //继续往下传递
                ReferenceCountUtil.retain(textWebSocketFrame);
                ctx.fireChannelRead(textWebSocketFrame);
            }

        }else if(messageType.equals(MessageTypeEnum.RESPONSE.name())){ 
        
            hasReceived =apiMessageLogService.checkResponseMessageExist(messageId);
            if(hasReceived){ 
        
               //不做处理,终止流程
            }else{ 
        
                //继续往下传递
                ReferenceCountUtil.retain(textWebSocketFrame);
                ctx.fireChannelRead(textWebSocketFrame);
            }
        }
    }


}

框架验证

第3-6条,实际是框架平台层面的验证,放在消息处理器基类MessageHandler中。

 /** * 验证框架 */
    protected  void validateFramework(BaseMessage message){ 
        

        // 消息主题验证(是否存在及是否可用)
        validateTopic(message.getTopic());
        // 应用验证(是否存在及是否可用)
        validateAppCode(message.getPublishAppCode());
        // 权限验证
        validatePermission(message.getPublishAppCode(),message.getTopic());
        // 时效性验证
        validatePublishTimeValid(message.getPublishTime());
    }


    /** * 验证权限 * * @param publishAppCode 应用程序编码 * @param topicCode 主题编码 */
    protected void validatePermission(String publishAppCode, String topicCode) { 
        

        boolean hasPermission = apiMessagePermissionService.checkPermission(publishAppCode, topicCode);
        if(hasPermission==false){ 
        
            throw new MessageException("301", "应用无权限");
        }

    }

    /** * 验证时效性 * * @param publishTimeString 发布时间字符串 */
    protected void validatePublishTimeValid(String publishTimeString) { 
        


        // 数据验证环节已验证可转换,此处不再处理转换异常
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date publishTime = null;
        try { 
        
            publishTime = dateFormat.parse(publishTimeString);
        } catch (ParseException e) { 
        
            // 前序环节已验证过日期格式,此处不会抛出异常,仅为编译通过
        }

        // 获取系统当前时间
        Date currentTime = new Date();
        // 比较时间
        long diff = Math.abs(currentTime.getTime() - publishTime.getTime());
        //允许最大的时间差,单位毫秒
        int maxTimeSpan = 10*60*1000;
        if (diff  > maxTimeSpan)
        { 
        
            // 请求时间超出合理范围(10分钟)
            throw new MessageException("S401", "发布时间超出合理范围");
        }
    }

    /** * 验证应用 * * @param publishAppCode 应用编码 */
    protected void validateAppCode(String publishAppCode) { 
        
        try { 
        
            ApiApp app =apiAppService.getByCode(publishAppCode);
            if(app.getStatus().equals(StatusEnum.DEAD.name())){ 
        
                throw new MessageException("S202", "应用被停用");

            }
        }catch (Exception ex){ 
        
            throw new MessageException("S201", "应用标识无效");
        }
    }

    /** * 验证主题编码 * * @param topicCode 主题编码 */
    protected void validateTopic(String topicCode) { 
        
        try { 
        
            ApiMessageTopic messageTopic = apiMessageTopicService.getByCode(topicCode);
            if(messageTopic.getStatus().equals(StatusEnum.DEAD.name())){ 
        
                throw new MessageException("S102", "消息主题不可用");
            }
        }catch (Exception ex){ 
        
            throw new MessageException("S101", "消息主题不存在");
        }

    }

会被子类RequestMessageHandler和ResponseMessageHandler在逻辑处理的开始进行处理。

    /** * 消息处理 * * @param message 消息 * @param channel 通道 */
    public void handleMessage(RequestMessage requestMessage, Channel channel) { 
        

        // 记录消息请求日志
        apiMessageLogService.createRequestPart(requestMessage);

        //验证框架
        validateFramework(requestMessage);

        //将请求消息状态默认设置为无需发送
        apiMessageLogService.updateStatus(MessageStatusEnum.NOT_TO_REQUEST.name(),requestMessage.getId());


        //特殊处理
        messageOperation(requestMessage, channel);

        //发送响应至消息发送者
        sendResponse(requestMessage, channel);


        //消息处理(复制及转发)
        if(isNeedRepost()){ 
        
            repostMessage(requestMessage.getTopic(),requestMessage.getContent());
        }

    }

    /** * 消息处理 * * @param message 消息 * @param channel 通道 */
    public void handleMessage(ResponseMessage responseMessage, Channel channel) { 
        


        //验证框架
        validateFramework(responseMessage);

        // 更新消息日志
        apiMessageLogService.updateResponsePart(responseMessage);

        //特殊处理
        messageOperation(responseMessage, channel);
    }

锐单商城拥有海量元器件数据手册IC替代型号,打造电子元器件IC百科大全!

相关文章