自学内容网 自学内容网

消息队列中间件,RabbitMQ的使用,利用枚举实现队列,交换机,RountKey的声明

目录

1.声明队列和交换机以及RountKey

2.初始化循环绑定

3.监听队列


1.声明队列和交换机以及RountKey

package com.hykj.dpp.dcn.configure;

import lombok.Getter;


@Getter
public enum RabbitmqBind {

    /**
     * 接收信审推送清洗数据
     */
DATA_CLEAN_PROCESS(
            RabbitMqExchangeEnum.E_DIRECT_RCP,
            RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS,
            RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS,
            true),

    SMS_CLEAN(
            RabbitMqExchangeEnum.E_DIRECT_RCP,
            RabbitMqQueueConstants.Q_API_TO_DCN_SMS,
            RabbitmqRoutingKey.K_API_TO_DCN_SMS,
            true),
    LOAN_ACCOUNT_MODIFY(
            RabbitMqExchangeEnum.E_TOPIC_PAY,
            RabbitMqQueueConstants.Q_MODIFY_ACCOUNT,
            RabbitmqRoutingKey.K_MODIFY_ACCOUNT,
            true)

    ;

/**
 * 交换机
 */
    private RabbitMqExchangeEnum exchange;

    /**
     * 队列名称
     */
    private String queueName;

    /**
     * 路由Key
     */
    private RabbitmqRoutingKey routingKey;

    /**
     * 绑定标识
     */
    private Boolean isBind;

    RabbitmqBind(RabbitMqExchangeEnum exchange, String queueName, RabbitmqRoutingKey routingKey, Boolean isBind) {
        this.exchange = exchange;
        this.queueName = queueName;
        this.routingKey = routingKey;
        this.isBind = isBind;
    }

    /**
     * 交换机
     */
    @Getter
    public enum RabbitMqExchangeEnum {

        /**
         * 交换机定义,类型 - 名称
         */
        E_DIRECT_RCP("direct","E_DIRECT_RCP"),

        E_TOPIC_RCP("topic","E_TOPIC_RCP"),

        E_TOPIC_PAY("topic","E_TOPIC_PAY")
        ;

        private String exchangeType;

        private String exchangeName ;

        RabbitMqExchangeEnum(String exchangeType, String exchangeName) {
            this.exchangeType = exchangeType;
            this.exchangeName = exchangeName;
        }
    }

    /**
     * 队列名定义
     */
    public interface RabbitMqQueueConstants {

    /**
     * 接收清洗数据
     */
        String Q_DATA_CLEAN_PROCESS = "RMPS_TO_RCP_DATA_CLEAN_PROCESS";
        
        /**
         * 清洗结束通知
         */
        String Q_DATA_CLEAN_FINISH = "RMPS_TO_RCP_DATA_CLEAN_FINISH";

        String Q_API_TO_DCN_SMS = "Q_API_TO_DCN_SMS";

        String Q_MODIFY_ACCOUNT = "PAY_TO_DPP_MODIFY_ACCOUNT";
    }

    /**
     * routingKey
     */
    @Getter
    public enum RabbitmqRoutingKey {

        /**
         * 路由
         */
        K_DATA_CLEAN_PROCESS("K_DATA_CLEAN_PROCESS"),
        K_DATA_CLEAN_FINISH("K_DATA_CLEAN_FINISH"),
        K_API_TO_DCN_SMS("K_API_TO_DCN_SMS"),
        K_MODIFY_ACCOUNT("#.K_MODIFY_ACCOUNT");

        private String keyName;

        RabbitmqRoutingKey(String keyName) {
            this.keyName = keyName;
        }
    }

}

2.初始化循环绑定

package com.hykj.dpp.dcn.configure;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.util.Arrays;

@Configuration
@ConditionalOnClass(EnableRabbit.class)
public class RabbitConfiguration {

    @Autowired
    protected RabbitTemplate rabbitTemplate;
    @Autowired
    protected RabbitAdmin rabbitAdmin;

    public static final int DEFAULT_CONCURRENT = 10;
    @Bean("customContainerFactory")
    public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                                                 ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConcurrentConsumers(DEFAULT_CONCURRENT);
        factory.setMaxConcurrentConsumers(DEFAULT_CONCURRENT);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean
    @ConditionalOnMissingBean
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }

    @Bean
    @ConditionalOnMissingBean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @PostConstruct
    protected void init() {
        rabbitTemplate.setChannelTransacted(true);
        //创建exchange
        Arrays.stream(RabbitmqBind.RabbitMqExchangeEnum.values())
                .forEach(rabbitMqExchangeEnum -> {
                        Exchange exchange = RabbitmqExchange
                                .getInstanceByType(rabbitMqExchangeEnum.getExchangeType())
                                .createExchange(rabbitMqExchangeEnum.getExchangeName());
                        rabbitAdmin.declareExchange(exchange);
                }

        );

        //创建队列并绑定exchange
        Arrays.stream(RabbitmqBind.values()).forEach(RabbitmqBind -> {
            if(RabbitmqBind.getIsBind()){
                rabbitAdmin.declareQueue(new Queue(RabbitmqBind.getQueueName(),
                        true, false, false, null));
                rabbitAdmin.declareBinding(new Binding(RabbitmqBind.getQueueName(),
                        Binding.DestinationType.QUEUE,
                        RabbitmqBind.getExchange().getExchangeName(), RabbitmqBind.getRoutingKey().getKeyName(), null));
            }
        });
    }
}

 绑定的形式由枚举类中定义

3.监听队列

package com.hykj.dpp.dcn.mq.listener;

import com.alibaba.fastjson.JSON;
import com.hykj.dpp.common.constant.*;
import com.hykj.dpp.common.model.RiskProcessLog;
import com.hykj.dpp.common.service.RiskProcessLogService;
import com.hykj.dpp.dcn.configure.RabbitmqBind;
import com.hykj.dpp.dcn.mq.message.DataCleanMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 数据清洗监听
 */
@Slf4j
@Component
@RabbitListener(queues = {
RabbitmqBind.RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS }, concurrency = "1-5", containerFactory = "customContainerFactory")
public class DataCleanListener {

@Resource
private RiskProcessLogService riskProcessLogService;


@RabbitHandler
public void processMessage(String message) {
log.info("DataClean recive message :{} ", message);
process(message);
}

@RabbitHandler
public void processMessage(byte[] message) {
String msg = new String(message);
log.info("DataClean recive message :{} ", msg);
process(msg);
}

/**
 * 处理推送消息
 * @param message
 */
private void process(String message) {
if(StringUtils.isBlank(message)) {
log.error("process message is blank , message:{}" , message);
return;
}
try {
DataCleanMessage dataCleanMessage = JSON.parseObject(message, DataCleanMessage.class);
if(StringUtils.isAnyBlank(dataCleanMessage.getBusinessId(),dataCleanMessage.getCountry())) {
log.error("parse message is blank");
return;
}
// 插入下个节点数据
RiskProcessLog processLog = new RiskProcessLog();
processLog.setBusinessId(dataCleanMessage.getBusinessId());
processLog.setCountry(Country.getCode(dataCleanMessage.getCountry()));
processLog.setOrg(dataCleanMessage.getOrg());
processLog.setProfileType(ProfileType.prod.getCode());
processLog.setSysType(SysType.RMPS.getCode());
            processLog.setServiceType(ServiceType.APPROVE.getCode());
processLog.setNodeId(NodeDef.DATA_CLEAN);
processLog.setPostInd(PostInd.PENDING.name());
riskProcessLogService.save(processLog);
} catch (Exception e) {
log.error("process message appear exception: {}" , message, e);
}
}

}

 监听并处理任务

 


原文地址:https://blog.csdn.net/qq_68135906/article/details/137562729

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!