From eb82bebd03beb9e8cb0bf128e01daf4e4664cd33 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sun, 2 May 2021 14:24:05 +0100 Subject: [PATCH] convert seen service to use store --- seen/handler/handler.go | 123 +++++++++++++++++++++++------------ seen/handler/handler_test.go | 24 ++----- seen/main.go | 27 +------- 3 files changed, 88 insertions(+), 86 deletions(-) diff --git a/seen/handler/handler.go b/seen/handler/handler.go index 4789c87..98c4e82 100644 --- a/seen/handler/handler.go +++ b/seen/handler/handler.go @@ -2,15 +2,16 @@ package handler import ( "context" + "encoding/json" + "fmt" + "strings" "time" - "github.com/micro/micro/v3/service/auth" - gorm2 "github.com/micro/services/pkg/gorm" - "gorm.io/gorm" - "github.com/google/uuid" + "github.com/micro/micro/v3/service/auth" "github.com/micro/micro/v3/service/errors" "github.com/micro/micro/v3/service/logger" + "github.com/micro/micro/v3/service/store" pb "github.com/micro/services/seen/proto" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -23,16 +24,27 @@ var ( ErrStore = errors.InternalServerError("STORE_ERROR", "Error connecting to the store") ) -type Seen struct { - gorm2.Helper +type Seen struct{} + +type Record struct { + ID string + UserID string + ResourceID string + ResourceType string + Timestamp time.Time } -type SeenInstance struct { - ID string - UserID string `gorm:"uniqueIndex:user_resource"` - ResourceID string `gorm:"uniqueIndex:user_resource"` - ResourceType string `gorm:"uniqueIndex:user_resource"` - Timestamp time.Time +func (r *Record) Key() string { + return fmt.Sprintf("%s:%s:%s", r.UserID, r.ResourceType, r.ResourceID) +} + +func (r *Record) Marshal() []byte { + b, _ := json.Marshal(r) + return b +} + +func (r *Record) Unmarshal(b []byte) error { + return json.Unmarshal(b, &r) } // Set a resource as seen by a user. If no timestamp is provided, the current time is used. @@ -58,17 +70,14 @@ func (s *Seen) Set(ctx context.Context, req *pb.SetRequest, rsp *pb.SetResponse) } // find the resource - instance := SeenInstance{ + instance := &Record{ UserID: req.UserId, ResourceID: req.ResourceId, ResourceType: req.ResourceType, } - db, err := s.GetDBConn(ctx) - if err != nil { - logger.Errorf("Error connecting to DB: %v", err) - return errors.InternalServerError("DB_ERROR", "Error connecting to DB") - } - if err := db.Where(&instance).First(&instance).Error; err == gorm.ErrRecordNotFound { + + _, err := store.Read(instance.Key(), store.ReadLimit(1)) + if err == store.ErrNotFound { instance.ID = uuid.New().String() } else if err != nil { logger.Errorf("Error with store: %v", err) @@ -77,7 +86,11 @@ func (s *Seen) Set(ctx context.Context, req *pb.SetRequest, rsp *pb.SetResponse) // update the resource instance.Timestamp = req.Timestamp.AsTime() - if err := db.Save(&instance).Error; err != nil { + + if err := store.Write(&store.Record{ + Key: instance.Key(), + Value: instance.Marshal(), + }); err != nil { logger.Errorf("Error with store: %v", err) return ErrStore } @@ -103,18 +116,14 @@ func (s *Seen) Unset(ctx context.Context, req *pb.UnsetRequest, rsp *pb.UnsetRes return ErrMissingResourceType } - db, err := s.GetDBConn(ctx) - if err != nil { - logger.Errorf("Error connecting to DB: %v", err) - return errors.InternalServerError("DB_ERROR", "Error connecting to DB") - } - // delete the object from the store - err = db.Delete(SeenInstance{}, SeenInstance{ + instance := &Record{ UserID: req.UserId, ResourceID: req.ResourceId, ResourceType: req.ResourceType, - }).Error - if err != nil { + } + + // delete the object from the store + if err := store.Delete(instance.Key()); err != nil { logger.Errorf("Error with store: %v", err) return ErrStore } @@ -141,24 +150,54 @@ func (s *Seen) Read(ctx context.Context, req *pb.ReadRequest, rsp *pb.ReadRespon return ErrMissingResourceType } - db, err := s.GetDBConn(ctx) - if err != nil { - logger.Errorf("Error connecting to DB: %v", err) - return errors.InternalServerError("DB_ERROR", "Error connecting to DB") + // create a key prefix + key := fmt.Sprintf("%s:%s:", req.UserId, req.ResourceType) + + var recs []*store.Record + var err error + + // get the records for the resource type + if len(req.ResourceIds) == 1 { + // read the key itself + key = key + req.ResourceIds[0] + recs, err = store.Read(key, store.ReadLimit(1)) + } else { + // otherwise read the prefix + recs, err = store.Read(key, store.ReadPrefix()) } - // query the store - q := db.Where(SeenInstance{UserID: req.UserId, ResourceType: req.ResourceType}) - q = q.Where("resource_id IN (?)", req.ResourceIds) - var data []SeenInstance - if err := q.Find(&data).Error; err != nil { + + if err != nil { logger.Errorf("Error with store: %v", err) return ErrStore } - // serialize the response - rsp.Timestamps = make(map[string]*timestamppb.Timestamp, len(data)) - for _, i := range data { - rsp.Timestamps[i.ResourceID] = timestamppb.New(i.Timestamp) + // make an id map + ids := make(map[string]bool) + + for _, id := range req.ResourceIds { + ids[id] = true + } + + // make the map + rsp.Timestamps = make(map[string]*timestamppb.Timestamp) + + // range over records for the user/resource type + // TODO: add some sort of filter query in store + for _, rec := range recs { + // get id + parts := strings.Split(rec.Key, ":") + id := parts[2] + + fmt.Println("checking record", rec.Key, id) + + if ok := ids[id]; !ok { + continue + } + + // add the timestamp for the record + r := new(Record) + r.Unmarshal(rec.Value) + rsp.Timestamps[id] = timestamppb.New(r.Timestamp) } return nil diff --git a/seen/handler/handler_test.go b/seen/handler/handler_test.go index 93d0448..cab8273 100644 --- a/seen/handler/handler_test.go +++ b/seen/handler/handler_test.go @@ -2,13 +2,13 @@ package handler_test import ( "context" - "database/sql" - "os" "testing" "time" "github.com/google/uuid" "github.com/micro/micro/v3/service/auth" + "github.com/micro/micro/v3/service/store" + "github.com/micro/micro/v3/service/store/memory" "github.com/micro/services/seen/handler" pb "github.com/micro/services/seen/proto" "github.com/stretchr/testify/assert" @@ -16,24 +16,8 @@ import ( ) func testHandler(t *testing.T) *handler.Seen { - // connect to the database - addr := os.Getenv("POSTGRES_URL") - if len(addr) == 0 { - addr = "postgresql://postgres@localhost:5432/postgres?sslmode=disable" - } - sqlDB, err := sql.Open("pgx", addr) - if err != nil { - t.Fatalf("Failed to open connection to DB %s", err) - } - - // clean any data from a previous run - if _, err := sqlDB.Exec("DROP TABLE IF EXISTS micro_seen_instances CASCADE"); err != nil { - t.Fatalf("Error cleaning database: %v", err) - } - - h := &handler.Seen{} - h.DBConn(sqlDB).Migrations(&handler.SeenInstance{}) - return h + store.DefaultStore = memory.NewStore() + return &handler.Seen{} } func TestSet(t *testing.T) { diff --git a/seen/main.go b/seen/main.go index 6e52d62..41d9f46 100644 --- a/seen/main.go +++ b/seen/main.go @@ -1,20 +1,12 @@ package main import ( - "database/sql" - + "github.com/micro/micro/v3/service" + "github.com/micro/micro/v3/service/logger" "github.com/micro/services/seen/handler" pb "github.com/micro/services/seen/proto" - - "github.com/micro/micro/v3/service" - "github.com/micro/micro/v3/service/config" - "github.com/micro/micro/v3/service/logger" - - _ "github.com/jackc/pgx/v4/stdlib" ) -var dbAddress = "postgresql://postgres:postgres@localhost:5432/seen?sslmode=disable" - func main() { // Create service srv := service.New( @@ -22,21 +14,8 @@ func main() { service.Version("latest"), ) - // Connect to the database - cfg, err := config.Get("seen.database") - if err != nil { - logger.Fatalf("Error loading config: %v", err) - } - addr := cfg.String(dbAddress) - sqlDB, err := sql.Open("pgx", addr) - if err != nil { - logger.Fatalf("Failed to open connection to DB %s", err) - } - - h := &handler.Seen{} - h.DBConn(sqlDB).Migrations(&handler.SeenInstance{}) // Register handler - pb.RegisterSeenHandler(srv.Server(), h) + pb.RegisterSeenHandler(srv.Server(), new(handler.Seen)) // Run service if err := srv.Run(); err != nil {