diff --git a/groups/handler/handler.go b/groups/handler/handler.go index df4f55a..d3c8201 100644 --- a/groups/handler/handler.go +++ b/groups/handler/handler.go @@ -2,16 +2,14 @@ package handler import ( "context" - "strings" + "fmt" "github.com/google/uuid" "github.com/micro/micro/v3/service/auth" "github.com/micro/micro/v3/service/errors" - "github.com/micro/micro/v3/service/logger" + "github.com/micro/micro/v3/service/store" pb "github.com/micro/services/groups/proto" - gorm2 "github.com/micro/services/pkg/gorm" - - "gorm.io/gorm" + "github.com/micro/services/pkg/tenant" ) var ( @@ -25,28 +23,48 @@ var ( ) type Group struct { - ID string - Name string - Memberships []Membership + ID string + Name string + Members []string } -type Membership struct { - MemberID string `gorm:"uniqueIndex:idx_membership"` - GroupID string `gorm:"uniqueIndex:idx_membership"` - Group Group +type Member struct { + ID string + Group string +} + +func (g *Group) Key(ctx context.Context) string { + key := fmt.Sprintf("group:%s", g.ID) + + t, ok := tenant.FromContext(ctx) + if !ok { + return key + } + + return fmt.Sprintf("%s/%s", t, key) +} + +func (m *Member) Key(ctx context.Context) string { + key := fmt.Sprintf("member:%s:%s", m.ID, m.Group) + + t, ok := tenant.FromContext(ctx) + if !ok { + return key + } + + return fmt.Sprintf("%s/%s", t, key) + } func (g *Group) Serialize() *pb.Group { - memberIDs := make([]string, len(g.Memberships)) - for i, m := range g.Memberships { - memberIDs[i] = m.MemberID + memberIDs := make([]string, len(g.Members)) + for i, m := range g.Members { + memberIDs[i] = m } return &pb.Group{Id: g.ID, Name: g.Name, MemberIds: memberIDs} } -type Groups struct { - gorm2.Helper -} +type Groups struct{} func (g *Groups) Create(ctx context.Context, req *pb.CreateRequest, rsp *pb.CreateResponse) error { _, ok := auth.AccountFromContext(ctx) @@ -60,18 +78,15 @@ func (g *Groups) Create(ctx context.Context, req *pb.CreateRequest, rsp *pb.Crea // create the group object group := &Group{ID: uuid.New().String(), Name: req.Name} - db, err := g.GetDBConn(ctx) - if err != nil { - logger.Errorf("Error connecting to DB: %v", err) - return errors.InternalServerError("DB_ERROR", "Error connecting to DB") - } - if err := db.Create(group).Error; err != nil { + // write the group record + if err := store.Write(store.NewRecord(group.Key(ctx), group)); err != nil { return ErrStore } // return the group rsp.Group = group.Serialize() + return nil } @@ -85,21 +100,24 @@ func (g *Groups) Read(ctx context.Context, req *pb.ReadRequest, rsp *pb.ReadResp return ErrMissingIDs } - db, err := g.GetDBConn(ctx) - if err != nil { - logger.Errorf("Error connecting to DB: %v", err) - return errors.InternalServerError("DB_ERROR", "Error connecting to DB") - } - // query the database - var groups []Group - if err := db.Model(&Group{}).Preload("Memberships").Where("id IN (?)", req.Ids).Find(&groups).Error; err != nil { - return ErrStore - } - // serialize the response - rsp.Groups = make(map[string]*pb.Group, len(groups)) - for _, g := range groups { - rsp.Groups[g.ID] = g.Serialize() + rsp.Groups = make(map[string]*pb.Group) + + for _, id := range req.Ids { + group := &Group{ + ID: id, + } + recs, err := store.Read(group.Key(ctx), store.ReadLimit(1)) + if err != nil { + return ErrStore + } + if len(recs) == 0 { + continue + } + if err := recs[0].Decode(&group); err != nil { + continue + } + rsp.Groups[group.ID] = group.Serialize() } return nil @@ -117,31 +135,31 @@ func (g *Groups) Update(ctx context.Context, req *pb.UpdateRequest, rsp *pb.Upda if len(req.Name) == 0 { return ErrMissingName } - db, err := g.GetDBConn(ctx) - if err != nil { - logger.Errorf("Error connecting to DB: %v", err) - return errors.InternalServerError("DB_ERROR", "Error connecting to DB") + + group := &Group{ID: req.Id} + + recs, err := store.Read(group.Key(ctx), store.ReadLimit(1)) + if err == store.ErrNotFound { + return ErrNotFound + } else if err != nil { + return ErrStore } - return db.Transaction(func(tx *gorm.DB) error { - // find the group - var group Group - if err := tx.Where(&Group{ID: req.Id}).First(&group).Error; err == gorm.ErrRecordNotFound { - return ErrNotFound - } else if err != nil { - return ErrStore - } + // decode the record + recs[0].Decode(&group) - // update the group - group.Name = req.Name - if err := tx.Save(&group).Error; err != nil { - return ErrStore - } + // set the name + group.Name = req.Name - // serialize the response - rsp.Group = group.Serialize() - return nil - }) + // save the record + if err := store.Write(store.NewRecord(group.Key(ctx), group)); err != nil { + return ErrStore + } + + // serialize the response + rsp.Group = group.Serialize() + + return nil } func (g *Groups) Delete(ctx context.Context, req *pb.DeleteRequest, rsp *pb.DeleteResponse) error { @@ -154,18 +172,35 @@ func (g *Groups) Delete(ctx context.Context, req *pb.DeleteRequest, rsp *pb.Dele return ErrMissingID } - db, err := g.GetDBConn(ctx) - if err != nil { - logger.Errorf("Error connecting to DB: %v", err) - return errors.InternalServerError("DB_ERROR", "Error connecting to DB") - } - // delete from the database - if err := db.Delete(&Group{ID: req.Id}).Error; err == gorm.ErrRecordNotFound { + group := &Group{ID: req.Id} + + // get the group + recs, err := store.Read(group.Key(ctx), store.ReadLimit(1)) + if err == store.ErrNotFound { return nil } else if err != nil { return ErrStore } + // decode the record + recs[0].Decode(&group) + + // delete the record + if err := store.Delete(group.Key(ctx)); err == store.ErrNotFound { + return nil + } else if err != nil { + return ErrStore + } + + // delete all the members + for _, memberId := range group.Members { + m := &Member{ + ID: memberId, + } + // delete the member + store.Delete(m.Key(ctx)) + } + return nil } @@ -174,35 +209,47 @@ func (g *Groups) List(ctx context.Context, req *pb.ListRequest, rsp *pb.ListResp if !ok { errors.Unauthorized("UNAUTHORIZED", "Unauthorized") } - db, err := g.GetDBConn(ctx) - if err != nil { - logger.Errorf("Error connecting to DB: %v", err) - return errors.InternalServerError("DB_ERROR", "Error connecting to DB") - } + if len(req.MemberId) > 0 { // only list groups the user is a member of - var ms []Membership - q := db.Where(&Membership{MemberID: req.MemberId}).Preload("Group.Memberships") - if err := q.Find(&ms).Error; err != nil { - return err + m := &Member{ID: req.MemberId} + recs, err := store.Read(m.Key(ctx), store.ReadPrefix()) + if err != nil { + return ErrStore } - rsp.Groups = make([]*pb.Group, len(ms)) - for i, m := range ms { - rsp.Groups[i] = m.Group.Serialize() + + for _, rec := range recs { + m := &Member{ID: req.MemberId} + rec.Decode(&m) + + // get the group + group := &Group{ID: m.Group} + + grecs, err := store.Read(group.Key(ctx), store.ReadLimit(1)) + if err != nil { + return ErrStore + } + grecs[0].Decode(&group) + + rsp.Groups = append(rsp.Groups, group.Serialize()) } + return nil } - // load all groups - var groups []Group - if err := db.Model(&Group{}).Preload("Memberships").Find(&groups).Error; err != nil { + group := &Group{} + + // read all the prefixes + recs, err := store.Read(group.Key(ctx), store.ReadPrefix()) + if err != nil { return ErrStore } - // serialize the response - rsp.Groups = make([]*pb.Group, len(groups)) - for i, g := range groups { - rsp.Groups[i] = g.Serialize() + // serialize and return response + for _, rec := range recs { + group := new(Group) + rec.Decode(&group) + rsp.Groups = append(rsp.Groups, group.Serialize()) } return nil @@ -220,33 +267,54 @@ func (g *Groups) AddMember(ctx context.Context, req *pb.AddMemberRequest, rsp *p if len(req.MemberId) == 0 { return ErrMissingMemberID } - db, err := g.GetDBConn(ctx) - if err != nil { - logger.Errorf("Error connecting to DB: %v", err) - return errors.InternalServerError("DB_ERROR", "Error connecting to DB") + + // read the group + group := &Group{ID: req.GroupId} + + recs, err := store.Read(group.Key(ctx), store.ReadLimit(1)) + if err == store.ErrNotFound { + return ErrNotFound + } else if err != nil { + return ErrStore } - return db.Transaction(func(tx *gorm.DB) error { - // check the group exists - var group Group - if err := tx.Where(&Group{ID: req.GroupId}).First(&group).Error; err == gorm.ErrRecordNotFound { - return ErrNotFound - } else if err != nil { - return err - } + // decode the record + recs[0].Decode(group) - // create the membership - m := &Membership{MemberID: req.MemberId, GroupID: req.GroupId} - err := tx.Create(m).Error - // check for membership already existing (unique index violation) - if err != nil && strings.Contains(err.Error(), "fk_groups_memberships") { - return nil - } else if err != nil { - return ErrStore + var seen bool + for _, member := range group.Members { + if member == req.MemberId { + seen = true + break } + } + // already a member + if seen { return nil - }) + } + + // add the member + group.Members = append(group.Members, req.MemberId) + + // save the record + if err := store.Write(store.NewRecord(group.Key(ctx), group)); err != nil { + return ErrStore + } + + // add the member record + + m := &Member{ + ID: req.MemberId, + Group: group.ID, + } + + // write the record + if err := store.Write(store.NewRecord(m.Key(ctx), m)); err != nil { + return ErrStore + } + + return nil } func (g *Groups) RemoveMember(ctx context.Context, req *pb.RemoveMemberRequest, rsp *pb.RemoveMemberResponse) error { @@ -262,14 +330,44 @@ func (g *Groups) RemoveMember(ctx context.Context, req *pb.RemoveMemberRequest, return ErrMissingMemberID } - db, err := g.GetDBConn(ctx) - if err != nil { - logger.Errorf("Error connecting to DB: %v", err) - return errors.InternalServerError("DB_ERROR", "Error connecting to DB") + // read the group + group := &Group{ID: req.GroupId} + + // read the gruop + recs, err := store.Read(group.Key(ctx), store.ReadLimit(1)) + if err == store.ErrNotFound { + return ErrNotFound + } else if err != nil { + return ErrStore } - // delete the membership - m := &Membership{MemberID: req.MemberId, GroupID: req.GroupId} - if err := db.Where(m).Delete(m).Error; err != nil { + + // decode the record + recs[0].Decode(&group) + + // new member id list + var members []string + + for _, member := range group.Members { + if member == req.MemberId { + continue + } + members = append(members, member) + } + + // update the member + group.Members = members + + // save the record + if err := store.Write(store.NewRecord(group.Key(ctx), group)); err != nil { + return ErrStore + } + + // delete the member + m := &Member{ + ID: req.MemberId, + Group: group.ID, + } + if err := store.Delete(m.Key(ctx)); err != nil { return ErrStore } diff --git a/groups/handler/handler_test.go b/groups/handler/handler_test.go index aa249c8..9916234 100644 --- a/groups/handler/handler_test.go +++ b/groups/handler/handler_test.go @@ -2,37 +2,21 @@ package handler_test import ( "context" - "database/sql" - "os" "sort" "testing" "github.com/google/uuid" "github.com/micro/micro/v3/service/auth" + "github.com/micro/micro/v3/service/store" + "github.com/micro/micro/v3/service/store/memory" "github.com/micro/services/groups/handler" pb "github.com/micro/services/groups/proto" "github.com/stretchr/testify/assert" ) func testHandler(t *testing.T) *handler.Groups { - // connect to the database - addr := os.Getenv("POSTGRES_URL") - if len(addr) == 0 { - addr = "postgresql://postgres@localhost:5432/postgres?sslmode=disable" - } - sqlDB, err := sql.Open("pgx", addr) - if err != nil { - t.Fatalf("Failed to open connection to DB %s", err) - } - - // clean any data from a previous run - if _, err := sqlDB.Exec(`DROP TABLE IF EXISTS "micro_someID_groups", "micro_someID_memberships" CASCADE`); err != nil { - t.Fatalf("Error cleaning database: %v", err) - } - - h := &handler.Groups{} - h.DBConn(sqlDB).Migrations(&handler.Group{}, &handler.Membership{}) - return h + store.DefaultStore = memory.NewStore() + return &handler.Groups{} } func TestCreate(t *testing.T) { h := testHandler(t) diff --git a/groups/main.go b/groups/main.go index b910377..02c2156 100644 --- a/groups/main.go +++ b/groups/main.go @@ -1,19 +1,12 @@ package main import ( - "database/sql" - "github.com/micro/micro/v3/service" - "github.com/micro/micro/v3/service/config" "github.com/micro/micro/v3/service/logger" "github.com/micro/services/groups/handler" pb "github.com/micro/services/groups/proto" - - _ "github.com/jackc/pgx/v4/stdlib" ) -var dbAddress = "postgresql://postgres:postgres@localhost:5432/groups?sslmode=disable" - func main() { // Create service srv := service.New( @@ -21,20 +14,8 @@ func main() { service.Version("latest"), ) - // Connect to the database - cfg, err := config.Get("groups.database") - if err != nil { - logger.Fatalf("Error loading config: %v", err) - } - addr := cfg.String(dbAddress) - sqlDB, err := sql.Open("pgx", addr) - if err != nil { - logger.Fatalf("Failed to open connection to DB %s", err) - } - h := &handler.Groups{} - h.DBConn(sqlDB).Migrations(&handler.Group{}, &handler.Membership{}) // Register handler - pb.RegisterGroupsHandler(srv.Server(), h) + pb.RegisterGroupsHandler(srv.Server(), new(handler.Groups)) // Run service if err := srv.Run(); err != nil {