* Streams Service (WIP)

* Complete Streams Service
This commit is contained in:
ben-toogood
2021-01-25 14:35:11 +00:00
committed by GitHub
parent 055517ec14
commit 47fe6b39ec
28 changed files with 2971 additions and 0 deletions

2
streams/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
streams

3
streams/Dockerfile Normal file
View File

@@ -0,0 +1,3 @@
FROM alpine
ADD streams /streams
ENTRYPOINT [ "/streams" ]

22
streams/Makefile Normal file
View File

@@ -0,0 +1,22 @@
GOPATH:=$(shell go env GOPATH)
.PHONY: init
init:
go get -u github.com/golang/protobuf/proto
go get -u github.com/golang/protobuf/protoc-gen-go
go get github.com/micro/micro/v3/cmd/protoc-gen-micro
.PHONY: proto
proto:
protoc --proto_path=. --micro_out=. --go_out=:. proto/streams.proto
.PHONY: build
build:
go build -o streams *.go
.PHONY: test
test:
go test -v ./... -cover
.PHONY: docker
docker:
docker build . -t streams:latest

23
streams/README.md Normal file
View File

@@ -0,0 +1,23 @@
# Streams Service
This is the Streams service
Generated with
```
micro new streams
```
## Usage
Generate the proto code
```
make proto
```
Run the service
```
micro run .
```

2
streams/generate.go Normal file
View File

@@ -0,0 +1,2 @@
package main
//go:generate make proto

View File

@@ -0,0 +1,37 @@
package handler
import (
"context"
"github.com/google/uuid"
"github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/logger"
pb "github.com/micro/services/streams/proto"
)
// Create a conversation
func (s *Streams) CreateConversation(ctx context.Context, req *pb.CreateConversationRequest, rsp *pb.CreateConversationResponse) error {
// validate the request
if len(req.GroupId) == 0 {
return ErrMissingGroupID
}
if len(req.Topic) == 0 {
return ErrMissingTopic
}
// write the conversation to the database
conv := &Conversation{
ID: uuid.New().String(),
Topic: req.Topic,
GroupID: req.GroupId,
CreatedAt: s.Time(),
}
if err := s.DB.Create(conv).Error; err != nil {
logger.Errorf("Error creating conversation: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
// serialize the response
rsp.Conversation = conv.Serialize()
return nil
}

View File

@@ -0,0 +1,60 @@
package handler_test
import (
"context"
"testing"
"github.com/micro/services/streams/handler"
pb "github.com/micro/services/streams/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
)
func TestCreateConversation(t *testing.T) {
tt := []struct {
Name string
GroupID string
Topic string
Error error
}{
{
Name: "MissingGroupID",
Topic: "HelloWorld",
Error: handler.ErrMissingGroupID,
},
{
Name: "MissingTopic",
GroupID: uuid.New().String(),
Error: handler.ErrMissingTopic,
},
{
Name: "Valid",
GroupID: uuid.New().String(),
Topic: "HelloWorld",
},
}
h := testHandler(t)
for _, tc := range tt {
t.Run(tc.Name, func(t *testing.T) {
var rsp pb.CreateConversationResponse
err := h.CreateConversation(context.TODO(), &pb.CreateConversationRequest{
Topic: tc.Topic, GroupId: tc.GroupID,
}, &rsp)
assert.Equal(t, tc.Error, err)
if tc.Error != nil {
assert.Nil(t, rsp.Conversation)
return
}
assertConversationsMatch(t, &pb.Conversation{
CreatedAt: timestamppb.New(h.Time()),
GroupId: tc.GroupID,
Topic: tc.Topic,
}, rsp.Conversation)
})
}
}

View File

@@ -0,0 +1,53 @@
package handler
import (
"context"
"github.com/google/uuid"
"github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/logger"
pb "github.com/micro/services/streams/proto"
"gorm.io/gorm"
)
// Create a message within a conversation
func (s *Streams) CreateMessage(ctx context.Context, req *pb.CreateMessageRequest, rsp *pb.CreateMessageResponse) error {
// validate the request
if len(req.AuthorId) == 0 {
return ErrMissingAuthorID
}
if len(req.ConversationId) == 0 {
return ErrMissingConversationID
}
if len(req.Text) == 0 {
return ErrMissingText
}
return s.DB.Transaction(func(tx *gorm.DB) error {
// lookup the conversation
var conv Conversation
if err := s.DB.Where(&Conversation{ID: req.ConversationId}).First(&conv).Error; err == gorm.ErrRecordNotFound {
return ErrNotFound
} else if err != nil {
logger.Errorf("Error reading conversation: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
// create the message
msg := &Message{
ID: uuid.New().String(),
SentAt: s.Time(),
Text: req.Text,
AuthorID: req.AuthorId,
ConversationID: req.ConversationId,
}
if err := s.DB.Create(msg).Error; err != nil {
logger.Errorf("Error creating message: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
// serialize the response
rsp.Message = msg.Serialize()
return nil
})
}

View File

@@ -0,0 +1,89 @@
package handler_test
import (
"context"
"testing"
"github.com/micro/services/streams/handler"
pb "github.com/micro/services/streams/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
)
func TestCreateMessage(t *testing.T) {
h := testHandler(t)
// seed some data
var cRsp pb.CreateConversationResponse
err := h.CreateConversation(context.TODO(), &pb.CreateConversationRequest{
Topic: "HelloWorld", GroupId: uuid.New().String(),
}, &cRsp)
if err != nil {
t.Fatalf("Error creating conversation: %v", err)
return
}
tt := []struct {
Name string
AuthorID string
ConversationID string
Text string
Error error
}{
{
Name: "MissingConversationID",
Text: "HelloWorld",
AuthorID: uuid.New().String(),
Error: handler.ErrMissingConversationID,
},
{
Name: "MissingAuthorID",
ConversationID: uuid.New().String(),
Text: "HelloWorld",
Error: handler.ErrMissingAuthorID,
},
{
Name: "MissingText",
ConversationID: uuid.New().String(),
AuthorID: uuid.New().String(),
Error: handler.ErrMissingText,
},
{
Name: "ConversationNotFound",
ConversationID: uuid.New().String(),
AuthorID: uuid.New().String(),
Text: "HelloWorld",
Error: handler.ErrNotFound,
},
{
Name: "Valid",
ConversationID: cRsp.Conversation.Id,
AuthorID: uuid.New().String(),
Text: "HelloWorld",
},
}
for _, tc := range tt {
t.Run(tc.Name, func(t *testing.T) {
var rsp pb.CreateMessageResponse
err := h.CreateMessage(context.TODO(), &pb.CreateMessageRequest{
Text: tc.Text, ConversationId: tc.ConversationID, AuthorId: tc.AuthorID,
}, &rsp)
assert.Equal(t, tc.Error, err)
if tc.Error != nil {
assert.Nil(t, rsp.Message)
return
}
assertMessagesMatch(t, &pb.Message{
AuthorId: tc.AuthorID,
ConversationId: tc.ConversationID,
SentAt: timestamppb.New(h.Time()),
Text: tc.Text,
}, rsp.Message)
})
}
}

View File

@@ -0,0 +1,34 @@
package handler
import (
"context"
"github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/logger"
pb "github.com/micro/services/streams/proto"
"gorm.io/gorm"
)
// Delete a conversation and all the messages within
func (s *Streams) DeleteConversation(ctx context.Context, req *pb.DeleteConversationRequest, rsp *pb.DeleteConversationResponse) error {
// validate the request
if len(req.Id) == 0 {
return ErrMissingID
}
return s.DB.Transaction(func(tx *gorm.DB) error {
// delete all the messages
if err := tx.Where(&Message{ConversationID: req.Id}).Delete(&Message{}).Error; err != nil {
logger.Errorf("Error deleting messages: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
// delete the conversation
if err := tx.Where(&Conversation{ID: req.Id}).Delete(&Conversation{}).Error; err != nil {
logger.Errorf("Error deleting conversation: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
return nil
})
}

View File

@@ -0,0 +1,50 @@
package handler_test
import (
"context"
"testing"
"github.com/micro/services/streams/handler"
pb "github.com/micro/services/streams/proto"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
)
func TestDeleteConversation(t *testing.T) {
h := testHandler(t)
// seed some data
var cRsp pb.CreateConversationResponse
err := h.CreateConversation(context.TODO(), &pb.CreateConversationRequest{
Topic: "HelloWorld", GroupId: uuid.New().String(),
}, &cRsp)
if err != nil {
t.Fatalf("Error creating conversation: %v", err)
return
}
t.Run("MissingID", func(t *testing.T) {
err := h.DeleteConversation(context.TODO(), &pb.DeleteConversationRequest{}, &pb.DeleteConversationResponse{})
assert.Equal(t, handler.ErrMissingID, err)
})
t.Run("Valid", func(t *testing.T) {
err := h.DeleteConversation(context.TODO(), &pb.DeleteConversationRequest{
Id: cRsp.Conversation.Id,
}, &pb.DeleteConversationResponse{})
assert.NoError(t, err)
err = h.ReadConversation(context.TODO(), &pb.ReadConversationRequest{
Id: cRsp.Conversation.Id,
}, &pb.ReadConversationResponse{})
assert.Equal(t, handler.ErrNotFound, err)
})
t.Run("Retry", func(t *testing.T) {
err := h.DeleteConversation(context.TODO(), &pb.DeleteConversationRequest{
Id: cRsp.Conversation.Id,
}, &pb.DeleteConversationResponse{})
assert.NoError(t, err)
})
}

View File

@@ -0,0 +1,31 @@
package handler
import (
"context"
"github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/logger"
pb "github.com/micro/services/streams/proto"
)
// List all the conversations for a group
func (s *Streams) ListConversations(ctx context.Context, req *pb.ListConversationsRequest, rsp *pb.ListConversationsResponse) error {
// validate the request
if len(req.GroupId) == 0 {
return ErrMissingGroupID
}
// query the database
var convs []Conversation
if err := s.DB.Where(&Conversation{GroupID: req.GroupId}).Find(&convs).Error; err != nil {
logger.Errorf("Error reading conversation: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
// serialize the response
rsp.Conversations = make([]*pb.Conversation, len(convs))
for i, c := range convs {
rsp.Conversations[i] = c.Serialize()
}
return nil
}

View File

@@ -0,0 +1,55 @@
package handler_test
import (
"context"
"testing"
"github.com/google/uuid"
"github.com/micro/services/streams/handler"
pb "github.com/micro/services/streams/proto"
"github.com/stretchr/testify/assert"
)
func TestListConversations(t *testing.T) {
h := testHandler(t)
// seed some data
var cRsp1 pb.CreateConversationResponse
err := h.CreateConversation(context.TODO(), &pb.CreateConversationRequest{
Topic: "HelloWorld", GroupId: uuid.New().String(),
}, &cRsp1)
if err != nil {
t.Fatalf("Error creating conversation: %v", err)
return
}
var cRsp2 pb.CreateConversationResponse
err = h.CreateConversation(context.TODO(), &pb.CreateConversationRequest{
Topic: "FooBar", GroupId: uuid.New().String(),
}, &cRsp2)
if err != nil {
t.Fatalf("Error creating conversation: %v", err)
return
}
t.Run("MissingGroupID", func(t *testing.T) {
var rsp pb.ListConversationsResponse
err := h.ListConversations(context.TODO(), &pb.ListConversationsRequest{}, &rsp)
assert.Equal(t, handler.ErrMissingGroupID, err)
assert.Nil(t, rsp.Conversations)
})
t.Run("Valid", func(t *testing.T) {
var rsp pb.ListConversationsResponse
err := h.ListConversations(context.TODO(), &pb.ListConversationsRequest{
GroupId: cRsp1.Conversation.GroupId,
}, &rsp)
assert.NoError(t, err)
if len(rsp.Conversations) != 1 {
t.Fatalf("Expected 1 conversation to be returned, got %v", len(rsp.Conversations))
return
}
assertConversationsMatch(t, cRsp1.Conversation, rsp.Conversations[0])
})
}

View File

@@ -0,0 +1,45 @@
package handler
import (
"context"
"github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/logger"
pb "github.com/micro/services/streams/proto"
)
const DefaultLimit = 25
// List the messages within a conversation in reverse chronological order, using sent_before to
// offset as older messages need to be loaded
func (s *Streams) ListMessages(ctx context.Context, req *pb.ListMessagesRequest, rsp *pb.ListMessagesResponse) error {
// validate the request
if len(req.ConversationId) == 0 {
return ErrMissingConversationID
}
// construct the query
q := s.DB.Where(&Message{ConversationID: req.ConversationId}).Order("sent_at DESC")
if req.SentBefore != nil {
q = q.Where("sent_at < ?", req.SentBefore.AsTime())
}
if req.Limit != nil {
q.Limit(int(req.Limit.Value))
} else {
q.Limit(DefaultLimit)
}
// execute the query
var msgs []Message
if err := q.Find(&msgs).Error; err != nil {
logger.Errorf("Error reading messages: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
// serialize the response
rsp.Messages = make([]*pb.Message, len(msgs))
for i, m := range msgs {
rsp.Messages[i] = m.Serialize()
}
return nil
}

View File

@@ -0,0 +1,116 @@
package handler_test
import (
"context"
"sort"
"strconv"
"testing"
"time"
"github.com/google/uuid"
"github.com/micro/services/streams/handler"
pb "github.com/micro/services/streams/proto"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/wrapperspb"
)
func TestListMessages(t *testing.T) {
h := testHandler(t)
h.Time = time.Now
// seed some data
var convRsp pb.CreateConversationResponse
err := h.CreateConversation(context.TODO(), &pb.CreateConversationRequest{
Topic: "TestListMessages", GroupId: uuid.New().String(),
}, &convRsp)
assert.NoError(t, err)
if err != nil {
return
}
msgs := make([]*pb.Message, 50)
for i := 0; i < len(msgs); i++ {
var rsp pb.CreateMessageResponse
err := h.CreateMessage(context.TODO(), &pb.CreateMessageRequest{
ConversationId: convRsp.Conversation.Id,
AuthorId: uuid.New().String(),
Text: strconv.Itoa(i),
}, &rsp)
assert.NoError(t, err)
msgs[i] = rsp.Message
}
t.Run("MissingConversationID", func(t *testing.T) {
var rsp pb.ListMessagesResponse
err := h.ListMessages(context.TODO(), &pb.ListMessagesRequest{}, &rsp)
assert.Equal(t, handler.ErrMissingConversationID, err)
assert.Nil(t, rsp.Messages)
})
t.Run("NoOffset", func(t *testing.T) {
var rsp pb.ListMessagesResponse
err := h.ListMessages(context.TODO(), &pb.ListMessagesRequest{
ConversationId: convRsp.Conversation.Id,
}, &rsp)
assert.NoError(t, err)
if len(rsp.Messages) != handler.DefaultLimit {
t.Fatalf("Expected %v messages but got %v", handler.DefaultLimit, len(rsp.Messages))
return
}
expected := msgs[25:]
sortMessages(rsp.Messages)
for i, msg := range rsp.Messages {
assertMessagesMatch(t, expected[i], msg)
}
})
t.Run("LimitSet", func(t *testing.T) {
var rsp pb.ListMessagesResponse
err := h.ListMessages(context.TODO(), &pb.ListMessagesRequest{
ConversationId: convRsp.Conversation.Id,
Limit: &wrapperspb.Int32Value{Value: 10},
}, &rsp)
assert.NoError(t, err)
if len(rsp.Messages) != 10 {
t.Fatalf("Expected %v messages but got %v", 10, len(rsp.Messages))
return
}
expected := msgs[40:]
sortMessages(rsp.Messages)
for i, msg := range rsp.Messages {
assertMessagesMatch(t, expected[i], msg)
}
})
t.Run("OffsetAndLimit", func(t *testing.T) {
var rsp pb.ListMessagesResponse
err := h.ListMessages(context.TODO(), &pb.ListMessagesRequest{
ConversationId: convRsp.Conversation.Id,
Limit: &wrapperspb.Int32Value{Value: 5},
SentBefore: msgs[20].SentAt,
}, &rsp)
assert.NoError(t, err)
if len(rsp.Messages) != 5 {
t.Fatalf("Expected %v messages but got %v", 5, len(rsp.Messages))
return
}
expected := msgs[15:20]
sortMessages(rsp.Messages)
for i, msg := range rsp.Messages {
assertMessagesMatch(t, expected[i], msg)
}
})
}
// sortMessages by the time they were sent
func sortMessages(msgs []*pb.Message) {
sort.Slice(msgs, func(i, j int) bool {
if msgs[i].SentAt == nil || msgs[j].SentAt == nil {
return true
}
return msgs[i].SentAt.AsTime().Before(msgs[j].SentAt.AsTime())
})
}

View File

@@ -0,0 +1,38 @@
package handler
import (
"context"
"gorm.io/gorm"
"github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/logger"
pb "github.com/micro/services/streams/proto"
)
// Read a conversation using its ID, can filter using group ID if provided
func (s *Streams) ReadConversation(ctx context.Context, req *pb.ReadConversationRequest, rsp *pb.ReadConversationResponse) error {
// validate the request
if len(req.Id) == 0 {
return ErrMissingID
}
// construct the query
q := Conversation{ID: req.Id}
if req.GroupId != nil {
q.GroupID = req.GroupId.Value
}
// execute the query
var conv Conversation
if err := s.DB.Where(&q).First(&conv).Error; err == gorm.ErrRecordNotFound {
return ErrNotFound
} else if err != nil {
logger.Errorf("Error reading conversation: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
// serialize the response
rsp.Conversation = conv.Serialize()
return nil
}

View File

@@ -0,0 +1,70 @@
package handler_test
import (
"context"
"testing"
"github.com/google/uuid"
"github.com/micro/services/streams/handler"
pb "github.com/micro/services/streams/proto"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/wrapperspb"
)
func TestReadConversation(t *testing.T) {
h := testHandler(t)
// seed some data
var cRsp pb.CreateConversationResponse
err := h.CreateConversation(context.TODO(), &pb.CreateConversationRequest{
Topic: "HelloWorld", GroupId: uuid.New().String(),
}, &cRsp)
if err != nil {
t.Fatalf("Error creating conversation: %v", err)
return
}
tt := []struct {
Name string
ID string
GroupID *wrapperspb.StringValue
Error error
Result *pb.Conversation
}{
{
Name: "MissingID",
Error: handler.ErrMissingID,
},
{
Name: "IncorrectID",
ID: uuid.New().String(),
Error: handler.ErrNotFound,
},
{
Name: "FoundUsingIDOnly",
ID: cRsp.Conversation.Id,
Result: cRsp.Conversation,
},
{
Name: "IncorrectGroupID",
ID: cRsp.Conversation.Id,
Error: handler.ErrNotFound,
GroupID: &wrapperspb.StringValue{Value: uuid.New().String()},
},
}
for _, tc := range tt {
t.Run(tc.Name, func(t *testing.T) {
var rsp pb.ReadConversationResponse
err := h.ReadConversation(context.TODO(), &pb.ReadConversationRequest{
Id: tc.ID, GroupId: tc.GroupID,
}, &rsp)
assert.Equal(t, tc.Error, err)
if tc.Result == nil {
assert.Nil(t, rsp.Conversation)
} else {
assertConversationsMatch(t, tc.Result, rsp.Conversation)
}
})
}
}

View File

@@ -0,0 +1,49 @@
package handler
import (
"context"
"github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/logger"
pb "github.com/micro/services/streams/proto"
"gorm.io/gorm"
)
// RecentMessages returns the most recent messages in a group of conversations. By default the
// most messages retrieved per conversation is 25, however this can be overriden using the
// limit_per_conversation option
func (s *Streams) RecentMessages(ctx context.Context, req *pb.RecentMessagesRequest, rsp *pb.RecentMessagesResponse) error {
// validate the request
if len(req.ConversationIds) == 0 {
return ErrMissingConversationIDs
}
limit := DefaultLimit
if req.LimitPerConversation != nil {
limit = int(req.LimitPerConversation.Value)
}
// query the database
var msgs []Message
err := s.DB.Transaction(func(tx *gorm.DB) error {
for _, id := range req.ConversationIds {
var cms []Message
if err := tx.Where(&Message{ConversationID: id}).Order("sent_at DESC").Limit(limit).Find(&cms).Error; err != nil {
return err
}
msgs = append(msgs, cms...)
}
return nil
})
if err != nil {
logger.Errorf("Error reading messages: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
// serialize the response
rsp.Messages = make([]*pb.Message, len(msgs))
for i, m := range msgs {
rsp.Messages[i] = m.Serialize()
}
return nil
}

View File

@@ -0,0 +1,101 @@
package handler_test
import (
"context"
"fmt"
"testing"
"time"
"github.com/google/uuid"
"github.com/micro/services/streams/handler"
pb "github.com/micro/services/streams/proto"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/wrapperspb"
)
func TestRecentMessages(t *testing.T) {
h := testHandler(t)
h.Time = time.Now
// seed some data
ids := make([]string, 3)
convos := make(map[string][]*pb.Message, 3)
for i := 0; i < 3; i++ {
var convRsp pb.CreateConversationResponse
err := h.CreateConversation(context.TODO(), &pb.CreateConversationRequest{
Topic: "TestRecentMessages", GroupId: uuid.New().String(),
}, &convRsp)
assert.NoError(t, err)
if err != nil {
return
}
convos[convRsp.Conversation.Id] = make([]*pb.Message, 50)
ids[i] = convRsp.Conversation.Id
for j := 0; j < 50; j++ {
var rsp pb.CreateMessageResponse
err := h.CreateMessage(context.TODO(), &pb.CreateMessageRequest{
ConversationId: convRsp.Conversation.Id,
AuthorId: uuid.New().String(),
Text: fmt.Sprintf("Conversation %v, Message %v", i, j),
}, &rsp)
assert.NoError(t, err)
convos[convRsp.Conversation.Id][j] = rsp.Message
}
}
t.Run("MissingConversationIDs", func(t *testing.T) {
var rsp pb.RecentMessagesResponse
err := h.RecentMessages(context.TODO(), &pb.RecentMessagesRequest{}, &rsp)
assert.Equal(t, handler.ErrMissingConversationIDs, err)
assert.Nil(t, rsp.Messages)
})
t.Run("LimitSet", func(t *testing.T) {
var rsp pb.RecentMessagesResponse
err := h.RecentMessages(context.TODO(), &pb.RecentMessagesRequest{
ConversationIds: ids,
LimitPerConversation: &wrapperspb.Int32Value{Value: 10},
}, &rsp)
assert.NoError(t, err)
if len(rsp.Messages) != 30 {
t.Fatalf("Expected %v messages but got %v", 30, len(rsp.Messages))
return
}
var expected []*pb.Message
for _, msgs := range convos {
expected = append(expected, msgs[40:]...)
}
sortMessages(expected)
sortMessages(rsp.Messages)
for i, msg := range rsp.Messages {
assertMessagesMatch(t, expected[i], msg)
}
})
t.Run("NoLimitSet", func(t *testing.T) {
reducedIDs := ids[:2]
var rsp pb.RecentMessagesResponse
err := h.RecentMessages(context.TODO(), &pb.RecentMessagesRequest{
ConversationIds: reducedIDs,
}, &rsp)
assert.NoError(t, err)
if len(rsp.Messages) != 50 {
t.Fatalf("Expected %v messages but got %v", 50, len(rsp.Messages))
return
}
var expected []*pb.Message
for _, id := range reducedIDs {
expected = append(expected, convos[id][25:]...)
}
sortMessages(expected)
sortMessages(rsp.Messages)
for i, msg := range rsp.Messages {
assertMessagesMatch(t, expected[i], msg)
}
})
}

View File

@@ -0,0 +1,60 @@
package handler
import (
"time"
"github.com/micro/micro/v3/service/errors"
pb "github.com/micro/services/streams/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"gorm.io/gorm"
)
var (
ErrMissingID = errors.BadRequest("MISSING_ID", "Missing ID")
ErrMissingGroupID = errors.BadRequest("MISSING_GROUP_ID", "Missing GroupID")
ErrMissingTopic = errors.BadRequest("MISSING_TOPIC", "Missing Topic")
ErrMissingAuthorID = errors.BadRequest("MISSING_AUTHOR_ID", "Missing Author ID")
ErrMissingText = errors.BadRequest("MISSING_TEXT", "Missing text")
ErrMissingConversationID = errors.BadRequest("MISSING_CONVERSATION_ID", "Missing Conversation ID")
ErrMissingConversationIDs = errors.BadRequest("MISSING_CONVERSATION_IDS", "One or more Conversation IDs are required")
ErrNotFound = errors.NotFound("NOT_FOUND", "Conversation not found")
)
type Streams struct {
DB *gorm.DB
Time func() time.Time
}
type Message struct {
ID string
AuthorID string
ConversationID string
Text string
SentAt time.Time
}
func (m *Message) Serialize() *pb.Message {
return &pb.Message{
Id: m.ID,
AuthorId: m.AuthorID,
ConversationId: m.ConversationID,
Text: m.Text,
SentAt: timestamppb.New(m.SentAt),
}
}
type Conversation struct {
ID string
GroupID string
Topic string
CreatedAt time.Time
}
func (c *Conversation) Serialize() *pb.Conversation {
return &pb.Conversation{
Id: c.ID,
GroupId: c.GroupID,
Topic: c.Topic,
CreatedAt: timestamppb.New(c.CreatedAt),
}
}

View File

@@ -0,0 +1,84 @@
package handler_test
import (
"testing"
"time"
"github.com/micro/services/streams/handler"
pb "github.com/micro/services/streams/proto"
"github.com/stretchr/testify/assert"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
func testHandler(t *testing.T) *handler.Streams {
// connect to the database
db, err := gorm.Open(postgres.Open("postgresql://postgres@localhost:5432/streams?sslmode=disable"), &gorm.Config{})
if err != nil {
t.Fatalf("Error connecting to database: %v", err)
}
// migrate the database
if err := db.AutoMigrate(&handler.Conversation{}, &handler.Message{}); err != nil {
t.Fatalf("Error migrating database: %v", err)
}
// clean any data from a previous run
if err := db.Exec("TRUNCATE TABLE conversations, messages CASCADE").Error; err != nil {
t.Fatalf("Error cleaning database: %v", err)
}
return &handler.Streams{DB: db, Time: func() time.Time { return time.Unix(1611327673, 0) }}
}
func assertConversationsMatch(t *testing.T, exp, act *pb.Conversation) {
if act == nil {
t.Errorf("Conversation not returned")
return
}
// adapt this check so we can reuse the func in testing create, where we don't know the exact id
// which will be generated
if len(exp.Id) > 0 {
assert.Equal(t, exp.Id, act.Id)
} else {
assert.NotEmpty(t, act.Id)
}
assert.Equal(t, exp.Topic, act.Topic)
assert.Equal(t, exp.GroupId, act.GroupId)
if act.CreatedAt == nil {
t.Errorf("CreatedAt not set")
return
}
assert.True(t, exp.CreatedAt.AsTime().Equal(act.CreatedAt.AsTime()))
}
func assertMessagesMatch(t *testing.T, exp, act *pb.Message) {
if act == nil {
t.Errorf("Message not returned")
return
}
// adapt this check so we can reuse the func in testing create, where we don't know the exact id
// which will be generated
if len(exp.Id) > 0 {
assert.Equal(t, exp.Id, act.Id)
} else {
assert.NotEmpty(t, act.Id)
}
assert.Equal(t, exp.Text, act.Text)
assert.Equal(t, exp.AuthorId, act.AuthorId)
assert.Equal(t, exp.ConversationId, act.ConversationId)
if act.SentAt == nil {
t.Errorf("SentAt not set")
return
}
assert.True(t, exp.SentAt.AsTime().Equal(act.SentAt.AsTime()))
}

View File

@@ -0,0 +1,41 @@
package handler
import (
"context"
"github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/logger"
pb "github.com/micro/services/streams/proto"
"gorm.io/gorm"
)
// Update a conversations topic
func (s *Streams) UpdateConversation(ctx context.Context, req *pb.UpdateConversationRequest, rsp *pb.UpdateConversationResponse) error {
// validate the request
if len(req.Id) == 0 {
return ErrMissingID
}
if len(req.Topic) == 0 {
return ErrMissingTopic
}
// lookup the conversation
var conv Conversation
if err := s.DB.Where(&Conversation{ID: req.Id}).First(&conv).Error; err == gorm.ErrRecordNotFound {
return ErrNotFound
} else if err != nil {
logger.Errorf("Error reading conversation: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
// update the conversation
conv.Topic = req.Topic
if err := s.DB.Save(&conv).Error; err != nil {
logger.Errorf("Error updating conversation: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
// serialize the result
rsp.Conversation = conv.Serialize()
return nil
}

View File

@@ -0,0 +1,66 @@
package handler_test
import (
"context"
"testing"
"github.com/google/uuid"
"github.com/micro/services/streams/handler"
pb "github.com/micro/services/streams/proto"
"github.com/stretchr/testify/assert"
)
func TestUpdateConversation(t *testing.T) {
h := testHandler(t)
// seed some data
var cRsp pb.CreateConversationResponse
err := h.CreateConversation(context.TODO(), &pb.CreateConversationRequest{
Topic: "HelloWorld", GroupId: uuid.New().String(),
}, &cRsp)
if err != nil {
t.Fatalf("Error creating conversation: %v", err)
return
}
t.Run("MissingID", func(t *testing.T) {
err := h.UpdateConversation(context.TODO(), &pb.UpdateConversationRequest{
Topic: "NewTopic",
}, &pb.UpdateConversationResponse{})
assert.Equal(t, handler.ErrMissingID, err)
})
t.Run("MissingTopic", func(t *testing.T) {
err := h.UpdateConversation(context.TODO(), &pb.UpdateConversationRequest{
Id: uuid.New().String(),
}, &pb.UpdateConversationResponse{})
assert.Equal(t, handler.ErrMissingTopic, err)
})
t.Run("InvalidID", func(t *testing.T) {
err := h.UpdateConversation(context.TODO(), &pb.UpdateConversationRequest{
Id: uuid.New().String(),
Topic: "NewTopic",
}, &pb.UpdateConversationResponse{})
assert.Equal(t, handler.ErrNotFound, err)
})
t.Run("Valid", func(t *testing.T) {
err := h.UpdateConversation(context.TODO(), &pb.UpdateConversationRequest{
Id: cRsp.Conversation.Id,
Topic: "NewTopic",
}, &pb.UpdateConversationResponse{})
assert.NoError(t, err)
var rsp pb.ReadConversationResponse
err = h.ReadConversation(context.TODO(), &pb.ReadConversationRequest{
Id: cRsp.Conversation.Id,
}, &rsp)
assert.NoError(t, err)
if rsp.Conversation == nil {
t.Fatal("No conversation returned")
return
}
assert.Equal(t, "NewTopic", rsp.Conversation.Topic)
})
}

25
streams/main.go Normal file
View File

@@ -0,0 +1,25 @@
package main
import (
"github.com/micro/services/streams/handler"
pb "github.com/micro/services/streams/proto"
"github.com/micro/micro/v3/service"
"github.com/micro/micro/v3/service/logger"
)
func main() {
// Create service
srv := service.New(
service.Name("streams"),
service.Version("latest"),
)
// Register handler
pb.RegisterStreamsHandler(srv.Server(), new(handler.Streams))
// Run service
if err := srv.Run(); err != nil {
logger.Fatal(err)
}
}

1
streams/micro.mu Normal file
View File

@@ -0,0 +1 @@
service streams

1465
streams/proto/streams.pb.go Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,236 @@
// Code generated by protoc-gen-micro. DO NOT EDIT.
// source: proto/streams.proto
package streams
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
_ "github.com/golang/protobuf/ptypes/timestamp"
_ "github.com/golang/protobuf/ptypes/wrappers"
math "math"
)
import (
context "context"
api "github.com/micro/micro/v3/service/api"
client "github.com/micro/micro/v3/service/client"
server "github.com/micro/micro/v3/service/server"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
// Reference imports to suppress errors if they are not otherwise used.
var _ api.Endpoint
var _ context.Context
var _ client.Option
var _ server.Option
// Api Endpoints for Streams service
func NewStreamsEndpoints() []*api.Endpoint {
return []*api.Endpoint{}
}
// Client API for Streams service
type StreamsService interface {
// Create a conversation
CreateConversation(ctx context.Context, in *CreateConversationRequest, opts ...client.CallOption) (*CreateConversationResponse, error)
// Read a conversation using its ID, can filter using group ID if provided
ReadConversation(ctx context.Context, in *ReadConversationRequest, opts ...client.CallOption) (*ReadConversationResponse, error)
// Update a conversations topic
UpdateConversation(ctx context.Context, in *UpdateConversationRequest, opts ...client.CallOption) (*UpdateConversationResponse, error)
// Delete a conversation and all the messages within
DeleteConversation(ctx context.Context, in *DeleteConversationRequest, opts ...client.CallOption) (*DeleteConversationResponse, error)
// List all the conversations for a group
ListConversations(ctx context.Context, in *ListConversationsRequest, opts ...client.CallOption) (*ListConversationsResponse, error)
// Create a message within a conversation
CreateMessage(ctx context.Context, in *CreateMessageRequest, opts ...client.CallOption) (*CreateMessageResponse, error)
// List the messages within a conversation in reverse chronological order, using sent_before to
// offset as older messages need to be loaded
ListMessages(ctx context.Context, in *ListMessagesRequest, opts ...client.CallOption) (*ListMessagesResponse, error)
// RecentMessages returns the most recent messages in a group of conversations. By default the
// most messages retrieved per conversation is 10, however this can be overriden using the
// limit_per_conversation option
RecentMessages(ctx context.Context, in *RecentMessagesRequest, opts ...client.CallOption) (*RecentMessagesResponse, error)
}
type streamsService struct {
c client.Client
name string
}
func NewStreamsService(name string, c client.Client) StreamsService {
return &streamsService{
c: c,
name: name,
}
}
func (c *streamsService) CreateConversation(ctx context.Context, in *CreateConversationRequest, opts ...client.CallOption) (*CreateConversationResponse, error) {
req := c.c.NewRequest(c.name, "Streams.CreateConversation", in)
out := new(CreateConversationResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *streamsService) ReadConversation(ctx context.Context, in *ReadConversationRequest, opts ...client.CallOption) (*ReadConversationResponse, error) {
req := c.c.NewRequest(c.name, "Streams.ReadConversation", in)
out := new(ReadConversationResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *streamsService) UpdateConversation(ctx context.Context, in *UpdateConversationRequest, opts ...client.CallOption) (*UpdateConversationResponse, error) {
req := c.c.NewRequest(c.name, "Streams.UpdateConversation", in)
out := new(UpdateConversationResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *streamsService) DeleteConversation(ctx context.Context, in *DeleteConversationRequest, opts ...client.CallOption) (*DeleteConversationResponse, error) {
req := c.c.NewRequest(c.name, "Streams.DeleteConversation", in)
out := new(DeleteConversationResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *streamsService) ListConversations(ctx context.Context, in *ListConversationsRequest, opts ...client.CallOption) (*ListConversationsResponse, error) {
req := c.c.NewRequest(c.name, "Streams.ListConversations", in)
out := new(ListConversationsResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *streamsService) CreateMessage(ctx context.Context, in *CreateMessageRequest, opts ...client.CallOption) (*CreateMessageResponse, error) {
req := c.c.NewRequest(c.name, "Streams.CreateMessage", in)
out := new(CreateMessageResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *streamsService) ListMessages(ctx context.Context, in *ListMessagesRequest, opts ...client.CallOption) (*ListMessagesResponse, error) {
req := c.c.NewRequest(c.name, "Streams.ListMessages", in)
out := new(ListMessagesResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *streamsService) RecentMessages(ctx context.Context, in *RecentMessagesRequest, opts ...client.CallOption) (*RecentMessagesResponse, error) {
req := c.c.NewRequest(c.name, "Streams.RecentMessages", in)
out := new(RecentMessagesResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Streams service
type StreamsHandler interface {
// Create a conversation
CreateConversation(context.Context, *CreateConversationRequest, *CreateConversationResponse) error
// Read a conversation using its ID, can filter using group ID if provided
ReadConversation(context.Context, *ReadConversationRequest, *ReadConversationResponse) error
// Update a conversations topic
UpdateConversation(context.Context, *UpdateConversationRequest, *UpdateConversationResponse) error
// Delete a conversation and all the messages within
DeleteConversation(context.Context, *DeleteConversationRequest, *DeleteConversationResponse) error
// List all the conversations for a group
ListConversations(context.Context, *ListConversationsRequest, *ListConversationsResponse) error
// Create a message within a conversation
CreateMessage(context.Context, *CreateMessageRequest, *CreateMessageResponse) error
// List the messages within a conversation in reverse chronological order, using sent_before to
// offset as older messages need to be loaded
ListMessages(context.Context, *ListMessagesRequest, *ListMessagesResponse) error
// RecentMessages returns the most recent messages in a group of conversations. By default the
// most messages retrieved per conversation is 10, however this can be overriden using the
// limit_per_conversation option
RecentMessages(context.Context, *RecentMessagesRequest, *RecentMessagesResponse) error
}
func RegisterStreamsHandler(s server.Server, hdlr StreamsHandler, opts ...server.HandlerOption) error {
type streams interface {
CreateConversation(ctx context.Context, in *CreateConversationRequest, out *CreateConversationResponse) error
ReadConversation(ctx context.Context, in *ReadConversationRequest, out *ReadConversationResponse) error
UpdateConversation(ctx context.Context, in *UpdateConversationRequest, out *UpdateConversationResponse) error
DeleteConversation(ctx context.Context, in *DeleteConversationRequest, out *DeleteConversationResponse) error
ListConversations(ctx context.Context, in *ListConversationsRequest, out *ListConversationsResponse) error
CreateMessage(ctx context.Context, in *CreateMessageRequest, out *CreateMessageResponse) error
ListMessages(ctx context.Context, in *ListMessagesRequest, out *ListMessagesResponse) error
RecentMessages(ctx context.Context, in *RecentMessagesRequest, out *RecentMessagesResponse) error
}
type Streams struct {
streams
}
h := &streamsHandler{hdlr}
return s.Handle(s.NewHandler(&Streams{h}, opts...))
}
type streamsHandler struct {
StreamsHandler
}
func (h *streamsHandler) CreateConversation(ctx context.Context, in *CreateConversationRequest, out *CreateConversationResponse) error {
return h.StreamsHandler.CreateConversation(ctx, in, out)
}
func (h *streamsHandler) ReadConversation(ctx context.Context, in *ReadConversationRequest, out *ReadConversationResponse) error {
return h.StreamsHandler.ReadConversation(ctx, in, out)
}
func (h *streamsHandler) UpdateConversation(ctx context.Context, in *UpdateConversationRequest, out *UpdateConversationResponse) error {
return h.StreamsHandler.UpdateConversation(ctx, in, out)
}
func (h *streamsHandler) DeleteConversation(ctx context.Context, in *DeleteConversationRequest, out *DeleteConversationResponse) error {
return h.StreamsHandler.DeleteConversation(ctx, in, out)
}
func (h *streamsHandler) ListConversations(ctx context.Context, in *ListConversationsRequest, out *ListConversationsResponse) error {
return h.StreamsHandler.ListConversations(ctx, in, out)
}
func (h *streamsHandler) CreateMessage(ctx context.Context, in *CreateMessageRequest, out *CreateMessageResponse) error {
return h.StreamsHandler.CreateMessage(ctx, in, out)
}
func (h *streamsHandler) ListMessages(ctx context.Context, in *ListMessagesRequest, out *ListMessagesResponse) error {
return h.StreamsHandler.ListMessages(ctx, in, out)
}
func (h *streamsHandler) RecentMessages(ctx context.Context, in *RecentMessagesRequest, out *RecentMessagesResponse) error {
return h.StreamsHandler.RecentMessages(ctx, in, out)
}

113
streams/proto/streams.proto Normal file
View File

@@ -0,0 +1,113 @@
syntax = "proto3";
package streams;
option go_package = "proto;streams";
import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";
service Streams {
// Create a conversation
rpc CreateConversation(CreateConversationRequest) returns (CreateConversationResponse);
// Read a conversation using its ID, can filter using group ID if provided
rpc ReadConversation(ReadConversationRequest) returns (ReadConversationResponse);
// Update a conversations topic
rpc UpdateConversation(UpdateConversationRequest) returns (UpdateConversationResponse);
// Delete a conversation and all the messages within
rpc DeleteConversation(DeleteConversationRequest) returns (DeleteConversationResponse);
// List all the conversations for a group
rpc ListConversations(ListConversationsRequest) returns (ListConversationsResponse);
// Create a message within a conversation
rpc CreateMessage(CreateMessageRequest) returns (CreateMessageResponse);
// List the messages within a conversation in reverse chronological order, using sent_before to
// offset as older messages need to be loaded
rpc ListMessages(ListMessagesRequest) returns (ListMessagesResponse);
// RecentMessages returns the most recent messages in a group of conversations. By default the
// most messages retrieved per conversation is 25, however this can be overriden using the
// limit_per_conversation option
rpc RecentMessages(RecentMessagesRequest) returns (RecentMessagesResponse);
}
message Conversation {
string id = 1;
string group_id = 2;
string topic = 3;
google.protobuf.Timestamp created_at = 4;
}
message Message {
string id = 1;
string author_id = 2;
string conversation_id = 3;
string text = 4;
google.protobuf.Timestamp sent_at = 5;
}
message CreateConversationRequest {
string group_id = 1;
string topic = 2;
}
message CreateConversationResponse {
Conversation conversation = 1;
}
message ReadConversationRequest {
string id = 1;
google.protobuf.StringValue group_id = 2;
}
message ReadConversationResponse {
Conversation conversation = 1;
}
message ListConversationsRequest {
string group_id = 1;
}
message ListConversationsResponse {
repeated Conversation conversations = 1;
}
message UpdateConversationRequest {
string id = 1;
string topic = 2;
}
message UpdateConversationResponse {
Conversation conversation = 1;
}
message DeleteConversationRequest {
string id = 1;
}
message DeleteConversationResponse {}
message CreateMessageRequest {
string conversation_id = 1;
string author_id = 2;
string text = 3;
}
message CreateMessageResponse {
Message message = 1;
}
message ListMessagesRequest {
string conversation_id = 1;
google.protobuf.Timestamp sent_before = 2;
google.protobuf.Int32Value limit = 3;
}
message ListMessagesResponse {
repeated Message messages = 1;
}
message RecentMessagesRequest {
repeated string conversation_ids = 1;
google.protobuf.Int32Value limit_per_conversation = 2;
}
message RecentMessagesResponse {
repeated Message messages = 1;
}