tkcashgame_v4/app/eonline/internal/data/data.go

263 lines
6.5 KiB
Go

package data
import (
"context"
"encoding/json"
"fmt"
"time"
"sandc/app/eonline/internal/biz"
"sandc/app/eonline/internal/conf"
"sandc/pkg/utils/snowflake"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/middleware/recovery"
"github.com/go-kratos/kratos/v2/middleware/tracing"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/transport/grpc"
"github.com/go-redis/redis/extra/redisotel/v8"
"github.com/go-redis/redis/v8"
"github.com/google/wire"
"github.com/hibiken/asynq"
"github.com/tx7do/kratos-transport/broker"
"github.com/tx7do/kratos-transport/broker/kafka"
"github.com/uptrace/opentelemetry-go-extra/otelgorm"
semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
ggrpc "google.golang.org/grpc"
driver "gorm.io/driver/mysql"
"gorm.io/gorm"
)
// ProviderSet is data providers.
var ProviderSet = wire.NewSet(
NewData,
NewTransaction,
NewEonlineRepo,
NewAsynqClient,
NewCache,
)
// Data .
type Data struct {
log *log.Helper
redis *redis.Client
db *gorm.DB
snowflake *snowflake.Node
asynqCli *asynq.Client
// TODO just for example, don't use dispatch self in production
// ec v1.EonlineClient
}
type contextTxKey struct{}
func (d *Data) InTx(ctx context.Context, fn func(ctx context.Context) error) error {
return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
ctx = context.WithValue(ctx, contextTxKey{}, tx)
return fn(ctx)
})
}
func (d *Data) DB(ctx context.Context) *gorm.DB {
tx, ok := ctx.Value(contextTxKey{}).(*gorm.DB)
if ok {
return tx.Debug()
}
return d.db.Debug()
}
// NewAsynqTask new asynq task
func (d *Data) NewAsynqTask(taskName string, payload []byte) error {
_, err := d.asynqCli.Enqueue(asynq.NewTask(taskName, payload))
return err
}
// NewTransaction .
func NewTransaction(d *Data) biz.Transaction {
return d
}
// NewData .
func NewData(c *conf.Data, bootstrap *conf.Bootstrap, asynqCli *asynq.Client, logger log.Logger) (*Data, func(), error) {
db := newDB(c.Database.Source)
rdb := newRedis(c.Redis)
cleanup := func() {
biz.SaveSvrDataShutDown()
biz.CloseReport()
log.NewHelper(logger).Info("closing the data resources")
_ = rdb.Close()
sqlDB, _ := db.DB()
_ = sqlDB.Close()
}
data := &Data{
log: log.NewHelper(logger),
redis: rdb,
db: db,
snowflake: snowflake.GetSnowflakeNode(rdb),
asynqCli: asynqCli,
}
biz.InitSvrData(bootstrap)
// biz.InitReport(bootstrap)
return data, cleanup, nil
}
func InitData(bootstrap *conf.Bootstrap) {
biz.InitReport(bootstrap)
}
func newRedis(c *conf.Data_Redis) *redis.Client {
rdb := redis.NewClient(&redis.Options{
Network: c.Network,
Addr: c.Addr,
Password: c.Password,
DB: int(c.Db),
PoolSize: int(c.Pool),
})
rdb.AddHook(redisotel.NewTracingHook(redisotel.WithAttributes(semconv.NetPeerNameKey.String(c.Addr))))
return rdb
}
func newDB(dsn string) *gorm.DB {
if dsn == "" {
dsn = "root:@tcp(127.0.0.1:3306)/dbname?charset=utf8mb4&parseTime=True&loc=UTC"
}
db, err := gorm.Open(driver.Open(dsn), &gorm.Config{})
if err != nil {
log.Fatal("connection database failed: ", err)
}
_ = db.Use(otelgorm.NewPlugin(otelgorm.WithoutQueryVariables()))
return db
}
// NewCache .
func NewCache(d *Data) biz.Cache {
return d
}
func (r *Data) Remember(ctx context.Context, key string, second int32, fn func(ctx context.Context) (interface{}, error)) ([]byte, error) {
str, err := r.GetValue(ctx, key)
if err != nil {
value, err := fn(ctx)
if err != nil {
return nil, err
}
mByte, err := json.Marshal(value)
if err != nil {
return nil, fmt.Errorf("json Marshal cache failed, err: %w ", err)
}
return mByte, nil
}
if str == "" {
value, err := fn(ctx)
if err != nil {
return nil, err
}
var mByte []byte
if str, ok := value.(string); ok {
mByte = []byte(str)
} else {
mByte, err = json.Marshal(value)
}
if err != nil {
return nil, fmt.Errorf("json Marshal cache null, err: %w ", err)
}
r.WriteValue(ctx, key, mByte, second)
return mByte, nil
}
return []byte(str), nil
}
func (r *Data) GetValue(ctx context.Context, key string) (string, error) {
v, err := r.redis.Get(ctx, key).Result()
if err == redis.Nil {
return v, nil
} else if err != nil {
return v, err
} else {
return v, nil
}
}
func (r *Data) WriteValue(ctx context.Context, key string, value interface{}, timeout int32) error {
return r.redis.Set(ctx, key, value, time.Duration(timeout)*time.Second).Err()
}
func (r *Data) DelValue(ctx context.Context, keys ...string) error {
return r.redis.Del(ctx, keys...).Err()
}
func (r *Data) RedisLock(ctx context.Context, key string, value interface{}, timeout int32) (bool, error) {
return r.redis.SetNX(ctx, key, value, time.Duration(timeout)*time.Second).Result()
}
func (r *Data) RedisUnLock(ctx context.Context, key string) (bool, error) {
_, err := r.redis.Del(ctx, key).Result()
if err != nil {
return false, err
}
return true, nil
}
func (r *Data) IncrValue(ctx context.Context, key string) error {
return r.redis.Incr(ctx, key).Err()
}
// NewEonlineClient new eonline rpc client
// func NewEonlineClient(r registry.Discovery) v1.EonlineClient {
// return NewRPCClient[v1.EonlineClient](r, "eonline.rpc", v1.NewEonlineClient)
// }
// NewRPCClient new one rpc client by discovery.
func NewRPCClient[T any](r registry.Discovery, rpcName string, fn func(cc ggrpc.ClientConnInterface) T) T {
conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("discovery:///"+rpcName),
grpc.WithDiscovery(r),
grpc.WithMiddleware(
tracing.Client(),
recovery.Recovery(),
),
)
if err != nil {
log.Fatal(err)
}
return fn(conn)
}
// NewKafkaBroker new kafka broker.
func NewKafkaBroker(confQueue *conf.Queue) broker.Broker {
opts := []broker.Option{
broker.WithAddress(confQueue.Kafka.Addrs...),
broker.WithCodec("json"),
broker.WithGlobalTracerProvider(),
}
if confQueue.Kafka.Username != "" && confQueue.Kafka.Password != "" {
opts = append(opts, kafka.WithPlainMechanism(confQueue.Kafka.Username, confQueue.Kafka.Password))
}
kafkaBroker := kafka.NewBroker(opts...)
_ = kafkaBroker.Init()
return kafkaBroker
}
// NewAsynqClient new asynq client.
func NewAsynqClient(confQueue *conf.Queue) *asynq.Client {
return asynq.NewClient(asynq.RedisClientOpt{
Addr: confQueue.Asynq.Addr,
DB: int(confQueue.Asynq.Db),
Password: confQueue.Asynq.Password,
ReadTimeout: confQueue.Asynq.ReadTimeout.AsDuration(),
WriteTimeout: confQueue.Asynq.WriteTimeout.AsDuration(),
PoolSize: int(confQueue.Asynq.Pool),
})
}