229 lines
5.9 KiB
Go
229 lines
5.9 KiB
Go
|
|
package tkasynq
|
||
|
|
|
||
|
|
import (
|
||
|
|
"encoding/json"
|
||
|
|
"fmt"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"github.com/google/uuid"
|
||
|
|
"github.com/hibiken/asynq"
|
||
|
|
)
|
||
|
|
|
||
|
|
type TaskInfo struct {
|
||
|
|
// ID is the identifier of the task.
|
||
|
|
ID string
|
||
|
|
|
||
|
|
// Queue is the name of the queue in which the task belongs.
|
||
|
|
Queue string
|
||
|
|
|
||
|
|
// Type is the type name of the task.
|
||
|
|
Type string
|
||
|
|
|
||
|
|
// Payload is the payload data of the task.
|
||
|
|
Payload []byte
|
||
|
|
}
|
||
|
|
type AsynqClient struct {
|
||
|
|
client *asynq.Client
|
||
|
|
inspector *asynq.Inspector
|
||
|
|
}
|
||
|
|
|
||
|
|
const (
|
||
|
|
// Default max retry count used if nothing is specified.
|
||
|
|
defaultMaxRetry = 25
|
||
|
|
|
||
|
|
// Default timeout used if both timeout and deadline are not specified.
|
||
|
|
defaultTimeout = 30 * time.Minute
|
||
|
|
)
|
||
|
|
|
||
|
|
type OptionType int
|
||
|
|
|
||
|
|
const (
|
||
|
|
MaxRetryOpt OptionType = iota
|
||
|
|
QueueOpt
|
||
|
|
TimeoutOpt
|
||
|
|
ProcessAtOpt
|
||
|
|
ProcessInOpt
|
||
|
|
TaskIDOpt
|
||
|
|
)
|
||
|
|
|
||
|
|
type (
|
||
|
|
retryOption int
|
||
|
|
queueOption string
|
||
|
|
taskIDOption string
|
||
|
|
timeoutOption time.Duration
|
||
|
|
processAtOption time.Time
|
||
|
|
processInOption time.Duration
|
||
|
|
)
|
||
|
|
|
||
|
|
// Option specifies the task processing behavior.
|
||
|
|
type Option interface {
|
||
|
|
// String returns a string representation of the option.
|
||
|
|
String() string
|
||
|
|
|
||
|
|
// Type describes the type of the option.
|
||
|
|
Type() OptionType
|
||
|
|
|
||
|
|
// Value returns a value used to create this option.
|
||
|
|
Value() interface{}
|
||
|
|
}
|
||
|
|
|
||
|
|
// MaxRetry returns an option to specify the max number of times
|
||
|
|
// the task will be retried.
|
||
|
|
//
|
||
|
|
// Negative retry count is treated as zero retry.
|
||
|
|
func MaxRetry(n int) Option {
|
||
|
|
if n < 0 {
|
||
|
|
n = 0
|
||
|
|
}
|
||
|
|
return retryOption(n)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (n retryOption) String() string { return fmt.Sprintf("MaxRetry(%d)", int(n)) }
|
||
|
|
func (n retryOption) Type() OptionType { return MaxRetryOpt }
|
||
|
|
func (n retryOption) Value() interface{} { return int(n) }
|
||
|
|
|
||
|
|
// Timeout returns an option to specify how long a task may run.
|
||
|
|
// If the timeout elapses before the Handler returns, then the task
|
||
|
|
// will be retried.
|
||
|
|
//
|
||
|
|
// Zero duration means no limit.
|
||
|
|
//
|
||
|
|
// If there's a conflicting Deadline option, whichever comes earliest
|
||
|
|
// will be used.
|
||
|
|
func Timeout(d time.Duration) Option {
|
||
|
|
return timeoutOption(d)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (d timeoutOption) String() string { return fmt.Sprintf("Timeout(%v)", time.Duration(d)) }
|
||
|
|
func (d timeoutOption) Type() OptionType { return TimeoutOpt }
|
||
|
|
func (d timeoutOption) Value() interface{} { return time.Duration(d) }
|
||
|
|
|
||
|
|
// Queue returns an option to specify the queue to enqueue the task into.
|
||
|
|
func Queue(name string) Option {
|
||
|
|
return queueOption(name)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (name queueOption) String() string { return fmt.Sprintf("Queue(%q)", string(name)) }
|
||
|
|
func (name queueOption) Type() OptionType { return QueueOpt }
|
||
|
|
func (name queueOption) Value() interface{} { return string(name) }
|
||
|
|
|
||
|
|
// ProcessAt returns an option to specify when to process the given task.
|
||
|
|
//
|
||
|
|
// If there's a conflicting ProcessIn option, the last option passed to Enqueue overrides the others.
|
||
|
|
func ProcessAt(t time.Time) Option {
|
||
|
|
return processAtOption(t)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (t processAtOption) String() string {
|
||
|
|
return fmt.Sprintf("ProcessAt(%v)", time.Time(t).Format(time.UnixDate))
|
||
|
|
}
|
||
|
|
func (t processAtOption) Type() OptionType { return ProcessAtOpt }
|
||
|
|
func (t processAtOption) Value() interface{} { return time.Time(t) }
|
||
|
|
|
||
|
|
// ProcessIn returns an option to specify when to process the given task relative to the current time.
|
||
|
|
//
|
||
|
|
// If there's a conflicting ProcessAt option, the last option passed to Enqueue overrides the others.
|
||
|
|
func ProcessIn(d time.Duration) Option {
|
||
|
|
return processInOption(d)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (d processInOption) String() string { return fmt.Sprintf("ProcessIn(%v)", time.Duration(d)) }
|
||
|
|
func (d processInOption) Type() OptionType { return ProcessInOpt }
|
||
|
|
func (d processInOption) Value() interface{} { return time.Duration(d) }
|
||
|
|
|
||
|
|
// TaskID returns an option to specify the task ID.
|
||
|
|
func TaskID(id string) Option {
|
||
|
|
return taskIDOption(id)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (id taskIDOption) String() string { return fmt.Sprintf("TaskID(%q)", string(id)) }
|
||
|
|
func (id taskIDOption) Type() OptionType { return TaskIDOpt }
|
||
|
|
func (id taskIDOption) Value() interface{} { return string(id) }
|
||
|
|
|
||
|
|
// NewClient 初始化client
|
||
|
|
func NewClient(client *asynq.Client, clientOpt *asynq.RedisClientOpt) *AsynqClient {
|
||
|
|
return &AsynqClient{
|
||
|
|
client: client,
|
||
|
|
inspector: asynq.NewInspector(clientOpt),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
type option struct {
|
||
|
|
retry int
|
||
|
|
queue string
|
||
|
|
timeout time.Duration
|
||
|
|
processAt time.Time
|
||
|
|
taskId string
|
||
|
|
}
|
||
|
|
|
||
|
|
func (y *AsynqClient) composeOptions(opts ...Option) option {
|
||
|
|
res := option{
|
||
|
|
retry: defaultMaxRetry,
|
||
|
|
queue: "default",
|
||
|
|
timeout: 0, // do not set to deafultTimeout here
|
||
|
|
processAt: time.Now(),
|
||
|
|
taskId: uuid.NewString(),
|
||
|
|
}
|
||
|
|
|
||
|
|
for _, opt := range opts {
|
||
|
|
switch opt := opt.(type) {
|
||
|
|
case retryOption:
|
||
|
|
res.retry = int(opt)
|
||
|
|
case queueOption:
|
||
|
|
res.queue = string(opt)
|
||
|
|
case timeoutOption:
|
||
|
|
res.timeout = time.Duration(opt)
|
||
|
|
case processAtOption:
|
||
|
|
res.processAt = time.Time(opt)
|
||
|
|
case processInOption:
|
||
|
|
res.processAt = time.Now().Add(time.Duration(opt))
|
||
|
|
case taskIDOption:
|
||
|
|
res.taskId = string(opt)
|
||
|
|
fmt.Println("here-taksId: ", res.taskId)
|
||
|
|
}
|
||
|
|
|
||
|
|
}
|
||
|
|
return res
|
||
|
|
|
||
|
|
}
|
||
|
|
|
||
|
|
// EnqueueTask 支持及时任务,支持时间点任务,多少时间后任务
|
||
|
|
func (y *AsynqClient) EnqueueTask(jobTag string, v interface{}, queue string, opts ...Option) (*TaskInfo, error) {
|
||
|
|
payload, err := json.Marshal(v)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
|
||
|
|
task := asynq.NewTask(jobTag, payload)
|
||
|
|
opt := y.composeOptions(opts...)
|
||
|
|
info, err := y.client.Enqueue(
|
||
|
|
task,
|
||
|
|
asynq.Queue(queue),
|
||
|
|
asynq.MaxRetry(opt.retry),
|
||
|
|
asynq.Timeout(time.Duration(opt.timeout)*time.Second),
|
||
|
|
asynq.ProcessAt(opt.processAt),
|
||
|
|
asynq.TaskID(opt.taskId),
|
||
|
|
)
|
||
|
|
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
|
||
|
|
return &TaskInfo{
|
||
|
|
ID: info.ID,
|
||
|
|
Queue: info.Queue,
|
||
|
|
Type: info.Type,
|
||
|
|
Payload: info.Payload,
|
||
|
|
}, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// RunTask 执行对应taskId任务
|
||
|
|
func (y *AsynqClient) RunTask(queue, id string) error {
|
||
|
|
return y.inspector.RunTask(queue, id)
|
||
|
|
}
|
||
|
|
|
||
|
|
// DeleteTask 删除对应taskId任务
|
||
|
|
func (y *AsynqClient) DeleteTask(queue, id string) error {
|
||
|
|
return y.inspector.DeleteTask(queue, id)
|
||
|
|
}
|