Add chat client

This commit is contained in:
Ben Toogood
2020-10-15 15:05:17 +01:00
parent 68f3e52656
commit 9e1c0e990f
7 changed files with 209 additions and 277 deletions

View File

@@ -7,12 +7,13 @@ import (
"github.com/micro/micro/v3/service/context/metadata"
// import the proto, it's standard to import the services own proto under the alias pb
"github.com/google/uuid"
"github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/events"
"github.com/micro/micro/v3/service/logger"
"github.com/micro/micro/v3/service/store"
// it's standard to import the services own proto under the alias pb
pb "github.com/micro/services/chat/proto"
)
@@ -22,8 +23,9 @@ func New() pb.ChatHandler {
}
const (
storeKeyPrefix = "chat/"
eventKeyPrefix = "chat/"
chatStoreKeyPrefix = "chats/"
chatEventKeyPrefix = "chats/"
messageStoreKeyPrefix = "messages/"
)
// handler satisfies the ChatHandler interface. You can see this inteface defined in chat.pb.micro.go
@@ -58,11 +60,11 @@ func (h *handler) New(ctx context.Context, req *pb.NewRequest, rsp *pb.NewRespon
sort.Strings(sortedIDs)
// key to lookup the chat in the store using, e.g. "chat/usera-userb-userc"
key := storeKeyPrefix + strings.Join(sortedIDs, "-")
key := chatStoreKeyPrefix + strings.Join(sortedIDs, "-")
// read from the store to check if a chat with these users already exists
recs, err := store.Read(key)
if err != nil {
if err == nil {
// if an error wasn't returned, at least one record was found. The value returned by the store
// is the bytes representation of the chat id. We'll convert this back into a string and return
// it to the client.
@@ -80,9 +82,9 @@ func (h *handler) New(ctx context.Context, req *pb.NewRequest, rsp *pb.NewRespon
// no chat id was returned so we'll generate one, write it to the store and then return it to the
// client
chatID := uuid.New().String()
record := store.Record{Key: key, Value: []byte(chatID)}
record := store.Record{Key: chatStoreKeyPrefix + chatID, Value: []byte(chatID)}
if err := store.Write(&record); err != nil {
logger.Errorf("Error writing to the store. Key: %v. Error: %v", key, err)
logger.Errorf("Error writing to the store. Key: %v. Error: %v", record.Key, err)
return errors.InternalServerError("chat.New.Unknown", "Error writing to the store")
}
@@ -104,7 +106,7 @@ func (h *handler) History(ctx context.Context, req *pb.HistoryRequest, rsp *pb.H
}
// lookup the chat from the store to ensure it's valid
if _, err := store.Read(storeKeyPrefix + req.ChatId); err == store.ErrNotFound {
if _, err := store.Read(chatStoreKeyPrefix + req.ChatId); err == store.ErrNotFound {
return errors.BadRequest("chat.History.InvalidChatID", "Chat not found with this ID")
} else if err != nil {
logger.Errorf("Error reading from the store. Chat ID: %v. Error: %v", req.ChatId, err)
@@ -114,7 +116,7 @@ func (h *handler) History(ctx context.Context, req *pb.HistoryRequest, rsp *pb.H
// lookup the historical messages for the chat using the event store. lots of packages in micro
// support options, in this case we'll pass the ReadLimit option to restrict the number of messages
// we'll load from the events store.
messages, err := events.Read(eventKeyPrefix+req.ChatId, events.ReadLimit(50))
messages, err := events.Read(chatEventKeyPrefix+req.ChatId, events.ReadLimit(50))
if err != nil {
logger.Errorf("Error reading from the event store. Chat ID: %v. Error: %v", req.ChatId, err)
return errors.InternalServerError("chat.History.Unknown", "Error reading from the event store")
@@ -143,17 +145,17 @@ func (h *handler) History(ctx context.Context, req *pb.HistoryRequest, rsp *pb.H
func (h *handler) Connect(ctx context.Context, stream pb.Chat_ConnectStream) error {
// the client passed the chat id and user id in the request context. we'll load that information
// now and validate it. If any information is missing we'll return a BadRequest error to the client
userID, ok := metadata.Get(ctx, "UserID")
userID, ok := metadata.Get(ctx, "user-id")
if !ok {
return errors.BadRequest("chat.Connect.MissingUserID", "UserID missing in context")
}
chatID, ok := metadata.Get(ctx, "ChatID")
chatID, ok := metadata.Get(ctx, "chat-id")
if !ok {
return errors.BadRequest("chat.Connect.MissingChatID", "ChatId missing in context")
}
// lookup the chat from the store to ensure it's valid
if _, err := store.Read(storeKeyPrefix + chatID); err == store.ErrNotFound {
if _, err := store.Read(chatStoreKeyPrefix + chatID); err == store.ErrNotFound {
return errors.BadRequest("chat.Connect.InvalidChatID", "Chat not found with this ID")
} else if err != nil {
logger.Errorf("Error reading from the store. Chat ID: %v. Error: %v", chatID, err)
@@ -175,9 +177,8 @@ func (h *handler) Connect(ctx context.Context, stream pb.Chat_ConnectStream) err
// create an event stream to consume messages posted by other users into the chat. we'll use the
// user id as a queue to ensure each user recieves the message
evStream, err := events.Subscribe(eventKeyPrefix+chatID, events.WithQueue(userID))
evStream, err := events.Subscribe(chatEventKeyPrefix+chatID, events.WithQueue(userID))
if err != nil {
defer cancel()
logger.Errorf("Error streaming events. Chat ID: %v. Error: %v", chatID, err)
return errors.InternalServerError("chat.Connect.Unknown", "Error connecting to the event stream")
}
@@ -235,8 +236,26 @@ func (h *handler) Connect(ctx context.Context, stream pb.Chat_ConnectStream) err
// an error occured in another goroutine, terminate the stream
return err
case msg := <-msgChan:
// a message was recieved from the client, send it to the event stream
if err := events.Publish(eventKeyPrefix+chatID, msg); err != nil {
// a message was recieved from the client. validate it hasn't been recieved before
if _, err := store.Read(messageStoreKeyPrefix + msg.ClientId); err == nil {
// the message has already been processed
continue
} else if err != store.ErrNotFound {
// an unexpected error occured
return err
}
// set the defaults
msg.UserId = userID
msg.ChatId = chatID
// send the message to the event stream
if err := events.Publish(chatEventKeyPrefix+chatID, msg); err != nil {
return err
}
// record the messages client id
if err := store.Write(&store.Record{Key: messageStoreKeyPrefix + msg.ClientId}); err != nil {
return err
}
}