From 68f3e526560fe4cc0462b8a9abd9c6b1bdf316c3 Mon Sep 17 00:00:00 2001 From: Ben Toogood Date: Thu, 15 Oct 2020 12:28:27 +0100 Subject: [PATCH] Chat Handler --- chat/handler/handler.go | 244 ++++++++++++++++++++++++++++ chat/main.go | 25 +++ chat/proto/chat.pb.go | 314 ++++++++++++++++++++++++++++++++++++ chat/proto/chat.pb.micro.go | 225 ++++++++++++++++++++++++++ chat/proto/chat.proto | 2 +- go.mod | 5 +- go.sum | 3 + 7 files changed, 814 insertions(+), 4 deletions(-) create mode 100644 chat/handler/handler.go create mode 100644 chat/main.go create mode 100644 chat/proto/chat.pb.go create mode 100644 chat/proto/chat.pb.micro.go diff --git a/chat/handler/handler.go b/chat/handler/handler.go new file mode 100644 index 0000000..b6e2527 --- /dev/null +++ b/chat/handler/handler.go @@ -0,0 +1,244 @@ +package handler + +import ( + "context" + "sort" + "strings" + + "github.com/micro/micro/v3/service/context/metadata" + + // import the proto, it's standard to import the services own proto under the alias pb + "github.com/google/uuid" + "github.com/micro/micro/v3/service/errors" + "github.com/micro/micro/v3/service/events" + "github.com/micro/micro/v3/service/logger" + "github.com/micro/micro/v3/service/store" + pb "github.com/micro/services/chat/proto" +) + +// New returns an initialized chat handler +func New() pb.ChatHandler { + return new(handler) +} + +const ( + storeKeyPrefix = "chat/" + eventKeyPrefix = "chat/" +) + +// handler satisfies the ChatHandler interface. You can see this inteface defined in chat.pb.micro.go +type handler struct{} + +// New creates a chat for a group of users. The RPC is idempotent so if it's called multiple times +// for the same users, the same response will be returned. It's good practice to design APIs as +// idempotent since this enables safe retries. +func (h *handler) New(ctx context.Context, req *pb.NewRequest, rsp *pb.NewResponse) error { + // in a real world application we would authorize the request to ensure the authenticated user + // is part of the chat they're attempting to create. We could do this by getting the user id from + // auth.AccountFromContext(ctx) and then validating the presence of their id in req.UserIds. If + // the user is not part of the request then we'd return a Forbidden error, which the micro api + // would transform to a 403 status code. + + // validate the request + if len(req.UserIds) == 0 { + // Return a bad request error to the client, the first argument is a unique id which the client + // can check for. The second argument is a human readable description. Returning the correct type + // of error is important as it's used by the network to know if a request should be retried. Only + // 500 (InternalServerError) and 408 (Timeout) errors are retried. + return errors.BadRequest("chat.New.MissingUserIDs", "One or more user IDs are required") + } + + // construct a key to identify the chat, we'll do this by sorting the user ids alphabetically and + // then joining them. When a service calls the store, the data returned will be automatically scoped + // to the service however it's still advised to use a prefix when writing data since this allows + // other types of keys to be written in the future. We'll make a copy of the req.UserIds object as + // it's a good practice to not mutate the request object. + sortedIDs := make([]string, len(req.UserIds)) + copy(sortedIDs, req.UserIds) + sort.Strings(sortedIDs) + + // key to lookup the chat in the store using, e.g. "chat/usera-userb-userc" + key := storeKeyPrefix + strings.Join(sortedIDs, "-") + + // read from the store to check if a chat with these users already exists + recs, err := store.Read(key) + if err != nil { + // if an error wasn't returned, at least one record was found. The value returned by the store + // is the bytes representation of the chat id. We'll convert this back into a string and return + // it to the client. + rsp.ChatId = string(recs[0].Value) + return nil + } else if err != store.ErrNotFound { + // if no records were found then we'd expect to get a store.ErrNotFound error returned. If this + // wasn't the case, the service could've experienced an issue connecting to the store so we should + // log the error and return an InternalServerError to the client, indicating the request should + // be retried + logger.Errorf("Error reading from the store. Key: %v. Error: %v", key, err) + return errors.InternalServerError("chat.New.Unknown", "Error reading from the store") + } + + // no chat id was returned so we'll generate one, write it to the store and then return it to the + // client + chatID := uuid.New().String() + record := store.Record{Key: key, Value: []byte(chatID)} + if err := store.Write(&record); err != nil { + logger.Errorf("Error writing to the store. Key: %v. Error: %v", key, err) + return errors.InternalServerError("chat.New.Unknown", "Error writing to the store") + } + + // The chat was successfully created so we'll log the event and then return the id to the client. + // Note that we'll use logger.Infof here vs the Errorf above. + logger.Infof("New chat created with ID %v", chatID) + rsp.ChatId = chatID + return nil +} + +// History returns the historical messages in a chat +func (h *handler) History(ctx context.Context, req *pb.HistoryRequest, rsp *pb.HistoryResponse) error { + // as per the New function, in a real world application we would authorize the request to ensure + // the authenticated user is part of the chat they're attempting to read the history of + + // validate the request + if len(req.ChatId) == 0 { + return errors.BadRequest("chat.History.MissingChatID", "ChatID is missing") + } + + // lookup the chat from the store to ensure it's valid + if _, err := store.Read(storeKeyPrefix + req.ChatId); err == store.ErrNotFound { + return errors.BadRequest("chat.History.InvalidChatID", "Chat not found with this ID") + } else if err != nil { + logger.Errorf("Error reading from the store. Chat ID: %v. Error: %v", req.ChatId, err) + return errors.InternalServerError("chat.History.Unknown", "Error reading from the store") + } + + // lookup the historical messages for the chat using the event store. lots of packages in micro + // support options, in this case we'll pass the ReadLimit option to restrict the number of messages + // we'll load from the events store. + messages, err := events.Read(eventKeyPrefix+req.ChatId, events.ReadLimit(50)) + if err != nil { + logger.Errorf("Error reading from the event store. Chat ID: %v. Error: %v", req.ChatId, err) + return errors.InternalServerError("chat.History.Unknown", "Error reading from the event store") + } + + // we've loaded the messages from the event store. next we need to serialize them and return them + // to the client. The message is stored in the event payload, to retrieve it we need to unmarshal + // the event into a message struct. + rsp.Messages = make([]*pb.Message, len(messages)) + for i, ev := range messages { + var msg pb.Message + if err := ev.Unmarshal(&msg); err != nil { + logger.Errorf("Error unmarshaling event: %v", err) + return errors.InternalServerError("chat.History.Unknown", "Error unmarshaling event") + } + rsp.Messages[i] = &msg + } + + return nil +} + +// Connect to a chat using a bidirectional stream enabling the client to send and recieve messages +// over a single RPC. When a message is sent on the stream, it will be added to the chat history +// and sent to the other connected users. When opening the connection, the client should provide +// the chat_id and user_id in the context so the server knows which messages to stream. +func (h *handler) Connect(ctx context.Context, stream pb.Chat_ConnectStream) error { + // the client passed the chat id and user id in the request context. we'll load that information + // now and validate it. If any information is missing we'll return a BadRequest error to the client + userID, ok := metadata.Get(ctx, "UserID") + if !ok { + return errors.BadRequest("chat.Connect.MissingUserID", "UserID missing in context") + } + chatID, ok := metadata.Get(ctx, "ChatID") + if !ok { + return errors.BadRequest("chat.Connect.MissingChatID", "ChatId missing in context") + } + + // lookup the chat from the store to ensure it's valid + if _, err := store.Read(storeKeyPrefix + chatID); err == store.ErrNotFound { + return errors.BadRequest("chat.Connect.InvalidChatID", "Chat not found with this ID") + } else if err != nil { + logger.Errorf("Error reading from the store. Chat ID: %v. Error: %v", chatID, err) + return errors.InternalServerError("chat.Connect.Unknown", "Error reading from the store") + } + + // as per the New and Connect functions, at this point in a real world application we would + // authorize the request to ensure the authenticated user is part of the chat they're attempting + // to read the history of + + // create a new context which can be cancelled, in the case either the consumer of publisher errors + // we don't want one to keep running in the background + cancelCtx, cancel := context.WithCancel(ctx) + defer cancel() + + // create a channel to send errors on, because the subscriber / publisher will run in seperate go- + // routines, they need a way of returning errors to the client + errChan := make(chan error) + + // create an event stream to consume messages posted by other users into the chat. we'll use the + // user id as a queue to ensure each user recieves the message + evStream, err := events.Subscribe(eventKeyPrefix+chatID, events.WithQueue(userID)) + if err != nil { + defer cancel() + logger.Errorf("Error streaming events. Chat ID: %v. Error: %v", chatID, err) + return errors.InternalServerError("chat.Connect.Unknown", "Error connecting to the event stream") + } + go func() { + for { + select { + case <-cancelCtx.Done(): + // the context has been cancelled or timed out, stop subscribing to new messages + return + case ev := <-evStream: + // recieved a message, unmarshal it into a message struct. if an error occurs log it and + // cancel the context + var msg pb.Message + if err := ev.Unmarshal(&msg); err != nil { + logger.Errorf("Error unmarshaling message. ChatID: %v. Error: %v", chatID, err) + errChan <- err + return + } + + // ignore any messages published by the current user + if msg.UserId == userID { + continue + } + + // publish the message to the stream + if err := stream.Send(&msg); err != nil { + logger.Errorf("Error sending message to stream. ChatID: %v. Message ID: %v. Error: %v", chatID, msg.Id, err) + errChan <- err + return + } + } + } + }() + + // transform the stream.Recv into a channel which can be used in the select statement below + msgChan := make(chan *pb.Message) + go func() { + for { + msg, err := stream.Recv() + if err != nil { + errChan <- err + close(msgChan) + return + } + msgChan <- msg + } + }() + + for { + select { + case <-cancelCtx.Done(): + // the context has been cancelled or timed out, stop subscribing to new messages + return nil + case err := <-errChan: + // an error occured in another goroutine, terminate the stream + return err + case msg := <-msgChan: + // a message was recieved from the client, send it to the event stream + if err := events.Publish(eventKeyPrefix+chatID, msg); err != nil { + return err + } + } + } +} diff --git a/chat/main.go b/chat/main.go new file mode 100644 index 0000000..46d608b --- /dev/null +++ b/chat/main.go @@ -0,0 +1,25 @@ +package main + +import ( + "github.com/micro/micro/v3/service" + "github.com/micro/micro/v3/service/logger" + + "github.com/micro/services/chat/handler" + pb "github.com/micro/services/chat/proto" +) + +func main() { + // Create the service + srv := service.New( + service.Name("chat"), + service.Version("latest"), + ) + + // Register the handler against the server + pb.RegisterChatHandler(srv.Server(), handler.New()) + + // Run the service + if err := srv.Run(); err != nil { + logger.Fatal(err) + } +} diff --git a/chat/proto/chat.pb.go b/chat/proto/chat.pb.go new file mode 100644 index 0000000..e67ba30 --- /dev/null +++ b/chat/proto/chat.pb.go @@ -0,0 +1,314 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: proto/chat.proto + +package chat + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// NewRequest contains the infromation needed to create a new chat +type NewRequest struct { + UserIds []string `protobuf:"bytes,1,rep,name=user_ids,json=userIds,proto3" json:"user_ids,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *NewRequest) Reset() { *m = NewRequest{} } +func (m *NewRequest) String() string { return proto.CompactTextString(m) } +func (*NewRequest) ProtoMessage() {} +func (*NewRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_ed7e7dde45555b7d, []int{0} +} + +func (m *NewRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_NewRequest.Unmarshal(m, b) +} +func (m *NewRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_NewRequest.Marshal(b, m, deterministic) +} +func (m *NewRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_NewRequest.Merge(m, src) +} +func (m *NewRequest) XXX_Size() int { + return xxx_messageInfo_NewRequest.Size(m) +} +func (m *NewRequest) XXX_DiscardUnknown() { + xxx_messageInfo_NewRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_NewRequest proto.InternalMessageInfo + +func (m *NewRequest) GetUserIds() []string { + if m != nil { + return m.UserIds + } + return nil +} + +// NewResponse contains the chat id for the users +type NewResponse struct { + ChatId string `protobuf:"bytes,1,opt,name=chat_id,json=chatId,proto3" json:"chat_id,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *NewResponse) Reset() { *m = NewResponse{} } +func (m *NewResponse) String() string { return proto.CompactTextString(m) } +func (*NewResponse) ProtoMessage() {} +func (*NewResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_ed7e7dde45555b7d, []int{1} +} + +func (m *NewResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_NewResponse.Unmarshal(m, b) +} +func (m *NewResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_NewResponse.Marshal(b, m, deterministic) +} +func (m *NewResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_NewResponse.Merge(m, src) +} +func (m *NewResponse) XXX_Size() int { + return xxx_messageInfo_NewResponse.Size(m) +} +func (m *NewResponse) XXX_DiscardUnknown() { + xxx_messageInfo_NewResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_NewResponse proto.InternalMessageInfo + +func (m *NewResponse) GetChatId() string { + if m != nil { + return m.ChatId + } + return "" +} + +// HistoryRequest contains the id of the chat we want the history for. This RPC will return all +// historical messages, however in a real life application we'd introduce some form of pagination +// here, only loading the older messages when required. +type HistoryRequest struct { + ChatId string `protobuf:"bytes,1,opt,name=chat_id,json=chatId,proto3" json:"chat_id,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *HistoryRequest) Reset() { *m = HistoryRequest{} } +func (m *HistoryRequest) String() string { return proto.CompactTextString(m) } +func (*HistoryRequest) ProtoMessage() {} +func (*HistoryRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_ed7e7dde45555b7d, []int{2} +} + +func (m *HistoryRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_HistoryRequest.Unmarshal(m, b) +} +func (m *HistoryRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_HistoryRequest.Marshal(b, m, deterministic) +} +func (m *HistoryRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_HistoryRequest.Merge(m, src) +} +func (m *HistoryRequest) XXX_Size() int { + return xxx_messageInfo_HistoryRequest.Size(m) +} +func (m *HistoryRequest) XXX_DiscardUnknown() { + xxx_messageInfo_HistoryRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_HistoryRequest proto.InternalMessageInfo + +func (m *HistoryRequest) GetChatId() string { + if m != nil { + return m.ChatId + } + return "" +} + +// HistoryResponse contains the historical messages in a chat +type HistoryResponse struct { + Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *HistoryResponse) Reset() { *m = HistoryResponse{} } +func (m *HistoryResponse) String() string { return proto.CompactTextString(m) } +func (*HistoryResponse) ProtoMessage() {} +func (*HistoryResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_ed7e7dde45555b7d, []int{3} +} + +func (m *HistoryResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_HistoryResponse.Unmarshal(m, b) +} +func (m *HistoryResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_HistoryResponse.Marshal(b, m, deterministic) +} +func (m *HistoryResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_HistoryResponse.Merge(m, src) +} +func (m *HistoryResponse) XXX_Size() int { + return xxx_messageInfo_HistoryResponse.Size(m) +} +func (m *HistoryResponse) XXX_DiscardUnknown() { + xxx_messageInfo_HistoryResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_HistoryResponse proto.InternalMessageInfo + +func (m *HistoryResponse) GetMessages() []*Message { + if m != nil { + return m.Messages + } + return nil +} + +// Message sent to a chat +type Message struct { + // id of the message, allocated by the server + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + // a client side id, should be validated by the server to make the request retry safe + ClientId string `protobuf:"bytes,2,opt,name=client_id,json=clientId,proto3" json:"client_id,omitempty"` + // id of the chat the message is being sent to / from + ChatId string `protobuf:"bytes,3,opt,name=chat_id,json=chatId,proto3" json:"chat_id,omitempty"` + // id of the user who sent the message + UserId string `protobuf:"bytes,4,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` + // time time the message was sent in unix format + SentAt int32 `protobuf:"varint,5,opt,name=sent_at,json=sentAt,proto3" json:"sent_at,omitempty"` + // subject of the message + Subject string `protobuf:"bytes,6,opt,name=subject,proto3" json:"subject,omitempty"` + // text of the message + Text string `protobuf:"bytes,7,opt,name=text,proto3" json:"text,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} +func (*Message) Descriptor() ([]byte, []int) { + return fileDescriptor_ed7e7dde45555b7d, []int{4} +} + +func (m *Message) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Message.Unmarshal(m, b) +} +func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Message.Marshal(b, m, deterministic) +} +func (m *Message) XXX_Merge(src proto.Message) { + xxx_messageInfo_Message.Merge(m, src) +} +func (m *Message) XXX_Size() int { + return xxx_messageInfo_Message.Size(m) +} +func (m *Message) XXX_DiscardUnknown() { + xxx_messageInfo_Message.DiscardUnknown(m) +} + +var xxx_messageInfo_Message proto.InternalMessageInfo + +func (m *Message) GetId() string { + if m != nil { + return m.Id + } + return "" +} + +func (m *Message) GetClientId() string { + if m != nil { + return m.ClientId + } + return "" +} + +func (m *Message) GetChatId() string { + if m != nil { + return m.ChatId + } + return "" +} + +func (m *Message) GetUserId() string { + if m != nil { + return m.UserId + } + return "" +} + +func (m *Message) GetSentAt() int32 { + if m != nil { + return m.SentAt + } + return 0 +} + +func (m *Message) GetSubject() string { + if m != nil { + return m.Subject + } + return "" +} + +func (m *Message) GetText() string { + if m != nil { + return m.Text + } + return "" +} + +func init() { + proto.RegisterType((*NewRequest)(nil), "chat.NewRequest") + proto.RegisterType((*NewResponse)(nil), "chat.NewResponse") + proto.RegisterType((*HistoryRequest)(nil), "chat.HistoryRequest") + proto.RegisterType((*HistoryResponse)(nil), "chat.HistoryResponse") + proto.RegisterType((*Message)(nil), "chat.Message") +} + +func init() { proto.RegisterFile("proto/chat.proto", fileDescriptor_ed7e7dde45555b7d) } + +var fileDescriptor_ed7e7dde45555b7d = []byte{ + // 351 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x52, 0x41, 0x4f, 0xf2, 0x40, + 0x10, 0xcd, 0x42, 0x69, 0x61, 0xc8, 0xc7, 0x87, 0x1b, 0x8d, 0x2b, 0x5e, 0x48, 0x0f, 0x5a, 0x24, + 0xa1, 0x06, 0x13, 0x2f, 0x7a, 0x51, 0x2e, 0x72, 0x90, 0x43, 0x8f, 0x5e, 0x48, 0x69, 0x37, 0xb0, + 0x46, 0xba, 0xd8, 0xd9, 0x8a, 0xfe, 0x12, 0x7f, 0x86, 0x7f, 0xd1, 0xec, 0x6e, 0x41, 0x30, 0xf1, + 0x36, 0xef, 0xbd, 0x99, 0xd9, 0x37, 0x2f, 0x0b, 0xed, 0x55, 0x2e, 0x95, 0x0c, 0x93, 0x45, 0xac, + 0x06, 0xa6, 0xa4, 0x8e, 0xae, 0xfd, 0x73, 0x80, 0x09, 0x5f, 0x47, 0xfc, 0xb5, 0xe0, 0xa8, 0xe8, + 0x09, 0xd4, 0x0b, 0xe4, 0xf9, 0x54, 0xa4, 0xc8, 0x48, 0xb7, 0x1a, 0x34, 0x22, 0x4f, 0xe3, 0x71, + 0x8a, 0xfe, 0x19, 0x34, 0x4d, 0x23, 0xae, 0x64, 0x86, 0x9c, 0x1e, 0x83, 0xa7, 0xe7, 0xa7, 0x22, + 0x65, 0xa4, 0x4b, 0x82, 0x46, 0xe4, 0x6a, 0x38, 0x4e, 0xfd, 0x1e, 0xb4, 0x1e, 0x04, 0x2a, 0x99, + 0x7f, 0x6c, 0x96, 0xfe, 0xd9, 0x7a, 0x0b, 0xff, 0xb7, 0xad, 0xe5, 0xda, 0x1e, 0xd4, 0x97, 0x1c, + 0x31, 0x9e, 0x73, 0x6b, 0xa0, 0x39, 0xfc, 0x37, 0x30, 0x9e, 0x1f, 0x2d, 0x1b, 0x6d, 0x65, 0xff, + 0x8b, 0x80, 0x57, 0xb2, 0xb4, 0x05, 0x95, 0xed, 0xf6, 0x8a, 0x48, 0xe9, 0x29, 0x34, 0x92, 0x17, + 0xc1, 0x33, 0xf3, 0x68, 0xc5, 0xd0, 0x75, 0x4b, 0x8c, 0xd3, 0x5d, 0x3f, 0xd5, 0x5d, 0x3f, 0x5a, + 0x28, 0xaf, 0x67, 0x8e, 0x15, 0xec, 0xf1, 0x5a, 0x40, 0xbd, 0x2c, 0x56, 0xac, 0xd6, 0x25, 0x41, + 0x2d, 0x72, 0x35, 0xbc, 0x53, 0x94, 0x81, 0x87, 0xc5, 0xec, 0x99, 0x27, 0x8a, 0xb9, 0x66, 0x62, + 0x03, 0x29, 0x05, 0x47, 0xf1, 0x77, 0xc5, 0x3c, 0x43, 0x9b, 0x7a, 0xf8, 0x49, 0xc0, 0x19, 0x2d, + 0x62, 0x45, 0x2f, 0xa0, 0x3a, 0xe1, 0x6b, 0xda, 0xb6, 0xa7, 0xfd, 0xe4, 0xdf, 0x39, 0xd8, 0x61, + 0xca, 0x44, 0xae, 0xc1, 0x2b, 0x43, 0xa2, 0x87, 0x56, 0xdd, 0x8f, 0xb7, 0x73, 0xf4, 0x8b, 0x2d, + 0xe7, 0xfa, 0xe0, 0x8d, 0x64, 0x96, 0x69, 0x2f, 0xfb, 0x11, 0x76, 0xf6, 0x61, 0x40, 0x2e, 0xc9, + 0x7d, 0xff, 0xa9, 0x37, 0x17, 0x6a, 0x51, 0xcc, 0x06, 0x89, 0x5c, 0x86, 0x4b, 0x91, 0xe4, 0x32, + 0x44, 0x9e, 0xbf, 0x89, 0x84, 0xa3, 0xf9, 0x33, 0xa1, 0xf9, 0x33, 0x37, 0xba, 0x9c, 0xb9, 0xa6, + 0xbe, 0xfa, 0x0e, 0x00, 0x00, 0xff, 0xff, 0x71, 0x1b, 0x44, 0xc0, 0x53, 0x02, 0x00, 0x00, +} diff --git a/chat/proto/chat.pb.micro.go b/chat/proto/chat.pb.micro.go new file mode 100644 index 0000000..5b2fa6b --- /dev/null +++ b/chat/proto/chat.pb.micro.go @@ -0,0 +1,225 @@ +// Code generated by protoc-gen-micro. DO NOT EDIT. +// source: proto/chat.proto + +package chat + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +import ( + context "context" + api "github.com/micro/micro/v3/service/api" + client "github.com/micro/micro/v3/service/client" + server "github.com/micro/micro/v3/service/server" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// Reference imports to suppress errors if they are not otherwise used. +var _ api.Endpoint +var _ context.Context +var _ client.Option +var _ server.Option + +// Api Endpoints for Chat service + +func NewChatEndpoints() []*api.Endpoint { + return []*api.Endpoint{} +} + +// Client API for Chat service + +type ChatService interface { + // New creates a chat for a group of users. The RPC is idempotent so if it's called multiple times + // for the same users, the same response will be returned. It's good practice to design APIs as + // idempotent since this enables safe retries. + New(ctx context.Context, in *NewRequest, opts ...client.CallOption) (*NewResponse, error) + // History returns the historical messages in a chat + History(ctx context.Context, in *HistoryRequest, opts ...client.CallOption) (*HistoryResponse, error) + // Connect to a chat using a bidirectional stream enabling the client to send and recieve messages + // over a single RPC. When a message is sent on the stream, it will be added to the chat history + // and sent to the other connected users. When opening the connection, the client should provide + // the chat_id and user_id in the context so the server knows which messages to stream. + Connect(ctx context.Context, opts ...client.CallOption) (Chat_ConnectService, error) +} + +type chatService struct { + c client.Client + name string +} + +func NewChatService(name string, c client.Client) ChatService { + return &chatService{ + c: c, + name: name, + } +} + +func (c *chatService) New(ctx context.Context, in *NewRequest, opts ...client.CallOption) (*NewResponse, error) { + req := c.c.NewRequest(c.name, "Chat.New", in) + out := new(NewResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *chatService) History(ctx context.Context, in *HistoryRequest, opts ...client.CallOption) (*HistoryResponse, error) { + req := c.c.NewRequest(c.name, "Chat.History", in) + out := new(HistoryResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *chatService) Connect(ctx context.Context, opts ...client.CallOption) (Chat_ConnectService, error) { + req := c.c.NewRequest(c.name, "Chat.Connect", &Message{}) + stream, err := c.c.Stream(ctx, req, opts...) + if err != nil { + return nil, err + } + return &chatServiceConnect{stream}, nil +} + +type Chat_ConnectService interface { + Context() context.Context + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Send(*Message) error + Recv() (*Message, error) +} + +type chatServiceConnect struct { + stream client.Stream +} + +func (x *chatServiceConnect) Close() error { + return x.stream.Close() +} + +func (x *chatServiceConnect) Context() context.Context { + return x.stream.Context() +} + +func (x *chatServiceConnect) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *chatServiceConnect) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *chatServiceConnect) Send(m *Message) error { + return x.stream.Send(m) +} + +func (x *chatServiceConnect) Recv() (*Message, error) { + m := new(Message) + err := x.stream.Recv(m) + if err != nil { + return nil, err + } + return m, nil +} + +// Server API for Chat service + +type ChatHandler interface { + // New creates a chat for a group of users. The RPC is idempotent so if it's called multiple times + // for the same users, the same response will be returned. It's good practice to design APIs as + // idempotent since this enables safe retries. + New(context.Context, *NewRequest, *NewResponse) error + // History returns the historical messages in a chat + History(context.Context, *HistoryRequest, *HistoryResponse) error + // Connect to a chat using a bidirectional stream enabling the client to send and recieve messages + // over a single RPC. When a message is sent on the stream, it will be added to the chat history + // and sent to the other connected users. When opening the connection, the client should provide + // the chat_id and user_id in the context so the server knows which messages to stream. + Connect(context.Context, Chat_ConnectStream) error +} + +func RegisterChatHandler(s server.Server, hdlr ChatHandler, opts ...server.HandlerOption) error { + type chat interface { + New(ctx context.Context, in *NewRequest, out *NewResponse) error + History(ctx context.Context, in *HistoryRequest, out *HistoryResponse) error + Connect(ctx context.Context, stream server.Stream) error + } + type Chat struct { + chat + } + h := &chatHandler{hdlr} + return s.Handle(s.NewHandler(&Chat{h}, opts...)) +} + +type chatHandler struct { + ChatHandler +} + +func (h *chatHandler) New(ctx context.Context, in *NewRequest, out *NewResponse) error { + return h.ChatHandler.New(ctx, in, out) +} + +func (h *chatHandler) History(ctx context.Context, in *HistoryRequest, out *HistoryResponse) error { + return h.ChatHandler.History(ctx, in, out) +} + +func (h *chatHandler) Connect(ctx context.Context, stream server.Stream) error { + return h.ChatHandler.Connect(ctx, &chatConnectStream{stream}) +} + +type Chat_ConnectStream interface { + Context() context.Context + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Send(*Message) error + Recv() (*Message, error) +} + +type chatConnectStream struct { + stream server.Stream +} + +func (x *chatConnectStream) Close() error { + return x.stream.Close() +} + +func (x *chatConnectStream) Context() context.Context { + return x.stream.Context() +} + +func (x *chatConnectStream) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *chatConnectStream) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *chatConnectStream) Send(m *Message) error { + return x.stream.Send(m) +} + +func (x *chatConnectStream) Recv() (*Message, error) { + m := new(Message) + if err := x.stream.Recv(m); err != nil { + return nil, err + } + return m, nil +} diff --git a/chat/proto/chat.proto b/chat/proto/chat.proto index 8670b78..ec8208c 100644 --- a/chat/proto/chat.proto +++ b/chat/proto/chat.proto @@ -1,7 +1,7 @@ syntax = "proto3"; package chat; -option go_package = "github.com/micro/services/chat;chat"; +option go_package = "github.com/micro/services/chat/proto;chat"; service Chat { // New creates a chat for a group of users. The RPC is idempotent so if it's called multiple times diff --git a/go.mod b/go.mod index db81a78..75e32d3 100644 --- a/go.mod +++ b/go.mod @@ -17,14 +17,13 @@ require ( github.com/gosimple/slug v1.9.0 github.com/hashicorp/go-retryablehttp v0.6.7 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect - github.com/micro/go-micro v1.18.0 // indirect - github.com/micro/go-micro/v3 v3.0.0-beta.3 + github.com/micro/go-micro v1.18.0 github.com/micro/go-plugins/broker/nats/v3 v3.0.0-20200908121001-4ea6f6760baf // indirect github.com/micro/go-plugins/events/stream/nats/v3 v3.0.0-20200908121001-4ea6f6760baf // indirect github.com/micro/go-plugins/metrics/prometheus/v3 v3.0.0-20200908121001-4ea6f6760baf // indirect github.com/micro/go-plugins/registry/etcd/v3 v3.0.0-20200908121001-4ea6f6760baf // indirect github.com/micro/go-plugins/store/cockroach/v3 v3.0.0-20200908121001-4ea6f6760baf // indirect - github.com/micro/micro/v3 v3.0.0-beta.5.1 + github.com/micro/micro/v3 v3.0.0-beta.6.0.20201015084013-5adee1bbfb5e github.com/miekg/dns v1.1.31 // indirect github.com/ulikunitz/xz v0.5.8 // indirect github.com/xanzy/go-gitlab v0.38.1 // indirect diff --git a/go.sum b/go.sum index 8efcd88..dfe720c 100644 --- a/go.sum +++ b/go.sum @@ -433,6 +433,9 @@ github.com/micro/micro/v3 v3.0.0-beta.4.0.20200918115538-32e9a17127d7 h1:Q9OvH5o github.com/micro/micro/v3 v3.0.0-beta.4.0.20200918115538-32e9a17127d7/go.mod h1:NeLvbrc7IpTU0EpRsJnEQyoH6Db3N3cVjt12mn4crmc= github.com/micro/micro/v3 v3.0.0-beta.5.1 h1:4v17JbrrLUN76SUKDgZoAXl9+cr5fSDHWeSnLZlqr7M= github.com/micro/micro/v3 v3.0.0-beta.5.1/go.mod h1:l4rOO3QkRJ7wCYZOlKh6n9ANRXs8OxN9H/9z4naUcqo= +github.com/micro/micro/v3 v3.0.0-beta.6 h1:TtklfOdte5Piu80clLpfb+4yK9jovt1s1svobChu/o0= +github.com/micro/micro/v3 v3.0.0-beta.6.0.20201015084013-5adee1bbfb5e h1:VHBDhPnLcXqgCiYfjC8ip+r8UbV/9JekqZ6u8Kme4mc= +github.com/micro/micro/v3 v3.0.0-beta.6.0.20201015084013-5adee1bbfb5e/go.mod h1:PK1Fa+RtdIVinOLBPgJQJP5Ov8V0ExXx+ywA80f9TdQ= github.com/micro/protoc-gen-micro v1.0.0/go.mod h1:C8ij4DJhapBmypcT00AXdb0cZ675/3PqUO02buWWqbE= github.com/miekg/dns v1.1.3/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/miekg/dns v1.1.15/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=