自学内容网 自学内容网

go的实践

用goroutine来替代mq做异步的应用

方法在创建ai任务接口中用协程的方式异步调用go s.handleResultPolling(ctx, algorithm, taskId, iAiHandle),来更新ai任务的状态

心跳

心跳机制是一种用于检测连接或任务是否仍处于活动状态的方法。在分布式系统或网络通信中,心跳机制可以用来监视节点和服务的可用性、性能和故障。心跳通常是通过定期发送小的数据包或信号来实现的,然后接收方会对这些信号进行响应,以表示它们仍然在线并正常运行。
在这段代码中,心跳机制用于监控AI任务的进行情况。每隔2秒,会将当前时间戳写入Redis缓存,作为心跳信号。这样,外部监控系统可以检查Redis缓存中的心跳信号来判断任务是否仍在进行。如果在一定时间内没有收到新的心跳信号,那么可以认为任务已经中止或出现故障。

contenx的超时设置

context没有默认的超时时间,如果要设置超时时间的话,记得调用

ctxTimeout, timeoutCancel := context.WithTimeout(ctx, time.Hour)
defer timeoutCancel() // 操作完成时立即释放资源

定时器

在这段代码中,time.NewTicker(time.Millisecond)创建的定时器最初每隔1毫秒触发一次,ticker.Reset(time.Second * 2)的作用是重置定时器(ticker)的时间间隔为2s,这样能每2s从通道中取到信息case <-ticker.C:

break label

breakFor是一个标签,它用于在嵌套循环或选择语句中提供更精确的控制。在这个例子中,breakFor标签用于跳出外层的for循环。 在Go语言中,break语句默认只跳出当前层次的循环或选择语句。在这个例子中,当ctxTimeout.Done()通道接收到一个值或者满足其他条件时,我们希望跳出整个for循环,而不仅仅是select语句。通过在for循环前添加breakFor标签,并在break`语句中指定该标签,我们可以实现这个功能。

核心代码

// handleCreateAsync 启动协程处理结果
func (s *aiHandle) handleCreateAsync(ctx context.Context, algorithm string, taskId string, data interface{}, iAiHandle IAiHandle) {
// 上报神策
properties := map[string]interface{}{
"uid": util.GetCtx(ctx).User.WsId,
"alg": algorithm,
}
sensorsdata.Track(gtrace.GetTraceID(ctx), "SeAlgCreate", properties, util.GetCtx(ctx).User.WsId != "", "")

// 创建任务
s.createDbSave(ctx, taskId, algorithm, data)

// 任务数量+1
s.handleCreateActive(ctx, algorithm, true)

// 异步处理结果
go s.handleResultPolling(ctx, algorithm, taskId, iAiHandle)
}
// handleCreateAsync 启动协程处理结果
func (s *aiHandle) handleCreateAsync(ctx context.Context, algorithm string, taskId string, data interface{}, iAiHandle IAiHandle) {
   // 上报神策
   properties := map[string]interface{}{
      "uid": util.GetCtx(ctx).User.WsId,
      "alg": algorithm,
   }
   sensorsdata.Track(gtrace.GetTraceID(ctx), "SeAlgCreate", properties, util.GetCtx(ctx).User.WsId != "", "")

   // 创建任务
   s.createDbSave(ctx, taskId, algorithm, data)

   // 任务数量+1
   s.handleCreateActive(ctx, algorithm, true)

   // 异步处理结果
   go s.handleResultPolling(ctx, algorithm, taskId, iAiHandle)
}

// handleResultPolling 轮询结果
func (s *aiHandle) handleResultPolling(ctx context.Context, algorithm, taskId string, iAiHandle IAiHandle) {
   // 60分钟超时
   // 1. 首先,设置一个60分钟的超时上下文,确保处理不会无限期地进行下去。
   ctxTimeout, timeoutCancel := context.WithTimeout(ctx, time.Hour)
   defer timeoutCancel() // 超时前调用这个,用来释放资源;不调用这个方法的话,会在超时时间释放资源

   // 2. 创建一个心跳goroutine,每隔2秒更新一次心跳缓存。这样可以在外部监控任务的进行情况。
   // 启动心跳,2秒一次
   go func(ctx context.Context, taskId string) {
      ticker := time.NewTicker(time.Millisecond)
      defer ticker.Stop()

      heartbeatKey := aiCache.GetAiTaskHeartBeat(taskId)
      for {
         select {
         case <-ctx.Done():
            return
         case <-ticker.C:
            ticker.Reset(time.Second * 2)
            if cache.RedisExists(ctx, heartbeatKey) == false {
               _ = cache.RedisSet(ctx, heartbeatKey, time.Now().Unix(), time.Hour*24)
            }
            _ = cache.RedisIncBy(ctx, heartbeatKey, 2)
         }
      }
   }(ctxTimeout, taskId)

   // 3. 初始化任务结果的状态和数据变量。
   // 任务结果
   taskEnd := false
   taskStatus := consts.TaskStatusProcessing
   taskMsg := ""
   var taskData any

   // 4. 设置一个定时器,每隔3秒执行一次轮询查询结果。
   // 每3秒重试一次
   step := time.NewTimer(time.Millisecond)
   defer step.Stop()

   // 查询失败时,重试5次
   retry := 1

   //5. 使用`breakFor`标签和`select`语句进行轮询操作,当超时或查询到结果时跳出循环。
   //- 调用`iAiHandle.ResultHandle`方法查询任务结果。
   //- 如果查询失败,重试5次。
   //- 如果任务状态不是处理中或等待中,则更新任务状态和数据,设置`taskEnd`为`true`,跳出循环。

   // 轮询结果
breakFor:
   for {
      select {
      case <-ctxTimeout.Done():
         break breakFor
      case <-step.C:
         step.Reset(time.Second * 3)
         // 查结果
         taskResult, err := iAiHandle.ResultHandle(ctx, &v1.ResultReq{
            Algorithm: algorithm,
            TaskId:    taskId,
         })
         taskData = taskResult
         if err != nil {
            if retry > 5 {
               taskStatus = consts.TaskStatusServerFail
               taskMsg = "handle result err retry than 5 times"
               break breakFor
            }
            retry++
         } else {
            // 状态不一致则返回
            if taskResult != nil && taskResult.Status != consts.TaskStatusProcessing && taskResult.Status != consts.TaskStatusWaiting {
               taskStatus = taskResult.Status
               taskMsg = taskResult.Reason
               taskEnd = true
               break breakFor
            }
         }
      }
   }

   // 程序处理超时
   if taskEnd == false {
      taskStatus = consts.TaskStatusServerOutTime
      taskMsg = "handle result than 60 minutes"
   }
   // 7. 更新任务计数器。
   // 任务数量-1
   s.handleCreateActive(ctx, algorithm, false)

   // 8. 保存任务结果到数据库。
   // 更数任务
   s.resultDbSave(ctx, taskId, taskStatus, taskMsg, taskData)

}

原文地址:https://blog.csdn.net/Mengbabe_2018/article/details/137968365

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!