tkcashgame_v4/pkg/tkasynq/asynq.go

229 lines
5.9 KiB
Go

package tkasynq
import (
"encoding/json"
"fmt"
"time"
"github.com/google/uuid"
"github.com/hibiken/asynq"
)
type TaskInfo struct {
// ID is the identifier of the task.
ID string
// Queue is the name of the queue in which the task belongs.
Queue string
// Type is the type name of the task.
Type string
// Payload is the payload data of the task.
Payload []byte
}
type AsynqClient struct {
client *asynq.Client
inspector *asynq.Inspector
}
const (
// Default max retry count used if nothing is specified.
defaultMaxRetry = 25
// Default timeout used if both timeout and deadline are not specified.
defaultTimeout = 30 * time.Minute
)
type OptionType int
const (
MaxRetryOpt OptionType = iota
QueueOpt
TimeoutOpt
ProcessAtOpt
ProcessInOpt
TaskIDOpt
)
type (
retryOption int
queueOption string
taskIDOption string
timeoutOption time.Duration
processAtOption time.Time
processInOption time.Duration
)
// Option specifies the task processing behavior.
type Option interface {
// String returns a string representation of the option.
String() string
// Type describes the type of the option.
Type() OptionType
// Value returns a value used to create this option.
Value() interface{}
}
// MaxRetry returns an option to specify the max number of times
// the task will be retried.
//
// Negative retry count is treated as zero retry.
func MaxRetry(n int) Option {
if n < 0 {
n = 0
}
return retryOption(n)
}
func (n retryOption) String() string { return fmt.Sprintf("MaxRetry(%d)", int(n)) }
func (n retryOption) Type() OptionType { return MaxRetryOpt }
func (n retryOption) Value() interface{} { return int(n) }
// Timeout returns an option to specify how long a task may run.
// If the timeout elapses before the Handler returns, then the task
// will be retried.
//
// Zero duration means no limit.
//
// If there's a conflicting Deadline option, whichever comes earliest
// will be used.
func Timeout(d time.Duration) Option {
return timeoutOption(d)
}
func (d timeoutOption) String() string { return fmt.Sprintf("Timeout(%v)", time.Duration(d)) }
func (d timeoutOption) Type() OptionType { return TimeoutOpt }
func (d timeoutOption) Value() interface{} { return time.Duration(d) }
// Queue returns an option to specify the queue to enqueue the task into.
func Queue(name string) Option {
return queueOption(name)
}
func (name queueOption) String() string { return fmt.Sprintf("Queue(%q)", string(name)) }
func (name queueOption) Type() OptionType { return QueueOpt }
func (name queueOption) Value() interface{} { return string(name) }
// ProcessAt returns an option to specify when to process the given task.
//
// If there's a conflicting ProcessIn option, the last option passed to Enqueue overrides the others.
func ProcessAt(t time.Time) Option {
return processAtOption(t)
}
func (t processAtOption) String() string {
return fmt.Sprintf("ProcessAt(%v)", time.Time(t).Format(time.UnixDate))
}
func (t processAtOption) Type() OptionType { return ProcessAtOpt }
func (t processAtOption) Value() interface{} { return time.Time(t) }
// ProcessIn returns an option to specify when to process the given task relative to the current time.
//
// If there's a conflicting ProcessAt option, the last option passed to Enqueue overrides the others.
func ProcessIn(d time.Duration) Option {
return processInOption(d)
}
func (d processInOption) String() string { return fmt.Sprintf("ProcessIn(%v)", time.Duration(d)) }
func (d processInOption) Type() OptionType { return ProcessInOpt }
func (d processInOption) Value() interface{} { return time.Duration(d) }
// TaskID returns an option to specify the task ID.
func TaskID(id string) Option {
return taskIDOption(id)
}
func (id taskIDOption) String() string { return fmt.Sprintf("TaskID(%q)", string(id)) }
func (id taskIDOption) Type() OptionType { return TaskIDOpt }
func (id taskIDOption) Value() interface{} { return string(id) }
// NewClient 初始化client
func NewClient(client *asynq.Client, clientOpt *asynq.RedisClientOpt) *AsynqClient {
return &AsynqClient{
client: client,
inspector: asynq.NewInspector(clientOpt),
}
}
type option struct {
retry int
queue string
timeout time.Duration
processAt time.Time
taskId string
}
func (y *AsynqClient) composeOptions(opts ...Option) option {
res := option{
retry: defaultMaxRetry,
queue: "default",
timeout: 0, // do not set to deafultTimeout here
processAt: time.Now(),
taskId: uuid.NewString(),
}
for _, opt := range opts {
switch opt := opt.(type) {
case retryOption:
res.retry = int(opt)
case queueOption:
res.queue = string(opt)
case timeoutOption:
res.timeout = time.Duration(opt)
case processAtOption:
res.processAt = time.Time(opt)
case processInOption:
res.processAt = time.Now().Add(time.Duration(opt))
case taskIDOption:
res.taskId = string(opt)
fmt.Println("here-taksId: ", res.taskId)
}
}
return res
}
// EnqueueTask 支持及时任务,支持时间点任务,多少时间后任务
func (y *AsynqClient) EnqueueTask(jobTag string, v interface{}, queue string, opts ...Option) (*TaskInfo, error) {
payload, err := json.Marshal(v)
if err != nil {
return nil, err
}
task := asynq.NewTask(jobTag, payload)
opt := y.composeOptions(opts...)
info, err := y.client.Enqueue(
task,
asynq.Queue(queue),
asynq.MaxRetry(opt.retry),
asynq.Timeout(time.Duration(opt.timeout)*time.Second),
asynq.ProcessAt(opt.processAt),
asynq.TaskID(opt.taskId),
)
if err != nil {
return nil, err
}
return &TaskInfo{
ID: info.ID,
Queue: info.Queue,
Type: info.Type,
Payload: info.Payload,
}, nil
}
// RunTask 执行对应taskId任务
func (y *AsynqClient) RunTask(queue, id string) error {
return y.inspector.RunTask(queue, id)
}
// DeleteTask 删除对应taskId任务
func (y *AsynqClient) DeleteTask(queue, id string) error {
return y.inspector.DeleteTask(queue, id)
}