自学内容网 自学内容网

webflux版定时任务实现方案

通常定时任务我们一般采用spring注解@EnableScheduling来启动,但如何与webflux响应式代码结合实现定时任务呢?下面给出了一个企业内使用的真实案例,希望能帮到你。

@Component
@EnableScheduling
@Slf4j
public class TestTask {
 
    @Resource
    private ReactiveStringRedisTemplate redisTemplate; 

    private final AtomicBoolean isRunning=new AtomicBoolean(false); 
    private final Sinks.Many<MapRecord<String, Object, Object>> sinks = Sinks.many().unicast().onBackpressureBuffer();

    @PostConstruct
    public void init(){
        //初始化时订阅sinks,等待有数据流进来,这里并发执行流中数据
        sinks.asFlux().onErrorResume(err->{
            log.error("出现异常",err);
            return Mono.empty();
        }).parallel().runOn(Schedulers.boundedElastic()).flatMap(record->{
            String orderNo=(String) record.getValue().getOrDefault("orderNo","");
            if(orderNo.isEmpty()){
                return Mono.empty();
            }else{
                //添加业务处理逻辑,处理成功后删除队列中数据即可完成任务调度逻辑
            } 
            }).onErrorResume(err->{
                log.error("orderNo={} 异常",orderNo,err);
                return Mono.empty();
            });
        }).subscribe();
    }
 
    @Scheduled(cron = "0/1 * * * * ?")
    public void timer() {
        //扫描要处理的数据发送到sinks,sinks连接下游处理器源源不断进行处理。
        //定时任务每秒执行一次,如果遇到前一个调度任务没有完成则等待,防止重复调度
        if (!isRunning.getAndSet(true)) {
            doTask().doFinally((v)->{
                isRunning.set(false);
            }).subscribe();
        }
    }
    public Mono<Void> doTask() {
        //每次从redis队列中读取100条要处理的数据发送给sinks
        Consumer consumer = Consumer.from("mygroup","myconsumer");
        return redisTemplate.opsForStream().read(consumer, StreamReadOptions.empty().count(100),
                StreamOffset.create("mystream", ReadOffset.lastConsumed()))
                .flatMap(record->{
            try {
                 sinks.tryEmitNext(record);
            }catch (Exception e){
                log.error("发送数据到sinks异常:{}",record.getValue(),e);
            }
            return Mono.empty();
        }).then();
    }
}


原文地址:https://blog.csdn.net/zhuguoli200851/article/details/144772955

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!