diff --git a/streams/.gitignore b/streams/.gitignore new file mode 100644 index 0000000..4a47005 --- /dev/null +++ b/streams/.gitignore @@ -0,0 +1,2 @@ + +streams diff --git a/streams/Makefile b/streams/Makefile new file mode 100644 index 0000000..b9a9ab8 --- /dev/null +++ b/streams/Makefile @@ -0,0 +1,22 @@ + +GOPATH:=$(shell go env GOPATH) +.PHONY: init +init: + go get -u github.com/golang/protobuf/proto + go get -u github.com/golang/protobuf/protoc-gen-go + go get github.com/micro/micro/v3/cmd/protoc-gen-micro +.PHONY: proto +proto: + protoc --proto_path=. --micro_out=. --go_out=:. proto/streams.proto + +.PHONY: build +build: + go build -o streams *.go + +.PHONY: test +test: + go test -v ./... -cover + +.PHONY: docker +docker: + docker build . -t streams:latest diff --git a/streams/README.md b/streams/README.md new file mode 100644 index 0000000..3ff166d --- /dev/null +++ b/streams/README.md @@ -0,0 +1,5 @@ +# Streams Service + +The streams service provides an event stream, designed for sending messages from a server to mutliple +clients connecting via Websockets. The Token RPC should be called to generate a token for each client, +the clients should then subscribe using the Subscribe RPC. diff --git a/streams/handler/handler.go b/streams/handler/handler.go new file mode 100644 index 0000000..7856318 --- /dev/null +++ b/streams/handler/handler.go @@ -0,0 +1,31 @@ +package handler + +import ( + "time" + + "github.com/micro/micro/v3/service/errors" + "github.com/micro/micro/v3/service/events" + "gorm.io/gorm" +) + +var ( + TokenTTL = time.Minute + ErrMissingTopic = errors.BadRequest("MISSING_TOPIC", "Missing topic") + ErrMissingToken = errors.BadRequest("MISSING_TOKEN", "Missing token") + ErrMissingMessage = errors.BadRequest("MISSING_MESSAGE", "Missing message") + ErrInvalidToken = errors.Forbidden("INVALID_TOKEN", "Invalid token") + ErrExpiredToken = errors.Forbidden("EXPIRED_TOKEN", "Token expired") + ErrForbiddenTopic = errors.Forbidden("FORBIDDEN_TOPIC", "Token has not have permission to subscribe to this topic") +) + +type Token struct { + Token string `gorm:"primaryKey"` + Topic string + ExpiresAt time.Time +} + +type Streams struct { + DB *gorm.DB + Events events.Stream + Time func() time.Time +} diff --git a/streams/handler/handler_test.go b/streams/handler/handler_test.go new file mode 100644 index 0000000..c581c1c --- /dev/null +++ b/streams/handler/handler_test.go @@ -0,0 +1,58 @@ +package handler_test + +import ( + "testing" + "time" + + "github.com/micro/micro/v3/service/events" + "github.com/micro/services/streams/handler" + "gorm.io/driver/postgres" + "gorm.io/gorm" +) + +func testHandler(t *testing.T) *handler.Streams { + // connect to the database + db, err := gorm.Open(postgres.Open("postgresql://postgres@localhost:5432/postgres?sslmode=disable"), &gorm.Config{}) + if err != nil { + t.Fatalf("Error connecting to database: %v", err) + } + + // migrate the database + if err := db.AutoMigrate(&handler.Token{}); err != nil { + t.Fatalf("Error migrating database: %v", err) + } + + // clean any data from a previous run + if err := db.Exec("TRUNCATE TABLE tokens CASCADE").Error; err != nil { + t.Fatalf("Error cleaning database: %v", err) + } + + return &handler.Streams{ + DB: db, + Events: new(eventsMock), + Time: func() time.Time { + return time.Unix(1612787045, 0) + }, + } +} + +type eventsMock struct { + PublishCount int + PublishTopic string + PublishMessage interface{} + + ConsumeTopic string + ConsumeChan <-chan events.Event +} + +func (e *eventsMock) Publish(topic string, msg interface{}, opts ...events.PublishOption) error { + e.PublishCount++ + e.PublishTopic = topic + e.PublishMessage = msg + return nil +} + +func (e *eventsMock) Consume(topic string, opts ...events.ConsumeOption) (<-chan events.Event, error) { + e.ConsumeTopic = topic + return e.ConsumeChan, nil +} diff --git a/streams/handler/publish.go b/streams/handler/publish.go new file mode 100644 index 0000000..6163823 --- /dev/null +++ b/streams/handler/publish.go @@ -0,0 +1,20 @@ +package handler + +import ( + "context" + + pb "github.com/micro/services/streams/proto" +) + +func (s *Streams) Publish(ctx context.Context, req *pb.Message, rsp *pb.PublishResponse) error { + // validate the request + if len(req.Topic) == 0 { + return ErrMissingTopic + } + if len(req.Message) == 0 { + return ErrMissingMessage + } + + // publish the message + return s.Events.Publish(req.Topic, req.Message) +} diff --git a/streams/handler/publish_test.go b/streams/handler/publish_test.go new file mode 100644 index 0000000..4adb989 --- /dev/null +++ b/streams/handler/publish_test.go @@ -0,0 +1,41 @@ +package handler_test + +import ( + "context" + "testing" + + "github.com/google/uuid" + "github.com/micro/services/streams/handler" + pb "github.com/micro/services/streams/proto" + "github.com/stretchr/testify/assert" +) + +func TestPublish(t *testing.T) { + msg := "{\"foo\":\"bar\"}" + topic := uuid.New().String() + + t.Run("MissingTopic", func(t *testing.T) { + h := testHandler(t) + err := h.Publish(context.TODO(), &pb.Message{Message: msg}, &pb.PublishResponse{}) + assert.Equal(t, handler.ErrMissingTopic, err) + assert.Zero(t, h.Events.(*eventsMock).PublishCount) + }) + + t.Run("MissingMessage", func(t *testing.T) { + h := testHandler(t) + err := h.Publish(context.TODO(), &pb.Message{Topic: topic}, &pb.PublishResponse{}) + assert.Equal(t, handler.ErrMissingMessage, err) + assert.Zero(t, h.Events.(*eventsMock).PublishCount) + }) + + t.Run("ValidMessage", func(t *testing.T) { + h := testHandler(t) + err := h.Publish(context.TODO(), &pb.Message{ + Topic: topic, Message: msg, + }, &pb.PublishResponse{}) + assert.NoError(t, err) + assert.Equal(t, 1, h.Events.(*eventsMock).PublishCount) + assert.Equal(t, msg, h.Events.(*eventsMock).PublishMessage) + assert.Equal(t, topic, h.Events.(*eventsMock).PublishTopic) + }) +} diff --git a/streams/handler/subscribe.go b/streams/handler/subscribe.go new file mode 100644 index 0000000..09ba6b3 --- /dev/null +++ b/streams/handler/subscribe.go @@ -0,0 +1,64 @@ +package handler + +import ( + "context" + + "github.com/micro/micro/v3/service/errors" + "github.com/micro/micro/v3/service/events" + "github.com/micro/micro/v3/service/logger" + pb "github.com/micro/services/streams/proto" + "google.golang.org/protobuf/types/known/timestamppb" + "gorm.io/gorm" +) + +func (s *Streams) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream pb.Streams_SubscribeStream) error { + // validate the request + if len(req.Token) == 0 { + return ErrMissingToken + } + if len(req.Topic) == 0 { + return ErrMissingTopic + } + + // find the token and check to see if it has expired + var token Token + if err := s.DB.Where(&Token{Token: req.Token}).First(&token).Error; err == gorm.ErrRecordNotFound { + return ErrInvalidToken + } else if err != nil { + logger.Errorf("Error reading token from store: %v", err) + return errors.InternalServerError("DATABASE_ERROR", "Error reading token from database") + } + if token.ExpiresAt.Before(s.Time()) { + return ErrExpiredToken + } + + // if the token was scoped to a channel, ensure the channel is the one being requested + if len(token.Topic) > 0 && token.Topic != req.Topic { + return ErrForbiddenTopic + } + + // start the subscription + evChan, err := s.Events.Consume(req.Topic, events.WithGroup(token.Token)) + if err != nil { + logger.Errorf("Error connecting to events stream: %v", err) + return errors.InternalServerError("EVENTS_ERROR", "Error connecting to events stream") + } + go func() { + defer stream.Close() + for { + msg, ok := <-evChan + if !ok { + return + } + if err := stream.Send(&pb.Message{ + Topic: msg.Topic, + Message: string(msg.Payload), + SentAt: timestamppb.New(msg.Timestamp), + }); err != nil { + return + } + } + }() + + return nil +} diff --git a/streams/handler/subscribe_test.go b/streams/handler/subscribe_test.go new file mode 100644 index 0000000..d9ba484 --- /dev/null +++ b/streams/handler/subscribe_test.go @@ -0,0 +1,164 @@ +package handler_test + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + "github.com/micro/micro/v3/service/events" + "github.com/micro/services/streams/handler" + pb "github.com/micro/services/streams/proto" + "github.com/stretchr/testify/assert" +) + +func TestSubscribe(t *testing.T) { + t.Run("MissingToken", func(t *testing.T) { + h := testHandler(t) + s := new(streamMock) + + err := h.Subscribe(context.TODO(), &pb.SubscribeRequest{ + Topic: "helloworld", + }, s) + + assert.Equal(t, handler.ErrMissingToken, err) + assert.Empty(t, s.Messages) + }) + + t.Run("MissingTopic", func(t *testing.T) { + h := testHandler(t) + s := new(streamMock) + + err := h.Subscribe(context.TODO(), &pb.SubscribeRequest{ + Token: uuid.New().String(), + }, s) + + assert.Equal(t, handler.ErrMissingTopic, err) + assert.Empty(t, s.Messages) + }) + + t.Run("InvalidToken", func(t *testing.T) { + h := testHandler(t) + s := new(streamMock) + + err := h.Subscribe(context.TODO(), &pb.SubscribeRequest{ + Topic: "helloworld", + Token: uuid.New().String(), + }, s) + + assert.Equal(t, handler.ErrInvalidToken, err) + assert.Empty(t, s.Messages) + }) + + t.Run("ExpiredToken", func(t *testing.T) { + h := testHandler(t) + + var tRsp pb.TokenResponse + err := h.Token(context.TODO(), &pb.TokenRequest{ + Topic: "helloworld", + }, &tRsp) + assert.NoError(t, err) + + ct := h.Time() + h.Time = func() time.Time { return ct.Add(handler.TokenTTL * 2) } + s := new(streamMock) + err = h.Subscribe(context.TODO(), &pb.SubscribeRequest{ + Topic: "helloworld", + Token: tRsp.Token, + }, s) + + assert.Equal(t, handler.ErrExpiredToken, err) + assert.Empty(t, s.Messages) + }) + + t.Run("ForbiddenTopic", func(t *testing.T) { + h := testHandler(t) + + var tRsp pb.TokenResponse + err := h.Token(context.TODO(), &pb.TokenRequest{ + Topic: "helloworldx", + }, &tRsp) + assert.NoError(t, err) + + s := new(streamMock) + err = h.Subscribe(context.TODO(), &pb.SubscribeRequest{ + Topic: "helloworld", + Token: tRsp.Token, + }, s) + + assert.Equal(t, handler.ErrForbiddenTopic, err) + assert.Empty(t, s.Messages) + }) + + t.Run("Valid", func(t *testing.T) { + h := testHandler(t) + c := make(chan events.Event) + h.Events.(*eventsMock).ConsumeChan = c + + var tRsp pb.TokenResponse + err := h.Token(context.TODO(), &pb.TokenRequest{ + Topic: "helloworld", + }, &tRsp) + assert.NoError(t, err) + + s := &streamMock{Messages: []*pb.Message{}} + err = h.Subscribe(context.TODO(), &pb.SubscribeRequest{ + Topic: "helloworld", + Token: tRsp.Token, + }, s) + assert.NoError(t, err) + assert.Equal(t, "helloworld", h.Events.(*eventsMock).ConsumeTopic) + + e1 := events.Event{ + ID: uuid.New().String(), + Topic: "helloworld", + Timestamp: h.Time().Add(time.Second * -2), + Payload: []byte("abc"), + } + e2 := events.Event{ + ID: uuid.New().String(), + Topic: "helloworld", + Timestamp: h.Time().Add(time.Second * -1), + Payload: []byte("123"), + } + + timeout := time.NewTimer(time.Millisecond * 100).C + select { + case <-timeout: + t.Fatal("Events not consumed from stream") + return + case c <- e1: + t.Log("Event1 consumed") + } + select { + case <-timeout: + t.Fatal("Events not consumed from stream") + return + case c <- e2: + t.Log("Event2 consumed") + } + + if len(s.Messages) != 2 { + t.Fatalf("Expected 2 messages, got %v", len(s.Messages)) + return + } + + assert.Equal(t, e1.Topic, s.Messages[0].Topic) + assert.Equal(t, string(e1.Payload), s.Messages[0].Message) + assert.True(t, e1.Timestamp.Equal(s.Messages[0].SentAt.AsTime())) + + assert.Equal(t, e2.Topic, s.Messages[1].Topic) + assert.Equal(t, string(e2.Payload), s.Messages[1].Message) + assert.True(t, e2.Timestamp.Equal(s.Messages[1].SentAt.AsTime())) + }) +} + +type streamMock struct { + Messages []*pb.Message + pb.Streams_SubscribeStream +} + +func (x *streamMock) Send(m *pb.Message) error { + x.Messages = append(x.Messages, m) + return nil +} diff --git a/streams/handler/token.go b/streams/handler/token.go new file mode 100644 index 0000000..8ea5992 --- /dev/null +++ b/streams/handler/token.go @@ -0,0 +1,27 @@ +package handler + +import ( + "context" + + "github.com/google/uuid" + "github.com/micro/micro/v3/service/errors" + "github.com/micro/micro/v3/service/logger" + pb "github.com/micro/services/streams/proto" +) + +func (s *Streams) Token(ctx context.Context, req *pb.TokenRequest, rsp *pb.TokenResponse) error { + // construct the token and write it to the database + t := Token{ + Token: uuid.New().String(), + ExpiresAt: s.Time().Add(TokenTTL), + Topic: req.Topic, + } + if err := s.DB.Create(&t).Error; err != nil { + logger.Errorf("Error creating token in store: %v", err) + return errors.InternalServerError("DATABASE_ERROR", "Error writing token to database") + } + + // return the token in the response + rsp.Token = t.Token + return nil +} diff --git a/streams/handler/token_test.go b/streams/handler/token_test.go new file mode 100644 index 0000000..62f6c0c --- /dev/null +++ b/streams/handler/token_test.go @@ -0,0 +1,27 @@ +package handler_test + +import ( + "context" + "testing" + + pb "github.com/micro/services/streams/proto" + "github.com/stretchr/testify/assert" +) + +func TestToken(t *testing.T) { + h := testHandler(t) + + t.Run("WithoutTopic", func(t *testing.T) { + var rsp pb.TokenResponse + err := h.Token(context.TODO(), &pb.TokenRequest{}, &rsp) + assert.NoError(t, err) + assert.NotEmpty(t, rsp.Token) + }) + + t.Run("WithTopic", func(t *testing.T) { + var rsp pb.TokenResponse + err := h.Token(context.TODO(), &pb.TokenRequest{Topic: "helloworld"}, &rsp) + assert.NoError(t, err) + assert.NotEmpty(t, rsp.Token) + }) +} diff --git a/streams/main.go b/streams/main.go new file mode 100644 index 0000000..fed3bfe --- /dev/null +++ b/streams/main.go @@ -0,0 +1,51 @@ +package main + +import ( + "time" + + "github.com/micro/services/streams/handler" + pb "github.com/micro/services/streams/proto" + "gorm.io/driver/postgres" + "gorm.io/gorm" + + "github.com/micro/micro/v3/service" + "github.com/micro/micro/v3/service/config" + "github.com/micro/micro/v3/service/events" + "github.com/micro/micro/v3/service/logger" +) + +var dbAddress = "postgresql://postgres@localhost:5432/streams?sslmode=disable" + +func main() { + // Create service + srv := service.New( + service.Name("streams"), + service.Version("latest"), + ) + + // Connect to the database + cfg, err := config.Get("streams.database") + if err != nil { + logger.Fatalf("Error loading config: %v", err) + } + addr := cfg.String(dbAddress) + db, err := gorm.Open(postgres.Open(addr), &gorm.Config{}) + if err != nil { + logger.Fatalf("Error connecting to database: %v", err) + } + if err := db.AutoMigrate(&handler.Token{}); err != nil { + logger.Fatalf("Error migrating database: %v", err) + } + + // Register handler + pb.RegisterStreamsHandler(srv.Server(), &handler.Streams{ + DB: db, + Events: events.DefaultStream, + Time: time.Now, + }) + + // Run service + if err := srv.Run(); err != nil { + logger.Fatal(err) + } +} diff --git a/streams/micro.mu b/streams/micro.mu new file mode 100644 index 0000000..c961f25 --- /dev/null +++ b/streams/micro.mu @@ -0,0 +1 @@ +service streams diff --git a/streams/proto/streams.pb.go b/streams/proto/streams.pb.go new file mode 100644 index 0000000..257289f --- /dev/null +++ b/streams/proto/streams.pb.go @@ -0,0 +1,448 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.23.0 +// protoc v3.13.0 +// source: proto/streams.proto + +package streams + +import ( + proto "github.com/golang/protobuf/proto" + timestamp "github.com/golang/protobuf/ptypes/timestamp" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 + +type PublishResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *PublishResponse) Reset() { + *x = PublishResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PublishResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PublishResponse) ProtoMessage() {} + +func (x *PublishResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PublishResponse.ProtoReflect.Descriptor instead. +func (*PublishResponse) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{0} +} + +type Message struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` + SentAt *timestamp.Timestamp `protobuf:"bytes,3,opt,name=sent_at,json=sentAt,proto3" json:"sent_at,omitempty"` +} + +func (x *Message) Reset() { + *x = Message{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Message) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Message) ProtoMessage() {} + +func (x *Message) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Message.ProtoReflect.Descriptor instead. +func (*Message) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{1} +} + +func (x *Message) GetTopic() string { + if x != nil { + return x.Topic + } + return "" +} + +func (x *Message) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +func (x *Message) GetSentAt() *timestamp.Timestamp { + if x != nil { + return x.SentAt + } + return nil +} + +type SubscribeRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // topic the user wishes to subscribe to, required + Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + // tokens should be provided if the user is not proving an API key on the request (e.g. in cases + // where the stream is being consumed directly from the frontend via websockets). tokens can be + // generated using the Token RPC + Token string `protobuf:"bytes,2,opt,name=token,proto3" json:"token,omitempty"` +} + +func (x *SubscribeRequest) Reset() { + *x = SubscribeRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeRequest) ProtoMessage() {} + +func (x *SubscribeRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeRequest.ProtoReflect.Descriptor instead. +func (*SubscribeRequest) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{2} +} + +func (x *SubscribeRequest) GetTopic() string { + if x != nil { + return x.Topic + } + return "" +} + +func (x *SubscribeRequest) GetToken() string { + if x != nil { + return x.Token + } + return "" +} + +type TokenRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // the topic the token should be restricted to, if no topic is required the token can be used to + // subscribe to any topic + Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` +} + +func (x *TokenRequest) Reset() { + *x = TokenRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TokenRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TokenRequest) ProtoMessage() {} + +func (x *TokenRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TokenRequest.ProtoReflect.Descriptor instead. +func (*TokenRequest) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{3} +} + +func (x *TokenRequest) GetTopic() string { + if x != nil { + return x.Topic + } + return "" +} + +type TokenResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Token string `protobuf:"bytes,1,opt,name=token,proto3" json:"token,omitempty"` +} + +func (x *TokenResponse) Reset() { + *x = TokenResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TokenResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TokenResponse) ProtoMessage() {} + +func (x *TokenResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TokenResponse.ProtoReflect.Descriptor instead. +func (*TokenResponse) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{4} +} + +func (x *TokenResponse) GetToken() string { + if x != nil { + return x.Token + } + return "" +} + +var File_proto_streams_proto protoreflect.FileDescriptor + +var file_proto_streams_proto_rawDesc = []byte{ + 0x0a, 0x13, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x1a, 0x1f, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, + 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, + 0x11, 0x0a, 0x0f, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x6e, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x14, 0x0a, + 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, + 0x70, 0x69, 0x63, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x33, 0x0a, + 0x07, 0x73, 0x65, 0x6e, 0x74, 0x5f, 0x61, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, + 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x06, 0x73, 0x65, 0x6e, 0x74, + 0x41, 0x74, 0x22, 0x3e, 0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x14, 0x0a, 0x05, + 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x6b, + 0x65, 0x6e, 0x22, 0x24, 0x0a, 0x0c, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x22, 0x25, 0x0a, 0x0d, 0x54, 0x6f, 0x6b, 0x65, + 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x6b, + 0x65, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x32, + 0xba, 0x01, 0x0a, 0x07, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x12, 0x37, 0x0a, 0x07, 0x50, + 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x10, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, + 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x18, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x73, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x12, 0x3c, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, + 0x65, 0x12, 0x19, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x53, 0x75, 0x62, 0x73, + 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x73, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, + 0x30, 0x01, 0x12, 0x38, 0x0a, 0x05, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x15, 0x2e, 0x73, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x54, 0x6f, 0x6b, + 0x65, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x31, 0x5a, 0x2f, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6d, 0x69, 0x63, 0x72, 0x6f, + 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_proto_streams_proto_rawDescOnce sync.Once + file_proto_streams_proto_rawDescData = file_proto_streams_proto_rawDesc +) + +func file_proto_streams_proto_rawDescGZIP() []byte { + file_proto_streams_proto_rawDescOnce.Do(func() { + file_proto_streams_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_streams_proto_rawDescData) + }) + return file_proto_streams_proto_rawDescData +} + +var file_proto_streams_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_proto_streams_proto_goTypes = []interface{}{ + (*PublishResponse)(nil), // 0: streams.PublishResponse + (*Message)(nil), // 1: streams.Message + (*SubscribeRequest)(nil), // 2: streams.SubscribeRequest + (*TokenRequest)(nil), // 3: streams.TokenRequest + (*TokenResponse)(nil), // 4: streams.TokenResponse + (*timestamp.Timestamp)(nil), // 5: google.protobuf.Timestamp +} +var file_proto_streams_proto_depIdxs = []int32{ + 5, // 0: streams.Message.sent_at:type_name -> google.protobuf.Timestamp + 1, // 1: streams.Streams.Publish:input_type -> streams.Message + 2, // 2: streams.Streams.Subscribe:input_type -> streams.SubscribeRequest + 3, // 3: streams.Streams.Token:input_type -> streams.TokenRequest + 0, // 4: streams.Streams.Publish:output_type -> streams.PublishResponse + 1, // 5: streams.Streams.Subscribe:output_type -> streams.Message + 4, // 6: streams.Streams.Token:output_type -> streams.TokenResponse + 4, // [4:7] is the sub-list for method output_type + 1, // [1:4] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_proto_streams_proto_init() } +func file_proto_streams_proto_init() { + if File_proto_streams_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_streams_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PublishResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Message); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TokenRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TokenResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_streams_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_proto_streams_proto_goTypes, + DependencyIndexes: file_proto_streams_proto_depIdxs, + MessageInfos: file_proto_streams_proto_msgTypes, + }.Build() + File_proto_streams_proto = out.File + file_proto_streams_proto_rawDesc = nil + file_proto_streams_proto_goTypes = nil + file_proto_streams_proto_depIdxs = nil +} diff --git a/streams/proto/streams.pb.micro.go b/streams/proto/streams.pb.micro.go new file mode 100644 index 0000000..a13fb71 --- /dev/null +++ b/streams/proto/streams.pb.micro.go @@ -0,0 +1,203 @@ +// Code generated by protoc-gen-micro. DO NOT EDIT. +// source: proto/streams.proto + +package streams + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + _ "github.com/golang/protobuf/ptypes/timestamp" + math "math" +) + +import ( + context "context" + api "github.com/micro/micro/v3/service/api" + client "github.com/micro/micro/v3/service/client" + server "github.com/micro/micro/v3/service/server" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// Reference imports to suppress errors if they are not otherwise used. +var _ api.Endpoint +var _ context.Context +var _ client.Option +var _ server.Option + +// Api Endpoints for Streams service + +func NewStreamsEndpoints() []*api.Endpoint { + return []*api.Endpoint{} +} + +// Client API for Streams service + +type StreamsService interface { + Publish(ctx context.Context, in *Message, opts ...client.CallOption) (*PublishResponse, error) + Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (Streams_SubscribeService, error) + Token(ctx context.Context, in *TokenRequest, opts ...client.CallOption) (*TokenResponse, error) +} + +type streamsService struct { + c client.Client + name string +} + +func NewStreamsService(name string, c client.Client) StreamsService { + return &streamsService{ + c: c, + name: name, + } +} + +func (c *streamsService) Publish(ctx context.Context, in *Message, opts ...client.CallOption) (*PublishResponse, error) { + req := c.c.NewRequest(c.name, "Streams.Publish", in) + out := new(PublishResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *streamsService) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (Streams_SubscribeService, error) { + req := c.c.NewRequest(c.name, "Streams.Subscribe", &SubscribeRequest{}) + stream, err := c.c.Stream(ctx, req, opts...) + if err != nil { + return nil, err + } + if err := stream.Send(in); err != nil { + return nil, err + } + return &streamsServiceSubscribe{stream}, nil +} + +type Streams_SubscribeService interface { + Context() context.Context + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Recv() (*Message, error) +} + +type streamsServiceSubscribe struct { + stream client.Stream +} + +func (x *streamsServiceSubscribe) Close() error { + return x.stream.Close() +} + +func (x *streamsServiceSubscribe) Context() context.Context { + return x.stream.Context() +} + +func (x *streamsServiceSubscribe) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *streamsServiceSubscribe) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *streamsServiceSubscribe) Recv() (*Message, error) { + m := new(Message) + err := x.stream.Recv(m) + if err != nil { + return nil, err + } + return m, nil +} + +func (c *streamsService) Token(ctx context.Context, in *TokenRequest, opts ...client.CallOption) (*TokenResponse, error) { + req := c.c.NewRequest(c.name, "Streams.Token", in) + out := new(TokenResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for Streams service + +type StreamsHandler interface { + Publish(context.Context, *Message, *PublishResponse) error + Subscribe(context.Context, *SubscribeRequest, Streams_SubscribeStream) error + Token(context.Context, *TokenRequest, *TokenResponse) error +} + +func RegisterStreamsHandler(s server.Server, hdlr StreamsHandler, opts ...server.HandlerOption) error { + type streams interface { + Publish(ctx context.Context, in *Message, out *PublishResponse) error + Subscribe(ctx context.Context, stream server.Stream) error + Token(ctx context.Context, in *TokenRequest, out *TokenResponse) error + } + type Streams struct { + streams + } + h := &streamsHandler{hdlr} + return s.Handle(s.NewHandler(&Streams{h}, opts...)) +} + +type streamsHandler struct { + StreamsHandler +} + +func (h *streamsHandler) Publish(ctx context.Context, in *Message, out *PublishResponse) error { + return h.StreamsHandler.Publish(ctx, in, out) +} + +func (h *streamsHandler) Subscribe(ctx context.Context, stream server.Stream) error { + m := new(SubscribeRequest) + if err := stream.Recv(m); err != nil { + return err + } + return h.StreamsHandler.Subscribe(ctx, m, &streamsSubscribeStream{stream}) +} + +type Streams_SubscribeStream interface { + Context() context.Context + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Send(*Message) error +} + +type streamsSubscribeStream struct { + stream server.Stream +} + +func (x *streamsSubscribeStream) Close() error { + return x.stream.Close() +} + +func (x *streamsSubscribeStream) Context() context.Context { + return x.stream.Context() +} + +func (x *streamsSubscribeStream) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *streamsSubscribeStream) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *streamsSubscribeStream) Send(m *Message) error { + return x.stream.Send(m) +} + +func (h *streamsHandler) Token(ctx context.Context, in *TokenRequest, out *TokenResponse) error { + return h.StreamsHandler.Token(ctx, in, out) +} diff --git a/streams/proto/streams.proto b/streams/proto/streams.proto new file mode 100644 index 0000000..0cb2b9f --- /dev/null +++ b/streams/proto/streams.proto @@ -0,0 +1,38 @@ +syntax = "proto3"; + +package streams; +option go_package = "github.com/micro/services/streams/proto;streams"; +import "google/protobuf/timestamp.proto"; + +service Streams { + rpc Publish(Message) returns (PublishResponse) {} + rpc Subscribe(SubscribeRequest) returns (stream Message) {} + rpc Token(TokenRequest) returns (TokenResponse) {} +} + +message PublishResponse {} + +message Message { + string topic = 1; + string message = 2; + google.protobuf.Timestamp sent_at = 3; +} + +message SubscribeRequest { + // topic the user wishes to subscribe to, required + string topic = 1; + // tokens should be provided if the user is not proving an API key on the request (e.g. in cases + // where the stream is being consumed directly from the frontend via websockets). tokens can be + // generated using the Token RPC + string token = 2; +} + +message TokenRequest { + // the topic the token should be restricted to, if no topic is required the token can be used to + // subscribe to any topic + string topic = 1; +} + +message TokenResponse { + string token = 1; +} \ No newline at end of file