From b3a93ce52a4452c3ee401c7fe9924f6b76d64fd9 Mon Sep 17 00:00:00 2001 From: Ben Toogood Date: Fri, 16 Oct 2020 10:28:40 +0100 Subject: [PATCH] Chat add support for CLI --- chat/README.md | 37 ++++++++- chat/handler/handler.go | 75 +++++++++++++---- chat/proto/chat.pb.go | 161 ++++++++++++++++++++++++++++++------ chat/proto/chat.pb.micro.go | 19 +++++ chat/proto/chat.proto | 19 +++++ 5 files changed, 270 insertions(+), 41 deletions(-) diff --git a/chat/README.md b/chat/README.md index 19fb752..7410db8 100644 --- a/chat/README.md +++ b/chat/README.md @@ -2,4 +2,39 @@ The chat service is an example Micro service which leverages bidirectional streaming, the store and events to build a chat backend. There is both a server and client which can be run together to demonstrate the application (see client/main.go for more instructions on running the service). -The service is documented inline and is designed to act as a reference for the events package. \ No newline at end of file +The service is documented inline and is designed to act as a reference for the events package. + +### Calling the service + +You can call the service via the CLI: + +Create a chat: +```bash +> micro chat new --user_ids=JohnBarry +{ + "chat_id": "3c9ea66c-d516-45d4-abe8-082089e18b27" +} +``` + +Send a message to the chat: +```bash +> micro chat send --chat_id=bed4f0f0-da12-46d2-90d2-17ae1714a214 --user_id=John --subject=Hello --text='Hey Barry' +{} +``` + +View the chat history +```bash +> micro chat history --chat_id=bed4f0f0-da12-46d2-90d2-17ae1714a214 +{ + "messages": [ + { + "id": "a61284a8-f471-4734-9192-640d89762e98", + "client_id": "6ba0d2a6-96fa-47d8-8f6f-7f75b4cc8b3e", + "chat_id": "bed4f0f0-da12-46d2-90d2-17ae1714a214", + "user_id": "John", + "subject": "Hello", + "text": "Hey Barry" + } + ] +} +``` \ No newline at end of file diff --git a/chat/handler/handler.go b/chat/handler/handler.go index da83307..c454513 100644 --- a/chat/handler/handler.go +++ b/chat/handler/handler.go @@ -138,6 +138,38 @@ func (h *handler) History(ctx context.Context, req *pb.HistoryRequest, rsp *pb.H return nil } +// Send a single message to the chat, designed for ease of use via the API / CLI +func (h *handler) Send(ctx context.Context, req *pb.SendRequest, rsp *pb.SendResponse) error { + // validate the request + if len(req.ChatId) == 0 { + return errors.BadRequest("chat.Send.MissingChatID", "ChatID is missing") + } + if len(req.UserId) == 0 { + return errors.BadRequest("chat.Send.MissingUserID", "UserID is missing") + } + if len(req.Text) == 0 { + return errors.BadRequest("chat.Send.MissingText", "Text is missing") + } + + // construct the message + msg := &pb.Message{ + Id: uuid.New().String(), + ClientId: req.ClientId, + ChatId: req.ChatId, + UserId: req.UserId, + Subject: req.Subject, + Text: req.Text, + } + + // default the client id if not provided + if len(msg.ClientId) == 0 { + msg.ClientId = uuid.New().String() + } + + // create the message + return h.createMessage(msg) +} + // Connect to a chat using a bidirectional stream enabling the client to send and recieve messages // over a single RPC. When a message is sent on the stream, it will be added to the chat history // and sent to the other connected users. When opening the connection, the client should provide @@ -236,28 +268,39 @@ func (h *handler) Connect(ctx context.Context, stream pb.Chat_ConnectStream) err // an error occured in another goroutine, terminate the stream return err case msg := <-msgChan: - // a message was recieved from the client. validate it hasn't been recieved before - if _, err := store.Read(messageStoreKeyPrefix + msg.ClientId); err == nil { - // the message has already been processed - continue - } else if err != store.ErrNotFound { - // an unexpected error occured - return err - } - // set the defaults msg.UserId = userID msg.ChatId = chatID - // send the message to the event stream - if err := events.Publish(chatEventKeyPrefix+chatID, msg); err != nil { - return err - } - - // record the messages client id - if err := store.Write(&store.Record{Key: messageStoreKeyPrefix + msg.ClientId}); err != nil { + // create the message + if err := h.createMessage(msg); err != nil { return err } } } } + +// createMessage is a helper function which creates a message in the event stream. It handles the +// logic for ensuring client id is unique. +func (h *handler) createMessage(msg *pb.Message) error { + // a message was recieved from the client. validate it hasn't been recieved before + if _, err := store.Read(messageStoreKeyPrefix + msg.ClientId); err == nil { + // the message has already been processed + return nil + } else if err != store.ErrNotFound { + // an unexpected error occured + return err + } + + // send the message to the event stream + if err := events.Publish(chatEventKeyPrefix+msg.ChatId, msg); err != nil { + return err + } + + // record the messages client id + if err := store.Write(&store.Record{Key: messageStoreKeyPrefix + msg.ClientId}); err != nil { + return err + } + + return nil +} diff --git a/chat/proto/chat.pb.go b/chat/proto/chat.pb.go index 8806d76..cca8e16 100644 --- a/chat/proto/chat.pb.go +++ b/chat/proto/chat.pb.go @@ -182,6 +182,115 @@ func (m *HistoryResponse) GetMessages() []*Message { return nil } +// SendRequest contains a single message to send to a chat +type SendRequest struct { + // a client side id, should be validated by the server to make the request retry safe + ClientId string `protobuf:"bytes,1,opt,name=client_id,json=clientId,proto3" json:"client_id,omitempty"` + // id of the chat the message is being sent to / from + ChatId string `protobuf:"bytes,2,opt,name=chat_id,json=chatId,proto3" json:"chat_id,omitempty"` + // id of the user who sent the message + UserId string `protobuf:"bytes,3,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` + // subject of the message + Subject string `protobuf:"bytes,4,opt,name=subject,proto3" json:"subject,omitempty"` + // text of the message + Text string `protobuf:"bytes,5,opt,name=text,proto3" json:"text,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SendRequest) Reset() { *m = SendRequest{} } +func (m *SendRequest) String() string { return proto.CompactTextString(m) } +func (*SendRequest) ProtoMessage() {} +func (*SendRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_825b1469f80f958d, []int{4} +} + +func (m *SendRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_SendRequest.Unmarshal(m, b) +} +func (m *SendRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_SendRequest.Marshal(b, m, deterministic) +} +func (m *SendRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_SendRequest.Merge(m, src) +} +func (m *SendRequest) XXX_Size() int { + return xxx_messageInfo_SendRequest.Size(m) +} +func (m *SendRequest) XXX_DiscardUnknown() { + xxx_messageInfo_SendRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_SendRequest proto.InternalMessageInfo + +func (m *SendRequest) GetClientId() string { + if m != nil { + return m.ClientId + } + return "" +} + +func (m *SendRequest) GetChatId() string { + if m != nil { + return m.ChatId + } + return "" +} + +func (m *SendRequest) GetUserId() string { + if m != nil { + return m.UserId + } + return "" +} + +func (m *SendRequest) GetSubject() string { + if m != nil { + return m.Subject + } + return "" +} + +func (m *SendRequest) GetText() string { + if m != nil { + return m.Text + } + return "" +} + +// SendResponse is a blank message returned when a message is successfully created +type SendResponse struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SendResponse) Reset() { *m = SendResponse{} } +func (m *SendResponse) String() string { return proto.CompactTextString(m) } +func (*SendResponse) ProtoMessage() {} +func (*SendResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_825b1469f80f958d, []int{5} +} + +func (m *SendResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_SendResponse.Unmarshal(m, b) +} +func (m *SendResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_SendResponse.Marshal(b, m, deterministic) +} +func (m *SendResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_SendResponse.Merge(m, src) +} +func (m *SendResponse) XXX_Size() int { + return xxx_messageInfo_SendResponse.Size(m) +} +func (m *SendResponse) XXX_DiscardUnknown() { + xxx_messageInfo_SendResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_SendResponse proto.InternalMessageInfo + // Message sent to a chat type Message struct { // id of the message, allocated by the server @@ -207,7 +316,7 @@ func (m *Message) Reset() { *m = Message{} } func (m *Message) String() string { return proto.CompactTextString(m) } func (*Message) ProtoMessage() {} func (*Message) Descriptor() ([]byte, []int) { - return fileDescriptor_825b1469f80f958d, []int{4} + return fileDescriptor_825b1469f80f958d, []int{6} } func (m *Message) XXX_Unmarshal(b []byte) error { @@ -282,33 +391,37 @@ func init() { proto.RegisterType((*NewResponse)(nil), "chat.NewResponse") proto.RegisterType((*HistoryRequest)(nil), "chat.HistoryRequest") proto.RegisterType((*HistoryResponse)(nil), "chat.HistoryResponse") + proto.RegisterType((*SendRequest)(nil), "chat.SendRequest") + proto.RegisterType((*SendResponse)(nil), "chat.SendResponse") proto.RegisterType((*Message)(nil), "chat.Message") } func init() { proto.RegisterFile("chat/proto/chat.proto", fileDescriptor_825b1469f80f958d) } var fileDescriptor_825b1469f80f958d = []byte{ - // 351 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x52, 0xcd, 0x4e, 0xf2, 0x40, - 0x14, 0xcd, 0xd0, 0x7e, 0x1d, 0xb8, 0xe4, 0x43, 0x9d, 0x48, 0x1c, 0x71, 0x43, 0xba, 0xd0, 0x22, - 0x09, 0x35, 0x98, 0xb8, 0xd1, 0x8d, 0xb2, 0x91, 0x85, 0x2c, 0xba, 0x74, 0x43, 0x4a, 0x3b, 0x81, - 0x31, 0xd2, 0xc1, 0xce, 0x54, 0xf4, 0x49, 0x7c, 0x0c, 0x5f, 0xd1, 0xcc, 0x4c, 0xf9, 0xa9, 0x89, - 0xbb, 0xf3, 0x33, 0xf7, 0xf6, 0xdc, 0x93, 0x42, 0x3b, 0x59, 0xc4, 0x2a, 0x5c, 0xe5, 0x42, 0x89, - 0x50, 0xc3, 0x81, 0x81, 0xc4, 0xd5, 0xd8, 0xbf, 0x00, 0x98, 0xb0, 0x75, 0xc4, 0xde, 0x0a, 0x26, - 0x15, 0x39, 0x85, 0x7a, 0x21, 0x59, 0x3e, 0xe5, 0xa9, 0xa4, 0xa8, 0xeb, 0x04, 0x8d, 0x08, 0x6b, - 0x3e, 0x4e, 0xa5, 0x7f, 0x0e, 0x4d, 0xf3, 0x50, 0xae, 0x44, 0x26, 0x19, 0x39, 0x01, 0xac, 0xe7, - 0xa7, 0x3c, 0xa5, 0xa8, 0x8b, 0x82, 0x46, 0xe4, 0x69, 0x3a, 0x4e, 0xfd, 0x1e, 0xb4, 0x1e, 0xb9, - 0x54, 0x22, 0xff, 0xdc, 0x2c, 0xfd, 0xf3, 0xe9, 0x1d, 0x1c, 0x6c, 0x9f, 0x96, 0x6b, 0x7b, 0x50, - 0x5f, 0x32, 0x29, 0xe3, 0x39, 0xb3, 0x01, 0x9a, 0xc3, 0xff, 0x03, 0x93, 0xf9, 0xc9, 0xaa, 0xd1, - 0xd6, 0xf6, 0xbf, 0x11, 0xe0, 0x52, 0x25, 0x2d, 0xa8, 0x6d, 0xb7, 0xd7, 0x78, 0x4a, 0xce, 0xa0, - 0x91, 0xbc, 0x72, 0x96, 0x99, 0x8f, 0xd6, 0x8c, 0x5c, 0xb7, 0xc2, 0x38, 0xdd, 0xcf, 0xe3, 0xec, - 0xe7, 0xd1, 0x46, 0x79, 0x3d, 0x75, 0xad, 0x61, 0x8f, 0xd7, 0x86, 0xd4, 0xcb, 0x62, 0x45, 0xff, - 0x75, 0x51, 0xe0, 0x44, 0x9e, 0xa6, 0xf7, 0x8a, 0x50, 0xc0, 0xb2, 0x98, 0xbd, 0xb0, 0x44, 0x51, - 0xcf, 0x4c, 0x6c, 0x28, 0x21, 0xe0, 0x2a, 0xf6, 0xa1, 0x28, 0x36, 0xb2, 0xc1, 0xc3, 0x2f, 0x04, - 0xee, 0x68, 0x11, 0x2b, 0x72, 0x09, 0xce, 0x84, 0xad, 0xc9, 0xa1, 0x3d, 0x6d, 0xd7, 0x7f, 0xe7, - 0x68, 0x4f, 0x29, 0x1b, 0xb9, 0x01, 0x5c, 0x96, 0x44, 0x8e, 0xad, 0x5b, 0xad, 0xb7, 0xd3, 0xfe, - 0xa5, 0x96, 0x73, 0x7d, 0xc0, 0x23, 0x91, 0x65, 0x3a, 0x4b, 0xb5, 0xc2, 0x4e, 0x95, 0x06, 0xe8, - 0x0a, 0x3d, 0xf4, 0x9f, 0x7b, 0x73, 0xae, 0x16, 0xc5, 0x6c, 0x90, 0x88, 0x65, 0xb8, 0xe4, 0x49, - 0x2e, 0x42, 0xc9, 0xf2, 0x77, 0x9e, 0x30, 0x19, 0xee, 0x7e, 0x9f, 0x5b, 0x0d, 0x67, 0x9e, 0xc1, - 0xd7, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x53, 0xab, 0xf2, 0x77, 0x58, 0x02, 0x00, 0x00, + // 370 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x92, 0xcf, 0x4e, 0x3a, 0x31, + 0x10, 0xc7, 0xd3, 0xdd, 0xfd, 0x6d, 0x61, 0xf8, 0x89, 0xda, 0x48, 0x58, 0xd7, 0x0b, 0xe9, 0x41, + 0x41, 0x23, 0x18, 0x4c, 0x3c, 0x79, 0x31, 0x5c, 0xe4, 0x20, 0x87, 0xf5, 0x01, 0xc8, 0x42, 0x1b, + 0x59, 0xa3, 0xbb, 0x48, 0x4b, 0xd0, 0x57, 0xf0, 0x65, 0x7c, 0x0b, 0x9f, 0xcb, 0xf4, 0x0f, 0xd0, + 0x35, 0x72, 0x9b, 0xf9, 0xce, 0x74, 0xfa, 0xf9, 0x76, 0x0a, 0x8d, 0xe9, 0x2c, 0x95, 0xbd, 0xf9, + 0xa2, 0x90, 0x45, 0x4f, 0x85, 0x5d, 0x1d, 0x92, 0x40, 0xc5, 0xf4, 0x0c, 0x60, 0xc4, 0x57, 0x09, + 0x7f, 0x5b, 0x72, 0x21, 0xc9, 0x31, 0x54, 0x96, 0x82, 0x2f, 0xc6, 0x19, 0x13, 0x11, 0x6a, 0xf9, + 0xed, 0x6a, 0x82, 0x55, 0x3e, 0x64, 0x82, 0x9e, 0x42, 0x4d, 0x37, 0x8a, 0x79, 0x91, 0x0b, 0x4e, + 0x9a, 0x80, 0xd5, 0xf9, 0x71, 0xc6, 0x22, 0xd4, 0x42, 0xed, 0x6a, 0x12, 0xaa, 0x74, 0xc8, 0x68, + 0x07, 0xea, 0xf7, 0x99, 0x90, 0xc5, 0xe2, 0x63, 0x3d, 0x74, 0x67, 0xeb, 0x2d, 0xec, 0x6f, 0x5a, + 0xed, 0xd8, 0x0e, 0x54, 0x5e, 0xb9, 0x10, 0xe9, 0x13, 0x37, 0x00, 0xb5, 0xfe, 0x5e, 0x57, 0x33, + 0x3f, 0x18, 0x35, 0xd9, 0x94, 0xe9, 0x27, 0x82, 0xda, 0x23, 0xcf, 0xd9, 0xfa, 0x9a, 0x13, 0xa8, + 0x4e, 0x5f, 0x32, 0x9e, 0x3b, 0x17, 0x55, 0x8c, 0x30, 0x64, 0x2e, 0x83, 0xe7, 0x32, 0xa8, 0x82, + 0x75, 0x1c, 0xf9, 0xa6, 0x60, 0x0c, 0x93, 0x08, 0xb0, 0x58, 0x4e, 0x9e, 0xf9, 0x54, 0x46, 0x81, + 0x2e, 0xac, 0x53, 0x42, 0x20, 0x90, 0xfc, 0x5d, 0x46, 0xff, 0xb4, 0xac, 0x63, 0x5a, 0x87, 0xff, + 0x86, 0xc5, 0xf8, 0xa0, 0x5f, 0x08, 0xb0, 0x45, 0x26, 0x75, 0xf0, 0x36, 0x44, 0x5e, 0xc6, 0xca, + 0xa0, 0xde, 0x6e, 0x50, 0x7f, 0x17, 0x68, 0x50, 0x02, 0x6d, 0x02, 0x16, 0x6a, 0x58, 0x6a, 0x88, + 0xfc, 0x24, 0x54, 0xe9, 0x9d, 0x74, 0x1d, 0x84, 0x7f, 0x3b, 0xc0, 0x5b, 0x07, 0xfd, 0x6f, 0x04, + 0xc1, 0x60, 0x96, 0x4a, 0x72, 0x0e, 0xfe, 0x88, 0xaf, 0xc8, 0x81, 0x79, 0xf7, 0xed, 0xe7, 0x88, + 0x0f, 0x1d, 0xc5, 0xae, 0xeb, 0x06, 0xb0, 0xdd, 0x20, 0x39, 0x32, 0xd5, 0xf2, 0xee, 0xe3, 0xc6, + 0x2f, 0xd5, 0x9e, 0xbb, 0x84, 0x40, 0x3d, 0x17, 0xb1, 0x23, 0x9d, 0x35, 0xc6, 0xc4, 0x95, 0x6c, + 0xfb, 0x05, 0xe0, 0x41, 0x91, 0xe7, 0x0a, 0xbd, 0xfc, 0x1d, 0xe2, 0x72, 0xda, 0x46, 0x57, 0x68, + 0x12, 0xea, 0xef, 0x7d, 0xfd, 0x13, 0x00, 0x00, 0xff, 0xff, 0x81, 0x53, 0x8a, 0xc9, 0xf7, 0x02, + 0x00, 0x00, } diff --git a/chat/proto/chat.pb.micro.go b/chat/proto/chat.pb.micro.go index 482e3e3..5e6ec91 100644 --- a/chat/proto/chat.pb.micro.go +++ b/chat/proto/chat.pb.micro.go @@ -48,6 +48,8 @@ type ChatService interface { New(ctx context.Context, in *NewRequest, opts ...client.CallOption) (*NewResponse, error) // History returns the historical messages in a chat History(ctx context.Context, in *HistoryRequest, opts ...client.CallOption) (*HistoryResponse, error) + // Send a single message to the chat + Send(ctx context.Context, in *SendRequest, opts ...client.CallOption) (*SendResponse, error) // Connect to a chat using a bidirectional stream enabling the client to send and recieve messages // over a single RPC. When a message is sent on the stream, it will be added to the chat history // and sent to the other connected users. When opening the connection, the client should provide @@ -87,6 +89,16 @@ func (c *chatService) History(ctx context.Context, in *HistoryRequest, opts ...c return out, nil } +func (c *chatService) Send(ctx context.Context, in *SendRequest, opts ...client.CallOption) (*SendResponse, error) { + req := c.c.NewRequest(c.name, "Chat.Send", in) + out := new(SendResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *chatService) Connect(ctx context.Context, opts ...client.CallOption) (Chat_ConnectService, error) { req := c.c.NewRequest(c.name, "Chat.Connect", &Message{}) stream, err := c.c.Stream(ctx, req, opts...) @@ -147,6 +159,8 @@ type ChatHandler interface { New(context.Context, *NewRequest, *NewResponse) error // History returns the historical messages in a chat History(context.Context, *HistoryRequest, *HistoryResponse) error + // Send a single message to the chat + Send(context.Context, *SendRequest, *SendResponse) error // Connect to a chat using a bidirectional stream enabling the client to send and recieve messages // over a single RPC. When a message is sent on the stream, it will be added to the chat history // and sent to the other connected users. When opening the connection, the client should provide @@ -158,6 +172,7 @@ func RegisterChatHandler(s server.Server, hdlr ChatHandler, opts ...server.Handl type chat interface { New(ctx context.Context, in *NewRequest, out *NewResponse) error History(ctx context.Context, in *HistoryRequest, out *HistoryResponse) error + Send(ctx context.Context, in *SendRequest, out *SendResponse) error Connect(ctx context.Context, stream server.Stream) error } type Chat struct { @@ -179,6 +194,10 @@ func (h *chatHandler) History(ctx context.Context, in *HistoryRequest, out *Hist return h.ChatHandler.History(ctx, in, out) } +func (h *chatHandler) Send(ctx context.Context, in *SendRequest, out *SendResponse) error { + return h.ChatHandler.Send(ctx, in, out) +} + func (h *chatHandler) Connect(ctx context.Context, stream server.Stream) error { return h.ChatHandler.Connect(ctx, &chatConnectStream{stream}) } diff --git a/chat/proto/chat.proto b/chat/proto/chat.proto index 981488a..991608d 100644 --- a/chat/proto/chat.proto +++ b/chat/proto/chat.proto @@ -10,6 +10,8 @@ service Chat { rpc New(NewRequest) returns (NewResponse); // History returns the historical messages in a chat rpc History(HistoryRequest) returns (HistoryResponse); + // Send a single message to the chat + rpc Send(SendRequest) returns (SendResponse); // Connect to a chat using a bidirectional stream enabling the client to send and recieve messages // over a single RPC. When a message is sent on the stream, it will be added to the chat history // and sent to the other connected users. When opening the connection, the client should provide @@ -39,6 +41,23 @@ message HistoryResponse { repeated Message messages = 1; } +// SendRequest contains a single message to send to a chat +message SendRequest { + // a client side id, should be validated by the server to make the request retry safe + string client_id = 1; + // id of the chat the message is being sent to / from + string chat_id = 2; + // id of the user who sent the message + string user_id = 3; + // subject of the message + string subject = 4; + // text of the message + string text = 5; +} + +// SendResponse is a blank message returned when a message is successfully created +message SendResponse {} + // Message sent to a chat message Message { // id of the message, allocated by the server