From 500acefe4738583c433a76d781e79fa62d93f66a Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Fri, 30 Apr 2021 20:48:42 +0100 Subject: [PATCH] fixup streams stuff (#90) --- streams/handler/handler.go | 11 ++++++++--- streams/handler/publish.go | 11 +++++++---- streams/handler/subscribe.go | 22 ++++++++++++++++------ streams/handler/token.go | 6 +++++- 4 files changed, 36 insertions(+), 14 deletions(-) diff --git a/streams/handler/handler.go b/streams/handler/handler.go index a9d33c2..63fecd0 100644 --- a/streams/handler/handler.go +++ b/streams/handler/handler.go @@ -26,6 +26,7 @@ var ( type Token struct { Token string `gorm:"primaryKey"` Topic string + Account string ExpiresAt time.Time } @@ -35,13 +36,17 @@ type Streams struct { Time func() time.Time } -// fmtTopic returns a topic string with namespace prefix -func fmtTopic(acc *auth.Account, topic string) string { +func getAccount(acc *auth.Account) string { owner := acc.Metadata["apikey_owner"] if len(owner) == 0 { owner = acc.ID } - return fmt.Sprintf("%s.%s.%s", acc.Issuer, owner, topic) + return fmt.Sprintf("%s.%s", acc.Issuer, owner) +} + +// fmtTopic returns a topic string with namespace prefix +func fmtTopic(acc *auth.Account, topic string) string { + return fmt.Sprintf("%s.%s", getAccount(acc), topic) } // validateTopicInput validates that topic is alphanumeric diff --git a/streams/handler/publish.go b/streams/handler/publish.go index a24f354..f013014 100644 --- a/streams/handler/publish.go +++ b/streams/handler/publish.go @@ -4,7 +4,6 @@ import ( "context" "github.com/micro/micro/v3/service/auth" - "github.com/micro/micro/v3/service/errors" "github.com/micro/micro/v3/service/logger" pb "github.com/micro/services/streams/proto" ) @@ -20,12 +19,16 @@ func (s *Streams) Publish(ctx context.Context, req *pb.Message, rsp *pb.PublishR if len(req.Message) == 0 { return ErrMissingMessage } + + topic := req.Topic + + // in the event we have an account we use multi-tenancy acc, ok := auth.AccountFromContext(ctx) - if !ok { - return errors.Unauthorized("UNAUTHORIZED", "Unauthorized") + if ok { + topic = fmtTopic(acc, req.Topic) } // publish the message logger.Infof("Publishing message to topic: %v", req.Topic) - return s.Events.Publish(fmtTopic(acc, req.Topic), req.Message) + return s.Events.Publish(topic, req.Message) } diff --git a/streams/handler/subscribe.go b/streams/handler/subscribe.go index d463cd1..291020e 100644 --- a/streams/handler/subscribe.go +++ b/streams/handler/subscribe.go @@ -2,6 +2,7 @@ package handler import ( "context" + "fmt" "io" "github.com/micro/micro/v3/service/auth" @@ -27,11 +28,6 @@ func (s *Streams) Subscribe(ctx context.Context, req *pb.SubscribeRequest, strea return err } - acc, ok := auth.AccountFromContext(ctx) - if !ok { - return errors.Unauthorized("UNAUTHORIZED", "Unauthorized") - } - // find the token and check to see if it has expired var token Token dbConn, err := s.GetDBConn(ctx) @@ -54,9 +50,23 @@ func (s *Streams) Subscribe(ctx context.Context, req *pb.SubscribeRequest, strea return ErrForbiddenTopic } + var topic string + + // attempt to create a unique topic for the account + acc, ok := auth.AccountFromContext(ctx) + if ok { + topic = fmtTopic(acc, req.Topic) + } else if len(token.Account) > 0 { + // use the account in the token if present + topic = fmt.Sprintf("%s.%s", token.Account, token.Topic) + } else { + topic = req.Topic + } + // start the subscription logger.Infof("Subscribing to %v via queue %v", req.Topic, token.Token) - evChan, err := s.Events.Consume(fmtTopic(acc, req.Topic), events.WithGroup(token.Token)) + + evChan, err := s.Events.Consume(topic, events.WithGroup(token.Token)) if err != nil { logger.Errorf("Error connecting to events stream: %v", err) return errors.InternalServerError("EVENTS_ERROR", "Error connecting to events stream") diff --git a/streams/handler/token.go b/streams/handler/token.go index 26fd929..49e3f7f 100644 --- a/streams/handler/token.go +++ b/streams/handler/token.go @@ -11,10 +11,11 @@ import ( ) func (s *Streams) Token(ctx context.Context, req *pb.TokenRequest, rsp *pb.TokenResponse) error { - _, ok := auth.AccountFromContext(ctx) + acc, ok := auth.AccountFromContext(ctx) if !ok { return errors.Unauthorized("UNAUTHORIZED", "Unauthorized") } + if len(req.Topic) > 0 { if err := validateTopicInput(req.Topic); err != nil { return err @@ -26,12 +27,15 @@ func (s *Streams) Token(ctx context.Context, req *pb.TokenRequest, rsp *pb.Token Token: uuid.New().String(), ExpiresAt: s.Time().Add(TokenTTL), Topic: req.Topic, + Account: getAccount(acc), } + dbConn, err := s.GetDBConn(ctx) if err != nil { logger.Errorf("Error creating token in store: %v", err) return errors.InternalServerError("DATABASE_ERROR", "Error writing token to database") } + if err := dbConn.Create(&t).Error; err != nil { logger.Errorf("Error creating token in store: %v", err) return errors.InternalServerError("DATABASE_ERROR", "Error writing token to database")