diff --git a/.gitignore b/.gitignore index f8bbd2d..efd97c6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ api-protobuf.json api-*.json redoc-static.html -!docs +!docs \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..33e628b --- /dev/null +++ b/Makefile @@ -0,0 +1,5 @@ +run: + find . -name "main.go" | xargs -L 1 dirname | grep -v -E 'test|cmd|vendor' | sort | xargs -L 1 micro run + +kill: + find . -name "main.go" | xargs -L 1 dirname | grep -v -E 'test|cmd|vendor' | xargs basename | xargs -L 1 micro kill \ No newline at end of file diff --git a/chats/main.go b/chats/main.go index 88173b9..a9b60ec 100644 --- a/chats/main.go +++ b/chats/main.go @@ -13,7 +13,7 @@ import ( "gorm.io/gorm" ) -var dbAddress = "postgresql://postgres@localhost:5432/chats?sslmode=disable" +var dbAddress = "postgresql://postgres:postgres@localhost:5432/chats?sslmode=disable" func main() { // Create service diff --git a/groups/main.go b/groups/main.go index 7675716..170caac 100644 --- a/groups/main.go +++ b/groups/main.go @@ -11,7 +11,7 @@ import ( "github.com/micro/micro/v3/service/logger" ) -var dbAddress = "postgresql://postgres@localhost:5432/groups?sslmode=disable" +var dbAddress = "postgresql://postgres:postgres@localhost:5432/groups?sslmode=disable" func main() { // Create service diff --git a/invites/main.go b/invites/main.go index 40c914e..893c4d0 100644 --- a/invites/main.go +++ b/invites/main.go @@ -11,7 +11,7 @@ import ( "gorm.io/gorm" ) -var dbAddress = "postgresql://postgres@localhost:5432/invites?sslmode=disable" +var dbAddress = "postgresql://postgres:postgres@localhost:5432/invites?sslmode=disable" func main() { // Create service diff --git a/places/main.go b/places/main.go index 9e8fc1b..76c1288 100644 --- a/places/main.go +++ b/places/main.go @@ -14,7 +14,7 @@ import ( "github.com/micro/micro/v3/service/logger" ) -var dbAddress = "postgresql://postgres@localhost:5432/places?sslmode=disable" +var dbAddress = "postgresql://postgres:postgres@localhost:5432/places?sslmode=disable" func main() { // Create service diff --git a/seen/main.go b/seen/main.go index e737c71..ec7468f 100644 --- a/seen/main.go +++ b/seen/main.go @@ -11,7 +11,7 @@ import ( "github.com/micro/micro/v3/service/logger" ) -var dbAddress = "postgresql://postgres@localhost:5432/seen?sslmode=disable" +var dbAddress = "postgresql://postgres:postgres@localhost:5432/seen?sslmode=disable" func main() { // Create service diff --git a/streams/handler/publish.go b/streams/handler/publish.go index 6163823..514be3d 100644 --- a/streams/handler/publish.go +++ b/streams/handler/publish.go @@ -3,6 +3,7 @@ package handler import ( "context" + "github.com/micro/micro/v3/service/logger" pb "github.com/micro/services/streams/proto" ) @@ -16,5 +17,6 @@ func (s *Streams) Publish(ctx context.Context, req *pb.Message, rsp *pb.PublishR } // publish the message + logger.Infof("Publishing message to topic: %v", req.Topic) return s.Events.Publish(req.Topic, req.Message) } diff --git a/streams/handler/subscribe.go b/streams/handler/subscribe.go index 09ba6b3..981f62d 100644 --- a/streams/handler/subscribe.go +++ b/streams/handler/subscribe.go @@ -2,6 +2,7 @@ package handler import ( "context" + "io" "github.com/micro/micro/v3/service/errors" "github.com/micro/micro/v3/service/events" @@ -12,6 +13,8 @@ import ( ) func (s *Streams) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream pb.Streams_SubscribeStream) error { + logger.Infof("Recieved subscribe request. Topic: '%v', Token: '%v'", req.Topic, req.Token) + // validate the request if len(req.Token) == 0 { return ErrMissingToken @@ -38,27 +41,31 @@ func (s *Streams) Subscribe(ctx context.Context, req *pb.SubscribeRequest, strea } // start the subscription + logger.Infof("Subscribing to %v via queue %v", req.Topic, token.Token) evChan, err := s.Events.Consume(req.Topic, events.WithGroup(token.Token)) if err != nil { logger.Errorf("Error connecting to events stream: %v", err) return errors.InternalServerError("EVENTS_ERROR", "Error connecting to events stream") } - go func() { - defer stream.Close() - for { - msg, ok := <-evChan - if !ok { - return - } - if err := stream.Send(&pb.Message{ - Topic: msg.Topic, - Message: string(msg.Payload), - SentAt: timestamppb.New(msg.Timestamp), - }); err != nil { - return - } - } - }() + defer stream.Close() - return nil + for { + msg, ok := <-evChan + if !ok { + return nil + } + + logger.Infof("Sending message to subscriber %v", token.Topic) + pbMsg := &pb.Message{ + Topic: msg.Topic, + Message: string(msg.Payload), + SentAt: timestamppb.New(msg.Timestamp), + } + + if err := stream.Send(pbMsg); err == io.EOF { + return nil + } else if err != nil { + return err + } + } } diff --git a/streams/main.go b/streams/main.go index fed3bfe..1d20c92 100644 --- a/streams/main.go +++ b/streams/main.go @@ -14,7 +14,7 @@ import ( "github.com/micro/micro/v3/service/logger" ) -var dbAddress = "postgresql://postgres@localhost:5432/streams?sslmode=disable" +var dbAddress = "postgresql://postgres:postgres@localhost:5432/streams?sslmode=disable" func main() { // Create service diff --git a/threads/main.go b/threads/main.go index 59d1d28..d8b98be 100644 --- a/threads/main.go +++ b/threads/main.go @@ -13,7 +13,7 @@ import ( "gorm.io/gorm" ) -var dbAddress = "postgresql://postgres@localhost:5432/threads?sslmode=disable" +var dbAddress = "postgresql://postgres:postgres@localhost:5432/threads?sslmode=disable" func main() { // Create service diff --git a/users/main.go b/users/main.go index da34440..9a78dd0 100644 --- a/users/main.go +++ b/users/main.go @@ -13,7 +13,7 @@ import ( "gorm.io/gorm" ) -var dbAddress = "postgresql://postgres@localhost:5432/users?sslmode=disable" +var dbAddress = "postgresql://postgres:postgres@localhost:5432/users?sslmode=disable" func main() { // Create service