diff --git a/user/main.go b/user/main.go index 4075427..a929446 100644 --- a/user/main.go +++ b/user/main.go @@ -1,61 +1,22 @@ package main import ( - "time" - "github.com/micro/micro/v3/service" - "github.com/micro/micro/v3/service/config" "github.com/micro/micro/v3/service/logger" "github.com/micro/micro/v3/service/store" - "gorm.io/driver/postgres" - "gorm.io/gorm" otp "github.com/micro/services/otp/proto" "github.com/micro/services/pkg/tracing" "github.com/micro/services/user/handler" - "github.com/micro/services/user/migrate" proto "github.com/micro/services/user/proto" ) -var pgxDsn = "postgresql://postgres:postgres@localhost:5432/db?sslmode=disable" - -func migrateData() { - startTime := time.Now() - logger.Info("start migrate ...") - defer func() { - logger.Infof("all migrations are finished, use time: %v", time.Since(startTime)) - }() - - // Connect to the database - cfg, err := config.Get("micro.db.database") - if err != nil { - logger.Fatalf("Error loading config: %v", err) - } - - dsn := cfg.String(pgxDsn) - gormDb, err := gorm.Open(postgres.Open(dsn), &gorm.Config{}) - - if err != nil { - logger.Fatal("Failed to connect to ") - } - - migration := migrate.NewMigration(gormDb) - if err := migration.Do(); err != nil { - logger.Fatal("migrate error: ", err) - } - - return -} - func main() { srv := service.New( service.Name("user"), ) srv.Init() - // migration work - go migrateData() - hd := handler.NewUser( store.DefaultStore, otp.NewOtpService("otp", srv.Client()), diff --git a/user/migrate/context.go b/user/migrate/context.go deleted file mode 100644 index 3d34e8c..0000000 --- a/user/migrate/context.go +++ /dev/null @@ -1,54 +0,0 @@ -package migrate - -import ( - "github.com/micro/micro/v3/service/logger" - "gorm.io/gorm" - - "github.com/micro/services/user/migrate/entity" -) - -type Migration interface { - Migrate([]*entity.Row) error -} - -type context struct { - db *gorm.DB - strategy Migration -} - -func NewContext(db *gorm.DB, strg Migration) *context { - return &context{ - db: db, - strategy: strg, - } -} - -func (c *context) Migrate(tableName string) error { - var count int64 - - db := c.db.Table(tableName) - - if err := db.Count(&count).Error; err != nil { - return err - } - - var offset, limit = 0, 1000 - - for offset = 0; offset < int(count); offset = offset + limit { - rows := make([]*entity.Row, 0) - - if err := db.Offset(offset).Limit(limit).Find(&rows).Error; err != nil { - logger.Errorf("migrate error, table:%v offset:%v limit:%v error:%v", tableName, offset, limit, err) - continue - } - - if err := c.strategy.Migrate(rows); err != nil { - logger.Errorf("migrate error, table:%v offset:%v limit:%v error:%v", tableName, offset, limit, err) - continue - } - } - - logger.Infof("migrate done, table: %v, rows count: %v", tableName, count) - - return nil -} diff --git a/user/migrate/entity/entity.go b/user/migrate/entity/entity.go deleted file mode 100644 index 511752a..0000000 --- a/user/migrate/entity/entity.go +++ /dev/null @@ -1,17 +0,0 @@ -package entity - -import ( - "fmt" - "time" -) - -type Row struct { - Id string - Data string - CreatedAt time.Time - UpdatedAt time.Time -} - -func KeyPrefix(tenantId string) string { - return fmt.Sprintf("user/%s/", tenantId) -} diff --git a/user/migrate/migrate.go b/user/migrate/migrate.go deleted file mode 100644 index c514511..0000000 --- a/user/migrate/migrate.go +++ /dev/null @@ -1,87 +0,0 @@ -package migrate - -import ( - "strings" - "sync" - - "github.com/micro/micro/v3/service/logger" - "github.com/micro/micro/v3/service/store" - "github.com/pkg/errors" - "gorm.io/gorm" - - pwdMgr "github.com/micro/services/user/migrate/password" - resetMgr "github.com/micro/services/user/migrate/password_reset_code" - sessionMgr "github.com/micro/services/user/migrate/session" - tokenMgr "github.com/micro/services/user/migrate/token" - userMgr "github.com/micro/services/user/migrate/user" -) - -type migration struct { - db *gorm.DB - store store.Store -} - -func NewMigration(db *gorm.DB) *migration { - return &migration{ - db: db, - store: store.DefaultStore, - } -} - -func (m *migration) Do() error { - // get all tables - var tables []string - if err := m.db.Table("information_schema.tables"). - Where("table_schema = ?", "public"). - Pluck("table_name", &tables).Error; err != nil { - return errors.Wrap(err, "get pgx tables error") - } - - // migrate all data in concurrency - wg := sync.WaitGroup{} - // max concurrency is 5 - concurrencyChan := make(chan struct{}, 5) - - for _, t := range tables { - - wg.Add(1) - concurrencyChan <- struct{}{} - - go func(tableName string) { - defer func() { - wg.Done() - <-concurrencyChan - }() - var strg Migration - - if strings.HasSuffix(tableName, "_users") { - strg = userMgr.New(m.store, strings.TrimSuffix(tableName, "_users")) - } else if strings.HasSuffix(tableName, "_passwords") { - strg = pwdMgr.New(m.store, strings.TrimSuffix(tableName, "_passwords")) - } else if strings.HasSuffix(tableName, "_sessions") { - strg = sessionMgr.New(m.store, strings.TrimSuffix(tableName, "_sessions")) - } else if strings.HasSuffix(tableName, "_tokens") { - strg = tokenMgr.New(m.store, strings.TrimSuffix(tableName, "_tokens")) - } else if strings.HasSuffix(tableName, "_password_reset_codes") { - strg = resetMgr.New(m.store, strings.TrimSuffix(tableName, "_password_reset_codes")) - } else { - logger.Infof("ignore table: %s", tableName) - } - - if strg == nil { - return - } - - ctx := NewContext(m.db, strg) - if err := ctx.Migrate(tableName); err != nil { - logger.Errorf("migrate table:%s error", tableName) - } - - }(t) - } - - wg.Wait() - - return nil - -} diff --git a/user/migrate/password/password.go b/user/migrate/password/password.go deleted file mode 100644 index 583126f..0000000 --- a/user/migrate/password/password.go +++ /dev/null @@ -1,50 +0,0 @@ -package password - -import ( - "fmt" - - "github.com/micro/micro/v3/service/logger" - "github.com/micro/micro/v3/service/store" - "github.com/tidwall/gjson" - - "github.com/micro/services/user/migrate/entity" -) - -func generatePasswordStoreKey(tenantId string, id string) string { - return fmt.Sprintf("%spassword/%s", entity.KeyPrefix(tenantId), id) -} - -type password struct { - to store.Store - tenantId string -} - -func New(to store.Store, tenantId string) *password { - return &password{ - to: to, - tenantId: tenantId, - } -} - -func (u *password) migrate(rows []*entity.Row) error { - for _, rec := range rows { - id := gjson.Get(rec.Data, "id").String() - - key := generatePasswordStoreKey(u.tenantId, id) - err := u.to.Write(&store.Record{ - Key: key, - Value: []byte(rec.Data), - }) - - if err != nil { - logger.Errorf("migrate password write error: %v, %+v", err, key) - continue - } - } - - return nil -} - -func (u *password) Migrate(rows []*entity.Row) error { - return u.migrate(rows) -} diff --git a/user/migrate/password_reset_code/password_reset_code.go b/user/migrate/password_reset_code/password_reset_code.go deleted file mode 100644 index f878910..0000000 --- a/user/migrate/password_reset_code/password_reset_code.go +++ /dev/null @@ -1,50 +0,0 @@ -package password_reset_code - -import ( - "fmt" - - "github.com/micro/micro/v3/service/logger" - "github.com/micro/micro/v3/service/store" - "github.com/tidwall/gjson" - - "github.com/micro/services/user/migrate/entity" -) - -func generatePasswordStoreKey(tenantId string, id string) string { - return fmt.Sprintf("%spassword-reset-codes/%s", entity.KeyPrefix(tenantId), id) -} - -type resetCode struct { - to store.Store - tenantId string -} - -func New(to store.Store, tenantId string) *resetCode { - return &resetCode{ - to: to, - tenantId: tenantId, - } -} - -func (u *resetCode) migrate(rows []*entity.Row) error { - for _, rec := range rows { - id := gjson.Get(rec.Data, "id").String() - - key := generatePasswordStoreKey(u.tenantId, id) - err := u.to.Write(&store.Record{ - Key: key, - Value: []byte(rec.Data), - }) - - if err != nil { - logger.Errorf("migrate password-reset-code write error: %v, %+v", err, key) - continue - } - } - - return nil -} - -func (u *resetCode) Migrate(rows []*entity.Row) error { - return u.migrate(rows) -} diff --git a/user/migrate/session/session.go b/user/migrate/session/session.go deleted file mode 100644 index 0f0f2ed..0000000 --- a/user/migrate/session/session.go +++ /dev/null @@ -1,50 +0,0 @@ -package session - -import ( - "fmt" - - "github.com/micro/micro/v3/service/logger" - "github.com/micro/micro/v3/service/store" - "github.com/tidwall/gjson" - - "github.com/micro/services/user/migrate/entity" -) - -func generateStoreKey(tenantId string, id string) string { - return fmt.Sprintf("%ssession/%s", entity.KeyPrefix(tenantId), id) -} - -type session struct { - to store.Store - tenantId string -} - -func New(to store.Store, tenantId string) *session { - return &session{ - to: to, - tenantId: tenantId, - } -} - -func (s *session) migrate(rows []*entity.Row) error { - for _, rec := range rows { - id := gjson.Get(rec.Data, "id").String() - - key := generateStoreKey(s.tenantId, id) - err := s.to.Write(&store.Record{ - Key: key, - Value: []byte(rec.Data), - }) - - if err != nil { - logger.Errorf("migrate session write error: %v, %+v", err, key) - continue - } - } - - return nil -} - -func (s *session) Migrate(rows []*entity.Row) error { - return s.migrate(rows) -} diff --git a/user/migrate/token/token.go b/user/migrate/token/token.go deleted file mode 100644 index dc266df..0000000 --- a/user/migrate/token/token.go +++ /dev/null @@ -1,50 +0,0 @@ -package token - -import ( - "fmt" - - "github.com/micro/micro/v3/service/logger" - "github.com/micro/micro/v3/service/store" - "github.com/tidwall/gjson" - - "github.com/micro/services/user/migrate/entity" -) - -func generateStoreKey(tenantId string, id string) string { - return fmt.Sprintf("%sverification-token/%s", entity.KeyPrefix(tenantId), id) -} - -type token struct { - to store.Store - tenantId string -} - -func New(to store.Store, tenantId string) *token { - return &token{ - to: to, - tenantId: tenantId, - } -} - -func (s *token) migrate(rows []*entity.Row) error { - for _, rec := range rows { - id := gjson.Get(rec.Data, "id").String() - - key := generateStoreKey(s.tenantId, id) - err := s.to.Write(&store.Record{ - Key: key, - Value: []byte(rec.Data), - }) - - if err != nil { - logger.Errorf("migrate token write error: %v, %+v", err, key) - continue - } - } - - return nil -} - -func (s *token) Migrate(rows []*entity.Row) error { - return s.migrate(rows) -} diff --git a/user/migrate/user/user.go b/user/migrate/user/user.go deleted file mode 100644 index 31fd9fd..0000000 --- a/user/migrate/user/user.go +++ /dev/null @@ -1,84 +0,0 @@ -package user - -import ( - "fmt" - "strings" - - "github.com/micro/micro/v3/service/logger" - "github.com/micro/micro/v3/service/store" - "github.com/pkg/errors" - "github.com/tidwall/gjson" - - "github.com/micro/services/user/migrate/entity" -) - -func generateAccountStoreKey(tenantId, userId string) string { - return fmt.Sprintf("%saccount/id/%s", entity.KeyPrefix(tenantId), userId) -} - -func generateAccountEmailStoreKey(tenantId, email string) string { - return fmt.Sprintf("%sacccount/email/%s", entity.KeyPrefix(tenantId), email) -} - -func generateAccountUsernameStoreKey(tenantId, username string) string { - return fmt.Sprintf("%saccount/username/%s", entity.KeyPrefix(tenantId), username) -} - -type user struct { - to store.Store - tenantId string -} - -func New(to store.Store, tenantId string) *user { - return &user{ - to: to, - tenantId: tenantId, - } -} - -func (u *user) batchWrite(keys []string, val []byte) error { - errs := make([]string, 0) - - for _, key := range keys { - err := u.to.Write(&store.Record{ - Key: key, - Value: val, - }) - - if err != nil { - errs = append(errs, err.Error()) - } - - } - - if len(errs) > 0 { - return errors.New(strings.Join(errs, ";")) - } - - return nil -} - -func (u *user) migrate(rows []*entity.Row) error { - for _, rec := range rows { - id := gjson.Get(rec.Data, "id").String() - email := gjson.Get(rec.Data, "email").String() - username := gjson.Get(rec.Data, "username").String() - - keys := []string{ - generateAccountStoreKey(u.tenantId, id), - generateAccountEmailStoreKey(u.tenantId, email), - generateAccountUsernameStoreKey(u.tenantId, username), - } - - if err := u.batchWrite(keys, []byte(rec.Data)); err != nil { - logger.Errorf("migrate users batch write error: %v, %+v", err, keys) - continue - } - } - - return nil -} - -func (u *user) Migrate(rows []*entity.Row) error { - return u.migrate(rows) -}