自学内容网 自学内容网

记录一次gRpc流式操作(jedis版)

使用背景: 从redis队列中发送和消费消息.(使用gRpc的流式实现的消费消息)

gRpc协议类定义

service方法定义
service MQDataService{
rpc sendFacebookAndroidMsg(google.protobuf.StringValue)returns (ResultProto);
rpc receiveFacebookAndroidMsg(empty)returns (stream google.protobuf.StringValue);
}

服务端写法


    @Override
    public void sendFacebookAndroidMsg(StringValue request, StreamObserver<ResultProto> responseObserver) {
        CacheKey cacheKey= AppKey.appReport;
        String key=cacheKey.get_keyName().replace("{PLATFORM}", MqTopic.FB_TOPIC)
                .replace("{APPTYPE}", "0");
        RedissonFactory.pushMsg(key, request.getValue(), cacheKey.get_dbIndex(),cacheKey.get_expireSecondTime());
        ResultProto.Builder builder = ResultProto.newBuilder();
        builder.setCode(ResultType.SUCCESS);
        responseObserver.onNext(builder.build());
        responseObserver.onCompleted();
    }
    
    @Override
    public void receiveFacebookAndroidMsg(empty request, StreamObserver<StringValue> responseObserver) {
        MQListener mqListener=new MQListener(responseObserver);
        try {
            CacheKey cacheKey= AppKey.appReport;
            String key=cacheKey.get_keyName().replace("{PLATFORM}", MqTopic.FB_TOPIC)
                    .replace("{APPTYPE}","0");

            RedissonFactory.getRedis().subscribe(mqListener,key);
        } catch (Exception e) {

        }
        finally {
            responseObserver.onCompleted();
        }
    }

// 消息监听响应
public class MQListener extends JedisPubSub {

    public MQListener(StreamObserver<StringValue> responseObserver)
    {
        _responseObserver=responseObserver;
    }

    private StreamObserver<StringValue> _responseObserver;
    // 取得订阅的消息后的处理
    public void onMessage(String channel, String message) {
        if(!StringUtil.isNullOrEmpty(message)){
            StringValue.Builder builder = StringValue.newBuilder();
            builder.setValue(message);
            _responseObserver.onNext(builder.build());
        }
    }

    // 初始化订阅时候的处理
    public void onSubscribe(String channel, int subscribedChannels) {
    ...
    }

    // 取消订阅时候的处理
    public void onUnsubscribe(String channel, int subscribedChannels) {
     ...
    }

    // 初始化按表达式的方式订阅时候的处理
    public void onPSubscribe(String pattern, int subscribedChannels) {
       ...
    }

    // 取消按表达式的方式订阅时候的处理
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
       ...
    }

    // 取得按表达式的方式订阅的消息后的处理
    public void onPMessage(String pattern, String channel, String message) {
     ...
    }
}

客户端写法

public static void receiveFacebookAndroidMsg() {
        try {
            log.info("facebook android msg");
            // 接收消息
            StreamObserver<StringValue> responseObserver = new StreamObserver<StringValue>() {
                @Override
                public void onNext(StringValue msgProto) {
                    try {
                        log.info("facebook android msg 接收到消息: {}", msgProto.getValue());
                        JSONObject jsonObject = JSONObject.parseObject(msgProto.getValue());
                        ...
                    } catch (Exception e) {
                        log.error("facebook ios msg 消费失败{}", e.getMessage());
                        // 发给mq重新消费
                       ...
                    }
                }

                @Override
                public void onError(Throwable throwable) {
                    System.err.println("Error occurred: " + throwable.getMessage());
                    log.info("facebook android Error occurred: {}", throwable.getMessage());
                }

                @Override
                public void onCompleted() {
                    System.out.println("Stream completed.");
                    log.info("facebook android Stream completed.");
                }
            };
            log.info("接收fb android msg 开始");
            ClientManager.getMqDataServiceStub().receiveFacebookAndroidMsg(empty.newBuilder().build(), responseObserver);
            log.info("接收fb android msg 成功");
        } catch (Exception e) {
            log.info("出错了");
        }
    }

源码下载


原文地址:https://blog.csdn.net/laozengsky/article/details/142656685

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!