Chat add support for CLI

This commit is contained in:
Ben Toogood
2020-10-16 10:28:40 +01:00
parent f2fcf3d3fb
commit b3a93ce52a
5 changed files with 270 additions and 41 deletions

View File

@@ -3,3 +3,38 @@
The chat service is an example Micro service which leverages bidirectional streaming, the store and events to build a chat backend. There is both a server and client which can be run together to demonstrate the application (see client/main.go for more instructions on running the service).
The service is documented inline and is designed to act as a reference for the events package.
### Calling the service
You can call the service via the CLI:
Create a chat:
```bash
> micro chat new --user_ids=JohnBarry
{
"chat_id": "3c9ea66c-d516-45d4-abe8-082089e18b27"
}
```
Send a message to the chat:
```bash
> micro chat send --chat_id=bed4f0f0-da12-46d2-90d2-17ae1714a214 --user_id=John --subject=Hello --text='Hey Barry'
{}
```
View the chat history
```bash
> micro chat history --chat_id=bed4f0f0-da12-46d2-90d2-17ae1714a214
{
"messages": [
{
"id": "a61284a8-f471-4734-9192-640d89762e98",
"client_id": "6ba0d2a6-96fa-47d8-8f6f-7f75b4cc8b3e",
"chat_id": "bed4f0f0-da12-46d2-90d2-17ae1714a214",
"user_id": "John",
"subject": "Hello",
"text": "Hey Barry"
}
]
}
```

View File

@@ -138,6 +138,38 @@ func (h *handler) History(ctx context.Context, req *pb.HistoryRequest, rsp *pb.H
return nil
}
// Send a single message to the chat, designed for ease of use via the API / CLI
func (h *handler) Send(ctx context.Context, req *pb.SendRequest, rsp *pb.SendResponse) error {
// validate the request
if len(req.ChatId) == 0 {
return errors.BadRequest("chat.Send.MissingChatID", "ChatID is missing")
}
if len(req.UserId) == 0 {
return errors.BadRequest("chat.Send.MissingUserID", "UserID is missing")
}
if len(req.Text) == 0 {
return errors.BadRequest("chat.Send.MissingText", "Text is missing")
}
// construct the message
msg := &pb.Message{
Id: uuid.New().String(),
ClientId: req.ClientId,
ChatId: req.ChatId,
UserId: req.UserId,
Subject: req.Subject,
Text: req.Text,
}
// default the client id if not provided
if len(msg.ClientId) == 0 {
msg.ClientId = uuid.New().String()
}
// create the message
return h.createMessage(msg)
}
// Connect to a chat using a bidirectional stream enabling the client to send and recieve messages
// over a single RPC. When a message is sent on the stream, it will be added to the chat history
// and sent to the other connected users. When opening the connection, the client should provide
@@ -236,28 +268,39 @@ func (h *handler) Connect(ctx context.Context, stream pb.Chat_ConnectStream) err
// an error occured in another goroutine, terminate the stream
return err
case msg := <-msgChan:
// a message was recieved from the client. validate it hasn't been recieved before
if _, err := store.Read(messageStoreKeyPrefix + msg.ClientId); err == nil {
// the message has already been processed
continue
} else if err != store.ErrNotFound {
// an unexpected error occured
return err
}
// set the defaults
msg.UserId = userID
msg.ChatId = chatID
// send the message to the event stream
if err := events.Publish(chatEventKeyPrefix+chatID, msg); err != nil {
return err
}
// record the messages client id
if err := store.Write(&store.Record{Key: messageStoreKeyPrefix + msg.ClientId}); err != nil {
// create the message
if err := h.createMessage(msg); err != nil {
return err
}
}
}
}
// createMessage is a helper function which creates a message in the event stream. It handles the
// logic for ensuring client id is unique.
func (h *handler) createMessage(msg *pb.Message) error {
// a message was recieved from the client. validate it hasn't been recieved before
if _, err := store.Read(messageStoreKeyPrefix + msg.ClientId); err == nil {
// the message has already been processed
return nil
} else if err != store.ErrNotFound {
// an unexpected error occured
return err
}
// send the message to the event stream
if err := events.Publish(chatEventKeyPrefix+msg.ChatId, msg); err != nil {
return err
}
// record the messages client id
if err := store.Write(&store.Record{Key: messageStoreKeyPrefix + msg.ClientId}); err != nil {
return err
}
return nil
}

View File

@@ -182,6 +182,115 @@ func (m *HistoryResponse) GetMessages() []*Message {
return nil
}
// SendRequest contains a single message to send to a chat
type SendRequest struct {
// a client side id, should be validated by the server to make the request retry safe
ClientId string `protobuf:"bytes,1,opt,name=client_id,json=clientId,proto3" json:"client_id,omitempty"`
// id of the chat the message is being sent to / from
ChatId string `protobuf:"bytes,2,opt,name=chat_id,json=chatId,proto3" json:"chat_id,omitempty"`
// id of the user who sent the message
UserId string `protobuf:"bytes,3,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"`
// subject of the message
Subject string `protobuf:"bytes,4,opt,name=subject,proto3" json:"subject,omitempty"`
// text of the message
Text string `protobuf:"bytes,5,opt,name=text,proto3" json:"text,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *SendRequest) Reset() { *m = SendRequest{} }
func (m *SendRequest) String() string { return proto.CompactTextString(m) }
func (*SendRequest) ProtoMessage() {}
func (*SendRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_825b1469f80f958d, []int{4}
}
func (m *SendRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SendRequest.Unmarshal(m, b)
}
func (m *SendRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_SendRequest.Marshal(b, m, deterministic)
}
func (m *SendRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_SendRequest.Merge(m, src)
}
func (m *SendRequest) XXX_Size() int {
return xxx_messageInfo_SendRequest.Size(m)
}
func (m *SendRequest) XXX_DiscardUnknown() {
xxx_messageInfo_SendRequest.DiscardUnknown(m)
}
var xxx_messageInfo_SendRequest proto.InternalMessageInfo
func (m *SendRequest) GetClientId() string {
if m != nil {
return m.ClientId
}
return ""
}
func (m *SendRequest) GetChatId() string {
if m != nil {
return m.ChatId
}
return ""
}
func (m *SendRequest) GetUserId() string {
if m != nil {
return m.UserId
}
return ""
}
func (m *SendRequest) GetSubject() string {
if m != nil {
return m.Subject
}
return ""
}
func (m *SendRequest) GetText() string {
if m != nil {
return m.Text
}
return ""
}
// SendResponse is a blank message returned when a message is successfully created
type SendResponse struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *SendResponse) Reset() { *m = SendResponse{} }
func (m *SendResponse) String() string { return proto.CompactTextString(m) }
func (*SendResponse) ProtoMessage() {}
func (*SendResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_825b1469f80f958d, []int{5}
}
func (m *SendResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SendResponse.Unmarshal(m, b)
}
func (m *SendResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_SendResponse.Marshal(b, m, deterministic)
}
func (m *SendResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_SendResponse.Merge(m, src)
}
func (m *SendResponse) XXX_Size() int {
return xxx_messageInfo_SendResponse.Size(m)
}
func (m *SendResponse) XXX_DiscardUnknown() {
xxx_messageInfo_SendResponse.DiscardUnknown(m)
}
var xxx_messageInfo_SendResponse proto.InternalMessageInfo
// Message sent to a chat
type Message struct {
// id of the message, allocated by the server
@@ -207,7 +316,7 @@ func (m *Message) Reset() { *m = Message{} }
func (m *Message) String() string { return proto.CompactTextString(m) }
func (*Message) ProtoMessage() {}
func (*Message) Descriptor() ([]byte, []int) {
return fileDescriptor_825b1469f80f958d, []int{4}
return fileDescriptor_825b1469f80f958d, []int{6}
}
func (m *Message) XXX_Unmarshal(b []byte) error {
@@ -282,33 +391,37 @@ func init() {
proto.RegisterType((*NewResponse)(nil), "chat.NewResponse")
proto.RegisterType((*HistoryRequest)(nil), "chat.HistoryRequest")
proto.RegisterType((*HistoryResponse)(nil), "chat.HistoryResponse")
proto.RegisterType((*SendRequest)(nil), "chat.SendRequest")
proto.RegisterType((*SendResponse)(nil), "chat.SendResponse")
proto.RegisterType((*Message)(nil), "chat.Message")
}
func init() { proto.RegisterFile("chat/proto/chat.proto", fileDescriptor_825b1469f80f958d) }
var fileDescriptor_825b1469f80f958d = []byte{
// 351 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x52, 0xcd, 0x4e, 0xf2, 0x40,
0x14, 0xcd, 0xd0, 0x7e, 0x1d, 0xb8, 0xe4, 0x43, 0x9d, 0x48, 0x1c, 0x71, 0x43, 0xba, 0xd0, 0x22,
0x09, 0x35, 0x98, 0xb8, 0xd1, 0x8d, 0xb2, 0x91, 0x85, 0x2c, 0xba, 0x74, 0x43, 0x4a, 0x3b, 0x81,
0x31, 0xd2, 0xc1, 0xce, 0x54, 0xf4, 0x49, 0x7c, 0x0c, 0x5f, 0xd1, 0xcc, 0x4c, 0xf9, 0xa9, 0x89,
0xbb, 0xf3, 0x33, 0xf7, 0xf6, 0xdc, 0x93, 0x42, 0x3b, 0x59, 0xc4, 0x2a, 0x5c, 0xe5, 0x42, 0x89,
0x50, 0xc3, 0x81, 0x81, 0xc4, 0xd5, 0xd8, 0xbf, 0x00, 0x98, 0xb0, 0x75, 0xc4, 0xde, 0x0a, 0x26,
0x15, 0x39, 0x85, 0x7a, 0x21, 0x59, 0x3e, 0xe5, 0xa9, 0xa4, 0xa8, 0xeb, 0x04, 0x8d, 0x08, 0x6b,
0x3e, 0x4e, 0xa5, 0x7f, 0x0e, 0x4d, 0xf3, 0x50, 0xae, 0x44, 0x26, 0x19, 0x39, 0x01, 0xac, 0xe7,
0xa7, 0x3c, 0xa5, 0xa8, 0x8b, 0x82, 0x46, 0xe4, 0x69, 0x3a, 0x4e, 0xfd, 0x1e, 0xb4, 0x1e, 0xb9,
0x54, 0x22, 0xff, 0xdc, 0x2c, 0xfd, 0xf3, 0xe9, 0x1d, 0x1c, 0x6c, 0x9f, 0x96, 0x6b, 0x7b, 0x50,
0x5f, 0x32, 0x29, 0xe3, 0x39, 0xb3, 0x01, 0x9a, 0xc3, 0xff, 0x03, 0x93, 0xf9, 0xc9, 0xaa, 0xd1,
0xd6, 0xf6, 0xbf, 0x11, 0xe0, 0x52, 0x25, 0x2d, 0xa8, 0x6d, 0xb7, 0xd7, 0x78, 0x4a, 0xce, 0xa0,
0x91, 0xbc, 0x72, 0x96, 0x99, 0x8f, 0xd6, 0x8c, 0x5c, 0xb7, 0xc2, 0x38, 0xdd, 0xcf, 0xe3, 0xec,
0xe7, 0xd1, 0x46, 0x79, 0x3d, 0x75, 0xad, 0x61, 0x8f, 0xd7, 0x86, 0xd4, 0xcb, 0x62, 0x45, 0xff,
0x75, 0x51, 0xe0, 0x44, 0x9e, 0xa6, 0xf7, 0x8a, 0x50, 0xc0, 0xb2, 0x98, 0xbd, 0xb0, 0x44, 0x51,
0xcf, 0x4c, 0x6c, 0x28, 0x21, 0xe0, 0x2a, 0xf6, 0xa1, 0x28, 0x36, 0xb2, 0xc1, 0xc3, 0x2f, 0x04,
0xee, 0x68, 0x11, 0x2b, 0x72, 0x09, 0xce, 0x84, 0xad, 0xc9, 0xa1, 0x3d, 0x6d, 0xd7, 0x7f, 0xe7,
0x68, 0x4f, 0x29, 0x1b, 0xb9, 0x01, 0x5c, 0x96, 0x44, 0x8e, 0xad, 0x5b, 0xad, 0xb7, 0xd3, 0xfe,
0xa5, 0x96, 0x73, 0x7d, 0xc0, 0x23, 0x91, 0x65, 0x3a, 0x4b, 0xb5, 0xc2, 0x4e, 0x95, 0x06, 0xe8,
0x0a, 0x3d, 0xf4, 0x9f, 0x7b, 0x73, 0xae, 0x16, 0xc5, 0x6c, 0x90, 0x88, 0x65, 0xb8, 0xe4, 0x49,
0x2e, 0x42, 0xc9, 0xf2, 0x77, 0x9e, 0x30, 0x19, 0xee, 0x7e, 0x9f, 0x5b, 0x0d, 0x67, 0x9e, 0xc1,
0xd7, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x53, 0xab, 0xf2, 0x77, 0x58, 0x02, 0x00, 0x00,
// 370 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x92, 0xcf, 0x4e, 0x3a, 0x31,
0x10, 0xc7, 0xd3, 0xdd, 0xfd, 0x6d, 0x61, 0xf8, 0x89, 0xda, 0x48, 0x58, 0xd7, 0x0b, 0xe9, 0x41,
0x41, 0x23, 0x18, 0x4c, 0x3c, 0x79, 0x31, 0x5c, 0xe4, 0x20, 0x87, 0xf5, 0x01, 0xc8, 0x42, 0x1b,
0x59, 0xa3, 0xbb, 0x48, 0x4b, 0xd0, 0x57, 0xf0, 0x65, 0x7c, 0x0b, 0x9f, 0xcb, 0xf4, 0x0f, 0xd0,
0x35, 0x72, 0x9b, 0xf9, 0xce, 0x74, 0xfa, 0xf9, 0x76, 0x0a, 0x8d, 0xe9, 0x2c, 0x95, 0xbd, 0xf9,
0xa2, 0x90, 0x45, 0x4f, 0x85, 0x5d, 0x1d, 0x92, 0x40, 0xc5, 0xf4, 0x0c, 0x60, 0xc4, 0x57, 0x09,
0x7f, 0x5b, 0x72, 0x21, 0xc9, 0x31, 0x54, 0x96, 0x82, 0x2f, 0xc6, 0x19, 0x13, 0x11, 0x6a, 0xf9,
0xed, 0x6a, 0x82, 0x55, 0x3e, 0x64, 0x82, 0x9e, 0x42, 0x4d, 0x37, 0x8a, 0x79, 0x91, 0x0b, 0x4e,
0x9a, 0x80, 0xd5, 0xf9, 0x71, 0xc6, 0x22, 0xd4, 0x42, 0xed, 0x6a, 0x12, 0xaa, 0x74, 0xc8, 0x68,
0x07, 0xea, 0xf7, 0x99, 0x90, 0xc5, 0xe2, 0x63, 0x3d, 0x74, 0x67, 0xeb, 0x2d, 0xec, 0x6f, 0x5a,
0xed, 0xd8, 0x0e, 0x54, 0x5e, 0xb9, 0x10, 0xe9, 0x13, 0x37, 0x00, 0xb5, 0xfe, 0x5e, 0x57, 0x33,
0x3f, 0x18, 0x35, 0xd9, 0x94, 0xe9, 0x27, 0x82, 0xda, 0x23, 0xcf, 0xd9, 0xfa, 0x9a, 0x13, 0xa8,
0x4e, 0x5f, 0x32, 0x9e, 0x3b, 0x17, 0x55, 0x8c, 0x30, 0x64, 0x2e, 0x83, 0xe7, 0x32, 0xa8, 0x82,
0x75, 0x1c, 0xf9, 0xa6, 0x60, 0x0c, 0x93, 0x08, 0xb0, 0x58, 0x4e, 0x9e, 0xf9, 0x54, 0x46, 0x81,
0x2e, 0xac, 0x53, 0x42, 0x20, 0x90, 0xfc, 0x5d, 0x46, 0xff, 0xb4, 0xac, 0x63, 0x5a, 0x87, 0xff,
0x86, 0xc5, 0xf8, 0xa0, 0x5f, 0x08, 0xb0, 0x45, 0x26, 0x75, 0xf0, 0x36, 0x44, 0x5e, 0xc6, 0xca,
0xa0, 0xde, 0x6e, 0x50, 0x7f, 0x17, 0x68, 0x50, 0x02, 0x6d, 0x02, 0x16, 0x6a, 0x58, 0x6a, 0x88,
0xfc, 0x24, 0x54, 0xe9, 0x9d, 0x74, 0x1d, 0x84, 0x7f, 0x3b, 0xc0, 0x5b, 0x07, 0xfd, 0x6f, 0x04,
0xc1, 0x60, 0x96, 0x4a, 0x72, 0x0e, 0xfe, 0x88, 0xaf, 0xc8, 0x81, 0x79, 0xf7, 0xed, 0xe7, 0x88,
0x0f, 0x1d, 0xc5, 0xae, 0xeb, 0x06, 0xb0, 0xdd, 0x20, 0x39, 0x32, 0xd5, 0xf2, 0xee, 0xe3, 0xc6,
0x2f, 0xd5, 0x9e, 0xbb, 0x84, 0x40, 0x3d, 0x17, 0xb1, 0x23, 0x9d, 0x35, 0xc6, 0xc4, 0x95, 0x6c,
0xfb, 0x05, 0xe0, 0x41, 0x91, 0xe7, 0x0a, 0xbd, 0xfc, 0x1d, 0xe2, 0x72, 0xda, 0x46, 0x57, 0x68,
0x12, 0xea, 0xef, 0x7d, 0xfd, 0x13, 0x00, 0x00, 0xff, 0xff, 0x81, 0x53, 0x8a, 0xc9, 0xf7, 0x02,
0x00, 0x00,
}

View File

@@ -48,6 +48,8 @@ type ChatService interface {
New(ctx context.Context, in *NewRequest, opts ...client.CallOption) (*NewResponse, error)
// History returns the historical messages in a chat
History(ctx context.Context, in *HistoryRequest, opts ...client.CallOption) (*HistoryResponse, error)
// Send a single message to the chat
Send(ctx context.Context, in *SendRequest, opts ...client.CallOption) (*SendResponse, error)
// Connect to a chat using a bidirectional stream enabling the client to send and recieve messages
// over a single RPC. When a message is sent on the stream, it will be added to the chat history
// and sent to the other connected users. When opening the connection, the client should provide
@@ -87,6 +89,16 @@ func (c *chatService) History(ctx context.Context, in *HistoryRequest, opts ...c
return out, nil
}
func (c *chatService) Send(ctx context.Context, in *SendRequest, opts ...client.CallOption) (*SendResponse, error) {
req := c.c.NewRequest(c.name, "Chat.Send", in)
out := new(SendResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *chatService) Connect(ctx context.Context, opts ...client.CallOption) (Chat_ConnectService, error) {
req := c.c.NewRequest(c.name, "Chat.Connect", &Message{})
stream, err := c.c.Stream(ctx, req, opts...)
@@ -147,6 +159,8 @@ type ChatHandler interface {
New(context.Context, *NewRequest, *NewResponse) error
// History returns the historical messages in a chat
History(context.Context, *HistoryRequest, *HistoryResponse) error
// Send a single message to the chat
Send(context.Context, *SendRequest, *SendResponse) error
// Connect to a chat using a bidirectional stream enabling the client to send and recieve messages
// over a single RPC. When a message is sent on the stream, it will be added to the chat history
// and sent to the other connected users. When opening the connection, the client should provide
@@ -158,6 +172,7 @@ func RegisterChatHandler(s server.Server, hdlr ChatHandler, opts ...server.Handl
type chat interface {
New(ctx context.Context, in *NewRequest, out *NewResponse) error
History(ctx context.Context, in *HistoryRequest, out *HistoryResponse) error
Send(ctx context.Context, in *SendRequest, out *SendResponse) error
Connect(ctx context.Context, stream server.Stream) error
}
type Chat struct {
@@ -179,6 +194,10 @@ func (h *chatHandler) History(ctx context.Context, in *HistoryRequest, out *Hist
return h.ChatHandler.History(ctx, in, out)
}
func (h *chatHandler) Send(ctx context.Context, in *SendRequest, out *SendResponse) error {
return h.ChatHandler.Send(ctx, in, out)
}
func (h *chatHandler) Connect(ctx context.Context, stream server.Stream) error {
return h.ChatHandler.Connect(ctx, &chatConnectStream{stream})
}

View File

@@ -10,6 +10,8 @@ service Chat {
rpc New(NewRequest) returns (NewResponse);
// History returns the historical messages in a chat
rpc History(HistoryRequest) returns (HistoryResponse);
// Send a single message to the chat
rpc Send(SendRequest) returns (SendResponse);
// Connect to a chat using a bidirectional stream enabling the client to send and recieve messages
// over a single RPC. When a message is sent on the stream, it will be added to the chat history
// and sent to the other connected users. When opening the connection, the client should provide
@@ -39,6 +41,23 @@ message HistoryResponse {
repeated Message messages = 1;
}
// SendRequest contains a single message to send to a chat
message SendRequest {
// a client side id, should be validated by the server to make the request retry safe
string client_id = 1;
// id of the chat the message is being sent to / from
string chat_id = 2;
// id of the user who sent the message
string user_id = 3;
// subject of the message
string subject = 4;
// text of the message
string text = 5;
}
// SendResponse is a blank message returned when a message is successfully created
message SendResponse {}
// Message sent to a chat
message Message {
// id of the message, allocated by the server