From 5ad499c8a0d7fe5fb38b7c64bb709972f5c5bd52 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 3 Nov 2021 16:03:20 +0000 Subject: [PATCH] add create channels to stream --- stream/domain/domain.go | 46 +++-- stream/examples.json | 11 ++ stream/handler/stream.go | 27 ++- stream/proto/stream.pb.go | 333 +++++++++++++++++++++++--------- stream/proto/stream.pb.micro.go | 17 ++ stream/proto/stream.proto | 16 +- 6 files changed, 340 insertions(+), 110 deletions(-) diff --git a/stream/domain/domain.go b/stream/domain/domain.go index 9ef0377..42b886d 100644 --- a/stream/domain/domain.go +++ b/stream/domain/domain.go @@ -32,6 +32,7 @@ type Metadata struct { type Stream struct { Id string + Description string Messages []*Message Updated int64 } @@ -50,7 +51,7 @@ type Store struct { mtx sync.RWMutex Streams *lru.Cache - streams map[string]int64 + streams map[string]*Stream metadatas map[string]*Metadata } @@ -63,14 +64,15 @@ func newStore() *Store { Created: time.Now().UnixNano(), Streams: lru.New(maxStreams), Updates: make(chan *Message, 100), - streams: make(map[string]int64), + streams: make(map[string]*Stream), metadatas: make(map[string]*Metadata), } } -func newStream(id string) *Stream { +func newStream(id, desc string) *Stream { return &Stream{ Id: id, + Description: desc, Updated: time.Now().UnixNano(), } } @@ -140,6 +142,18 @@ func getMetadata(uri string) *Metadata { return g } +func (c *Store) CreateStream(name, description string) { + c.mtx.Lock() + ch, ok := c.streams[name] + if ok { + ch.Description = description + } else { + ch = newStream(name, description) + } + c.streams[name] = ch + c.mtx.Unlock() +} + func (c *Store) Metadata(t *Message) { parts := strings.Split(t.Text, " ") for _, part := range parts { @@ -154,7 +168,7 @@ func (c *Store) Metadata(t *Message) { } } -func (c *Store) List() map[string]int64 { +func (c *Store) ListStreams() map[string]*Stream { c.mtx.RLock() streams := c.streams c.mtx.RUnlock() @@ -170,7 +184,7 @@ func (c *Store) Save(message *Message) { if obj, ok := c.Streams.Get(message.Stream); ok { stream = obj.(*Stream) } else { - stream = newStream(message.Stream) + stream = newStream(message.Stream, "") c.Streams.Add(message.Stream, stream) } @@ -270,18 +284,24 @@ func (c *Store) Retrieve(message string, streem string, direction, last, limit i func (c *Store) Run() { t1 := time.NewTicker(time.Hour) t2 := time.NewTicker(time.Minute) - streams := make(map[string]int64) + streams := make(map[string]*Stream) for { select { case message := <-c.Updates: c.Save(message) - streams[message.Stream] = time.Now().UnixNano() + ch, ok := streams[message.Stream] + if !ok { + ch = newStream(message.Stream, "") + streams[message.Stream] = ch + } + ch.Updated = time.Now().UnixNano() + streams[message.Stream] = ch go c.Metadata(message) case <-t1.C: now := time.Now().UnixNano() - for stream, u := range streams { - if d := now - u; d > streamTTL { + for stream, ch := range streams { + if d := now - ch.Updated; d > streamTTL { c.Streams.Remove(stream) delete(streams, stream) } @@ -301,8 +321,12 @@ func (c *Store) Run() { } } -func ListChannels() map[string]int64 { - return C.List() +func CreateChannel(name, description string) { + C.CreateStream(name, description) +} + +func ListChannels() map[string]*Stream { + return C.ListStreams() } func ListMessages(channel string, limit int64) []*Message { diff --git a/stream/examples.json b/stream/examples.json index 4d5fc22..499a2e2 100644 --- a/stream/examples.json +++ b/stream/examples.json @@ -1,5 +1,15 @@ { + "createChannel": [{ + "title": "Create Channel", + "description": "Create a channel with name and description", + "run_check": true, + "request": { + "name": "general", + "description": "The channel for all things" + }, + "response": {} + }], "sendMessage": [{ "title": "Send message", "description": "Send a message to a channel", @@ -38,6 +48,7 @@ "channels": [ { "name": "general", + "description": "The channel for all things", "last_active": "2021-11-03T14:35:07.594972213Z" } ] diff --git a/stream/handler/stream.go b/stream/handler/stream.go index acf56d8..5aab90e 100644 --- a/stream/handler/stream.go +++ b/stream/handler/stream.go @@ -19,6 +19,22 @@ func New() *Stream { return &Stream{} } +func (s *Stream) CreateChannel(ctx context.Context, req *pb.CreateChannelRequest, rsp *pb.CreateChannelResponse) error { + // get the tenant + id, ok := tenant.FromContext(ctx) + if !ok { + id = "default" + } + + if len(req.Name) == 0 { + return errors.BadRequest("stream.createchannel", "name is blank") + } + + domain.CreateChannel(path.Join(id, req.Name), req.Description) + + return nil +} + func (s *Stream) SendMessage(ctx context.Context, req *pb.SendMessageRequest, rsp *pb.SendMessageResponse) error { if len(req.Channel) == 0 { return errors.BadRequest("stream.sendmessage", "channel is blank") @@ -93,16 +109,17 @@ func (s *Stream) ListChannels(ctx context.Context, req *pb.ListChannelsRequest, id = "default" } - for channel, active := range domain.ListChannels() { - if !strings.HasPrefix(channel, id+"/") { + for _, channel := range domain.ListChannels() { + if !strings.HasPrefix(channel.Id, id+"/") { continue } - channel = strings.TrimPrefix(channel, id+"/") + name := strings.TrimPrefix(channel.Id, id+"/") rsp.Channels = append(rsp.Channels, &pb.Channel{ - Name: channel, - LastActive: time.Unix(0, active).Format(time.RFC3339Nano), + Name: name, + Description: channel.Description, + LastActive: time.Unix(0, channel.Updated).Format(time.RFC3339Nano), }) } diff --git a/stream/proto/stream.pb.go b/stream/proto/stream.pb.go index 5c01f56..582138e 100644 --- a/stream/proto/stream.pb.go +++ b/stream/proto/stream.pb.go @@ -111,8 +111,10 @@ type Channel struct { // name of the channel Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + // description for the channel + Description string `protobuf:"bytes,2,opt,name=description,proto3" json:"description,omitempty"` // last activity time - LastActive string `protobuf:"bytes,2,opt,name=last_active,json=lastActive,proto3" json:"last_active,omitempty"` + LastActive string `protobuf:"bytes,3,opt,name=last_active,json=lastActive,proto3" json:"last_active,omitempty"` } func (x *Channel) Reset() { @@ -154,6 +156,13 @@ func (x *Channel) GetName() string { return "" } +func (x *Channel) GetDescription() string { + if x != nil { + return x.Description + } + return "" +} + func (x *Channel) GetLastActive() string { if x != nil { return x.LastActive @@ -161,7 +170,104 @@ func (x *Channel) GetLastActive() string { return "" } -// SendMessage a message to the stream. +// Create a channel with a given name and description. Channels are created automatically but +// this allows you to specify a description that's persisted for the lifetime of the channel. +type CreateChannelRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // name of the channel + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + // description for the channel + Description string `protobuf:"bytes,2,opt,name=description,proto3" json:"description,omitempty"` +} + +func (x *CreateChannelRequest) Reset() { + *x = CreateChannelRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_stream_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CreateChannelRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CreateChannelRequest) ProtoMessage() {} + +func (x *CreateChannelRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_stream_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CreateChannelRequest.ProtoReflect.Descriptor instead. +func (*CreateChannelRequest) Descriptor() ([]byte, []int) { + return file_proto_stream_proto_rawDescGZIP(), []int{2} +} + +func (x *CreateChannelRequest) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *CreateChannelRequest) GetDescription() string { + if x != nil { + return x.Description + } + return "" +} + +type CreateChannelResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *CreateChannelResponse) Reset() { + *x = CreateChannelResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_stream_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CreateChannelResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CreateChannelResponse) ProtoMessage() {} + +func (x *CreateChannelResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_stream_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CreateChannelResponse.ProtoReflect.Descriptor instead. +func (*CreateChannelResponse) Descriptor() ([]byte, []int) { + return file_proto_stream_proto_rawDescGZIP(), []int{3} +} + +// Send a message to the stream. type SendMessageRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -176,7 +282,7 @@ type SendMessageRequest struct { func (x *SendMessageRequest) Reset() { *x = SendMessageRequest{} if protoimpl.UnsafeEnabled { - mi := &file_proto_stream_proto_msgTypes[2] + mi := &file_proto_stream_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -189,7 +295,7 @@ func (x *SendMessageRequest) String() string { func (*SendMessageRequest) ProtoMessage() {} func (x *SendMessageRequest) ProtoReflect() protoreflect.Message { - mi := &file_proto_stream_proto_msgTypes[2] + mi := &file_proto_stream_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -202,7 +308,7 @@ func (x *SendMessageRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use SendMessageRequest.ProtoReflect.Descriptor instead. func (*SendMessageRequest) Descriptor() ([]byte, []int) { - return file_proto_stream_proto_rawDescGZIP(), []int{2} + return file_proto_stream_proto_rawDescGZIP(), []int{4} } func (x *SendMessageRequest) GetChannel() string { @@ -228,7 +334,7 @@ type SendMessageResponse struct { func (x *SendMessageResponse) Reset() { *x = SendMessageResponse{} if protoimpl.UnsafeEnabled { - mi := &file_proto_stream_proto_msgTypes[3] + mi := &file_proto_stream_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -241,7 +347,7 @@ func (x *SendMessageResponse) String() string { func (*SendMessageResponse) ProtoMessage() {} func (x *SendMessageResponse) ProtoReflect() protoreflect.Message { - mi := &file_proto_stream_proto_msgTypes[3] + mi := &file_proto_stream_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -254,7 +360,7 @@ func (x *SendMessageResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use SendMessageResponse.ProtoReflect.Descriptor instead. func (*SendMessageResponse) Descriptor() ([]byte, []int) { - return file_proto_stream_proto_rawDescGZIP(), []int{3} + return file_proto_stream_proto_rawDescGZIP(), []int{5} } // List all the active channels @@ -267,7 +373,7 @@ type ListChannelsRequest struct { func (x *ListChannelsRequest) Reset() { *x = ListChannelsRequest{} if protoimpl.UnsafeEnabled { - mi := &file_proto_stream_proto_msgTypes[4] + mi := &file_proto_stream_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -280,7 +386,7 @@ func (x *ListChannelsRequest) String() string { func (*ListChannelsRequest) ProtoMessage() {} func (x *ListChannelsRequest) ProtoReflect() protoreflect.Message { - mi := &file_proto_stream_proto_msgTypes[4] + mi := &file_proto_stream_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -293,7 +399,7 @@ func (x *ListChannelsRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use ListChannelsRequest.ProtoReflect.Descriptor instead. func (*ListChannelsRequest) Descriptor() ([]byte, []int) { - return file_proto_stream_proto_rawDescGZIP(), []int{4} + return file_proto_stream_proto_rawDescGZIP(), []int{6} } type ListChannelsResponse struct { @@ -307,7 +413,7 @@ type ListChannelsResponse struct { func (x *ListChannelsResponse) Reset() { *x = ListChannelsResponse{} if protoimpl.UnsafeEnabled { - mi := &file_proto_stream_proto_msgTypes[5] + mi := &file_proto_stream_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -320,7 +426,7 @@ func (x *ListChannelsResponse) String() string { func (*ListChannelsResponse) ProtoMessage() {} func (x *ListChannelsResponse) ProtoReflect() protoreflect.Message { - mi := &file_proto_stream_proto_msgTypes[5] + mi := &file_proto_stream_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -333,7 +439,7 @@ func (x *ListChannelsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ListChannelsResponse.ProtoReflect.Descriptor instead. func (*ListChannelsResponse) Descriptor() ([]byte, []int) { - return file_proto_stream_proto_rawDescGZIP(), []int{5} + return file_proto_stream_proto_rawDescGZIP(), []int{7} } func (x *ListChannelsResponse) GetChannels() []*Channel { @@ -358,7 +464,7 @@ type ListMessagesRequest struct { func (x *ListMessagesRequest) Reset() { *x = ListMessagesRequest{} if protoimpl.UnsafeEnabled { - mi := &file_proto_stream_proto_msgTypes[6] + mi := &file_proto_stream_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -371,7 +477,7 @@ func (x *ListMessagesRequest) String() string { func (*ListMessagesRequest) ProtoMessage() {} func (x *ListMessagesRequest) ProtoReflect() protoreflect.Message { - mi := &file_proto_stream_proto_msgTypes[6] + mi := &file_proto_stream_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -384,7 +490,7 @@ func (x *ListMessagesRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use ListMessagesRequest.ProtoReflect.Descriptor instead. func (*ListMessagesRequest) Descriptor() ([]byte, []int) { - return file_proto_stream_proto_rawDescGZIP(), []int{6} + return file_proto_stream_proto_rawDescGZIP(), []int{8} } func (x *ListMessagesRequest) GetChannel() string { @@ -408,14 +514,14 @@ type ListMessagesResponse struct { // The channel subscribed to Channel string `protobuf:"bytes,1,opt,name=channel,proto3" json:"channel,omitempty"` - // Messages are returned in reverse order; latest first + // Messages are chronological order Messages []*Message `protobuf:"bytes,2,rep,name=messages,proto3" json:"messages,omitempty"` } func (x *ListMessagesResponse) Reset() { *x = ListMessagesResponse{} if protoimpl.UnsafeEnabled { - mi := &file_proto_stream_proto_msgTypes[7] + mi := &file_proto_stream_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -428,7 +534,7 @@ func (x *ListMessagesResponse) String() string { func (*ListMessagesResponse) ProtoMessage() {} func (x *ListMessagesResponse) ProtoReflect() protoreflect.Message { - mi := &file_proto_stream_proto_msgTypes[7] + mi := &file_proto_stream_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -441,7 +547,7 @@ func (x *ListMessagesResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ListMessagesResponse.ProtoReflect.Descriptor instead. func (*ListMessagesResponse) Descriptor() ([]byte, []int) { - return file_proto_stream_proto_rawDescGZIP(), []int{7} + return file_proto_stream_proto_rawDescGZIP(), []int{9} } func (x *ListMessagesResponse) GetChannel() string { @@ -476,50 +582,63 @@ var file_proto_stream_proto_rawDesc = []byte{ 0x3b, 0x0a, 0x0d, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x3e, 0x0a, 0x07, + 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x60, 0x0a, 0x07, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x6c, - 0x61, 0x73, 0x74, 0x5f, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x0a, 0x6c, 0x61, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x22, 0x42, 0x0a, 0x12, - 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x12, 0x0a, 0x04, - 0x74, 0x65, 0x78, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x65, 0x78, 0x74, - 0x22, 0x15, 0x0a, 0x13, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x15, 0x0a, 0x13, 0x4c, 0x69, 0x73, 0x74, 0x43, - 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x43, - 0x0a, 0x14, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x73, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2b, 0x0a, 0x08, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, - 0x6c, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x52, 0x08, 0x63, 0x68, 0x61, 0x6e, 0x6e, - 0x65, 0x6c, 0x73, 0x22, 0x45, 0x0a, 0x13, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x68, - 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x68, 0x61, - 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x05, 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x22, 0x5d, 0x0a, 0x14, 0x4c, 0x69, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x64, + 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x0a, + 0x0b, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0a, 0x6c, 0x61, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x22, 0x4c, + 0x0a, 0x14, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x64, 0x65, + 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x17, 0x0a, 0x15, + 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x42, 0x0a, 0x12, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x63, + 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x68, + 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, 0x78, 0x74, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x65, 0x78, 0x74, 0x22, 0x15, 0x0a, 0x13, 0x53, 0x65, 0x6e, + 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x22, 0x15, 0x0a, 0x13, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x73, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x43, 0x0a, 0x14, 0x4c, 0x69, 0x73, 0x74, 0x43, + 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x2b, 0x0a, 0x08, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, + 0x0b, 0x32, 0x0f, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x6e, + 0x65, 0x6c, 0x52, 0x08, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x73, 0x22, 0x45, 0x0a, 0x13, + 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x14, 0x0a, + 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x6c, 0x69, + 0x6d, 0x69, 0x74, 0x22, 0x5d, 0x0a, 0x14, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, + 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x68, + 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x2b, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x73, 0x32, 0xbc, 0x02, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x4e, 0x0a, + 0x0d, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x1c, + 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x68, + 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x73, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x68, 0x61, 0x6e, + 0x6e, 0x65, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x48, 0x0a, + 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1a, 0x2e, 0x73, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4b, 0x0a, 0x0c, 0x4c, 0x69, 0x73, 0x74, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x12, 0x1b, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x2b, 0x0a, 0x08, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, - 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, - 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x32, 0xec, 0x01, 0x0a, 0x06, 0x53, 0x74, - 0x72, 0x65, 0x61, 0x6d, 0x12, 0x48, 0x0a, 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x65, 0x12, 0x1a, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x53, 0x65, 0x6e, - 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x1b, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4b, - 0x0a, 0x0c, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x12, 0x1b, - 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x73, 0x74, - 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4b, 0x0a, 0x0c, 0x4c, - 0x69, 0x73, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x73, 0x12, 0x1b, 0x2e, 0x73, 0x74, - 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, - 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x73, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x10, 0x5a, 0x0e, 0x2e, 0x2f, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x3b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x73, 0x65, 0x22, 0x00, 0x12, 0x4b, 0x0a, 0x0c, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x68, 0x61, 0x6e, + 0x6e, 0x65, 0x6c, 0x73, 0x12, 0x1b, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x4c, 0x69, + 0x73, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x1c, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x43, + 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x00, 0x42, 0x10, 0x5a, 0x0e, 0x2e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x73, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -534,33 +653,37 @@ func file_proto_stream_proto_rawDescGZIP() []byte { return file_proto_stream_proto_rawDescData } -var file_proto_stream_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_proto_stream_proto_msgTypes = make([]protoimpl.MessageInfo, 11) var file_proto_stream_proto_goTypes = []interface{}{ - (*Message)(nil), // 0: stream.Message - (*Channel)(nil), // 1: stream.Channel - (*SendMessageRequest)(nil), // 2: stream.SendMessageRequest - (*SendMessageResponse)(nil), // 3: stream.SendMessageResponse - (*ListChannelsRequest)(nil), // 4: stream.ListChannelsRequest - (*ListChannelsResponse)(nil), // 5: stream.ListChannelsResponse - (*ListMessagesRequest)(nil), // 6: stream.ListMessagesRequest - (*ListMessagesResponse)(nil), // 7: stream.ListMessagesResponse - nil, // 8: stream.Message.MetadataEntry + (*Message)(nil), // 0: stream.Message + (*Channel)(nil), // 1: stream.Channel + (*CreateChannelRequest)(nil), // 2: stream.CreateChannelRequest + (*CreateChannelResponse)(nil), // 3: stream.CreateChannelResponse + (*SendMessageRequest)(nil), // 4: stream.SendMessageRequest + (*SendMessageResponse)(nil), // 5: stream.SendMessageResponse + (*ListChannelsRequest)(nil), // 6: stream.ListChannelsRequest + (*ListChannelsResponse)(nil), // 7: stream.ListChannelsResponse + (*ListMessagesRequest)(nil), // 8: stream.ListMessagesRequest + (*ListMessagesResponse)(nil), // 9: stream.ListMessagesResponse + nil, // 10: stream.Message.MetadataEntry } var file_proto_stream_proto_depIdxs = []int32{ - 8, // 0: stream.Message.metadata:type_name -> stream.Message.MetadataEntry - 1, // 1: stream.ListChannelsResponse.channels:type_name -> stream.Channel - 0, // 2: stream.ListMessagesResponse.messages:type_name -> stream.Message - 2, // 3: stream.Stream.SendMessage:input_type -> stream.SendMessageRequest - 6, // 4: stream.Stream.ListMessages:input_type -> stream.ListMessagesRequest - 4, // 5: stream.Stream.ListChannels:input_type -> stream.ListChannelsRequest - 3, // 6: stream.Stream.SendMessage:output_type -> stream.SendMessageResponse - 7, // 7: stream.Stream.ListMessages:output_type -> stream.ListMessagesResponse - 5, // 8: stream.Stream.ListChannels:output_type -> stream.ListChannelsResponse - 6, // [6:9] is the sub-list for method output_type - 3, // [3:6] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 10, // 0: stream.Message.metadata:type_name -> stream.Message.MetadataEntry + 1, // 1: stream.ListChannelsResponse.channels:type_name -> stream.Channel + 0, // 2: stream.ListMessagesResponse.messages:type_name -> stream.Message + 2, // 3: stream.Stream.CreateChannel:input_type -> stream.CreateChannelRequest + 4, // 4: stream.Stream.SendMessage:input_type -> stream.SendMessageRequest + 8, // 5: stream.Stream.ListMessages:input_type -> stream.ListMessagesRequest + 6, // 6: stream.Stream.ListChannels:input_type -> stream.ListChannelsRequest + 3, // 7: stream.Stream.CreateChannel:output_type -> stream.CreateChannelResponse + 5, // 8: stream.Stream.SendMessage:output_type -> stream.SendMessageResponse + 9, // 9: stream.Stream.ListMessages:output_type -> stream.ListMessagesResponse + 7, // 10: stream.Stream.ListChannels:output_type -> stream.ListChannelsResponse + 7, // [7:11] is the sub-list for method output_type + 3, // [3:7] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_proto_stream_proto_init() } @@ -594,7 +717,7 @@ func file_proto_stream_proto_init() { } } file_proto_stream_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*SendMessageRequest); i { + switch v := v.(*CreateChannelRequest); i { case 0: return &v.state case 1: @@ -606,7 +729,7 @@ func file_proto_stream_proto_init() { } } file_proto_stream_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*SendMessageResponse); i { + switch v := v.(*CreateChannelResponse); i { case 0: return &v.state case 1: @@ -618,7 +741,7 @@ func file_proto_stream_proto_init() { } } file_proto_stream_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ListChannelsRequest); i { + switch v := v.(*SendMessageRequest); i { case 0: return &v.state case 1: @@ -630,7 +753,7 @@ func file_proto_stream_proto_init() { } } file_proto_stream_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ListChannelsResponse); i { + switch v := v.(*SendMessageResponse); i { case 0: return &v.state case 1: @@ -642,7 +765,7 @@ func file_proto_stream_proto_init() { } } file_proto_stream_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ListMessagesRequest); i { + switch v := v.(*ListChannelsRequest); i { case 0: return &v.state case 1: @@ -654,6 +777,30 @@ func file_proto_stream_proto_init() { } } file_proto_stream_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListChannelsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_stream_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListMessagesRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_stream_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ListMessagesResponse); i { case 0: return &v.state @@ -672,7 +819,7 @@ func file_proto_stream_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_proto_stream_proto_rawDesc, NumEnums: 0, - NumMessages: 9, + NumMessages: 11, NumExtensions: 0, NumServices: 1, }, diff --git a/stream/proto/stream.pb.micro.go b/stream/proto/stream.pb.micro.go index 40ab904..08a1bd4 100644 --- a/stream/proto/stream.pb.micro.go +++ b/stream/proto/stream.pb.micro.go @@ -42,6 +42,7 @@ func NewStreamEndpoints() []*api.Endpoint { // Client API for Stream service type StreamService interface { + CreateChannel(ctx context.Context, in *CreateChannelRequest, opts ...client.CallOption) (*CreateChannelResponse, error) SendMessage(ctx context.Context, in *SendMessageRequest, opts ...client.CallOption) (*SendMessageResponse, error) ListMessages(ctx context.Context, in *ListMessagesRequest, opts ...client.CallOption) (*ListMessagesResponse, error) ListChannels(ctx context.Context, in *ListChannelsRequest, opts ...client.CallOption) (*ListChannelsResponse, error) @@ -59,6 +60,16 @@ func NewStreamService(name string, c client.Client) StreamService { } } +func (c *streamService) CreateChannel(ctx context.Context, in *CreateChannelRequest, opts ...client.CallOption) (*CreateChannelResponse, error) { + req := c.c.NewRequest(c.name, "Stream.CreateChannel", in) + out := new(CreateChannelResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *streamService) SendMessage(ctx context.Context, in *SendMessageRequest, opts ...client.CallOption) (*SendMessageResponse, error) { req := c.c.NewRequest(c.name, "Stream.SendMessage", in) out := new(SendMessageResponse) @@ -92,6 +103,7 @@ func (c *streamService) ListChannels(ctx context.Context, in *ListChannelsReques // Server API for Stream service type StreamHandler interface { + CreateChannel(context.Context, *CreateChannelRequest, *CreateChannelResponse) error SendMessage(context.Context, *SendMessageRequest, *SendMessageResponse) error ListMessages(context.Context, *ListMessagesRequest, *ListMessagesResponse) error ListChannels(context.Context, *ListChannelsRequest, *ListChannelsResponse) error @@ -99,6 +111,7 @@ type StreamHandler interface { func RegisterStreamHandler(s server.Server, hdlr StreamHandler, opts ...server.HandlerOption) error { type stream interface { + CreateChannel(ctx context.Context, in *CreateChannelRequest, out *CreateChannelResponse) error SendMessage(ctx context.Context, in *SendMessageRequest, out *SendMessageResponse) error ListMessages(ctx context.Context, in *ListMessagesRequest, out *ListMessagesResponse) error ListChannels(ctx context.Context, in *ListChannelsRequest, out *ListChannelsResponse) error @@ -114,6 +127,10 @@ type streamHandler struct { StreamHandler } +func (h *streamHandler) CreateChannel(ctx context.Context, in *CreateChannelRequest, out *CreateChannelResponse) error { + return h.StreamHandler.CreateChannel(ctx, in, out) +} + func (h *streamHandler) SendMessage(ctx context.Context, in *SendMessageRequest, out *SendMessageResponse) error { return h.StreamHandler.SendMessage(ctx, in, out) } diff --git a/stream/proto/stream.proto b/stream/proto/stream.proto index b6b1827..ed6a52d 100644 --- a/stream/proto/stream.proto +++ b/stream/proto/stream.proto @@ -5,6 +5,7 @@ package stream; option go_package = "./proto;stream"; service Stream { + rpc CreateChannel(CreateChannelRequest) returns (CreateChannelResponse) {} rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {} rpc ListMessages(ListMessagesRequest) returns (ListMessagesResponse) {} rpc ListChannels(ListChannelsRequest) returns (ListChannelsResponse) {} @@ -26,10 +27,23 @@ message Message { message Channel { // name of the channel string name = 1; + // description for the channel + string description = 2; // last activity time - string last_active = 2; + string last_active = 3; } +// Create a channel with a given name and description. Channels are created automatically but +// this allows you to specify a description that's persisted for the lifetime of the channel. +message CreateChannelRequest { + // name of the channel + string name = 1; + // description for the channel + string description = 2; +} + +message CreateChannelResponse {} + // Send a message to the stream. message SendMessageRequest { // The channel to send to