diff --git a/stream/README.md b/stream/README.md index bb83a34..d704ee7 100644 --- a/stream/README.md +++ b/stream/README.md @@ -1,6 +1,10 @@ -Publish and subscribe to messages +Ephemeral messaging service # Stream Service -Send messages to a stream and broadcast to multiple subscribers. Group by topic -and rapidly blast fire-and-forget messages to anyone listening. +The stream service provides ephemeral message streams for building chat applications, +feeds and timelines. Simply send messages to a channel and know they'll expire +after 24 hours. Streams are limited to 1000 messages and 1000 streams in total. +Max message size is 512 characters. + +Messages with links are automatically populated with metadata related to site. diff --git a/stream/domain/domain.go b/stream/domain/domain.go new file mode 100644 index 0000000..9ef0377 --- /dev/null +++ b/stream/domain/domain.go @@ -0,0 +1,348 @@ +package domain + +import ( + "errors" + "net/url" + "strings" + "sync" + "time" + + "github.com/PuerkitoBio/goquery" + "github.com/golang/groupcache/lru" + "github.com/google/uuid" +) + +const ( + defaultStream = "_" + maxMessageSize = 512 + maxMessages = 1000 + maxStreams = 1000 + streamTTL = 8.64e13 +) + +type Metadata struct { + Created int64 + Title string + Description string + Type string + Image string + Url string + Site string +} + +type Stream struct { + Id string + Messages []*Message + Updated int64 +} + +type Message struct { + Id string + Text string + Created int64 `json:",string"` + Stream string + Metadata *Metadata +} + +type Store struct { + Created int64 + Updates chan *Message + + mtx sync.RWMutex + Streams *lru.Cache + streams map[string]int64 + metadatas map[string]*Metadata +} + +var ( + C = newStore() +) + +func newStore() *Store { + return &Store{ + Created: time.Now().UnixNano(), + Streams: lru.New(maxStreams), + Updates: make(chan *Message, 100), + streams: make(map[string]int64), + metadatas: make(map[string]*Metadata), + } +} + +func newStream(id string) *Stream { + return &Stream{ + Id: id, + Updated: time.Now().UnixNano(), + } +} + +func newMessage(text, stream string) *Message { + return &Message{ + Id: uuid.New().String(), + Text: text, + Created: time.Now().UnixNano(), + Stream: stream, + } +} + +func getMetadata(uri string) *Metadata { + u, err := url.Parse(uri) + if err != nil { + return nil + } + + d, err := goquery.NewDocument(u.String()) + if err != nil { + return nil + } + + g := &Metadata{ + Created: time.Now().UnixNano(), + } + + for _, node := range d.Find("meta").Nodes { + if len(node.Attr) < 2 { + continue + } + + p := strings.Split(node.Attr[0].Val, ":") + if len(p) < 2 || (p[0] != "twitter" && p[0] != "og") { + continue + } + + switch p[1] { + case "site_name": + g.Site = node.Attr[1].Val + case "site": + if len(g.Site) == 0 { + g.Site = node.Attr[1].Val + } + case "title": + g.Title = node.Attr[1].Val + case "description": + g.Description = node.Attr[1].Val + case "card", "type": + g.Type = node.Attr[1].Val + case "url": + g.Url = node.Attr[1].Val + case "image": + if len(p) > 2 && p[2] == "src" { + g.Image = node.Attr[1].Val + } else if len(g.Image) == 0 { + g.Image = node.Attr[1].Val + } + } + } + + if len(g.Type) == 0 || len(g.Image) == 0 || len(g.Title) == 0 || len(g.Url) == 0 { + return nil + } + + return g +} + +func (c *Store) Metadata(t *Message) { + parts := strings.Split(t.Text, " ") + for _, part := range parts { + g := getMetadata(part) + if g == nil { + continue + } + c.mtx.Lock() + c.metadatas[t.Id] = g + c.mtx.Unlock() + return + } +} + +func (c *Store) List() map[string]int64 { + c.mtx.RLock() + streams := c.streams + c.mtx.RUnlock() + return streams +} + +func (c *Store) Save(message *Message) { + c.mtx.Lock() + defer c.mtx.Unlock() + + var stream *Stream + + if obj, ok := c.Streams.Get(message.Stream); ok { + stream = obj.(*Stream) + } else { + stream = newStream(message.Stream) + c.Streams.Add(message.Stream, stream) + } + + stream.Messages = append(stream.Messages, message) + if len(stream.Messages) > maxMessages { + stream.Messages = stream.Messages[1:] + } + stream.Updated = time.Now().UnixNano() +} + +func (c *Store) Retrieve(message string, streem string, direction, last, limit int64) []*Message { + c.mtx.RLock() + defer c.mtx.RUnlock() + + var stream *Stream + + if message, ok := c.Streams.Get(streem); ok { + stream = message.(*Stream) + } else { + return []*Message{} + } + + if len(message) == 0 { + var messages []*Message + + if limit <= 0 { + return messages + } + + li := int(limit) + + // go back in time + if direction < 0 { + for i := len(stream.Messages) - 1; i >= 0; i-- { + if len(messages) >= li { + return messages + } + + message := stream.Messages[i] + + if message.Created < last { + if g, ok := c.metadatas[message.Id]; ok { + tc := *message + tc.Metadata = g + messages = append(messages, &tc) + } else { + messages = append(messages, message) + } + } + } + return messages + } + + start := 0 + if len(stream.Messages) > li { + start = len(stream.Messages) - li + } + + for i := start; i < len(stream.Messages); i++ { + if len(messages) >= li { + return messages + } + + message := stream.Messages[i] + + if message.Created > last { + if g, ok := c.metadatas[message.Id]; ok { + tc := *message + tc.Metadata = g + messages = append(messages, &tc) + } else { + messages = append(messages, message) + } + } + } + return messages + } + + // retrieve one + for _, t := range stream.Messages { + var messages []*Message + if message == t.Id { + if g, ok := c.metadatas[t.Id]; ok { + tc := *t + tc.Metadata = g + messages = append(messages, &tc) + } else { + messages = append(messages, t) + } + return messages + } + } + + return []*Message{} +} + +func (c *Store) Run() { + t1 := time.NewTicker(time.Hour) + t2 := time.NewTicker(time.Minute) + streams := make(map[string]int64) + + for { + select { + case message := <-c.Updates: + c.Save(message) + streams[message.Stream] = time.Now().UnixNano() + go c.Metadata(message) + case <-t1.C: + now := time.Now().UnixNano() + for stream, u := range streams { + if d := now - u; d > streamTTL { + c.Streams.Remove(stream) + delete(streams, stream) + } + } + c.mtx.Lock() + for metadata, g := range c.metadatas { + if d := now - g.Created; d > streamTTL { + delete(c.metadatas, metadata) + } + } + c.mtx.Unlock() + case <-t2.C: + c.mtx.Lock() + c.streams = streams + c.mtx.Unlock() + } + } +} + +func ListChannels() map[string]int64 { + return C.List() +} + +func ListMessages(channel string, limit int64) []*Message { + message := "" + last := int64(0) + direction := int64(1) + + if limit <= 0 { + limit = 25 + } + + // default stream + if len(channel) == 0 { + channel = defaultStream + } + + return C.Retrieve(message, channel, direction, last, limit) +} + +func SendMessage(channel, message string) error { + // default stream + if len(channel) == 0 { + channel = defaultStream + } + + // default length + if len(message) > maxMessageSize { + message = message[:maxMessageSize] + } + + select { + case C.Updates <- newMessage(message, channel): + case <-time.After(time.Second): + return errors.New("timed out creating message") + } + + return nil +} + +// TODO: streams per user +func Setup() { + go C.Run() +} diff --git a/stream/examples.json b/stream/examples.json index e2e8e7e..1202e32 100644 --- a/stream/examples.json +++ b/stream/examples.json @@ -1,25 +1,46 @@ { - "publish": [{ - "title": "Publish a message", - "description": "Publish a message to a topic on the stream", + "sendMessage": [{ + "title": "Send a message", + "description": "Send a message to a channel", "run_check": true, "request": { - "topic": "events", - "message": {"id": "1", "type": "signup", "user": "john"} + "channel": "general", + "text": "Hey checkout this tweet https://twitter.com/m3oservices/status/1455291054295498752" }, "response": {} }], - "subscribe": [{ - "title": "Subscribe to a topic", - "description": "Subscribe to messages on a given topic from the stream", + "listMessages": [{ + "title": "List messages", + "description": "List messages for a channel", "run_check": false, "request": { - "topic": "events" + "channel": "general" }, "response": { - "topic": "events", - "message": {"id": "1", "type": "signup", "user": "john"} + "messages": [ + { + "id": "e6099dca-22af-440e-bdbf-e14525af9824", + "text": "Hey checkout this tweet https://twitter.com/m3oservices/status/1455291054295498752", + "timestamp": "2021-11-03T14:34:40.333401738Z", + "channel": "general", + "metadata": {} + } + ] + } + }], + "listChannels": [{ + "title": "List channels", + "description": "List all the channels", + "run_check": true, + "request": {}, + "response": { + "channels": [ + { + "name": "general", + "last_active": "2021-11-03T14:35:07.594972213Z" + } + ] } }] } diff --git a/stream/handler/stream.go b/stream/handler/stream.go index 6299b5f..acf56d8 100644 --- a/stream/handler/stream.go +++ b/stream/handler/stream.go @@ -2,23 +2,29 @@ package handler import ( "context" - "encoding/json" - "fmt" "path" + "strings" + "time" - "github.com/asim/mq/broker" "github.com/micro/micro/v3/service/errors" - log "github.com/micro/micro/v3/service/logger" "github.com/micro/services/pkg/tenant" + "github.com/micro/services/stream/domain" pb "github.com/micro/services/stream/proto" - "google.golang.org/protobuf/types/known/structpb" ) type Stream struct{} -func (s *Stream) Publish(ctx context.Context, req *pb.PublishRequest, rsp *pb.PublishResponse) error { - if len(req.Topic) == 0 { - return errors.BadRequest("stream.publish", "topic is blank") +func New() *Stream { + domain.Setup() + return &Stream{} +} + +func (s *Stream) SendMessage(ctx context.Context, req *pb.SendMessageRequest, rsp *pb.SendMessageResponse) error { + if len(req.Channel) == 0 { + return errors.BadRequest("stream.sendmessage", "channel is blank") + } + if len(req.Text) == 0 { + return errors.BadRequest("stream.sendmessage", "message is blank") } // get the tenant @@ -27,23 +33,23 @@ func (s *Stream) Publish(ctx context.Context, req *pb.PublishRequest, rsp *pb.Pu id = "default" } - // create tenant based topics - topic := path.Join("stream", id, req.Topic) + // create tenant based channels + channel := path.Join(id, req.Channel) - // marshal the data - b, _ := json.Marshal(req.Message.AsMap()) - - log.Infof("Tenant %v publishing to %v\n", id, req.Topic) - - // publish the message - broker.Publish(topic, b) + // sendmessage the message + if err := domain.SendMessage(channel, req.Text); err != nil { + return errors.InternalServerError("stream.sendmessage", err.Error()) + } return nil } -func (s *Stream) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream pb.Stream_SubscribeStream) error { - if len(req.Topic) == 0 { - return errors.BadRequest("stream.publish", "topic is blank") +func (s *Stream) ListMessages(ctx context.Context, req *pb.ListMessagesRequest, rsp *pb.ListMessagesResponse) error { + if len(req.Channel) == 0 { + return errors.BadRequest("stream.sendmessage", "channel is blank") + } + if req.Limit <= 0 { + req.Limit = 25 } id, ok := tenant.FromContext(ctx) @@ -51,30 +57,53 @@ func (s *Stream) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream id = "default" } - // create tenant based topics - topic := path.Join("stream", id, req.Topic) + // create tenant based channels + channel := path.Join(id, req.Channel) + rsp.Channel = req.Channel - log.Infof("Tenant %v subscribing to %v\n", id, req.Topic) + for _, message := range domain.ListMessages(channel, int64(req.Limit)) { + metadata := map[string]string{} - sub, err := broker.Subscribe(topic) - if err != nil { - return errors.InternalServerError("stream.subscribe", "failed to subscribe to stream") - } - defer broker.Unsubscribe(req.Topic, sub) - - // range over the messages until the subscriber is closed - for msg := range sub { - fmt.Println("got message, sending") - // unmarshal the message into a struct - d := &structpb.Struct{} - d.UnmarshalJSON(msg) - - if err := stream.Send(&pb.SubscribeResponse{ - Topic: req.Topic, - Message: d, - }); err != nil { - return err + if message.Metadata != nil { + metadata["created"] = time.Unix(0, message.Metadata.Created).Format(time.RFC3339Nano) + metadata["title"] = message.Metadata.Title + metadata["description"] = message.Metadata.Description + metadata["type"] = message.Metadata.Type + metadata["image"] = message.Metadata.Image + metadata["url"] = message.Metadata.Url + metadata["site"] = message.Metadata.Site } + + rsp.Messages = append(rsp.Messages, &pb.Message{ + Id: message.Id, + Text: message.Text, + Timestamp: time.Unix(0, message.Created).Format(time.RFC3339Nano), + Channel: req.Channel, + Metadata: metadata, + }) + } + + return nil +} + +func (s *Stream) ListChannels(ctx context.Context, req *pb.ListChannelsRequest, rsp *pb.ListChannelsResponse) error { + // get the tenant + id, ok := tenant.FromContext(ctx) + if !ok { + id = "default" + } + + for channel, active := range domain.ListChannels() { + if !strings.HasPrefix(channel, id+"/") { + continue + } + + channel = strings.TrimPrefix(channel, id+"/") + + rsp.Channels = append(rsp.Channels, &pb.Channel{ + Name: channel, + LastActive: time.Unix(0, active).Format(time.RFC3339Nano), + }) } return nil diff --git a/stream/main.go b/stream/main.go index 1c8e28b..f854e38 100644 --- a/stream/main.go +++ b/stream/main.go @@ -15,7 +15,7 @@ func main() { ) // Register handler - pb.RegisterStreamHandler(srv.Server(), new(handler.Stream)) + pb.RegisterStreamHandler(srv.Server(), handler.New()) // Run service if err := srv.Run(); err != nil { diff --git a/stream/proto/stream.pb.go b/stream/proto/stream.pb.go index 0dd9f11..5c01f56 100644 --- a/stream/proto/stream.pb.go +++ b/stream/proto/stream.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.26.0 +// protoc-gen-go v1.27.1 // protoc v3.15.6 // source: proto/stream.proto @@ -9,7 +9,6 @@ package stream import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" - structpb "google.golang.org/protobuf/types/known/structpb" reflect "reflect" sync "sync" ) @@ -21,20 +20,25 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -// Publish a message to the stream. Specify a topic to group messages for a specific topic. -type PublishRequest struct { +type Message struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // The topic to publish to - Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` - // The json message to publish - Message *structpb.Struct `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` + // id of the message + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + // text of the message + Text string `protobuf:"bytes,2,opt,name=text,proto3" json:"text,omitempty"` + // time of message creation + Timestamp string `protobuf:"bytes,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + // the channel name + Channel string `protobuf:"bytes,4,opt,name=channel,proto3" json:"channel,omitempty"` + // the associated metadata + Metadata map[string]string `protobuf:"bytes,5,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } -func (x *PublishRequest) Reset() { - *x = PublishRequest{} +func (x *Message) Reset() { + *x = Message{} if protoimpl.UnsafeEnabled { mi := &file_proto_stream_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -42,13 +46,13 @@ func (x *PublishRequest) Reset() { } } -func (x *PublishRequest) String() string { +func (x *Message) String() string { return protoimpl.X.MessageStringOf(x) } -func (*PublishRequest) ProtoMessage() {} +func (*Message) ProtoMessage() {} -func (x *PublishRequest) ProtoReflect() protoreflect.Message { +func (x *Message) ProtoReflect() protoreflect.Message { mi := &file_proto_stream_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -60,33 +64,59 @@ func (x *PublishRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use PublishRequest.ProtoReflect.Descriptor instead. -func (*PublishRequest) Descriptor() ([]byte, []int) { +// Deprecated: Use Message.ProtoReflect.Descriptor instead. +func (*Message) Descriptor() ([]byte, []int) { return file_proto_stream_proto_rawDescGZIP(), []int{0} } -func (x *PublishRequest) GetTopic() string { +func (x *Message) GetId() string { if x != nil { - return x.Topic + return x.Id } return "" } -func (x *PublishRequest) GetMessage() *structpb.Struct { +func (x *Message) GetText() string { if x != nil { - return x.Message + return x.Text + } + return "" +} + +func (x *Message) GetTimestamp() string { + if x != nil { + return x.Timestamp + } + return "" +} + +func (x *Message) GetChannel() string { + if x != nil { + return x.Channel + } + return "" +} + +func (x *Message) GetMetadata() map[string]string { + if x != nil { + return x.Metadata } return nil } -type PublishResponse struct { +type Channel struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields + + // name of the channel + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + // last activity time + LastActive string `protobuf:"bytes,2,opt,name=last_active,json=lastActive,proto3" json:"last_active,omitempty"` } -func (x *PublishResponse) Reset() { - *x = PublishResponse{} +func (x *Channel) Reset() { + *x = Channel{} if protoimpl.UnsafeEnabled { mi := &file_proto_stream_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -94,13 +124,13 @@ func (x *PublishResponse) Reset() { } } -func (x *PublishResponse) String() string { +func (x *Channel) String() string { return protoimpl.X.MessageStringOf(x) } -func (*PublishResponse) ProtoMessage() {} +func (*Channel) ProtoMessage() {} -func (x *PublishResponse) ProtoReflect() protoreflect.Message { +func (x *Channel) ProtoReflect() protoreflect.Message { mi := &file_proto_stream_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -112,23 +142,39 @@ func (x *PublishResponse) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use PublishResponse.ProtoReflect.Descriptor instead. -func (*PublishResponse) Descriptor() ([]byte, []int) { +// Deprecated: Use Channel.ProtoReflect.Descriptor instead. +func (*Channel) Descriptor() ([]byte, []int) { return file_proto_stream_proto_rawDescGZIP(), []int{1} } -// Subscribe to messages for a given topic. -type SubscribeRequest struct { +func (x *Channel) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *Channel) GetLastActive() string { + if x != nil { + return x.LastActive + } + return "" +} + +// SendMessage a message to the stream. +type SendMessageRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // The topic to subscribe to - Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + // The channel to send to + Channel string `protobuf:"bytes,1,opt,name=channel,proto3" json:"channel,omitempty"` + // The message text to send + Text string `protobuf:"bytes,2,opt,name=text,proto3" json:"text,omitempty"` } -func (x *SubscribeRequest) Reset() { - *x = SubscribeRequest{} +func (x *SendMessageRequest) Reset() { + *x = SendMessageRequest{} if protoimpl.UnsafeEnabled { mi := &file_proto_stream_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -136,13 +182,13 @@ func (x *SubscribeRequest) Reset() { } } -func (x *SubscribeRequest) String() string { +func (x *SendMessageRequest) String() string { return protoimpl.X.MessageStringOf(x) } -func (*SubscribeRequest) ProtoMessage() {} +func (*SendMessageRequest) ProtoMessage() {} -func (x *SubscribeRequest) ProtoReflect() protoreflect.Message { +func (x *SendMessageRequest) ProtoReflect() protoreflect.Message { mi := &file_proto_stream_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -154,32 +200,33 @@ func (x *SubscribeRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use SubscribeRequest.ProtoReflect.Descriptor instead. -func (*SubscribeRequest) Descriptor() ([]byte, []int) { +// Deprecated: Use SendMessageRequest.ProtoReflect.Descriptor instead. +func (*SendMessageRequest) Descriptor() ([]byte, []int) { return file_proto_stream_proto_rawDescGZIP(), []int{2} } -func (x *SubscribeRequest) GetTopic() string { +func (x *SendMessageRequest) GetChannel() string { if x != nil { - return x.Topic + return x.Channel } return "" } -// A blocking stream will be returned in response. -type SubscribeResponse struct { +func (x *SendMessageRequest) GetText() string { + if x != nil { + return x.Text + } + return "" +} + +type SendMessageResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - - // The topic subscribed to - Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` - // The next json message on the topic - Message *structpb.Struct `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` } -func (x *SubscribeResponse) Reset() { - *x = SubscribeResponse{} +func (x *SendMessageResponse) Reset() { + *x = SendMessageResponse{} if protoimpl.UnsafeEnabled { mi := &file_proto_stream_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -187,13 +234,13 @@ func (x *SubscribeResponse) Reset() { } } -func (x *SubscribeResponse) String() string { +func (x *SendMessageResponse) String() string { return protoimpl.X.MessageStringOf(x) } -func (*SubscribeResponse) ProtoMessage() {} +func (*SendMessageResponse) ProtoMessage() {} -func (x *SubscribeResponse) ProtoReflect() protoreflect.Message { +func (x *SendMessageResponse) ProtoReflect() protoreflect.Message { mi := &file_proto_stream_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -205,21 +252,208 @@ func (x *SubscribeResponse) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use SubscribeResponse.ProtoReflect.Descriptor instead. -func (*SubscribeResponse) Descriptor() ([]byte, []int) { +// Deprecated: Use SendMessageResponse.ProtoReflect.Descriptor instead. +func (*SendMessageResponse) Descriptor() ([]byte, []int) { return file_proto_stream_proto_rawDescGZIP(), []int{3} } -func (x *SubscribeResponse) GetTopic() string { +// List all the active channels +type ListChannelsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *ListChannelsRequest) Reset() { + *x = ListChannelsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_stream_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ListChannelsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListChannelsRequest) ProtoMessage() {} + +func (x *ListChannelsRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_stream_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListChannelsRequest.ProtoReflect.Descriptor instead. +func (*ListChannelsRequest) Descriptor() ([]byte, []int) { + return file_proto_stream_proto_rawDescGZIP(), []int{4} +} + +type ListChannelsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Channels []*Channel `protobuf:"bytes,1,rep,name=channels,proto3" json:"channels,omitempty"` +} + +func (x *ListChannelsResponse) Reset() { + *x = ListChannelsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_stream_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ListChannelsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListChannelsResponse) ProtoMessage() {} + +func (x *ListChannelsResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_stream_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListChannelsResponse.ProtoReflect.Descriptor instead. +func (*ListChannelsResponse) Descriptor() ([]byte, []int) { + return file_proto_stream_proto_rawDescGZIP(), []int{5} +} + +func (x *ListChannelsResponse) GetChannels() []*Channel { if x != nil { - return x.Topic + return x.Channels + } + return nil +} + +// List messages for a given channel +type ListMessagesRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The channel to subscribe to + Channel string `protobuf:"bytes,1,opt,name=channel,proto3" json:"channel,omitempty"` + // number of message to return + Limit int32 `protobuf:"varint,2,opt,name=limit,proto3" json:"limit,omitempty"` +} + +func (x *ListMessagesRequest) Reset() { + *x = ListMessagesRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_stream_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ListMessagesRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListMessagesRequest) ProtoMessage() {} + +func (x *ListMessagesRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_stream_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListMessagesRequest.ProtoReflect.Descriptor instead. +func (*ListMessagesRequest) Descriptor() ([]byte, []int) { + return file_proto_stream_proto_rawDescGZIP(), []int{6} +} + +func (x *ListMessagesRequest) GetChannel() string { + if x != nil { + return x.Channel } return "" } -func (x *SubscribeResponse) GetMessage() *structpb.Struct { +func (x *ListMessagesRequest) GetLimit() int32 { if x != nil { - return x.Message + return x.Limit + } + return 0 +} + +type ListMessagesResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The channel subscribed to + Channel string `protobuf:"bytes,1,opt,name=channel,proto3" json:"channel,omitempty"` + // Messages are returned in reverse order; latest first + Messages []*Message `protobuf:"bytes,2,rep,name=messages,proto3" json:"messages,omitempty"` +} + +func (x *ListMessagesResponse) Reset() { + *x = ListMessagesResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_stream_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ListMessagesResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListMessagesResponse) ProtoMessage() {} + +func (x *ListMessagesResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_stream_proto_msgTypes[7] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListMessagesResponse.ProtoReflect.Descriptor instead. +func (*ListMessagesResponse) Descriptor() ([]byte, []int) { + return file_proto_stream_proto_rawDescGZIP(), []int{7} +} + +func (x *ListMessagesResponse) GetChannel() string { + if x != nil { + return x.Channel + } + return "" +} + +func (x *ListMessagesResponse) GetMessages() []*Message { + if x != nil { + return x.Messages } return nil } @@ -228,35 +462,64 @@ var File_proto_stream_proto protoreflect.FileDescriptor var file_proto_stream_proto_rawDesc = []byte{ 0x0a, 0x12, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x1a, 0x1c, 0x67, 0x6f, - 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, - 0x72, 0x75, 0x63, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x59, 0x0a, 0x0e, 0x50, 0x75, - 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, - 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, - 0x69, 0x63, 0x12, 0x31, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x07, 0x6d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x11, 0x0a, 0x0f, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x28, 0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, - 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, - 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, - 0x69, 0x63, 0x22, 0x5c, 0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x31, 0x0a, - 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, - 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, - 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x32, 0x8c, 0x01, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x3c, 0x0a, 0x07, 0x50, - 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x16, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, - 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, - 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x44, 0x0a, 0x09, 0x53, 0x75, 0x62, - 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x18, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, - 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x19, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, - 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, - 0x10, 0x5a, 0x0e, 0x2e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x73, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x22, 0xdd, 0x01, 0x0a, + 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, 0x78, 0x74, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x65, 0x78, 0x74, 0x12, 0x1c, 0x0a, 0x09, + 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x68, + 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x68, 0x61, + 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x39, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, + 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x1a, + 0x3b, 0x0a, 0x0d, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, + 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, + 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x3e, 0x0a, 0x07, + 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x6c, + 0x61, 0x73, 0x74, 0x5f, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0a, 0x6c, 0x61, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x22, 0x42, 0x0a, 0x12, + 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x12, 0x0a, 0x04, + 0x74, 0x65, 0x78, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x65, 0x78, 0x74, + 0x22, 0x15, 0x0a, 0x13, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x15, 0x0a, 0x13, 0x4c, 0x69, 0x73, 0x74, 0x43, + 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x43, + 0x0a, 0x14, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x73, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2b, 0x0a, 0x08, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, + 0x6c, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x52, 0x08, 0x63, 0x68, 0x61, 0x6e, 0x6e, + 0x65, 0x6c, 0x73, 0x22, 0x45, 0x0a, 0x13, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x68, + 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x68, 0x61, + 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x05, 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x22, 0x5d, 0x0a, 0x14, 0x4c, 0x69, + 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x2b, 0x0a, 0x08, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, + 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, + 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x32, 0xec, 0x01, 0x0a, 0x06, 0x53, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x12, 0x48, 0x0a, 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x12, 0x1a, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x53, 0x65, 0x6e, + 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x1b, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4b, + 0x0a, 0x0c, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x12, 0x1b, + 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x73, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4b, 0x0a, 0x0c, 0x4c, + 0x69, 0x73, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x73, 0x12, 0x1b, 0x2e, 0x73, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x73, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x10, 0x5a, 0x0e, 0x2e, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x3b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( @@ -271,26 +534,33 @@ func file_proto_stream_proto_rawDescGZIP() []byte { return file_proto_stream_proto_rawDescData } -var file_proto_stream_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_proto_stream_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_proto_stream_proto_goTypes = []interface{}{ - (*PublishRequest)(nil), // 0: stream.PublishRequest - (*PublishResponse)(nil), // 1: stream.PublishResponse - (*SubscribeRequest)(nil), // 2: stream.SubscribeRequest - (*SubscribeResponse)(nil), // 3: stream.SubscribeResponse - (*structpb.Struct)(nil), // 4: google.protobuf.Struct + (*Message)(nil), // 0: stream.Message + (*Channel)(nil), // 1: stream.Channel + (*SendMessageRequest)(nil), // 2: stream.SendMessageRequest + (*SendMessageResponse)(nil), // 3: stream.SendMessageResponse + (*ListChannelsRequest)(nil), // 4: stream.ListChannelsRequest + (*ListChannelsResponse)(nil), // 5: stream.ListChannelsResponse + (*ListMessagesRequest)(nil), // 6: stream.ListMessagesRequest + (*ListMessagesResponse)(nil), // 7: stream.ListMessagesResponse + nil, // 8: stream.Message.MetadataEntry } var file_proto_stream_proto_depIdxs = []int32{ - 4, // 0: stream.PublishRequest.message:type_name -> google.protobuf.Struct - 4, // 1: stream.SubscribeResponse.message:type_name -> google.protobuf.Struct - 0, // 2: stream.Stream.Publish:input_type -> stream.PublishRequest - 2, // 3: stream.Stream.Subscribe:input_type -> stream.SubscribeRequest - 1, // 4: stream.Stream.Publish:output_type -> stream.PublishResponse - 3, // 5: stream.Stream.Subscribe:output_type -> stream.SubscribeResponse - 4, // [4:6] is the sub-list for method output_type - 2, // [2:4] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 8, // 0: stream.Message.metadata:type_name -> stream.Message.MetadataEntry + 1, // 1: stream.ListChannelsResponse.channels:type_name -> stream.Channel + 0, // 2: stream.ListMessagesResponse.messages:type_name -> stream.Message + 2, // 3: stream.Stream.SendMessage:input_type -> stream.SendMessageRequest + 6, // 4: stream.Stream.ListMessages:input_type -> stream.ListMessagesRequest + 4, // 5: stream.Stream.ListChannels:input_type -> stream.ListChannelsRequest + 3, // 6: stream.Stream.SendMessage:output_type -> stream.SendMessageResponse + 7, // 7: stream.Stream.ListMessages:output_type -> stream.ListMessagesResponse + 5, // 8: stream.Stream.ListChannels:output_type -> stream.ListChannelsResponse + 6, // [6:9] is the sub-list for method output_type + 3, // [3:6] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_proto_stream_proto_init() } @@ -300,7 +570,7 @@ func file_proto_stream_proto_init() { } if !protoimpl.UnsafeEnabled { file_proto_stream_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PublishRequest); i { + switch v := v.(*Message); i { case 0: return &v.state case 1: @@ -312,7 +582,7 @@ func file_proto_stream_proto_init() { } } file_proto_stream_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PublishResponse); i { + switch v := v.(*Channel); i { case 0: return &v.state case 1: @@ -324,7 +594,7 @@ func file_proto_stream_proto_init() { } } file_proto_stream_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*SubscribeRequest); i { + switch v := v.(*SendMessageRequest); i { case 0: return &v.state case 1: @@ -336,7 +606,55 @@ func file_proto_stream_proto_init() { } } file_proto_stream_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*SubscribeResponse); i { + switch v := v.(*SendMessageResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_stream_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListChannelsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_stream_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListChannelsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_stream_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListMessagesRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_stream_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListMessagesResponse); i { case 0: return &v.state case 1: @@ -354,7 +672,7 @@ func file_proto_stream_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_proto_stream_proto_rawDesc, NumEnums: 0, - NumMessages: 4, + NumMessages: 9, NumExtensions: 0, NumServices: 1, }, diff --git a/stream/proto/stream.pb.micro.go b/stream/proto/stream.pb.micro.go index fbb0fe8..40ab904 100644 --- a/stream/proto/stream.pb.micro.go +++ b/stream/proto/stream.pb.micro.go @@ -6,7 +6,6 @@ package stream import ( fmt "fmt" proto "github.com/golang/protobuf/proto" - _ "google.golang.org/protobuf/types/known/structpb" math "math" ) @@ -43,8 +42,9 @@ func NewStreamEndpoints() []*api.Endpoint { // Client API for Stream service type StreamService interface { - Publish(ctx context.Context, in *PublishRequest, opts ...client.CallOption) (*PublishResponse, error) - Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (Stream_SubscribeService, error) + SendMessage(ctx context.Context, in *SendMessageRequest, opts ...client.CallOption) (*SendMessageResponse, error) + ListMessages(ctx context.Context, in *ListMessagesRequest, opts ...client.CallOption) (*ListMessagesResponse, error) + ListChannels(ctx context.Context, in *ListChannelsRequest, opts ...client.CallOption) (*ListChannelsResponse, error) } type streamService struct { @@ -59,9 +59,9 @@ func NewStreamService(name string, c client.Client) StreamService { } } -func (c *streamService) Publish(ctx context.Context, in *PublishRequest, opts ...client.CallOption) (*PublishResponse, error) { - req := c.c.NewRequest(c.name, "Stream.Publish", in) - out := new(PublishResponse) +func (c *streamService) SendMessage(ctx context.Context, in *SendMessageRequest, opts ...client.CallOption) (*SendMessageResponse, error) { + req := c.c.NewRequest(c.name, "Stream.SendMessage", in) + out := new(SendMessageResponse) err := c.c.Call(ctx, req, out, opts...) if err != nil { return nil, err @@ -69,66 +69,39 @@ func (c *streamService) Publish(ctx context.Context, in *PublishRequest, opts .. return out, nil } -func (c *streamService) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (Stream_SubscribeService, error) { - req := c.c.NewRequest(c.name, "Stream.Subscribe", &SubscribeRequest{}) - stream, err := c.c.Stream(ctx, req, opts...) +func (c *streamService) ListMessages(ctx context.Context, in *ListMessagesRequest, opts ...client.CallOption) (*ListMessagesResponse, error) { + req := c.c.NewRequest(c.name, "Stream.ListMessages", in) + out := new(ListMessagesResponse) + err := c.c.Call(ctx, req, out, opts...) if err != nil { return nil, err } - if err := stream.Send(in); err != nil { - return nil, err - } - return &streamServiceSubscribe{stream}, nil + return out, nil } -type Stream_SubscribeService interface { - Context() context.Context - SendMsg(interface{}) error - RecvMsg(interface{}) error - Close() error - Recv() (*SubscribeResponse, error) -} - -type streamServiceSubscribe struct { - stream client.Stream -} - -func (x *streamServiceSubscribe) Close() error { - return x.stream.Close() -} - -func (x *streamServiceSubscribe) Context() context.Context { - return x.stream.Context() -} - -func (x *streamServiceSubscribe) SendMsg(m interface{}) error { - return x.stream.Send(m) -} - -func (x *streamServiceSubscribe) RecvMsg(m interface{}) error { - return x.stream.Recv(m) -} - -func (x *streamServiceSubscribe) Recv() (*SubscribeResponse, error) { - m := new(SubscribeResponse) - err := x.stream.Recv(m) +func (c *streamService) ListChannels(ctx context.Context, in *ListChannelsRequest, opts ...client.CallOption) (*ListChannelsResponse, error) { + req := c.c.NewRequest(c.name, "Stream.ListChannels", in) + out := new(ListChannelsResponse) + err := c.c.Call(ctx, req, out, opts...) if err != nil { return nil, err } - return m, nil + return out, nil } // Server API for Stream service type StreamHandler interface { - Publish(context.Context, *PublishRequest, *PublishResponse) error - Subscribe(context.Context, *SubscribeRequest, Stream_SubscribeStream) error + SendMessage(context.Context, *SendMessageRequest, *SendMessageResponse) error + ListMessages(context.Context, *ListMessagesRequest, *ListMessagesResponse) error + ListChannels(context.Context, *ListChannelsRequest, *ListChannelsResponse) error } func RegisterStreamHandler(s server.Server, hdlr StreamHandler, opts ...server.HandlerOption) error { type stream interface { - Publish(ctx context.Context, in *PublishRequest, out *PublishResponse) error - Subscribe(ctx context.Context, stream server.Stream) error + SendMessage(ctx context.Context, in *SendMessageRequest, out *SendMessageResponse) error + ListMessages(ctx context.Context, in *ListMessagesRequest, out *ListMessagesResponse) error + ListChannels(ctx context.Context, in *ListChannelsRequest, out *ListChannelsResponse) error } type Stream struct { stream @@ -141,46 +114,14 @@ type streamHandler struct { StreamHandler } -func (h *streamHandler) Publish(ctx context.Context, in *PublishRequest, out *PublishResponse) error { - return h.StreamHandler.Publish(ctx, in, out) +func (h *streamHandler) SendMessage(ctx context.Context, in *SendMessageRequest, out *SendMessageResponse) error { + return h.StreamHandler.SendMessage(ctx, in, out) } -func (h *streamHandler) Subscribe(ctx context.Context, stream server.Stream) error { - m := new(SubscribeRequest) - if err := stream.Recv(m); err != nil { - return err - } - return h.StreamHandler.Subscribe(ctx, m, &streamSubscribeStream{stream}) +func (h *streamHandler) ListMessages(ctx context.Context, in *ListMessagesRequest, out *ListMessagesResponse) error { + return h.StreamHandler.ListMessages(ctx, in, out) } -type Stream_SubscribeStream interface { - Context() context.Context - SendMsg(interface{}) error - RecvMsg(interface{}) error - Close() error - Send(*SubscribeResponse) error -} - -type streamSubscribeStream struct { - stream server.Stream -} - -func (x *streamSubscribeStream) Close() error { - return x.stream.Close() -} - -func (x *streamSubscribeStream) Context() context.Context { - return x.stream.Context() -} - -func (x *streamSubscribeStream) SendMsg(m interface{}) error { - return x.stream.Send(m) -} - -func (x *streamSubscribeStream) RecvMsg(m interface{}) error { - return x.stream.Recv(m) -} - -func (x *streamSubscribeStream) Send(m *SubscribeResponse) error { - return x.stream.Send(m) +func (h *streamHandler) ListChannels(ctx context.Context, in *ListChannelsRequest, out *ListChannelsResponse) error { + return h.StreamHandler.ListChannels(ctx, in, out) } diff --git a/stream/proto/stream.proto b/stream/proto/stream.proto index 06b36e1..a031db0 100644 --- a/stream/proto/stream.proto +++ b/stream/proto/stream.proto @@ -3,33 +3,62 @@ syntax = "proto3"; package stream; option go_package = "./proto;stream"; -import "google/protobuf/struct.proto"; service Stream { - rpc Publish(PublishRequest) returns (PublishResponse) {} - rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse) {} + rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {} + rpc ListMessages(ListMessagesRequest) returns (ListMessagesResponse) {} + rpc ListChannels(ListChannelsRequest) returns (ListChannelsResponse) {} } -// Publish a message to the stream. Specify a topic to group messages for a specific topic. -message PublishRequest { - // The topic to publish to - string topic = 1; - // The json message to publish - google.protobuf.Struct message = 2; +message Message { + // id of the message + string id = 1; + // text of the message + string text = 2; + // time of message creation + string timestamp = 3; + // the channel name + string channel = 4; + // the associated metadata + map metadata = 5; } -message PublishResponse {} - -// Subscribe to messages for a given topic. -message SubscribeRequest { - // The topic to subscribe to - string topic = 1; +message Channel { + // name of the channel + string name = 1; + // last activity time + string last_active = 2; } -// A blocking stream will be returned in response. -message SubscribeResponse { - // The topic subscribed to - string topic = 1; - // The next json message on the topic - google.protobuf.Struct message = 2; +// SendMessage a message to the stream. +message SendMessageRequest { + // The channel to send to + string channel = 1; + // The message text to send + string text = 2; +} + +message SendMessageResponse {} + +// List all the active channels +message ListChannelsRequest { +} + +message ListChannelsResponse { + repeated Channel channels = 1; +} + +// List messages for a given channel +message ListMessagesRequest { + // The channel to subscribe to + string channel = 1; + // number of message to return + int32 limit = 2; +} + +message ListMessagesResponse { + // The channel subscribed to + string channel = 1; + // Messages are chronological order + repeated Message messages = 2; } diff --git a/stream/publicapi.json b/stream/publicapi.json index f0f05f0..30337b8 100644 --- a/stream/publicapi.json +++ b/stream/publicapi.json @@ -1,6 +1,6 @@ { "name": "stream", "icon": "🌊", - "category": "messaging", + "category": "social", "display_name": "Stream" }