diff --git a/streams/.gitignore b/streams/.gitignore new file mode 100644 index 0000000..4a47005 --- /dev/null +++ b/streams/.gitignore @@ -0,0 +1,2 @@ + +streams diff --git a/streams/Dockerfile b/streams/Dockerfile new file mode 100644 index 0000000..0289dae --- /dev/null +++ b/streams/Dockerfile @@ -0,0 +1,3 @@ +FROM alpine +ADD streams /streams +ENTRYPOINT [ "/streams" ] diff --git a/streams/Makefile b/streams/Makefile new file mode 100644 index 0000000..b9a9ab8 --- /dev/null +++ b/streams/Makefile @@ -0,0 +1,22 @@ + +GOPATH:=$(shell go env GOPATH) +.PHONY: init +init: + go get -u github.com/golang/protobuf/proto + go get -u github.com/golang/protobuf/protoc-gen-go + go get github.com/micro/micro/v3/cmd/protoc-gen-micro +.PHONY: proto +proto: + protoc --proto_path=. --micro_out=. --go_out=:. proto/streams.proto + +.PHONY: build +build: + go build -o streams *.go + +.PHONY: test +test: + go test -v ./... -cover + +.PHONY: docker +docker: + docker build . -t streams:latest diff --git a/streams/README.md b/streams/README.md new file mode 100644 index 0000000..10c3fd3 --- /dev/null +++ b/streams/README.md @@ -0,0 +1,23 @@ +# Streams Service + +This is the Streams service + +Generated with + +``` +micro new streams +``` + +## Usage + +Generate the proto code + +``` +make proto +``` + +Run the service + +``` +micro run . +``` \ No newline at end of file diff --git a/streams/generate.go b/streams/generate.go new file mode 100644 index 0000000..96f431a --- /dev/null +++ b/streams/generate.go @@ -0,0 +1,2 @@ +package main +//go:generate make proto diff --git a/streams/handler/create_conversation.go b/streams/handler/create_conversation.go new file mode 100644 index 0000000..9d449e3 --- /dev/null +++ b/streams/handler/create_conversation.go @@ -0,0 +1,37 @@ +package handler + +import ( + "context" + + "github.com/google/uuid" + "github.com/micro/micro/v3/service/errors" + "github.com/micro/micro/v3/service/logger" + pb "github.com/micro/services/streams/proto" +) + +// Create a conversation +func (s *Streams) CreateConversation(ctx context.Context, req *pb.CreateConversationRequest, rsp *pb.CreateConversationResponse) error { + // validate the request + if len(req.GroupId) == 0 { + return ErrMissingGroupID + } + if len(req.Topic) == 0 { + return ErrMissingTopic + } + + // write the conversation to the database + conv := &Conversation{ + ID: uuid.New().String(), + Topic: req.Topic, + GroupID: req.GroupId, + CreatedAt: s.Time(), + } + if err := s.DB.Create(conv).Error; err != nil { + logger.Errorf("Error creating conversation: %v", err) + return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database") + } + + // serialize the response + rsp.Conversation = conv.Serialize() + return nil +} diff --git a/streams/handler/create_conversation_test.go b/streams/handler/create_conversation_test.go new file mode 100644 index 0000000..879276a --- /dev/null +++ b/streams/handler/create_conversation_test.go @@ -0,0 +1,60 @@ +package handler_test + +import ( + "context" + "testing" + + "github.com/micro/services/streams/handler" + pb "github.com/micro/services/streams/proto" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" +) + +func TestCreateConversation(t *testing.T) { + tt := []struct { + Name string + GroupID string + Topic string + Error error + }{ + { + Name: "MissingGroupID", + Topic: "HelloWorld", + Error: handler.ErrMissingGroupID, + }, + { + Name: "MissingTopic", + GroupID: uuid.New().String(), + Error: handler.ErrMissingTopic, + }, + { + Name: "Valid", + GroupID: uuid.New().String(), + Topic: "HelloWorld", + }, + } + + h := testHandler(t) + for _, tc := range tt { + t.Run(tc.Name, func(t *testing.T) { + var rsp pb.CreateConversationResponse + err := h.CreateConversation(context.TODO(), &pb.CreateConversationRequest{ + Topic: tc.Topic, GroupId: tc.GroupID, + }, &rsp) + + assert.Equal(t, tc.Error, err) + if tc.Error != nil { + assert.Nil(t, rsp.Conversation) + return + } + + assertConversationsMatch(t, &pb.Conversation{ + CreatedAt: timestamppb.New(h.Time()), + GroupId: tc.GroupID, + Topic: tc.Topic, + }, rsp.Conversation) + }) + } +} diff --git a/streams/handler/create_message.go b/streams/handler/create_message.go new file mode 100644 index 0000000..9f42ab0 --- /dev/null +++ b/streams/handler/create_message.go @@ -0,0 +1,53 @@ +package handler + +import ( + "context" + + "github.com/google/uuid" + "github.com/micro/micro/v3/service/errors" + "github.com/micro/micro/v3/service/logger" + pb "github.com/micro/services/streams/proto" + "gorm.io/gorm" +) + +// Create a message within a conversation +func (s *Streams) CreateMessage(ctx context.Context, req *pb.CreateMessageRequest, rsp *pb.CreateMessageResponse) error { + // validate the request + if len(req.AuthorId) == 0 { + return ErrMissingAuthorID + } + if len(req.ConversationId) == 0 { + return ErrMissingConversationID + } + if len(req.Text) == 0 { + return ErrMissingText + } + + return s.DB.Transaction(func(tx *gorm.DB) error { + // lookup the conversation + var conv Conversation + if err := s.DB.Where(&Conversation{ID: req.ConversationId}).First(&conv).Error; err == gorm.ErrRecordNotFound { + return ErrNotFound + } else if err != nil { + logger.Errorf("Error reading conversation: %v", err) + return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database") + } + + // create the message + msg := &Message{ + ID: uuid.New().String(), + SentAt: s.Time(), + Text: req.Text, + AuthorID: req.AuthorId, + ConversationID: req.ConversationId, + } + if err := s.DB.Create(msg).Error; err != nil { + logger.Errorf("Error creating message: %v", err) + return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database") + } + + // serialize the response + rsp.Message = msg.Serialize() + return nil + }) +} diff --git a/streams/handler/create_message_test.go b/streams/handler/create_message_test.go new file mode 100644 index 0000000..e8a20a2 --- /dev/null +++ b/streams/handler/create_message_test.go @@ -0,0 +1,89 @@ +package handler_test + +import ( + "context" + "testing" + + "github.com/micro/services/streams/handler" + pb "github.com/micro/services/streams/proto" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" +) + +func TestCreateMessage(t *testing.T) { + h := testHandler(t) + + // seed some data + var cRsp pb.CreateConversationResponse + err := h.CreateConversation(context.TODO(), &pb.CreateConversationRequest{ + Topic: "HelloWorld", GroupId: uuid.New().String(), + }, &cRsp) + if err != nil { + t.Fatalf("Error creating conversation: %v", err) + return + } + + tt := []struct { + Name string + AuthorID string + ConversationID string + Text string + Error error + }{ + { + Name: "MissingConversationID", + Text: "HelloWorld", + AuthorID: uuid.New().String(), + Error: handler.ErrMissingConversationID, + }, + { + Name: "MissingAuthorID", + ConversationID: uuid.New().String(), + Text: "HelloWorld", + Error: handler.ErrMissingAuthorID, + }, + { + Name: "MissingText", + ConversationID: uuid.New().String(), + AuthorID: uuid.New().String(), + Error: handler.ErrMissingText, + }, + { + Name: "ConversationNotFound", + ConversationID: uuid.New().String(), + AuthorID: uuid.New().String(), + Text: "HelloWorld", + Error: handler.ErrNotFound, + }, + { + Name: "Valid", + ConversationID: cRsp.Conversation.Id, + AuthorID: uuid.New().String(), + Text: "HelloWorld", + }, + } + + for _, tc := range tt { + t.Run(tc.Name, func(t *testing.T) { + var rsp pb.CreateMessageResponse + err := h.CreateMessage(context.TODO(), &pb.CreateMessageRequest{ + Text: tc.Text, ConversationId: tc.ConversationID, AuthorId: tc.AuthorID, + }, &rsp) + + assert.Equal(t, tc.Error, err) + if tc.Error != nil { + assert.Nil(t, rsp.Message) + return + } + + assertMessagesMatch(t, &pb.Message{ + AuthorId: tc.AuthorID, + ConversationId: tc.ConversationID, + SentAt: timestamppb.New(h.Time()), + Text: tc.Text, + }, rsp.Message) + }) + } +} diff --git a/streams/handler/delete_conversation.go b/streams/handler/delete_conversation.go new file mode 100644 index 0000000..0b0ca39 --- /dev/null +++ b/streams/handler/delete_conversation.go @@ -0,0 +1,34 @@ +package handler + +import ( + "context" + + "github.com/micro/micro/v3/service/errors" + "github.com/micro/micro/v3/service/logger" + pb "github.com/micro/services/streams/proto" + "gorm.io/gorm" +) + +// Delete a conversation and all the messages within +func (s *Streams) DeleteConversation(ctx context.Context, req *pb.DeleteConversationRequest, rsp *pb.DeleteConversationResponse) error { + // validate the request + if len(req.Id) == 0 { + return ErrMissingID + } + + return s.DB.Transaction(func(tx *gorm.DB) error { + // delete all the messages + if err := tx.Where(&Message{ConversationID: req.Id}).Delete(&Message{}).Error; err != nil { + logger.Errorf("Error deleting messages: %v", err) + return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database") + } + + // delete the conversation + if err := tx.Where(&Conversation{ID: req.Id}).Delete(&Conversation{}).Error; err != nil { + logger.Errorf("Error deleting conversation: %v", err) + return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database") + } + + return nil + }) +} diff --git a/streams/handler/delete_conversation_test.go b/streams/handler/delete_conversation_test.go new file mode 100644 index 0000000..a6281d0 --- /dev/null +++ b/streams/handler/delete_conversation_test.go @@ -0,0 +1,50 @@ +package handler_test + +import ( + "context" + "testing" + + "github.com/micro/services/streams/handler" + pb "github.com/micro/services/streams/proto" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" +) + +func TestDeleteConversation(t *testing.T) { + h := testHandler(t) + + // seed some data + var cRsp pb.CreateConversationResponse + err := h.CreateConversation(context.TODO(), &pb.CreateConversationRequest{ + Topic: "HelloWorld", GroupId: uuid.New().String(), + }, &cRsp) + if err != nil { + t.Fatalf("Error creating conversation: %v", err) + return + } + + t.Run("MissingID", func(t *testing.T) { + err := h.DeleteConversation(context.TODO(), &pb.DeleteConversationRequest{}, &pb.DeleteConversationResponse{}) + assert.Equal(t, handler.ErrMissingID, err) + }) + + t.Run("Valid", func(t *testing.T) { + err := h.DeleteConversation(context.TODO(), &pb.DeleteConversationRequest{ + Id: cRsp.Conversation.Id, + }, &pb.DeleteConversationResponse{}) + assert.NoError(t, err) + + err = h.ReadConversation(context.TODO(), &pb.ReadConversationRequest{ + Id: cRsp.Conversation.Id, + }, &pb.ReadConversationResponse{}) + assert.Equal(t, handler.ErrNotFound, err) + }) + + t.Run("Retry", func(t *testing.T) { + err := h.DeleteConversation(context.TODO(), &pb.DeleteConversationRequest{ + Id: cRsp.Conversation.Id, + }, &pb.DeleteConversationResponse{}) + assert.NoError(t, err) + }) +} diff --git a/streams/handler/list_conversations.go b/streams/handler/list_conversations.go new file mode 100644 index 0000000..d71de98 --- /dev/null +++ b/streams/handler/list_conversations.go @@ -0,0 +1,31 @@ +package handler + +import ( + "context" + + "github.com/micro/micro/v3/service/errors" + "github.com/micro/micro/v3/service/logger" + pb "github.com/micro/services/streams/proto" +) + +// List all the conversations for a group +func (s *Streams) ListConversations(ctx context.Context, req *pb.ListConversationsRequest, rsp *pb.ListConversationsResponse) error { + // validate the request + if len(req.GroupId) == 0 { + return ErrMissingGroupID + } + + // query the database + var convs []Conversation + if err := s.DB.Where(&Conversation{GroupID: req.GroupId}).Find(&convs).Error; err != nil { + logger.Errorf("Error reading conversation: %v", err) + return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database") + } + + // serialize the response + rsp.Conversations = make([]*pb.Conversation, len(convs)) + for i, c := range convs { + rsp.Conversations[i] = c.Serialize() + } + return nil +} diff --git a/streams/handler/list_conversations_test.go b/streams/handler/list_conversations_test.go new file mode 100644 index 0000000..44189aa --- /dev/null +++ b/streams/handler/list_conversations_test.go @@ -0,0 +1,55 @@ +package handler_test + +import ( + "context" + "testing" + + "github.com/google/uuid" + "github.com/micro/services/streams/handler" + pb "github.com/micro/services/streams/proto" + "github.com/stretchr/testify/assert" +) + +func TestListConversations(t *testing.T) { + h := testHandler(t) + + // seed some data + var cRsp1 pb.CreateConversationResponse + err := h.CreateConversation(context.TODO(), &pb.CreateConversationRequest{ + Topic: "HelloWorld", GroupId: uuid.New().String(), + }, &cRsp1) + if err != nil { + t.Fatalf("Error creating conversation: %v", err) + return + } + var cRsp2 pb.CreateConversationResponse + err = h.CreateConversation(context.TODO(), &pb.CreateConversationRequest{ + Topic: "FooBar", GroupId: uuid.New().String(), + }, &cRsp2) + if err != nil { + t.Fatalf("Error creating conversation: %v", err) + return + } + + t.Run("MissingGroupID", func(t *testing.T) { + var rsp pb.ListConversationsResponse + err := h.ListConversations(context.TODO(), &pb.ListConversationsRequest{}, &rsp) + assert.Equal(t, handler.ErrMissingGroupID, err) + assert.Nil(t, rsp.Conversations) + }) + + t.Run("Valid", func(t *testing.T) { + var rsp pb.ListConversationsResponse + err := h.ListConversations(context.TODO(), &pb.ListConversationsRequest{ + GroupId: cRsp1.Conversation.GroupId, + }, &rsp) + + assert.NoError(t, err) + if len(rsp.Conversations) != 1 { + t.Fatalf("Expected 1 conversation to be returned, got %v", len(rsp.Conversations)) + return + } + + assertConversationsMatch(t, cRsp1.Conversation, rsp.Conversations[0]) + }) +} diff --git a/streams/handler/list_messages.go b/streams/handler/list_messages.go new file mode 100644 index 0000000..adefab1 --- /dev/null +++ b/streams/handler/list_messages.go @@ -0,0 +1,45 @@ +package handler + +import ( + "context" + + "github.com/micro/micro/v3/service/errors" + "github.com/micro/micro/v3/service/logger" + pb "github.com/micro/services/streams/proto" +) + +const DefaultLimit = 25 + +// List the messages within a conversation in reverse chronological order, using sent_before to +// offset as older messages need to be loaded +func (s *Streams) ListMessages(ctx context.Context, req *pb.ListMessagesRequest, rsp *pb.ListMessagesResponse) error { + // validate the request + if len(req.ConversationId) == 0 { + return ErrMissingConversationID + } + + // construct the query + q := s.DB.Where(&Message{ConversationID: req.ConversationId}).Order("sent_at DESC") + if req.SentBefore != nil { + q = q.Where("sent_at < ?", req.SentBefore.AsTime()) + } + if req.Limit != nil { + q.Limit(int(req.Limit.Value)) + } else { + q.Limit(DefaultLimit) + } + + // execute the query + var msgs []Message + if err := q.Find(&msgs).Error; err != nil { + logger.Errorf("Error reading messages: %v", err) + return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database") + } + + // serialize the response + rsp.Messages = make([]*pb.Message, len(msgs)) + for i, m := range msgs { + rsp.Messages[i] = m.Serialize() + } + return nil +} diff --git a/streams/handler/list_messages_test.go b/streams/handler/list_messages_test.go new file mode 100644 index 0000000..650b998 --- /dev/null +++ b/streams/handler/list_messages_test.go @@ -0,0 +1,116 @@ +package handler_test + +import ( + "context" + "sort" + "strconv" + "testing" + "time" + + "github.com/google/uuid" + "github.com/micro/services/streams/handler" + pb "github.com/micro/services/streams/proto" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/wrapperspb" +) + +func TestListMessages(t *testing.T) { + h := testHandler(t) + h.Time = time.Now + + // seed some data + var convRsp pb.CreateConversationResponse + err := h.CreateConversation(context.TODO(), &pb.CreateConversationRequest{ + Topic: "TestListMessages", GroupId: uuid.New().String(), + }, &convRsp) + assert.NoError(t, err) + if err != nil { + return + } + + msgs := make([]*pb.Message, 50) + for i := 0; i < len(msgs); i++ { + var rsp pb.CreateMessageResponse + err := h.CreateMessage(context.TODO(), &pb.CreateMessageRequest{ + ConversationId: convRsp.Conversation.Id, + AuthorId: uuid.New().String(), + Text: strconv.Itoa(i), + }, &rsp) + assert.NoError(t, err) + msgs[i] = rsp.Message + } + + t.Run("MissingConversationID", func(t *testing.T) { + var rsp pb.ListMessagesResponse + err := h.ListMessages(context.TODO(), &pb.ListMessagesRequest{}, &rsp) + assert.Equal(t, handler.ErrMissingConversationID, err) + assert.Nil(t, rsp.Messages) + }) + + t.Run("NoOffset", func(t *testing.T) { + var rsp pb.ListMessagesResponse + err := h.ListMessages(context.TODO(), &pb.ListMessagesRequest{ + ConversationId: convRsp.Conversation.Id, + }, &rsp) + assert.NoError(t, err) + + if len(rsp.Messages) != handler.DefaultLimit { + t.Fatalf("Expected %v messages but got %v", handler.DefaultLimit, len(rsp.Messages)) + return + } + expected := msgs[25:] + sortMessages(rsp.Messages) + for i, msg := range rsp.Messages { + assertMessagesMatch(t, expected[i], msg) + } + }) + + t.Run("LimitSet", func(t *testing.T) { + var rsp pb.ListMessagesResponse + err := h.ListMessages(context.TODO(), &pb.ListMessagesRequest{ + ConversationId: convRsp.Conversation.Id, + Limit: &wrapperspb.Int32Value{Value: 10}, + }, &rsp) + assert.NoError(t, err) + + if len(rsp.Messages) != 10 { + t.Fatalf("Expected %v messages but got %v", 10, len(rsp.Messages)) + return + } + expected := msgs[40:] + sortMessages(rsp.Messages) + for i, msg := range rsp.Messages { + assertMessagesMatch(t, expected[i], msg) + } + }) + + t.Run("OffsetAndLimit", func(t *testing.T) { + var rsp pb.ListMessagesResponse + err := h.ListMessages(context.TODO(), &pb.ListMessagesRequest{ + ConversationId: convRsp.Conversation.Id, + Limit: &wrapperspb.Int32Value{Value: 5}, + SentBefore: msgs[20].SentAt, + }, &rsp) + assert.NoError(t, err) + + if len(rsp.Messages) != 5 { + t.Fatalf("Expected %v messages but got %v", 5, len(rsp.Messages)) + return + } + expected := msgs[15:20] + sortMessages(rsp.Messages) + for i, msg := range rsp.Messages { + assertMessagesMatch(t, expected[i], msg) + } + }) +} + +// sortMessages by the time they were sent +func sortMessages(msgs []*pb.Message) { + sort.Slice(msgs, func(i, j int) bool { + if msgs[i].SentAt == nil || msgs[j].SentAt == nil { + return true + } + return msgs[i].SentAt.AsTime().Before(msgs[j].SentAt.AsTime()) + }) +} diff --git a/streams/handler/read_conversation.go b/streams/handler/read_conversation.go new file mode 100644 index 0000000..449cafc --- /dev/null +++ b/streams/handler/read_conversation.go @@ -0,0 +1,38 @@ +package handler + +import ( + "context" + + "gorm.io/gorm" + + "github.com/micro/micro/v3/service/errors" + "github.com/micro/micro/v3/service/logger" + pb "github.com/micro/services/streams/proto" +) + +// Read a conversation using its ID, can filter using group ID if provided +func (s *Streams) ReadConversation(ctx context.Context, req *pb.ReadConversationRequest, rsp *pb.ReadConversationResponse) error { + // validate the request + if len(req.Id) == 0 { + return ErrMissingID + } + + // construct the query + q := Conversation{ID: req.Id} + if req.GroupId != nil { + q.GroupID = req.GroupId.Value + } + + // execute the query + var conv Conversation + if err := s.DB.Where(&q).First(&conv).Error; err == gorm.ErrRecordNotFound { + return ErrNotFound + } else if err != nil { + logger.Errorf("Error reading conversation: %v", err) + return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database") + } + + // serialize the response + rsp.Conversation = conv.Serialize() + return nil +} diff --git a/streams/handler/read_conversation_test.go b/streams/handler/read_conversation_test.go new file mode 100644 index 0000000..542e542 --- /dev/null +++ b/streams/handler/read_conversation_test.go @@ -0,0 +1,70 @@ +package handler_test + +import ( + "context" + "testing" + + "github.com/google/uuid" + "github.com/micro/services/streams/handler" + pb "github.com/micro/services/streams/proto" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/wrapperspb" +) + +func TestReadConversation(t *testing.T) { + h := testHandler(t) + + // seed some data + var cRsp pb.CreateConversationResponse + err := h.CreateConversation(context.TODO(), &pb.CreateConversationRequest{ + Topic: "HelloWorld", GroupId: uuid.New().String(), + }, &cRsp) + if err != nil { + t.Fatalf("Error creating conversation: %v", err) + return + } + + tt := []struct { + Name string + ID string + GroupID *wrapperspb.StringValue + Error error + Result *pb.Conversation + }{ + { + Name: "MissingID", + Error: handler.ErrMissingID, + }, + { + Name: "IncorrectID", + ID: uuid.New().String(), + Error: handler.ErrNotFound, + }, + { + Name: "FoundUsingIDOnly", + ID: cRsp.Conversation.Id, + Result: cRsp.Conversation, + }, + { + Name: "IncorrectGroupID", + ID: cRsp.Conversation.Id, + Error: handler.ErrNotFound, + GroupID: &wrapperspb.StringValue{Value: uuid.New().String()}, + }, + } + for _, tc := range tt { + t.Run(tc.Name, func(t *testing.T) { + var rsp pb.ReadConversationResponse + err := h.ReadConversation(context.TODO(), &pb.ReadConversationRequest{ + Id: tc.ID, GroupId: tc.GroupID, + }, &rsp) + assert.Equal(t, tc.Error, err) + + if tc.Result == nil { + assert.Nil(t, rsp.Conversation) + } else { + assertConversationsMatch(t, tc.Result, rsp.Conversation) + } + }) + } +} diff --git a/streams/handler/recent_messages.go b/streams/handler/recent_messages.go new file mode 100644 index 0000000..b23782f --- /dev/null +++ b/streams/handler/recent_messages.go @@ -0,0 +1,49 @@ +package handler + +import ( + "context" + + "github.com/micro/micro/v3/service/errors" + "github.com/micro/micro/v3/service/logger" + pb "github.com/micro/services/streams/proto" + "gorm.io/gorm" +) + +// RecentMessages returns the most recent messages in a group of conversations. By default the +// most messages retrieved per conversation is 25, however this can be overriden using the +// limit_per_conversation option +func (s *Streams) RecentMessages(ctx context.Context, req *pb.RecentMessagesRequest, rsp *pb.RecentMessagesResponse) error { + // validate the request + if len(req.ConversationIds) == 0 { + return ErrMissingConversationIDs + } + + limit := DefaultLimit + if req.LimitPerConversation != nil { + limit = int(req.LimitPerConversation.Value) + } + + // query the database + var msgs []Message + err := s.DB.Transaction(func(tx *gorm.DB) error { + for _, id := range req.ConversationIds { + var cms []Message + if err := tx.Where(&Message{ConversationID: id}).Order("sent_at DESC").Limit(limit).Find(&cms).Error; err != nil { + return err + } + msgs = append(msgs, cms...) + } + return nil + }) + if err != nil { + logger.Errorf("Error reading messages: %v", err) + return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database") + } + + // serialize the response + rsp.Messages = make([]*pb.Message, len(msgs)) + for i, m := range msgs { + rsp.Messages[i] = m.Serialize() + } + return nil +} diff --git a/streams/handler/recent_messages_test.go b/streams/handler/recent_messages_test.go new file mode 100644 index 0000000..b59ba5a --- /dev/null +++ b/streams/handler/recent_messages_test.go @@ -0,0 +1,101 @@ +package handler_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/google/uuid" + "github.com/micro/services/streams/handler" + pb "github.com/micro/services/streams/proto" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/wrapperspb" +) + +func TestRecentMessages(t *testing.T) { + h := testHandler(t) + h.Time = time.Now + + // seed some data + ids := make([]string, 3) + convos := make(map[string][]*pb.Message, 3) + for i := 0; i < 3; i++ { + var convRsp pb.CreateConversationResponse + err := h.CreateConversation(context.TODO(), &pb.CreateConversationRequest{ + Topic: "TestRecentMessages", GroupId: uuid.New().String(), + }, &convRsp) + assert.NoError(t, err) + if err != nil { + return + } + + convos[convRsp.Conversation.Id] = make([]*pb.Message, 50) + ids[i] = convRsp.Conversation.Id + + for j := 0; j < 50; j++ { + var rsp pb.CreateMessageResponse + err := h.CreateMessage(context.TODO(), &pb.CreateMessageRequest{ + ConversationId: convRsp.Conversation.Id, + AuthorId: uuid.New().String(), + Text: fmt.Sprintf("Conversation %v, Message %v", i, j), + }, &rsp) + assert.NoError(t, err) + convos[convRsp.Conversation.Id][j] = rsp.Message + } + } + + t.Run("MissingConversationIDs", func(t *testing.T) { + var rsp pb.RecentMessagesResponse + err := h.RecentMessages(context.TODO(), &pb.RecentMessagesRequest{}, &rsp) + assert.Equal(t, handler.ErrMissingConversationIDs, err) + assert.Nil(t, rsp.Messages) + }) + + t.Run("LimitSet", func(t *testing.T) { + var rsp pb.RecentMessagesResponse + err := h.RecentMessages(context.TODO(), &pb.RecentMessagesRequest{ + ConversationIds: ids, + LimitPerConversation: &wrapperspb.Int32Value{Value: 10}, + }, &rsp) + assert.NoError(t, err) + + if len(rsp.Messages) != 30 { + t.Fatalf("Expected %v messages but got %v", 30, len(rsp.Messages)) + return + } + var expected []*pb.Message + for _, msgs := range convos { + expected = append(expected, msgs[40:]...) + } + sortMessages(expected) + sortMessages(rsp.Messages) + for i, msg := range rsp.Messages { + assertMessagesMatch(t, expected[i], msg) + } + }) + + t.Run("NoLimitSet", func(t *testing.T) { + reducedIDs := ids[:2] + + var rsp pb.RecentMessagesResponse + err := h.RecentMessages(context.TODO(), &pb.RecentMessagesRequest{ + ConversationIds: reducedIDs, + }, &rsp) + assert.NoError(t, err) + + if len(rsp.Messages) != 50 { + t.Fatalf("Expected %v messages but got %v", 50, len(rsp.Messages)) + return + } + var expected []*pb.Message + for _, id := range reducedIDs { + expected = append(expected, convos[id][25:]...) + } + sortMessages(expected) + sortMessages(rsp.Messages) + for i, msg := range rsp.Messages { + assertMessagesMatch(t, expected[i], msg) + } + }) +} diff --git a/streams/handler/streams.go b/streams/handler/streams.go new file mode 100644 index 0000000..b896d81 --- /dev/null +++ b/streams/handler/streams.go @@ -0,0 +1,60 @@ +package handler + +import ( + "time" + + "github.com/micro/micro/v3/service/errors" + pb "github.com/micro/services/streams/proto" + "google.golang.org/protobuf/types/known/timestamppb" + "gorm.io/gorm" +) + +var ( + ErrMissingID = errors.BadRequest("MISSING_ID", "Missing ID") + ErrMissingGroupID = errors.BadRequest("MISSING_GROUP_ID", "Missing GroupID") + ErrMissingTopic = errors.BadRequest("MISSING_TOPIC", "Missing Topic") + ErrMissingAuthorID = errors.BadRequest("MISSING_AUTHOR_ID", "Missing Author ID") + ErrMissingText = errors.BadRequest("MISSING_TEXT", "Missing text") + ErrMissingConversationID = errors.BadRequest("MISSING_CONVERSATION_ID", "Missing Conversation ID") + ErrMissingConversationIDs = errors.BadRequest("MISSING_CONVERSATION_IDS", "One or more Conversation IDs are required") + ErrNotFound = errors.NotFound("NOT_FOUND", "Conversation not found") +) + +type Streams struct { + DB *gorm.DB + Time func() time.Time +} + +type Message struct { + ID string + AuthorID string + ConversationID string + Text string + SentAt time.Time +} + +func (m *Message) Serialize() *pb.Message { + return &pb.Message{ + Id: m.ID, + AuthorId: m.AuthorID, + ConversationId: m.ConversationID, + Text: m.Text, + SentAt: timestamppb.New(m.SentAt), + } +} + +type Conversation struct { + ID string + GroupID string + Topic string + CreatedAt time.Time +} + +func (c *Conversation) Serialize() *pb.Conversation { + return &pb.Conversation{ + Id: c.ID, + GroupId: c.GroupID, + Topic: c.Topic, + CreatedAt: timestamppb.New(c.CreatedAt), + } +} diff --git a/streams/handler/streams_test.go b/streams/handler/streams_test.go new file mode 100644 index 0000000..6f0856b --- /dev/null +++ b/streams/handler/streams_test.go @@ -0,0 +1,84 @@ +package handler_test + +import ( + "testing" + "time" + + "github.com/micro/services/streams/handler" + pb "github.com/micro/services/streams/proto" + "github.com/stretchr/testify/assert" + + "gorm.io/driver/postgres" + "gorm.io/gorm" +) + +func testHandler(t *testing.T) *handler.Streams { + // connect to the database + db, err := gorm.Open(postgres.Open("postgresql://postgres@localhost:5432/streams?sslmode=disable"), &gorm.Config{}) + if err != nil { + t.Fatalf("Error connecting to database: %v", err) + } + + // migrate the database + if err := db.AutoMigrate(&handler.Conversation{}, &handler.Message{}); err != nil { + t.Fatalf("Error migrating database: %v", err) + } + + // clean any data from a previous run + if err := db.Exec("TRUNCATE TABLE conversations, messages CASCADE").Error; err != nil { + t.Fatalf("Error cleaning database: %v", err) + } + + return &handler.Streams{DB: db, Time: func() time.Time { return time.Unix(1611327673, 0) }} +} + +func assertConversationsMatch(t *testing.T, exp, act *pb.Conversation) { + if act == nil { + t.Errorf("Conversation not returned") + return + } + + // adapt this check so we can reuse the func in testing create, where we don't know the exact id + // which will be generated + if len(exp.Id) > 0 { + assert.Equal(t, exp.Id, act.Id) + } else { + assert.NotEmpty(t, act.Id) + } + + assert.Equal(t, exp.Topic, act.Topic) + assert.Equal(t, exp.GroupId, act.GroupId) + + if act.CreatedAt == nil { + t.Errorf("CreatedAt not set") + return + } + + assert.True(t, exp.CreatedAt.AsTime().Equal(act.CreatedAt.AsTime())) +} + +func assertMessagesMatch(t *testing.T, exp, act *pb.Message) { + if act == nil { + t.Errorf("Message not returned") + return + } + + // adapt this check so we can reuse the func in testing create, where we don't know the exact id + // which will be generated + if len(exp.Id) > 0 { + assert.Equal(t, exp.Id, act.Id) + } else { + assert.NotEmpty(t, act.Id) + } + + assert.Equal(t, exp.Text, act.Text) + assert.Equal(t, exp.AuthorId, act.AuthorId) + assert.Equal(t, exp.ConversationId, act.ConversationId) + + if act.SentAt == nil { + t.Errorf("SentAt not set") + return + } + + assert.True(t, exp.SentAt.AsTime().Equal(act.SentAt.AsTime())) +} diff --git a/streams/handler/update_conversation.go b/streams/handler/update_conversation.go new file mode 100644 index 0000000..d46f754 --- /dev/null +++ b/streams/handler/update_conversation.go @@ -0,0 +1,41 @@ +package handler + +import ( + "context" + + "github.com/micro/micro/v3/service/errors" + "github.com/micro/micro/v3/service/logger" + pb "github.com/micro/services/streams/proto" + "gorm.io/gorm" +) + +// Update a conversations topic +func (s *Streams) UpdateConversation(ctx context.Context, req *pb.UpdateConversationRequest, rsp *pb.UpdateConversationResponse) error { + // validate the request + if len(req.Id) == 0 { + return ErrMissingID + } + if len(req.Topic) == 0 { + return ErrMissingTopic + } + + // lookup the conversation + var conv Conversation + if err := s.DB.Where(&Conversation{ID: req.Id}).First(&conv).Error; err == gorm.ErrRecordNotFound { + return ErrNotFound + } else if err != nil { + logger.Errorf("Error reading conversation: %v", err) + return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database") + } + + // update the conversation + conv.Topic = req.Topic + if err := s.DB.Save(&conv).Error; err != nil { + logger.Errorf("Error updating conversation: %v", err) + return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database") + } + + // serialize the result + rsp.Conversation = conv.Serialize() + return nil +} diff --git a/streams/handler/update_conversation_test.go b/streams/handler/update_conversation_test.go new file mode 100644 index 0000000..00d0945 --- /dev/null +++ b/streams/handler/update_conversation_test.go @@ -0,0 +1,66 @@ +package handler_test + +import ( + "context" + "testing" + + "github.com/google/uuid" + "github.com/micro/services/streams/handler" + pb "github.com/micro/services/streams/proto" + "github.com/stretchr/testify/assert" +) + +func TestUpdateConversation(t *testing.T) { + h := testHandler(t) + + // seed some data + var cRsp pb.CreateConversationResponse + err := h.CreateConversation(context.TODO(), &pb.CreateConversationRequest{ + Topic: "HelloWorld", GroupId: uuid.New().String(), + }, &cRsp) + if err != nil { + t.Fatalf("Error creating conversation: %v", err) + return + } + + t.Run("MissingID", func(t *testing.T) { + err := h.UpdateConversation(context.TODO(), &pb.UpdateConversationRequest{ + Topic: "NewTopic", + }, &pb.UpdateConversationResponse{}) + assert.Equal(t, handler.ErrMissingID, err) + }) + + t.Run("MissingTopic", func(t *testing.T) { + err := h.UpdateConversation(context.TODO(), &pb.UpdateConversationRequest{ + Id: uuid.New().String(), + }, &pb.UpdateConversationResponse{}) + assert.Equal(t, handler.ErrMissingTopic, err) + }) + + t.Run("InvalidID", func(t *testing.T) { + err := h.UpdateConversation(context.TODO(), &pb.UpdateConversationRequest{ + Id: uuid.New().String(), + Topic: "NewTopic", + }, &pb.UpdateConversationResponse{}) + assert.Equal(t, handler.ErrNotFound, err) + }) + + t.Run("Valid", func(t *testing.T) { + err := h.UpdateConversation(context.TODO(), &pb.UpdateConversationRequest{ + Id: cRsp.Conversation.Id, + Topic: "NewTopic", + }, &pb.UpdateConversationResponse{}) + assert.NoError(t, err) + + var rsp pb.ReadConversationResponse + err = h.ReadConversation(context.TODO(), &pb.ReadConversationRequest{ + Id: cRsp.Conversation.Id, + }, &rsp) + assert.NoError(t, err) + if rsp.Conversation == nil { + t.Fatal("No conversation returned") + return + } + assert.Equal(t, "NewTopic", rsp.Conversation.Topic) + }) +} diff --git a/streams/main.go b/streams/main.go new file mode 100644 index 0000000..916f21e --- /dev/null +++ b/streams/main.go @@ -0,0 +1,25 @@ +package main + +import ( + "github.com/micro/services/streams/handler" + pb "github.com/micro/services/streams/proto" + + "github.com/micro/micro/v3/service" + "github.com/micro/micro/v3/service/logger" +) + +func main() { + // Create service + srv := service.New( + service.Name("streams"), + service.Version("latest"), + ) + + // Register handler + pb.RegisterStreamsHandler(srv.Server(), new(handler.Streams)) + + // Run service + if err := srv.Run(); err != nil { + logger.Fatal(err) + } +} diff --git a/streams/micro.mu b/streams/micro.mu new file mode 100644 index 0000000..c961f25 --- /dev/null +++ b/streams/micro.mu @@ -0,0 +1 @@ +service streams diff --git a/streams/proto/streams.pb.go b/streams/proto/streams.pb.go new file mode 100644 index 0000000..9e56083 --- /dev/null +++ b/streams/proto/streams.pb.go @@ -0,0 +1,1465 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.23.0 +// protoc v3.13.0 +// source: proto/streams.proto + +package streams + +import ( + proto "github.com/golang/protobuf/proto" + timestamp "github.com/golang/protobuf/ptypes/timestamp" + wrappers "github.com/golang/protobuf/ptypes/wrappers" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 + +type Conversation struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + GroupId string `protobuf:"bytes,2,opt,name=group_id,json=groupId,proto3" json:"group_id,omitempty"` + Topic string `protobuf:"bytes,3,opt,name=topic,proto3" json:"topic,omitempty"` + CreatedAt *timestamp.Timestamp `protobuf:"bytes,4,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` +} + +func (x *Conversation) Reset() { + *x = Conversation{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Conversation) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Conversation) ProtoMessage() {} + +func (x *Conversation) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Conversation.ProtoReflect.Descriptor instead. +func (*Conversation) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{0} +} + +func (x *Conversation) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *Conversation) GetGroupId() string { + if x != nil { + return x.GroupId + } + return "" +} + +func (x *Conversation) GetTopic() string { + if x != nil { + return x.Topic + } + return "" +} + +func (x *Conversation) GetCreatedAt() *timestamp.Timestamp { + if x != nil { + return x.CreatedAt + } + return nil +} + +type Message struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + AuthorId string `protobuf:"bytes,2,opt,name=author_id,json=authorId,proto3" json:"author_id,omitempty"` + ConversationId string `protobuf:"bytes,3,opt,name=conversation_id,json=conversationId,proto3" json:"conversation_id,omitempty"` + Text string `protobuf:"bytes,4,opt,name=text,proto3" json:"text,omitempty"` + SentAt *timestamp.Timestamp `protobuf:"bytes,5,opt,name=sent_at,json=sentAt,proto3" json:"sent_at,omitempty"` +} + +func (x *Message) Reset() { + *x = Message{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Message) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Message) ProtoMessage() {} + +func (x *Message) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Message.ProtoReflect.Descriptor instead. +func (*Message) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{1} +} + +func (x *Message) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *Message) GetAuthorId() string { + if x != nil { + return x.AuthorId + } + return "" +} + +func (x *Message) GetConversationId() string { + if x != nil { + return x.ConversationId + } + return "" +} + +func (x *Message) GetText() string { + if x != nil { + return x.Text + } + return "" +} + +func (x *Message) GetSentAt() *timestamp.Timestamp { + if x != nil { + return x.SentAt + } + return nil +} + +type CreateConversationRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + GroupId string `protobuf:"bytes,1,opt,name=group_id,json=groupId,proto3" json:"group_id,omitempty"` + Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"` +} + +func (x *CreateConversationRequest) Reset() { + *x = CreateConversationRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CreateConversationRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CreateConversationRequest) ProtoMessage() {} + +func (x *CreateConversationRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CreateConversationRequest.ProtoReflect.Descriptor instead. +func (*CreateConversationRequest) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{2} +} + +func (x *CreateConversationRequest) GetGroupId() string { + if x != nil { + return x.GroupId + } + return "" +} + +func (x *CreateConversationRequest) GetTopic() string { + if x != nil { + return x.Topic + } + return "" +} + +type CreateConversationResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Conversation *Conversation `protobuf:"bytes,1,opt,name=conversation,proto3" json:"conversation,omitempty"` +} + +func (x *CreateConversationResponse) Reset() { + *x = CreateConversationResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CreateConversationResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CreateConversationResponse) ProtoMessage() {} + +func (x *CreateConversationResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CreateConversationResponse.ProtoReflect.Descriptor instead. +func (*CreateConversationResponse) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{3} +} + +func (x *CreateConversationResponse) GetConversation() *Conversation { + if x != nil { + return x.Conversation + } + return nil +} + +type ReadConversationRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + GroupId *wrappers.StringValue `protobuf:"bytes,2,opt,name=group_id,json=groupId,proto3" json:"group_id,omitempty"` +} + +func (x *ReadConversationRequest) Reset() { + *x = ReadConversationRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReadConversationRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReadConversationRequest) ProtoMessage() {} + +func (x *ReadConversationRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReadConversationRequest.ProtoReflect.Descriptor instead. +func (*ReadConversationRequest) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{4} +} + +func (x *ReadConversationRequest) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *ReadConversationRequest) GetGroupId() *wrappers.StringValue { + if x != nil { + return x.GroupId + } + return nil +} + +type ReadConversationResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Conversation *Conversation `protobuf:"bytes,1,opt,name=conversation,proto3" json:"conversation,omitempty"` +} + +func (x *ReadConversationResponse) Reset() { + *x = ReadConversationResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReadConversationResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReadConversationResponse) ProtoMessage() {} + +func (x *ReadConversationResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReadConversationResponse.ProtoReflect.Descriptor instead. +func (*ReadConversationResponse) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{5} +} + +func (x *ReadConversationResponse) GetConversation() *Conversation { + if x != nil { + return x.Conversation + } + return nil +} + +type ListConversationsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + GroupId string `protobuf:"bytes,1,opt,name=group_id,json=groupId,proto3" json:"group_id,omitempty"` +} + +func (x *ListConversationsRequest) Reset() { + *x = ListConversationsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ListConversationsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListConversationsRequest) ProtoMessage() {} + +func (x *ListConversationsRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListConversationsRequest.ProtoReflect.Descriptor instead. +func (*ListConversationsRequest) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{6} +} + +func (x *ListConversationsRequest) GetGroupId() string { + if x != nil { + return x.GroupId + } + return "" +} + +type ListConversationsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Conversations []*Conversation `protobuf:"bytes,1,rep,name=conversations,proto3" json:"conversations,omitempty"` +} + +func (x *ListConversationsResponse) Reset() { + *x = ListConversationsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ListConversationsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListConversationsResponse) ProtoMessage() {} + +func (x *ListConversationsResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[7] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListConversationsResponse.ProtoReflect.Descriptor instead. +func (*ListConversationsResponse) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{7} +} + +func (x *ListConversationsResponse) GetConversations() []*Conversation { + if x != nil { + return x.Conversations + } + return nil +} + +type UpdateConversationRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"` +} + +func (x *UpdateConversationRequest) Reset() { + *x = UpdateConversationRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *UpdateConversationRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UpdateConversationRequest) ProtoMessage() {} + +func (x *UpdateConversationRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[8] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UpdateConversationRequest.ProtoReflect.Descriptor instead. +func (*UpdateConversationRequest) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{8} +} + +func (x *UpdateConversationRequest) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *UpdateConversationRequest) GetTopic() string { + if x != nil { + return x.Topic + } + return "" +} + +type UpdateConversationResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Conversation *Conversation `protobuf:"bytes,1,opt,name=conversation,proto3" json:"conversation,omitempty"` +} + +func (x *UpdateConversationResponse) Reset() { + *x = UpdateConversationResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *UpdateConversationResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UpdateConversationResponse) ProtoMessage() {} + +func (x *UpdateConversationResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[9] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UpdateConversationResponse.ProtoReflect.Descriptor instead. +func (*UpdateConversationResponse) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{9} +} + +func (x *UpdateConversationResponse) GetConversation() *Conversation { + if x != nil { + return x.Conversation + } + return nil +} + +type DeleteConversationRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *DeleteConversationRequest) Reset() { + *x = DeleteConversationRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DeleteConversationRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DeleteConversationRequest) ProtoMessage() {} + +func (x *DeleteConversationRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[10] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DeleteConversationRequest.ProtoReflect.Descriptor instead. +func (*DeleteConversationRequest) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{10} +} + +func (x *DeleteConversationRequest) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +type DeleteConversationResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *DeleteConversationResponse) Reset() { + *x = DeleteConversationResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DeleteConversationResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DeleteConversationResponse) ProtoMessage() {} + +func (x *DeleteConversationResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[11] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DeleteConversationResponse.ProtoReflect.Descriptor instead. +func (*DeleteConversationResponse) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{11} +} + +type CreateMessageRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ConversationId string `protobuf:"bytes,1,opt,name=conversation_id,json=conversationId,proto3" json:"conversation_id,omitempty"` + AuthorId string `protobuf:"bytes,2,opt,name=author_id,json=authorId,proto3" json:"author_id,omitempty"` + Text string `protobuf:"bytes,3,opt,name=text,proto3" json:"text,omitempty"` +} + +func (x *CreateMessageRequest) Reset() { + *x = CreateMessageRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[12] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CreateMessageRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CreateMessageRequest) ProtoMessage() {} + +func (x *CreateMessageRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[12] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CreateMessageRequest.ProtoReflect.Descriptor instead. +func (*CreateMessageRequest) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{12} +} + +func (x *CreateMessageRequest) GetConversationId() string { + if x != nil { + return x.ConversationId + } + return "" +} + +func (x *CreateMessageRequest) GetAuthorId() string { + if x != nil { + return x.AuthorId + } + return "" +} + +func (x *CreateMessageRequest) GetText() string { + if x != nil { + return x.Text + } + return "" +} + +type CreateMessageResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Message *Message `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` +} + +func (x *CreateMessageResponse) Reset() { + *x = CreateMessageResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CreateMessageResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CreateMessageResponse) ProtoMessage() {} + +func (x *CreateMessageResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[13] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CreateMessageResponse.ProtoReflect.Descriptor instead. +func (*CreateMessageResponse) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{13} +} + +func (x *CreateMessageResponse) GetMessage() *Message { + if x != nil { + return x.Message + } + return nil +} + +type ListMessagesRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ConversationId string `protobuf:"bytes,1,opt,name=conversation_id,json=conversationId,proto3" json:"conversation_id,omitempty"` + SentBefore *timestamp.Timestamp `protobuf:"bytes,2,opt,name=sent_before,json=sentBefore,proto3" json:"sent_before,omitempty"` + Limit *wrappers.Int32Value `protobuf:"bytes,3,opt,name=limit,proto3" json:"limit,omitempty"` +} + +func (x *ListMessagesRequest) Reset() { + *x = ListMessagesRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[14] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ListMessagesRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListMessagesRequest) ProtoMessage() {} + +func (x *ListMessagesRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[14] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListMessagesRequest.ProtoReflect.Descriptor instead. +func (*ListMessagesRequest) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{14} +} + +func (x *ListMessagesRequest) GetConversationId() string { + if x != nil { + return x.ConversationId + } + return "" +} + +func (x *ListMessagesRequest) GetSentBefore() *timestamp.Timestamp { + if x != nil { + return x.SentBefore + } + return nil +} + +func (x *ListMessagesRequest) GetLimit() *wrappers.Int32Value { + if x != nil { + return x.Limit + } + return nil +} + +type ListMessagesResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"` +} + +func (x *ListMessagesResponse) Reset() { + *x = ListMessagesResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[15] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ListMessagesResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListMessagesResponse) ProtoMessage() {} + +func (x *ListMessagesResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[15] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListMessagesResponse.ProtoReflect.Descriptor instead. +func (*ListMessagesResponse) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{15} +} + +func (x *ListMessagesResponse) GetMessages() []*Message { + if x != nil { + return x.Messages + } + return nil +} + +type RecentMessagesRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ConversationIds []string `protobuf:"bytes,1,rep,name=conversation_ids,json=conversationIds,proto3" json:"conversation_ids,omitempty"` + LimitPerConversation *wrappers.Int32Value `protobuf:"bytes,2,opt,name=limit_per_conversation,json=limitPerConversation,proto3" json:"limit_per_conversation,omitempty"` +} + +func (x *RecentMessagesRequest) Reset() { + *x = RecentMessagesRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[16] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RecentMessagesRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RecentMessagesRequest) ProtoMessage() {} + +func (x *RecentMessagesRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[16] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RecentMessagesRequest.ProtoReflect.Descriptor instead. +func (*RecentMessagesRequest) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{16} +} + +func (x *RecentMessagesRequest) GetConversationIds() []string { + if x != nil { + return x.ConversationIds + } + return nil +} + +func (x *RecentMessagesRequest) GetLimitPerConversation() *wrappers.Int32Value { + if x != nil { + return x.LimitPerConversation + } + return nil +} + +type RecentMessagesResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"` +} + +func (x *RecentMessagesResponse) Reset() { + *x = RecentMessagesResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_proto_msgTypes[17] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RecentMessagesResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RecentMessagesResponse) ProtoMessage() {} + +func (x *RecentMessagesResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_proto_msgTypes[17] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RecentMessagesResponse.ProtoReflect.Descriptor instead. +func (*RecentMessagesResponse) Descriptor() ([]byte, []int) { + return file_proto_streams_proto_rawDescGZIP(), []int{17} +} + +func (x *RecentMessagesResponse) GetMessages() []*Message { + if x != nil { + return x.Messages + } + return nil +} + +var File_proto_streams_proto protoreflect.FileDescriptor + +var file_proto_streams_proto_rawDesc = []byte{ + 0x0a, 0x13, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x1a, 0x1f, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, + 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, + 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2f, 0x77, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, + 0x8a, 0x01, 0x0a, 0x0c, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, + 0x12, 0x19, 0x0a, 0x08, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x07, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x74, + 0x6f, 0x70, 0x69, 0x63, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, + 0x63, 0x12, 0x39, 0x0a, 0x0a, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x52, 0x09, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x22, 0xa8, 0x01, 0x0a, + 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x61, 0x75, 0x74, 0x68, + 0x6f, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x61, 0x75, 0x74, + 0x68, 0x6f, 0x72, 0x49, 0x64, 0x12, 0x27, 0x0a, 0x0f, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, + 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x12, + 0x0a, 0x04, 0x74, 0x65, 0x78, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x65, + 0x78, 0x74, 0x12, 0x33, 0x0a, 0x07, 0x73, 0x65, 0x6e, 0x74, 0x5f, 0x61, 0x74, 0x18, 0x05, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, + 0x06, 0x73, 0x65, 0x6e, 0x74, 0x41, 0x74, 0x22, 0x4c, 0x0a, 0x19, 0x43, 0x72, 0x65, 0x61, 0x74, + 0x65, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x69, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x49, 0x64, 0x12, + 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x74, 0x6f, 0x70, 0x69, 0x63, 0x22, 0x57, 0x0a, 0x1a, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, + 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x39, 0x0a, 0x0c, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x73, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x73, 0x2e, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x52, 0x0c, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x62, + 0x0a, 0x17, 0x52, 0x65, 0x61, 0x64, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x37, 0x0a, 0x08, 0x67, 0x72, 0x6f, + 0x75, 0x70, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, + 0x72, 0x69, 0x6e, 0x67, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x07, 0x67, 0x72, 0x6f, 0x75, 0x70, + 0x49, 0x64, 0x22, 0x55, 0x0a, 0x18, 0x52, 0x65, 0x61, 0x64, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, + 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x39, + 0x0a, 0x0c, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x43, + 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0c, 0x63, 0x6f, 0x6e, + 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x35, 0x0a, 0x18, 0x4c, 0x69, 0x73, + 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x69, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x49, 0x64, + 0x22, 0x58, 0x0a, 0x19, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3b, 0x0a, + 0x0d, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x43, + 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0d, 0x63, 0x6f, 0x6e, + 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x41, 0x0a, 0x19, 0x55, 0x70, + 0x64, 0x61, 0x74, 0x65, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x22, 0x57, 0x0a, + 0x1a, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x39, 0x0a, 0x0c, 0x63, + 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x15, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x43, 0x6f, 0x6e, 0x76, + 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0c, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, + 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x2b, 0x0a, 0x19, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, + 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x02, 0x69, 0x64, 0x22, 0x1c, 0x0a, 0x1a, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x43, 0x6f, 0x6e, + 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x70, 0x0a, 0x14, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x27, 0x0a, 0x0f, 0x63, 0x6f, 0x6e, + 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x61, 0x75, 0x74, 0x68, 0x6f, 0x72, 0x5f, 0x69, 0x64, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x61, 0x75, 0x74, 0x68, 0x6f, 0x72, 0x49, 0x64, 0x12, + 0x12, 0x0a, 0x04, 0x74, 0x65, 0x78, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, + 0x65, 0x78, 0x74, 0x22, 0x43, 0x0a, 0x15, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x07, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, + 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, + 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xae, 0x01, 0x0a, 0x13, 0x4c, 0x69, 0x73, + 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x27, 0x0a, 0x0f, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x63, 0x6f, 0x6e, 0x76, 0x65, + 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x3b, 0x0a, 0x0b, 0x73, 0x65, 0x6e, + 0x74, 0x5f, 0x62, 0x65, 0x66, 0x6f, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, + 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0a, 0x73, 0x65, 0x6e, 0x74, + 0x42, 0x65, 0x66, 0x6f, 0x72, 0x65, 0x12, 0x31, 0x0a, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x49, 0x6e, 0x74, 0x33, 0x32, 0x56, 0x61, 0x6c, + 0x75, 0x65, 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x22, 0x44, 0x0a, 0x14, 0x4c, 0x69, 0x73, + 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x2c, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x22, + 0x95, 0x01, 0x0a, 0x15, 0x52, 0x65, 0x63, 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x29, 0x0a, 0x10, 0x63, 0x6f, 0x6e, + 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x01, 0x20, + 0x03, 0x28, 0x09, 0x52, 0x0f, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x49, 0x64, 0x73, 0x12, 0x51, 0x0a, 0x16, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x5f, 0x70, 0x65, + 0x72, 0x5f, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x49, 0x6e, 0x74, 0x33, 0x32, 0x56, 0x61, 0x6c, 0x75, + 0x65, 0x52, 0x14, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x50, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x76, 0x65, + 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x46, 0x0a, 0x16, 0x52, 0x65, 0x63, 0x65, 0x6e, + 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x2c, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x32, + 0xcb, 0x05, 0x0a, 0x07, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x12, 0x5d, 0x0a, 0x12, 0x43, + 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x12, 0x22, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x43, 0x72, 0x65, 0x61, + 0x74, 0x65, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, + 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x57, 0x0a, 0x10, 0x52, 0x65, + 0x61, 0x64, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x20, + 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x43, 0x6f, 0x6e, + 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x21, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x43, + 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x5d, 0x0a, 0x12, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x43, 0x6f, 0x6e, + 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x22, 0x2e, 0x73, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x73, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, + 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, + 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x43, 0x6f, + 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x5d, 0x0a, 0x12, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x43, 0x6f, 0x6e, 0x76, + 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x22, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x73, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x73, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x43, 0x6f, 0x6e, + 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x5a, 0x0a, 0x11, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x21, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, + 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x73, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x73, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4e, 0x0a, + 0x0d, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1d, + 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, + 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4b, 0x0a, + 0x0c, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x12, 0x1c, 0x2e, + 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x73, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x51, 0x0a, 0x0e, 0x52, 0x65, + 0x63, 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x12, 0x1e, 0x2e, 0x73, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x52, 0x65, 0x63, 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x73, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x52, 0x65, 0x63, 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x0f, 0x5a, + 0x0d, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_proto_streams_proto_rawDescOnce sync.Once + file_proto_streams_proto_rawDescData = file_proto_streams_proto_rawDesc +) + +func file_proto_streams_proto_rawDescGZIP() []byte { + file_proto_streams_proto_rawDescOnce.Do(func() { + file_proto_streams_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_streams_proto_rawDescData) + }) + return file_proto_streams_proto_rawDescData +} + +var file_proto_streams_proto_msgTypes = make([]protoimpl.MessageInfo, 18) +var file_proto_streams_proto_goTypes = []interface{}{ + (*Conversation)(nil), // 0: streams.Conversation + (*Message)(nil), // 1: streams.Message + (*CreateConversationRequest)(nil), // 2: streams.CreateConversationRequest + (*CreateConversationResponse)(nil), // 3: streams.CreateConversationResponse + (*ReadConversationRequest)(nil), // 4: streams.ReadConversationRequest + (*ReadConversationResponse)(nil), // 5: streams.ReadConversationResponse + (*ListConversationsRequest)(nil), // 6: streams.ListConversationsRequest + (*ListConversationsResponse)(nil), // 7: streams.ListConversationsResponse + (*UpdateConversationRequest)(nil), // 8: streams.UpdateConversationRequest + (*UpdateConversationResponse)(nil), // 9: streams.UpdateConversationResponse + (*DeleteConversationRequest)(nil), // 10: streams.DeleteConversationRequest + (*DeleteConversationResponse)(nil), // 11: streams.DeleteConversationResponse + (*CreateMessageRequest)(nil), // 12: streams.CreateMessageRequest + (*CreateMessageResponse)(nil), // 13: streams.CreateMessageResponse + (*ListMessagesRequest)(nil), // 14: streams.ListMessagesRequest + (*ListMessagesResponse)(nil), // 15: streams.ListMessagesResponse + (*RecentMessagesRequest)(nil), // 16: streams.RecentMessagesRequest + (*RecentMessagesResponse)(nil), // 17: streams.RecentMessagesResponse + (*timestamp.Timestamp)(nil), // 18: google.protobuf.Timestamp + (*wrappers.StringValue)(nil), // 19: google.protobuf.StringValue + (*wrappers.Int32Value)(nil), // 20: google.protobuf.Int32Value +} +var file_proto_streams_proto_depIdxs = []int32{ + 18, // 0: streams.Conversation.created_at:type_name -> google.protobuf.Timestamp + 18, // 1: streams.Message.sent_at:type_name -> google.protobuf.Timestamp + 0, // 2: streams.CreateConversationResponse.conversation:type_name -> streams.Conversation + 19, // 3: streams.ReadConversationRequest.group_id:type_name -> google.protobuf.StringValue + 0, // 4: streams.ReadConversationResponse.conversation:type_name -> streams.Conversation + 0, // 5: streams.ListConversationsResponse.conversations:type_name -> streams.Conversation + 0, // 6: streams.UpdateConversationResponse.conversation:type_name -> streams.Conversation + 1, // 7: streams.CreateMessageResponse.message:type_name -> streams.Message + 18, // 8: streams.ListMessagesRequest.sent_before:type_name -> google.protobuf.Timestamp + 20, // 9: streams.ListMessagesRequest.limit:type_name -> google.protobuf.Int32Value + 1, // 10: streams.ListMessagesResponse.messages:type_name -> streams.Message + 20, // 11: streams.RecentMessagesRequest.limit_per_conversation:type_name -> google.protobuf.Int32Value + 1, // 12: streams.RecentMessagesResponse.messages:type_name -> streams.Message + 2, // 13: streams.Streams.CreateConversation:input_type -> streams.CreateConversationRequest + 4, // 14: streams.Streams.ReadConversation:input_type -> streams.ReadConversationRequest + 8, // 15: streams.Streams.UpdateConversation:input_type -> streams.UpdateConversationRequest + 10, // 16: streams.Streams.DeleteConversation:input_type -> streams.DeleteConversationRequest + 6, // 17: streams.Streams.ListConversations:input_type -> streams.ListConversationsRequest + 12, // 18: streams.Streams.CreateMessage:input_type -> streams.CreateMessageRequest + 14, // 19: streams.Streams.ListMessages:input_type -> streams.ListMessagesRequest + 16, // 20: streams.Streams.RecentMessages:input_type -> streams.RecentMessagesRequest + 3, // 21: streams.Streams.CreateConversation:output_type -> streams.CreateConversationResponse + 5, // 22: streams.Streams.ReadConversation:output_type -> streams.ReadConversationResponse + 9, // 23: streams.Streams.UpdateConversation:output_type -> streams.UpdateConversationResponse + 11, // 24: streams.Streams.DeleteConversation:output_type -> streams.DeleteConversationResponse + 7, // 25: streams.Streams.ListConversations:output_type -> streams.ListConversationsResponse + 13, // 26: streams.Streams.CreateMessage:output_type -> streams.CreateMessageResponse + 15, // 27: streams.Streams.ListMessages:output_type -> streams.ListMessagesResponse + 17, // 28: streams.Streams.RecentMessages:output_type -> streams.RecentMessagesResponse + 21, // [21:29] is the sub-list for method output_type + 13, // [13:21] is the sub-list for method input_type + 13, // [13:13] is the sub-list for extension type_name + 13, // [13:13] is the sub-list for extension extendee + 0, // [0:13] is the sub-list for field type_name +} + +func init() { file_proto_streams_proto_init() } +func file_proto_streams_proto_init() { + if File_proto_streams_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_streams_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Conversation); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Message); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CreateConversationRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CreateConversationResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReadConversationRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReadConversationResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListConversationsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListConversationsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*UpdateConversationRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*UpdateConversationResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DeleteConversationRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DeleteConversationResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CreateMessageRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CreateMessageResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListMessagesRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_proto_msgTypes[15].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListMessagesResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_proto_msgTypes[16].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RecentMessagesRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_proto_msgTypes[17].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RecentMessagesResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_streams_proto_rawDesc, + NumEnums: 0, + NumMessages: 18, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_proto_streams_proto_goTypes, + DependencyIndexes: file_proto_streams_proto_depIdxs, + MessageInfos: file_proto_streams_proto_msgTypes, + }.Build() + File_proto_streams_proto = out.File + file_proto_streams_proto_rawDesc = nil + file_proto_streams_proto_goTypes = nil + file_proto_streams_proto_depIdxs = nil +} diff --git a/streams/proto/streams.pb.micro.go b/streams/proto/streams.pb.micro.go new file mode 100644 index 0000000..f78bb41 --- /dev/null +++ b/streams/proto/streams.pb.micro.go @@ -0,0 +1,236 @@ +// Code generated by protoc-gen-micro. DO NOT EDIT. +// source: proto/streams.proto + +package streams + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + _ "github.com/golang/protobuf/ptypes/timestamp" + _ "github.com/golang/protobuf/ptypes/wrappers" + math "math" +) + +import ( + context "context" + api "github.com/micro/micro/v3/service/api" + client "github.com/micro/micro/v3/service/client" + server "github.com/micro/micro/v3/service/server" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// Reference imports to suppress errors if they are not otherwise used. +var _ api.Endpoint +var _ context.Context +var _ client.Option +var _ server.Option + +// Api Endpoints for Streams service + +func NewStreamsEndpoints() []*api.Endpoint { + return []*api.Endpoint{} +} + +// Client API for Streams service + +type StreamsService interface { + // Create a conversation + CreateConversation(ctx context.Context, in *CreateConversationRequest, opts ...client.CallOption) (*CreateConversationResponse, error) + // Read a conversation using its ID, can filter using group ID if provided + ReadConversation(ctx context.Context, in *ReadConversationRequest, opts ...client.CallOption) (*ReadConversationResponse, error) + // Update a conversations topic + UpdateConversation(ctx context.Context, in *UpdateConversationRequest, opts ...client.CallOption) (*UpdateConversationResponse, error) + // Delete a conversation and all the messages within + DeleteConversation(ctx context.Context, in *DeleteConversationRequest, opts ...client.CallOption) (*DeleteConversationResponse, error) + // List all the conversations for a group + ListConversations(ctx context.Context, in *ListConversationsRequest, opts ...client.CallOption) (*ListConversationsResponse, error) + // Create a message within a conversation + CreateMessage(ctx context.Context, in *CreateMessageRequest, opts ...client.CallOption) (*CreateMessageResponse, error) + // List the messages within a conversation in reverse chronological order, using sent_before to + // offset as older messages need to be loaded + ListMessages(ctx context.Context, in *ListMessagesRequest, opts ...client.CallOption) (*ListMessagesResponse, error) + // RecentMessages returns the most recent messages in a group of conversations. By default the + // most messages retrieved per conversation is 10, however this can be overriden using the + // limit_per_conversation option + RecentMessages(ctx context.Context, in *RecentMessagesRequest, opts ...client.CallOption) (*RecentMessagesResponse, error) +} + +type streamsService struct { + c client.Client + name string +} + +func NewStreamsService(name string, c client.Client) StreamsService { + return &streamsService{ + c: c, + name: name, + } +} + +func (c *streamsService) CreateConversation(ctx context.Context, in *CreateConversationRequest, opts ...client.CallOption) (*CreateConversationResponse, error) { + req := c.c.NewRequest(c.name, "Streams.CreateConversation", in) + out := new(CreateConversationResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *streamsService) ReadConversation(ctx context.Context, in *ReadConversationRequest, opts ...client.CallOption) (*ReadConversationResponse, error) { + req := c.c.NewRequest(c.name, "Streams.ReadConversation", in) + out := new(ReadConversationResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *streamsService) UpdateConversation(ctx context.Context, in *UpdateConversationRequest, opts ...client.CallOption) (*UpdateConversationResponse, error) { + req := c.c.NewRequest(c.name, "Streams.UpdateConversation", in) + out := new(UpdateConversationResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *streamsService) DeleteConversation(ctx context.Context, in *DeleteConversationRequest, opts ...client.CallOption) (*DeleteConversationResponse, error) { + req := c.c.NewRequest(c.name, "Streams.DeleteConversation", in) + out := new(DeleteConversationResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *streamsService) ListConversations(ctx context.Context, in *ListConversationsRequest, opts ...client.CallOption) (*ListConversationsResponse, error) { + req := c.c.NewRequest(c.name, "Streams.ListConversations", in) + out := new(ListConversationsResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *streamsService) CreateMessage(ctx context.Context, in *CreateMessageRequest, opts ...client.CallOption) (*CreateMessageResponse, error) { + req := c.c.NewRequest(c.name, "Streams.CreateMessage", in) + out := new(CreateMessageResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *streamsService) ListMessages(ctx context.Context, in *ListMessagesRequest, opts ...client.CallOption) (*ListMessagesResponse, error) { + req := c.c.NewRequest(c.name, "Streams.ListMessages", in) + out := new(ListMessagesResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *streamsService) RecentMessages(ctx context.Context, in *RecentMessagesRequest, opts ...client.CallOption) (*RecentMessagesResponse, error) { + req := c.c.NewRequest(c.name, "Streams.RecentMessages", in) + out := new(RecentMessagesResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for Streams service + +type StreamsHandler interface { + // Create a conversation + CreateConversation(context.Context, *CreateConversationRequest, *CreateConversationResponse) error + // Read a conversation using its ID, can filter using group ID if provided + ReadConversation(context.Context, *ReadConversationRequest, *ReadConversationResponse) error + // Update a conversations topic + UpdateConversation(context.Context, *UpdateConversationRequest, *UpdateConversationResponse) error + // Delete a conversation and all the messages within + DeleteConversation(context.Context, *DeleteConversationRequest, *DeleteConversationResponse) error + // List all the conversations for a group + ListConversations(context.Context, *ListConversationsRequest, *ListConversationsResponse) error + // Create a message within a conversation + CreateMessage(context.Context, *CreateMessageRequest, *CreateMessageResponse) error + // List the messages within a conversation in reverse chronological order, using sent_before to + // offset as older messages need to be loaded + ListMessages(context.Context, *ListMessagesRequest, *ListMessagesResponse) error + // RecentMessages returns the most recent messages in a group of conversations. By default the + // most messages retrieved per conversation is 10, however this can be overriden using the + // limit_per_conversation option + RecentMessages(context.Context, *RecentMessagesRequest, *RecentMessagesResponse) error +} + +func RegisterStreamsHandler(s server.Server, hdlr StreamsHandler, opts ...server.HandlerOption) error { + type streams interface { + CreateConversation(ctx context.Context, in *CreateConversationRequest, out *CreateConversationResponse) error + ReadConversation(ctx context.Context, in *ReadConversationRequest, out *ReadConversationResponse) error + UpdateConversation(ctx context.Context, in *UpdateConversationRequest, out *UpdateConversationResponse) error + DeleteConversation(ctx context.Context, in *DeleteConversationRequest, out *DeleteConversationResponse) error + ListConversations(ctx context.Context, in *ListConversationsRequest, out *ListConversationsResponse) error + CreateMessage(ctx context.Context, in *CreateMessageRequest, out *CreateMessageResponse) error + ListMessages(ctx context.Context, in *ListMessagesRequest, out *ListMessagesResponse) error + RecentMessages(ctx context.Context, in *RecentMessagesRequest, out *RecentMessagesResponse) error + } + type Streams struct { + streams + } + h := &streamsHandler{hdlr} + return s.Handle(s.NewHandler(&Streams{h}, opts...)) +} + +type streamsHandler struct { + StreamsHandler +} + +func (h *streamsHandler) CreateConversation(ctx context.Context, in *CreateConversationRequest, out *CreateConversationResponse) error { + return h.StreamsHandler.CreateConversation(ctx, in, out) +} + +func (h *streamsHandler) ReadConversation(ctx context.Context, in *ReadConversationRequest, out *ReadConversationResponse) error { + return h.StreamsHandler.ReadConversation(ctx, in, out) +} + +func (h *streamsHandler) UpdateConversation(ctx context.Context, in *UpdateConversationRequest, out *UpdateConversationResponse) error { + return h.StreamsHandler.UpdateConversation(ctx, in, out) +} + +func (h *streamsHandler) DeleteConversation(ctx context.Context, in *DeleteConversationRequest, out *DeleteConversationResponse) error { + return h.StreamsHandler.DeleteConversation(ctx, in, out) +} + +func (h *streamsHandler) ListConversations(ctx context.Context, in *ListConversationsRequest, out *ListConversationsResponse) error { + return h.StreamsHandler.ListConversations(ctx, in, out) +} + +func (h *streamsHandler) CreateMessage(ctx context.Context, in *CreateMessageRequest, out *CreateMessageResponse) error { + return h.StreamsHandler.CreateMessage(ctx, in, out) +} + +func (h *streamsHandler) ListMessages(ctx context.Context, in *ListMessagesRequest, out *ListMessagesResponse) error { + return h.StreamsHandler.ListMessages(ctx, in, out) +} + +func (h *streamsHandler) RecentMessages(ctx context.Context, in *RecentMessagesRequest, out *RecentMessagesResponse) error { + return h.StreamsHandler.RecentMessages(ctx, in, out) +} diff --git a/streams/proto/streams.proto b/streams/proto/streams.proto new file mode 100644 index 0000000..d76069d --- /dev/null +++ b/streams/proto/streams.proto @@ -0,0 +1,113 @@ +syntax = "proto3"; + +package streams; +option go_package = "proto;streams"; +import "google/protobuf/timestamp.proto"; +import "google/protobuf/wrappers.proto"; + +service Streams { + // Create a conversation + rpc CreateConversation(CreateConversationRequest) returns (CreateConversationResponse); + // Read a conversation using its ID, can filter using group ID if provided + rpc ReadConversation(ReadConversationRequest) returns (ReadConversationResponse); + // Update a conversations topic + rpc UpdateConversation(UpdateConversationRequest) returns (UpdateConversationResponse); + // Delete a conversation and all the messages within + rpc DeleteConversation(DeleteConversationRequest) returns (DeleteConversationResponse); + // List all the conversations for a group + rpc ListConversations(ListConversationsRequest) returns (ListConversationsResponse); + // Create a message within a conversation + rpc CreateMessage(CreateMessageRequest) returns (CreateMessageResponse); + // List the messages within a conversation in reverse chronological order, using sent_before to + // offset as older messages need to be loaded + rpc ListMessages(ListMessagesRequest) returns (ListMessagesResponse); + // RecentMessages returns the most recent messages in a group of conversations. By default the + // most messages retrieved per conversation is 25, however this can be overriden using the + // limit_per_conversation option + rpc RecentMessages(RecentMessagesRequest) returns (RecentMessagesResponse); +} + +message Conversation { + string id = 1; + string group_id = 2; + string topic = 3; + google.protobuf.Timestamp created_at = 4; +} + +message Message { + string id = 1; + string author_id = 2; + string conversation_id = 3; + string text = 4; + google.protobuf.Timestamp sent_at = 5; +} + +message CreateConversationRequest { + string group_id = 1; + string topic = 2; +} + +message CreateConversationResponse { + Conversation conversation = 1; +} + +message ReadConversationRequest { + string id = 1; + google.protobuf.StringValue group_id = 2; +} + +message ReadConversationResponse { + Conversation conversation = 1; +} + +message ListConversationsRequest { + string group_id = 1; +} + +message ListConversationsResponse { + repeated Conversation conversations = 1; +} + +message UpdateConversationRequest { + string id = 1; + string topic = 2; +} + +message UpdateConversationResponse { + Conversation conversation = 1; +} + +message DeleteConversationRequest { + string id = 1; +} + +message DeleteConversationResponse {} + +message CreateMessageRequest { + string conversation_id = 1; + string author_id = 2; + string text = 3; +} + +message CreateMessageResponse { + Message message = 1; +} + +message ListMessagesRequest { + string conversation_id = 1; + google.protobuf.Timestamp sent_before = 2; + google.protobuf.Int32Value limit = 3; +} + +message ListMessagesResponse { + repeated Message messages = 1; +} + +message RecentMessagesRequest { + repeated string conversation_ids = 1; + google.protobuf.Int32Value limit_per_conversation = 2; +} + +message RecentMessagesResponse { + repeated Message messages = 1; +} \ No newline at end of file