我有一些悟了。
这是我依托 chatgpt 生成的目录结构。
这样就可以实现,从 service 和 task 内部对任务的调用。
下一个环节就是对调用的抽离,支持所有的 task 。( HandleTask )主要是这个方法。
package main
import (
"context"
"fmt"
"log"
"time"
"
github.com/hibiken/asynq"
)
// 定义一个任务类型
const TaskType = "task:example"
// 定义一个通用的 TaskEnqueuer 结构体
type TaskEnqueuer struct {
Client *asynq.Client
}
// 公共的 EnqueueTask 方法
func (te *TaskEnqueuer) EnqueueTask(taskType string, payload interface{}, delay time.Duration) error {
task := asynq.NewTask(taskType, asynq.PayloadFrom(payload))
_, err := te.Client.Enqueue(task, asynq.ProcessIn(delay))
return err
}
// TaskHandler 结构体,现在包含一个 TaskEnqueuer
type TaskHandler struct {
Enqueuer *TaskEnqueuer
}
// HandleTask 方法,用于处理任务
func (h *TaskHandler) HandleTask(ctx context.Context, task *asynq.Task) error {
var depth int
err := task.Payload().Unmarshal(&depth)
if err != nil {
return err
}
fmt.Printf("Executing task, Depth: %d\n", depth)
if depth > 0 {
// 调用公共的 EnqueueTask 方法,递归调用自身
return h.Enqueuer.EnqueueTask(TaskType, depth-1, 1*time.Second)
}
return nil
}
// NewTaskHandler 工厂函数,用于初始化 TaskHandler 和 TaskEnqueuer
func NewTaskHandler(redisAddr string) (*TaskHandler, *asynq.Server) {
r := asynq.RedisClientOpt{Addr: redisAddr}
client := asynq.NewClient(r)
enqueuer := &TaskEnqueuer{Client: client}
server := asynq.NewServer(r, asynq.Config{
Concurrency: 10,
})
return &TaskHandler{Enqueuer: enqueuer}, server
}
// SetupAndRunServer 函数用于设置和启动服务器
func SetupAndRunServer(server *asynq.Server, handler *TaskHandler) {
mux := asynq.NewServeMux()
mux.Handle(TaskType, asynq.HandlerFunc(handler.HandleTask))
if err :=
server.Run(mux); err != nil {
log.Fatalf("could not run server: %v", err)
}
}
// main 函数作为程序入口
func main() {
redisAddr := "127.0.0.1:6379"
handler, server := NewTaskHandler(redisAddr)
defer handler.Enqueuer.Client.Close()
// 初始化任务并加入队列
err := handler.Enqueuer.EnqueueTask(TaskType, 3, 0) // 递归深度为 3 ,立即执行
if err != nil {
log.Fatalf("could not enqueue task: %v", err)
}
// 启动服务器处理任务
SetupAndRunServer(server, handler)
}