个人技术分享

背景

有一个需求是这样的,前端需要通过http请求的form-data上传图片文件,后端接收图片后调用AI接口执行命令,由于命令执行时间较长,需要持续返回当前任务在全局任务列表中的位置,以便前端即时更新排队信息。

思考

如果直接在gin的请求处理函数中开goroutine,并在goroutine中通过类似c.JSON的方法来返回响应,前端会无法收到响应。原因是当处理函数返回时,Gin 会自动关闭 HTTP 请求的响应,这意味着在处理函数返回后,后台的协程无法再向响应中写入数据,从而导致客户端收不到信息。(不用goroutine也不行,因为持续返回响应涉及到使用for循环,会阻塞通道,阻塞主协程,仍然无法返回响应),使用gin配合websocket解决。

于是解决方案就是,使用gin的处理函数接收图片后,把当前任务的id作为响应返回给前端。然后前端再用当前任务的id建立websocket请求,在websocket请求中,通常不会自动关闭,除非显式地由客户端或服务器关闭,所以就可以在ws的处理函数中开goroutine来持续返回响应。

func WsHandleConnect(s *melody.Session) {
	TaskId, _ := s.Get("task_id")
	// 将 TaskId 转换为字符串并解析为整数
	taskIdStr, _ := TaskId.(string)
	taskId, err := strconv.Atoi(taskIdStr)
	if err != nil {
		sendJSONResponse(s, Res{
			Code: 4000,
			Msg:  "Invalid Task ID",
			Data: nil,
		})
		s.Close()
		return
	}
	var task *model.Task
	// 遍历 TaskQueue 查找对应的任务
	for _, t := range global.TaskQueue {
		if t.ID == taskId {
			task = t
			break
		}
	}
	if task == nil {
		sendJSONResponse(s, Res{
			Code: 4000,
			Msg:  "Task ID not provided",
			Data: nil,
		})
		s.Close()
		return
	}
	logger.Log.Error("task:", task.ID)
	go func() {
		for {
			select {
			case result := <-task.Response:

				sendJSONResponse(s, Res{
					Code: global.SuccessCode,
					Msg:  result,
					Data: nil,
				})
				s.Close()
				return
			default:
				time.Sleep(3 * time.Second)
				if len(global.TaskQueue) == 0 {
					sendJSONResponse(s, Res{
						Code: global.SuccessCode,
						Msg:  "Task failed",
						Data: nil,
					})
					s.Close()
					return
				}
				position := service.FindTaskPosition(task.ID)
				sendJSONResponse(s, Res{
					Code: global.SuccessCode,
					Data: position,
				})

			}
		}
	}()
}