自学内容网 自学内容网

RocketMQ复杂过滤尝试

需求

消息实体,根据实体中的一个字段,决定推给多个业务系统。

例:一个点位信息Bean,这个点位信息,设备、能源、安全都有用,那么点位信息表中有适用模块标识。

点位新增 需要通知所有勾选业务系统  tag - add

点位编辑 需要新增勾选业务系统标识 tag - add ,移除勾选 tag - delete ,不变 tag - update

点位删除 通知所有勾选的系统 tag - delete

分析

1、MQ不支持同个消息,一下子发送到不同的topic。发多次也可以实现,但是多少有点抵触

2、那么发送到同一个topic下,让各个业务系统来取,那么必定需要去过滤,不然拿到不属于本业务系统的点位信息了,仅仅靠tag明显是不够的,服务端过滤可以采用SQL92方式

3、那么我随之就想到也可以在各个业务系统中过滤了,不是本业务系统的标识,直接返回。不执行相关逻辑。

本人更倾向于第二种实现

编码

尝试一:SQL92过滤实现

再broker.conf 新增配置 enablePropertyFilter=true

生产者

public class AddProducer {
 
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("a-group");
        producer.setNamesrvAddr("192.168.0.211:9876");
        producer.start();

        Point point0 = new Point(0, "point0");
        Point point1 = new Point(1, "point1");
        Point point2 = new Point(2, "point2");
        Point point3 = new Point(3, "point3");
        Point point4 = new Point(4, "point4");
        Point point5 = new Point(5, "point5");
        Point point6 = new Point(6, "point6");
        Point point7 = new Point(7, "point7");
        Point point8 = new Point(8, "point8");
        Point point9 = new Point(9, "point9");

        ArrayList<Point> list = new ArrayList<>();
        list.add(point0);
        list.add(point1);
        list.add(point2);
        list.add(point3);
        list.add(point4);
        list.add(point5);
        list.add(point6);
        list.add(point7);
        list.add(point8);
        list.add(point9);
 
 
 
 
        try {
 
            for (Point bean : list) {
                Message msg = new Message("UNIFIED_POINT", "add", JSONUtil.toJsonStr(bean).getBytes(RemotingHelper.DEFAULT_CHARSET));
                msg.putUserProperty("testValue",String.valueOf(bean.getId()));
                producer.send(msg,new CustomMessageQueueSelector(),bean.getId(),new CustomSendCallback());
                System.out.println(bean.getId() + " Continue execution ");
            }

            Thread.sleep(20000);

            for (Point bean : list) {
                Message msg = new Message("UNIFIED_POINT", "update", JSONUtil.toJsonStr(bean).getBytes(RemotingHelper.DEFAULT_CHARSET));
                msg.putUserProperty("testValue",String.valueOf(bean.getId()));
                producer.send(msg,new CustomMessageQueueSelector(),bean.getId(),new CustomSendCallback());
                System.out.println(bean.getId() + " Continue execution ");
            }
 
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消费者1 - 模拟业务系统1

    public static void main(String[] args) throws Exception {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("calc_rules_point");
        // 指定Namesrv地址
        consumer.setNamesrvAddr("192.168.0.211:9876");
        // 订阅主题和标签
        consumer.subscribe("UNIFIED_POINT", MessageSelector.bySql("(testValue is not null and testValue between 0 and 3)"));
        // 设置Consumer第一次启动是从哪个位置开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.setSuspendCurrentQueueTimeMillis(2000);
 
        // 注册消息监听器
        consumer.registerMessageListener(new CustomMessageListenerOrderly());
 
        // 启动消费者
        consumer.start();
        System.out.printf("Group1ConsumerTagA Started.%n");
    }

消费者2 - 业务系统2

public class consumer2 {
 
    public static void main(String[] args) throws Exception {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("a_point");
        // 指定Namesrv地址
        consumer.setNamesrvAddr("192.168.0.211:9876");
        // 订阅主题和标签
        consumer.subscribe("UNIFIED_POINT", MessageSelector.bySql("(testValue is not null and testValue between 4 and 6)"));
        // 设置Consumer第一次启动是从哪个位置开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.setSuspendCurrentQueueTimeMillis(2000);
 
        // 注册消息监听器
        consumer.registerMessageListener(new CustomMessageListenerOrderly());
 
        // 启动消费者
        consumer.start();
        System.out.printf("Group1ConsumerTagA Started.%n");
    }
}

测试结果

消费者1

消费者2

sql92 确实可以实现我的需求。

那么我把testValue换成各业务系统唯一标识,逗号拼接

把生产者调整一下

msg.putUserProperty("testValue","aaaa,bbbb,cccc");

客户端试了 in 不行

consumer.subscribe("UNIFIED_POINT", MessageSelector.bySql("(testValue is not null and 'aaaa' in (testValue))"));

like

consumer.subscribe("UNIFIED_POINT", MessageSelector.bySql("(testValue is not null and testValue LIKE '%aaaa%')"));

不行,只支持这些关键字

in 关键字 只能变量in 特定字符集合这样表达。

思路转变一下,也不是说不能实现

msg.putUserProperty("aaaa","aaaa");
msg.putUserProperty("bbbb","bbbb");
msg.putUserProperty("cccc","cccc");

生产者 setProperty 每个业务系统一个区分开来。

消费者即可实现

consumer.subscribe("UNIFIED_POINT", MessageSelector.bySql("aaaa is not null"));

尝试二:消费端逻辑处理

修改生产者

测试的点位信息加上适用系统标识(逗号分隔)

public class AddProducer {
 
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("a-group");
        producer.setNamesrvAddr("192.168.0.211:9876");
        producer.start();

        Point point1 = new Point(1, "point1","1,2");
        Point point2 = new Point(2, "point2","1,2");
        Point point3 = new Point(3, "point3","1,4");
        Point point4 = new Point(4, "point4","1,4");
        Point point5 = new Point(5, "point5","1,5");

        ArrayList<Point> list = new ArrayList<>();
        list.add(point1);
        list.add(point2);
        list.add(point3);
        list.add(point4);
        list.add(point5);

 
 
 
        try {
 
            for (Point bean : list) {
                Message msg = new Message("UNIFIED_POINT", "add", JSONUtil.toJsonStr(bean).getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg,new CustomMessageQueueSelector(),bean.getId(),new CustomSendCallback());
                System.out.println(bean.getId() + " Continue execution ");
            }

            Thread.sleep(20000);

            for (Point bean : list) {
                Message msg = new Message("UNIFIED_POINT", "update", JSONUtil.toJsonStr(bean).getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg,new CustomMessageQueueSelector(),bean.getId(),new CustomSendCallback());
                System.out.println(bean.getId() + " Continue execution ");
            }
 
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

模拟消费者一:业务系统标识为1

public class consumer {
 
    public static void main(String[] args) throws Exception {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("calc_rules_point");
        // 指定Namesrv地址
        consumer.setNamesrvAddr("192.168.0.211:9876");
        // 订阅主题和标签
        consumer.subscribe("UNIFIED_POINT", "*");
        // 设置Consumer第一次启动是从哪个位置开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.setSuspendCurrentQueueTimeMillis(2000);
 
        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                try {
                    for (MessageExt msg : list) {
                        String data = new String(msg.getBody());
                        Point p = JSONUtil.toBean(data, Point.class);

                        List<String> ids = Stream.of(p.getApplyModuleIds().split(","))
                                .map(String::trim)
                                .collect(Collectors.toList());
                        if(!ids.contains("1")){
                            //非本业务系统 直接返回
                            return ConsumeOrderlyStatus.SUCCESS;
                        }
                        if(msg.getTags().equals("add")){
                            System.out.println("新增消费:" + p + msg.getQueueId());
                        }else if(msg.getTags().equals("update")){
                            System.out.println("修改消费:" + p + msg.getQueueId());
                        }

                    }

                    return ConsumeOrderlyStatus.SUCCESS;
                } catch (Exception e) {
                    MessageExt msg = list.get(0);
                    log.error("consumer news error " + new String(msg.getBody()));
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
        });
 
        // 启动消费者
        consumer.start();
        System.out.printf("Group1ConsumerTagA Started.%n");
    }
}

模拟消费者二:业务系统标识2

public class consumer2 {
 
    public static void main(String[] args) throws Exception {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("a_point");
        // 指定Namesrv地址
        consumer.setNamesrvAddr("192.168.0.211:9876");
        // 订阅主题和标签
        consumer.subscribe("UNIFIED_POINT", "*");
        // 设置Consumer第一次启动是从哪个位置开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.setSuspendCurrentQueueTimeMillis(2000);
 
        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                try {
                    for (MessageExt msg : list) {
                        String data = new String(msg.getBody());
                        Point p = JSONUtil.toBean(data, Point.class);

                        List<String> ids = Stream.of(p.getApplyModuleIds().split(","))
                                .map(String::trim)
                                .collect(Collectors.toList());
                        if(!ids.contains("2")){
                            //非本业务系统 直接返回
                            return ConsumeOrderlyStatus.SUCCESS;
                        }
                        if(msg.getTags().equals("add")){
                            System.out.println("新增消费:" + p + msg.getQueueId());
                        }else if(msg.getTags().equals("update")){
                            System.out.println("修改消费:" + p + msg.getQueueId());
                        }

                    }

                    return ConsumeOrderlyStatus.SUCCESS;
                } catch (Exception e) {
                    MessageExt msg = list.get(0);
                    log.error("consumer news error " + new String(msg.getBody()));
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
        });
 
        // 启动消费者
        consumer.start();
        System.out.printf("Group1ConsumerTagA Started.%n");
    }
}

测试结果 - 消费者1

测试结果 - 消费者2

结论

SQL92方式:过滤在服务端,但是功能还是局限的。如果服务器性能好压力不大,且过滤方式能满足。个人任务还是可用的

消费端逻辑处理方式:略费带宽,服务器压力小。过滤在消费端


原文地址:https://blog.csdn.net/zjy660358/article/details/140158245

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!