Replace DB with Store in service/user (#321)

This commit is contained in:
zhaoyang
2022-01-28 19:03:34 +08:00
committed by GitHub
parent 8246d8f8a7
commit cd215a8392
17 changed files with 885 additions and 284 deletions

View File

@@ -8,13 +8,14 @@ import (
"net/url"
"path"
"strings"
"sync"
"time"
_struct "github.com/golang/protobuf/ptypes/struct"
"github.com/micro/micro/v3/service/config"
microerr "github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/logger"
db "github.com/micro/services/db/proto"
"github.com/micro/micro/v3/service/store"
"github.com/micro/services/pkg/cache"
user "github.com/micro/services/user/proto"
@@ -27,26 +28,23 @@ var (
)
type pw struct {
ID string `json:"id"`
Password string `json:"password"`
Salt string `json:"salt"`
}
type verificationToken struct {
ID string `json:"id"`
UserID string `json:"userId"`
Token string `json:"token"`
}
type passwordResetCode struct {
ID string `json:"id"`
Expires time.Time `json:"expires"`
UserID string `json:"userId"`
Code string `json:"code"`
}
type Domain struct {
db db.DbService
store store.Store
sengridKey string
fromEmail string
}
@@ -56,7 +54,7 @@ var (
defaultSender = "noreply@email.m3ocontent.com"
)
func New(db db.DbService) *Domain {
func New(st store.Store) *Domain {
var key, email string
cfg, err := config.Get("micro.user.sendgrid.api_key")
if err == nil {
@@ -73,7 +71,7 @@ func New(db db.DbService) *Domain {
}
return &Domain{
sengridKey: key,
db: db,
store: st,
fromEmail: email,
}
}
@@ -97,66 +95,50 @@ func (domain *Domain) SendEmail(fromName, toAddress, toUsername, subject, textCo
return err
}
func (domain *Domain) SavePasswordResetCode(ctx context.Context, userID, code string) (*passwordResetCode, error) {
func (domain *Domain) SavePasswordResetCode(ctx context.Context, userId, code string) (*passwordResetCode, error) {
pwcode := passwordResetCode{
ID: userID + "-" + code,
Expires: time.Now().Add(24 * time.Hour),
UserID: userID,
UserID: userId,
Code: code,
}
s := &_struct.Struct{}
jso, _ := json.Marshal(pwcode)
err := s.UnmarshalJSON(jso)
val, err := json.Marshal(pwcode)
if err != nil {
return nil, err
}
_, err = domain.db.Create(ctx, &db.CreateRequest{
Table: "password-reset-codes",
Record: s,
})
record := store.NewRecord(generatePasswordResetCodeStoreKey(ctx, userId, code), val)
err = domain.store.Write(record)
return &pwcode, err
}
func (domain *Domain) DeletePasswordResetCode(ctx context.Context, userId, code string) error {
_, err := domain.db.Delete(ctx, &db.DeleteRequest{
Table: "password-reset-codes",
Id: userId + "-" + code,
})
return err
return domain.store.Delete(generatePasswordResetCodeStoreKey(ctx, userId, code))
}
// ReadToken returns the user id
// ReadPasswordResetCode returns the user reset code
func (domain *Domain) ReadPasswordResetCode(ctx context.Context, userId, code string) (*passwordResetCode, error) {
// generate the key
id := userId + "-" + code
if id == "" {
return nil, errors.New("password reset code id is empty")
}
token := &passwordResetCode{}
rsp, err := domain.db.Read(ctx, &db.ReadRequest{
Table: "password-reset-codes",
Query: fmt.Sprintf("id == '%v'", id),
})
records, err := domain.store.Read(generatePasswordResetCodeStoreKey(ctx, userId, code))
if err != nil {
return nil, err
}
if len(rsp.Records) == 0 {
if len(records) == 0 {
return nil, errors.New("password reset code not found")
}
m, _ := rsp.Records[0].MarshalJSON()
json.Unmarshal(m, token)
resetCode := &passwordResetCode{}
if err := json.Unmarshal(records[0].Value, resetCode); err != nil {
return nil, err
}
// check the expiry
if token.Expires.Before(time.Now()) {
if resetCode.Expires.Before(time.Now()) {
return nil, errors.New("password reset code expired")
}
return token, nil
return resetCode, nil
}
func (domain *Domain) SendPasswordResetEmail(ctx context.Context, userId, codeStr, fromName, toAddress, toUsername, subject, textContent string) error {
@@ -196,274 +178,336 @@ func (domain *Domain) CreateSession(ctx context.Context, sess *user.Session) err
sess.Expires = time.Now().Add(time.Hour * 24 * 7).Unix()
}
s := &_struct.Struct{}
jso, _ := json.Marshal(sess)
err := s.UnmarshalJSON(jso)
val, err := json.Marshal(sess)
if err != nil {
return err
}
_, err = domain.db.Create(ctx, &db.CreateRequest{
Table: "sessions",
Record: s,
})
return err
record := &store.Record{
Key: generateSessionStoreKey(ctx, sess.Id),
Value: val,
}
return domain.store.Write(record)
}
func (domain *Domain) DeleteSession(ctx context.Context, id string) error {
_, err := domain.db.Delete(ctx, &db.DeleteRequest{
Table: "sessions",
Id: id,
})
return err
return domain.store.Delete(generateSessionStoreKey(ctx, id))
}
// ReadToken returns the user id
func (domain *Domain) ReadToken(ctx context.Context, userId, token string) (string, error) {
id := userId + "-" + token
func (domain *Domain) ReadToken(ctx context.Context, email, token string) (string, error) {
if token == "" {
return "", errors.New("token id empty")
}
tk := &verificationToken{}
rsp, err := domain.db.Read(ctx, &db.ReadRequest{
Table: "tokens",
Query: fmt.Sprintf("id == '%v'", id),
})
records, err := domain.store.Read(generateVerificationsTokenStoreKey(ctx, email, token))
if err != nil {
return "", err
}
if len(rsp.Records) == 0 {
if len(records) == 0 {
return "", errors.New("token not found")
}
m, _ := rsp.Records[0].MarshalJSON()
json.Unmarshal(m, tk)
tk := &verificationToken{}
err = json.Unmarshal(records[0].Value, tk)
if err != nil {
return "", err
}
return tk.UserID, nil
}
// CreateToken returns the created and saved token
func (domain *Domain) CreateToken(ctx context.Context, userId, token string) (string, error) {
s := &_struct.Struct{}
jso, _ := json.Marshal(verificationToken{
ID: userId + "-" + token,
UserID: userId,
func (domain *Domain) CreateToken(ctx context.Context, email, token string) (string, error) {
tk, err := json.Marshal(verificationToken{
UserID: email,
Token: token,
})
err := s.UnmarshalJSON(jso)
if err != nil {
return "", err
}
_, err = domain.db.Create(ctx, &db.CreateRequest{
Table: "tokens",
Record: s,
})
record := &store.Record{
Key: generateVerificationsTokenStoreKey(ctx, email, token),
Value: tk,
}
err = domain.store.Write(record)
if err != nil {
return "", err
}
return token, err
}
func (domain *Domain) ReadSession(ctx context.Context, id string) (*user.Session, error) {
sess := &user.Session{}
if len(id) == 0 {
return nil, fmt.Errorf("no id provided")
}
q := fmt.Sprintf("id == '%v'", id)
logger.Infof("Running query: %v", q)
rsp, err := domain.db.Read(ctx, &db.ReadRequest{
Table: "sessions",
Query: q,
})
records, err := domain.store.Read(generateSessionStoreKey(ctx, id))
if err != nil {
return nil, err
}
if len(rsp.Records) == 0 {
if len(records) == 0 {
return nil, ErrNotFound
}
m, _ := rsp.Records[0].MarshalJSON()
json.Unmarshal(m, sess)
sess := &user.Session{}
err = json.Unmarshal(records[0].Value, sess)
if err != nil {
return nil, err
}
return sess, nil
}
// batchWrite write multiple records in batches
func (domain *Domain) batchWrite(records []*store.Record) error {
if len(records) == 0 {
return nil
}
wg := sync.WaitGroup{}
lock := sync.Mutex{}
errs := make([]string, 0)
for _, v := range records {
wg.Add(1)
go func(r *store.Record) {
defer wg.Done()
if err := domain.store.Write(r); err != nil {
lock.Lock()
errs = append(errs, err.Error())
lock.Unlock()
}
}(v)
}
wg.Wait()
if len(errs) != 0 {
return errors.New(strings.Join(errs, ";"))
}
return nil
}
func (domain *Domain) Create(ctx context.Context, user *user.Account, salt string, password string) error {
user.Created = time.Now().Unix()
user.Updated = time.Now().Unix()
s := &_struct.Struct{}
jso, _ := json.Marshal(user)
err := s.UnmarshalJSON(jso)
if err != nil {
return err
}
_, err = domain.db.Create(ctx, &db.CreateRequest{
Table: "users",
Record: s,
})
// user account record
accountVal, err := json.Marshal(user)
if err != nil {
return err
}
pass := pw{
ID: user.Id,
// password record
passwordVal, err := json.Marshal(pw{
Password: password,
Salt: salt,
}
s = &_struct.Struct{}
jso, _ = json.Marshal(pass)
err = s.UnmarshalJSON(jso)
})
if err != nil {
return err
}
_, err = domain.db.Create(ctx, &db.CreateRequest{
Table: "passwords",
Record: s,
})
return err
records := []*store.Record{
{Key: generateAccountStoreKey(ctx, user.Id), Value: accountVal},
{Key: generateAccountUsernameStoreKey(ctx, user.Username), Value: accountVal},
{Key: generateAccountEmailStoreKey(ctx, user.Email), Value: accountVal},
{Key: generatePasswordStoreKey(ctx, user.Id), Value: passwordVal},
}
return domain.batchWrite(records)
}
func (domain *Domain) Delete(ctx context.Context, id string) error {
_, err := domain.db.Delete(ctx, &db.DeleteRequest{
Table: "users",
Id: id,
})
return err
// batchDelete deletes the keys in batches
func (domain *Domain) batchDelete(keys []string) error {
if len(keys) == 0 {
return nil
}
wg := sync.WaitGroup{}
lock := sync.Mutex{}
errs := make([]string, 0)
for _, key := range keys {
wg.Add(1)
go func(keyToDel string) {
defer wg.Done()
if err := domain.store.Delete(keyToDel); err != nil {
lock.Lock()
errs = append(errs, err.Error())
lock.Unlock()
}
}(key)
}
wg.Wait()
if len(errs) != 0 {
return errors.New(strings.Join(errs, ";"))
}
return nil
}
func (domain *Domain) Delete(ctx context.Context, userId string) error {
account, err := domain.Read(ctx, userId)
if err != nil {
return err
}
keys := []string{
generateAccountStoreKey(ctx, userId),
generateAccountEmailStoreKey(ctx, account.Email),
generateAccountUsernameStoreKey(ctx, account.Username),
}
return domain.batchDelete(keys)
}
func (domain *Domain) Update(ctx context.Context, user *user.Account) error {
user.Updated = time.Now().Unix()
s := &_struct.Struct{}
jso, _ := json.Marshal(user)
err := s.UnmarshalJSON(jso)
// get old information of the user
old, err := domain.Read(ctx, user.Id)
if err != nil {
return err
}
_, err = domain.db.Update(ctx, &db.UpdateRequest{
Table: "users",
Record: s,
})
return err
keysToDelete := make([]string, 0)
if old.Email != user.Email {
keysToDelete = append(keysToDelete, generateAccountEmailStoreKey(ctx, old.Email))
}
if old.Username != user.Username {
keysToDelete = append(keysToDelete, generateAccountUsernameStoreKey(ctx, old.Username))
}
// update user
user.Created = old.Created
user.Updated = time.Now().Unix()
val, err := json.Marshal(user)
if err != nil {
return err
}
records := []*store.Record{
{Key: generateAccountStoreKey(ctx, user.Id), Value: val},
{Key: generateAccountUsernameStoreKey(ctx, user.Username), Value: val},
{Key: generateAccountEmailStoreKey(ctx, user.Email), Value: val},
}
// update
if err := domain.batchWrite(records); err != nil {
return err
}
// delete
if err := domain.batchDelete(keysToDelete); err != nil {
return err
}
return nil
}
// readUserByKey read user account in store by key
func (domain *Domain) readUserByKey(ctx context.Context, key string) (*user.Account, error) {
var result = &user.Account{}
records, err := domain.store.Read(key)
if err != nil {
return result, err
}
if len(records) == 0 {
return result, ErrNotFound
}
err = json.Unmarshal(records[0].Value, result)
return result, err
}
func (domain *Domain) Read(ctx context.Context, userId string) (*user.Account, error) {
user := &user.Account{}
if len(userId) == 0 {
return nil, fmt.Errorf("no id provided")
}
q := fmt.Sprintf("id == '%v'", userId)
logger.Infof("Running query: %v", q)
rsp, err := domain.db.Read(ctx, &db.ReadRequest{
Table: "users",
Query: q,
})
if err != nil {
return nil, err
}
if len(rsp.Records) == 0 {
return nil, ErrNotFound
}
m, _ := rsp.Records[0].MarshalJSON()
json.Unmarshal(m, user)
return user, nil
return domain.readUserByKey(ctx, generateAccountStoreKey(ctx, userId))
}
func (domain *Domain) SearchByUsername(ctx context.Context, username string) (*user.Account, error) {
return domain.readUserByKey(ctx, generateAccountUsernameStoreKey(ctx, username))
}
func (domain *Domain) SearchByEmail(ctx context.Context, email string) (*user.Account, error) {
return domain.readUserByKey(ctx, generateAccountEmailStoreKey(ctx, email))
}
func (domain *Domain) Search(ctx context.Context, username, email string) ([]*user.Account, error) {
var query string
if len(username) > 0 {
query = fmt.Sprintf("username == '%v'", username)
} else if len(email) > 0 {
query = fmt.Sprintf("email == '%v'", email)
} else {
return nil, errors.New("username and email cannot be blank")
var account = &user.Account{}
var err error
switch {
case username != "":
account, err = domain.SearchByUsername(ctx, username)
case email != "":
account, err = domain.SearchByEmail(ctx, email)
}
usr := &user.Account{}
rsp, err := domain.db.Read(ctx, &db.ReadRequest{
Table: "users",
Query: query,
})
if err != nil {
return nil, err
return []*user.Account{}, err
}
if len(rsp.Records) == 0 {
return nil, ErrNotFound
}
m, _ := rsp.Records[0].MarshalJSON()
json.Unmarshal(m, usr)
return []*user.Account{usr}, nil
return []*user.Account{account}, nil
}
func (domain *Domain) UpdatePassword(ctx context.Context, id string, salt string, password string) error {
pass := pw{
ID: id,
func (domain *Domain) UpdatePassword(ctx context.Context, userId string, salt string, password string) error {
val, err := json.Marshal(pw{
Password: password,
Salt: salt,
}
s := &_struct.Struct{}
jso, _ := json.Marshal(pass)
err := s.UnmarshalJSON(jso)
})
if err != nil {
return err
}
_, err = domain.db.Update(ctx, &db.UpdateRequest{
Table: "passwords",
Record: s,
})
return err
record := &store.Record{
Key: generatePasswordStoreKey(ctx, userId),
Value: val,
}
return domain.store.Write(record)
}
func (domain *Domain) SaltAndPassword(ctx context.Context, userId string) (string, string, error) {
password := &pw{}
rsp, err := domain.db.Read(ctx, &db.ReadRequest{
Table: "passwords",
Query: fmt.Sprintf("id == '%v'", userId),
})
records, err := domain.store.Read(generatePasswordStoreKey(ctx, userId))
if err != nil {
return "", "", err
}
if len(rsp.Records) == 0 {
if len(records) == 0 {
return "", "", ErrNotFound
}
m, _ := rsp.Records[0].MarshalJSON()
json.Unmarshal(m, password)
password := &pw{}
if err := json.Unmarshal(records[0].Value, password); err != nil {
return "", "", err
}
return password.Salt, password.Password, nil
}
func (domain *Domain) List(ctx context.Context, o, l int32) ([]*user.Account, error) {
var limit int32 = 25
var offset int32 = 0
if l > 0 {
limit = l
}
if o > 0 {
offset = o
}
rsp, err := domain.db.Read(ctx, &db.ReadRequest{
Table: "users",
Limit: limit,
Offset: offset,
})
func (domain *Domain) List(ctx context.Context, o, l uint32) (result []*user.Account, err error) {
records, err := domain.store.Read(generateAccountStoreKey(ctx, ""),
store.ReadPrefix(),
store.ReadLimit(uint(l)),
store.ReadLimit(uint(o)))
if err != nil {
return nil, err
}
if len(rsp.Records) == 0 {
return nil, ErrNotFound
if len(records) == 0 {
return result, ErrNotFound
}
ret := make([]*user.Account, len(rsp.Records))
for i, v := range rsp.Records {
m, _ := v.MarshalJSON()
var user user.Account
json.Unmarshal(m, &user)
ret[i] = &user
ret := make([]*user.Account, len(records))
for i, v := range records {
account := user.Account{}
json.Unmarshal(v.Value, &account)
ret[i] = &account
}
return ret, nil
}
@@ -492,7 +536,6 @@ func (domain *Domain) SendMLE(fromName, toAddress, toUsername, subject, textCont
}
func (domain *Domain) CacheReadToken(ctx context.Context, token string) (string, error) {
if token == "" {
return "", errors.New("token empty")
}

48
user/domain/store_key.go Normal file
View File

@@ -0,0 +1,48 @@
package domain
import (
"context"
"fmt"
"strings"
"github.com/micro/services/pkg/tenant"
)
func getStoreKeyPrefix(ctx context.Context) string {
tenantId, ok := tenant.FromContext(ctx)
if !ok {
tenantId = "micro"
}
tenantId = strings.Replace(strings.Replace(tenantId, "/", "_", -1), "-", "_", -1)
return fmt.Sprintf("user/%s/", tenantId)
}
func generateAccountStoreKey(ctx context.Context, userId string) string {
return fmt.Sprintf("%saccount/id/%s", getStoreKeyPrefix(ctx), userId)
}
func generateAccountEmailStoreKey(ctx context.Context, email string) string {
return fmt.Sprintf("%sacccount/email/%s", getStoreKeyPrefix(ctx), email)
}
func generateAccountUsernameStoreKey(ctx context.Context, username string) string {
return fmt.Sprintf("%saccount/username/%s", getStoreKeyPrefix(ctx), username)
}
func generatePasswordStoreKey(ctx context.Context, userId string) string {
return fmt.Sprintf("%spassword/%s", getStoreKeyPrefix(ctx), userId)
}
func generatePasswordResetCodeStoreKey(ctx context.Context, userId, code string) string {
return fmt.Sprintf("%spassword-reset-codes/%s-%s", getStoreKeyPrefix(ctx), userId, code)
}
func generateSessionStoreKey(ctx context.Context, sessionId string) string {
return fmt.Sprintf("%ssession/%s", getStoreKeyPrefix(ctx), sessionId)
}
func generateVerificationsTokenStoreKey(ctx context.Context, userId, token string) string {
return fmt.Sprintf("%sverification-token/%s-%s", getStoreKeyPrefix(ctx), userId, token)
}