package data import ( "context" "encoding/json" "fmt" "time" "sandc/app/eonline/internal/biz" "sandc/app/eonline/internal/conf" "sandc/pkg/utils/snowflake" "github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/middleware/recovery" "github.com/go-kratos/kratos/v2/middleware/tracing" "github.com/go-kratos/kratos/v2/registry" "github.com/go-kratos/kratos/v2/transport/grpc" "github.com/go-redis/redis/extra/redisotel/v8" "github.com/go-redis/redis/v8" "github.com/google/wire" "github.com/hibiken/asynq" "github.com/tx7do/kratos-transport/broker" "github.com/tx7do/kratos-transport/broker/kafka" "github.com/uptrace/opentelemetry-go-extra/otelgorm" semconv "go.opentelemetry.io/otel/semconv/v1.7.0" ggrpc "google.golang.org/grpc" driver "gorm.io/driver/mysql" "gorm.io/gorm" ) // ProviderSet is data providers. var ProviderSet = wire.NewSet( NewData, NewTransaction, NewEonlineRepo, NewAsynqClient, NewCache, ) // Data . type Data struct { log *log.Helper redis *redis.Client db *gorm.DB snowflake *snowflake.Node asynqCli *asynq.Client // TODO just for example, don't use dispatch self in production // ec v1.EonlineClient } type contextTxKey struct{} func (d *Data) InTx(ctx context.Context, fn func(ctx context.Context) error) error { return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { ctx = context.WithValue(ctx, contextTxKey{}, tx) return fn(ctx) }) } func (d *Data) DB(ctx context.Context) *gorm.DB { tx, ok := ctx.Value(contextTxKey{}).(*gorm.DB) if ok { return tx.Debug() } return d.db.Debug() } // NewAsynqTask new asynq task func (d *Data) NewAsynqTask(taskName string, payload []byte) error { _, err := d.asynqCli.Enqueue(asynq.NewTask(taskName, payload)) return err } // NewTransaction . func NewTransaction(d *Data) biz.Transaction { return d } // NewData . func NewData(c *conf.Data, bootstrap *conf.Bootstrap, asynqCli *asynq.Client, logger log.Logger) (*Data, func(), error) { db := newDB(c.Database.Source) rdb := newRedis(c.Redis) cleanup := func() { biz.SaveSvrDataShutDown() biz.CloseReport() log.NewHelper(logger).Info("closing the data resources") _ = rdb.Close() sqlDB, _ := db.DB() _ = sqlDB.Close() } data := &Data{ log: log.NewHelper(logger), redis: rdb, db: db, snowflake: snowflake.GetSnowflakeNode(rdb), asynqCli: asynqCli, } biz.InitSvrData(bootstrap) // biz.InitReport(bootstrap) return data, cleanup, nil } func InitData(bootstrap *conf.Bootstrap) { biz.InitReport(bootstrap) } func newRedis(c *conf.Data_Redis) *redis.Client { rdb := redis.NewClient(&redis.Options{ Network: c.Network, Addr: c.Addr, Password: c.Password, DB: int(c.Db), PoolSize: int(c.Pool), }) rdb.AddHook(redisotel.NewTracingHook(redisotel.WithAttributes(semconv.NetPeerNameKey.String(c.Addr)))) return rdb } func newDB(dsn string) *gorm.DB { if dsn == "" { dsn = "root:@tcp(127.0.0.1:3306)/dbname?charset=utf8mb4&parseTime=True&loc=UTC" } db, err := gorm.Open(driver.Open(dsn), &gorm.Config{}) if err != nil { log.Fatal("connection database failed: ", err) } _ = db.Use(otelgorm.NewPlugin(otelgorm.WithoutQueryVariables())) return db } // NewCache . func NewCache(d *Data) biz.Cache { return d } func (r *Data) Remember(ctx context.Context, key string, second int32, fn func(ctx context.Context) (interface{}, error)) ([]byte, error) { str, err := r.GetValue(ctx, key) if err != nil { value, err := fn(ctx) if err != nil { return nil, err } mByte, err := json.Marshal(value) if err != nil { return nil, fmt.Errorf("json Marshal cache failed, err: %w ", err) } return mByte, nil } if str == "" { value, err := fn(ctx) if err != nil { return nil, err } var mByte []byte if str, ok := value.(string); ok { mByte = []byte(str) } else { mByte, err = json.Marshal(value) } if err != nil { return nil, fmt.Errorf("json Marshal cache null, err: %w ", err) } r.WriteValue(ctx, key, mByte, second) return mByte, nil } return []byte(str), nil } func (r *Data) GetValue(ctx context.Context, key string) (string, error) { v, err := r.redis.Get(ctx, key).Result() if err == redis.Nil { return v, nil } else if err != nil { return v, err } else { return v, nil } } func (r *Data) WriteValue(ctx context.Context, key string, value interface{}, timeout int32) error { return r.redis.Set(ctx, key, value, time.Duration(timeout)*time.Second).Err() } func (r *Data) DelValue(ctx context.Context, keys ...string) error { return r.redis.Del(ctx, keys...).Err() } func (r *Data) RedisLock(ctx context.Context, key string, value interface{}, timeout int32) (bool, error) { return r.redis.SetNX(ctx, key, value, time.Duration(timeout)*time.Second).Result() } func (r *Data) RedisUnLock(ctx context.Context, key string) (bool, error) { _, err := r.redis.Del(ctx, key).Result() if err != nil { return false, err } return true, nil } func (r *Data) IncrValue(ctx context.Context, key string) error { return r.redis.Incr(ctx, key).Err() } // NewEonlineClient new eonline rpc client // func NewEonlineClient(r registry.Discovery) v1.EonlineClient { // return NewRPCClient[v1.EonlineClient](r, "eonline.rpc", v1.NewEonlineClient) // } // NewRPCClient new one rpc client by discovery. func NewRPCClient[T any](r registry.Discovery, rpcName string, fn func(cc ggrpc.ClientConnInterface) T) T { conn, err := grpc.DialInsecure( context.Background(), grpc.WithEndpoint("discovery:///"+rpcName), grpc.WithDiscovery(r), grpc.WithMiddleware( tracing.Client(), recovery.Recovery(), ), ) if err != nil { log.Fatal(err) } return fn(conn) } // NewKafkaBroker new kafka broker. func NewKafkaBroker(confQueue *conf.Queue) broker.Broker { opts := []broker.Option{ broker.WithAddress(confQueue.Kafka.Addrs...), broker.WithCodec("json"), broker.WithGlobalTracerProvider(), } if confQueue.Kafka.Username != "" && confQueue.Kafka.Password != "" { opts = append(opts, kafka.WithPlainMechanism(confQueue.Kafka.Username, confQueue.Kafka.Password)) } kafkaBroker := kafka.NewBroker(opts...) _ = kafkaBroker.Init() return kafkaBroker } // NewAsynqClient new asynq client. func NewAsynqClient(confQueue *conf.Queue) *asynq.Client { return asynq.NewClient(asynq.RedisClientOpt{ Addr: confQueue.Asynq.Addr, DB: int(confQueue.Asynq.Db), Password: confQueue.Asynq.Password, ReadTimeout: confQueue.Asynq.ReadTimeout.AsDuration(), WriteTimeout: confQueue.Asynq.WriteTimeout.AsDuration(), PoolSize: int(confQueue.Asynq.Pool), }) }