Update threads to use model/store (#97)

This commit is contained in:
Asim Aslam
2021-05-11 12:41:53 +01:00
committed by GitHub
parent f74cfdf674
commit 7e13403d9b
31 changed files with 1309 additions and 1143 deletions

175
pkg/model/model.go Normal file
View File

@@ -0,0 +1,175 @@
// package model helps with data modelling on top of the store
package model
import (
"context"
"encoding/json"
"errors"
"github.com/micro/micro/v3/service/store"
)
var (
ErrNotFound = errors.New("not found")
ErrAlreadyExists = errors.New("already exists")
)
type Entity interface {
// The primary key
Key(ctx context.Context) string
// The index for the entity
Index(ctx context.Context) string
// The raw value of the entity
Value() interface{}
}
type Query struct {
Limit uint
Offset uint
Order string
}
func Create(ctx context.Context, e Entity) error {
key := e.Key(ctx)
val := e.Value()
idx := e.Index(ctx)
// read the existing record
recs, err := store.Read(key, store.ReadLimit(1))
if err != nil && err != store.ErrNotFound {
return err
}
if len(recs) > 0 {
return ErrAlreadyExists
}
// write the record
if err := store.Write(store.NewRecord(key, val)); err != nil {
return err
}
// only write the index if it exists
if len(idx) == 0 {
return nil
}
// write the index
return store.Write(store.NewRecord(idx, val))
}
func ReadIndex(ctx context.Context, e Entity) error {
recs, err := store.Read(e.Index(ctx), store.ReadLimit(1))
if err == store.ErrNotFound {
return ErrNotFound
} else if err != nil {
return err
}
if len(recs) == 0 {
return ErrNotFound
}
return recs[0].Decode(e)
}
func Read(ctx context.Context, e Entity) error {
recs, err := store.Read(e.Key(ctx), store.ReadLimit(1))
if err == store.ErrNotFound {
return ErrNotFound
} else if err != nil {
return err
}
if len(recs) == 0 {
return ErrNotFound
}
return recs[0].Decode(e)
}
func Update(ctx context.Context, e Entity) error {
key := e.Key(ctx)
val := e.Value()
idx := e.Index(ctx)
// write the record
if err := store.Write(store.NewRecord(key, val)); err != nil {
return err
}
// only write the index if it exists
if len(idx) == 0 {
return nil
}
// write the index
return store.Write(store.NewRecord(idx, val))
}
func List(ctx context.Context, e Entity, rsp interface{}, q Query) error {
opts := []store.ReadOption{
store.ReadPrefix(),
}
if q.Limit > 0 {
opts = append(opts, store.ReadLimit(q.Limit))
}
if q.Offset > 0 {
opts = append(opts, store.ReadOffset(q.Offset))
}
if len(q.Order) > 0 {
if q.Order == "desc" {
opts = append(opts, store.ReadOrder(store.OrderDesc))
} else {
opts = append(opts, store.ReadOrder(store.OrderAsc))
}
}
recs, err := store.Read(e.Index(ctx), opts...)
if err != nil {
return err
}
jsBuffer := []byte("[")
for i, rec := range recs {
jsBuffer = append(jsBuffer, rec.Value...)
if i < len(recs)-1 {
jsBuffer = append(jsBuffer, []byte(",")...)
}
}
jsBuffer = append(jsBuffer, []byte("]")...)
return json.Unmarshal(jsBuffer, rsp)
}
func Delete(ctx context.Context, e Entity) error {
key := e.Key(ctx)
idx := e.Index(ctx)
if len(key) > 0 {
if err := store.Delete(key); err != nil && err != store.ErrNotFound {
return err
}
}
recs, err := store.Read(idx, store.ReadPrefix())
if err != nil && err != store.ErrNotFound {
return err
}
// delete every record by index
for _, rec := range recs {
var val interface{}
if err := rec.Decode(val); err != nil || val == nil {
continue
}
// convert to an entity
e, ok := val.(Entity)
if !ok {
continue
}
if err := store.Delete(e.Key(ctx)); err != store.ErrNotFound {
return err
}
}
return nil
}

View File

@@ -2,17 +2,16 @@ package handler
import ( import (
"context" "context"
"strings"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/micro/micro/v3/service/auth" "github.com/micro/micro/v3/service/auth"
"github.com/micro/micro/v3/service/errors" "github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/logger" "github.com/micro/micro/v3/service/logger"
"github.com/micro/services/pkg/model"
pb "github.com/micro/services/threads/proto" pb "github.com/micro/services/threads/proto"
"gorm.io/gorm"
) )
// Create a message within a conversation // Create a message within a thread
func (s *Threads) CreateMessage(ctx context.Context, req *pb.CreateMessageRequest, rsp *pb.CreateMessageResponse) error { func (s *Threads) CreateMessage(ctx context.Context, req *pb.CreateMessageRequest, rsp *pb.CreateMessageResponse) error {
_, ok := auth.AccountFromContext(ctx) _, ok := auth.AccountFromContext(ctx)
if !ok { if !ok {
@@ -22,52 +21,54 @@ func (s *Threads) CreateMessage(ctx context.Context, req *pb.CreateMessageReques
if len(req.AuthorId) == 0 { if len(req.AuthorId) == 0 {
return ErrMissingAuthorID return ErrMissingAuthorID
} }
if len(req.ConversationId) == 0 { if len(req.ThreadId) == 0 {
return ErrMissingConversationID return ErrMissingThreadID
} }
if len(req.Text) == 0 { if len(req.Text) == 0 {
return ErrMissingText return ErrMissingText
} }
db, err := s.GetDBConn(ctx) // lookup the thread
if err != nil { conv := Thread{ID: req.ThreadId}
logger.Errorf("Error connecting to DB: %v", err)
return errors.InternalServerError("DB_ERROR", "Error connecting to DB") if err := model.Read(ctx, &conv); err == model.ErrNotFound {
}
// lookup the conversation
var conv Conversation
if err := db.Where(&Conversation{ID: req.ConversationId}).First(&conv).Error; err == gorm.ErrRecordNotFound {
return ErrNotFound return ErrNotFound
} else if err != nil { } else if err != nil {
logger.Errorf("Error reading conversation: %v", err) logger.Errorf("Error reading thread: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database") return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
} }
// create the message // create the message
msg := &Message{ msg := &Message{
ID: req.Id, ID: req.Id,
SentAt: s.Time(), SentAt: s.Time(),
Text: req.Text, Text: req.Text,
AuthorID: req.AuthorId, AuthorID: req.AuthorId,
ConversationID: req.ConversationId, ThreadID: req.ThreadId,
} }
if len(msg.ID) == 0 { if len(msg.ID) == 0 {
msg.ID = uuid.New().String() msg.ID = uuid.New().String()
} }
if err := db.Create(msg).Error; err == nil {
if err := model.Create(ctx, msg); err == nil {
rsp.Message = msg.Serialize() rsp.Message = msg.Serialize()
return nil return nil
} else if !strings.Contains(err.Error(), "messages_pkey") { } else if err != model.ErrAlreadyExists {
logger.Errorf("Error creating message: %v", err) logger.Errorf("Error creating message: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database") return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
} }
// a message already exists with this id // a message already exists with this id
var existing Message existing := &Message{ID: msg.ID, ThreadID: req.ThreadId}
if err := db.Where(&Message{ID: msg.ID}).First(&existing).Error; err != nil {
if err := model.Read(ctx, existing); err == model.ErrNotFound {
return ErrNotFound
} else if err != nil {
logger.Errorf("Error creating message: %v", err) logger.Errorf("Error creating message: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database") return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
} }
// return the message
rsp.Message = existing.Serialize() rsp.Message = existing.Serialize()
return nil return nil
} }

View File

@@ -5,7 +5,6 @@ import (
"github.com/micro/services/threads/handler" "github.com/micro/services/threads/handler"
pb "github.com/micro/services/threads/proto" pb "github.com/micro/services/threads/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@@ -15,68 +14,68 @@ func TestCreateMessage(t *testing.T) {
h := testHandler(t) h := testHandler(t)
// seed some data // seed some data
var cRsp pb.CreateConversationResponse var cRsp pb.CreateThreadResponse
err := h.CreateConversation(microAccountCtx(), &pb.CreateConversationRequest{ err := h.CreateThread(microAccountCtx(), &pb.CreateThreadRequest{
Topic: "HelloWorld", GroupId: uuid.New().String(), Topic: "HelloWorld", GroupId: uuid.New().String(),
}, &cRsp) }, &cRsp)
if err != nil { if err != nil {
t.Fatalf("Error creating conversation: %v", err) t.Fatalf("Error creating thread: %v", err)
return return
} }
iid := uuid.New().String() iid := uuid.New().String()
tt := []struct { tt := []struct {
Name string Name string
AuthorID string AuthorID string
ConversationID string ThreadID string
ID string ID string
Text string Text string
Error error Error error
}{ }{
{ {
Name: "MissingConversationID", Name: "MissingThreadID",
Text: "HelloWorld", Text: "HelloWorld",
AuthorID: uuid.New().String(), AuthorID: uuid.New().String(),
Error: handler.ErrMissingConversationID, Error: handler.ErrMissingThreadID,
}, },
{ {
Name: "MissingAuthorID", Name: "MissingAuthorID",
ConversationID: uuid.New().String(), ThreadID: uuid.New().String(),
Text: "HelloWorld", Text: "HelloWorld",
Error: handler.ErrMissingAuthorID, Error: handler.ErrMissingAuthorID,
}, },
{ {
Name: "MissingText", Name: "MissingText",
ConversationID: uuid.New().String(), ThreadID: uuid.New().String(),
AuthorID: uuid.New().String(), AuthorID: uuid.New().String(),
Error: handler.ErrMissingText, Error: handler.ErrMissingText,
}, },
{ {
Name: "ConversationNotFound", Name: "ThreadNotFound",
ConversationID: uuid.New().String(), ThreadID: uuid.New().String(),
AuthorID: uuid.New().String(), AuthorID: uuid.New().String(),
Text: "HelloWorld", Text: "HelloWorld",
Error: handler.ErrNotFound, Error: handler.ErrNotFound,
}, },
{ {
Name: "NoID", Name: "NoID",
ConversationID: cRsp.Conversation.Id, ThreadID: cRsp.Thread.Id,
AuthorID: uuid.New().String(), AuthorID: uuid.New().String(),
Text: "HelloWorld", Text: "HelloWorld",
}, },
{ {
Name: "WithID", Name: "WithID",
ConversationID: cRsp.Conversation.Id, ThreadID: cRsp.Thread.Id,
Text: "HelloWorld", Text: "HelloWorld",
AuthorID: "johndoe", AuthorID: "johndoe",
ID: iid, ID: iid,
}, },
{ {
Name: "RepeatID", Name: "RepeatID",
ConversationID: cRsp.Conversation.Id, ThreadID: cRsp.Thread.Id,
Text: "HelloWorld", Text: "HelloWorld",
AuthorID: "johndoe", AuthorID: "johndoe",
ID: iid, ID: iid,
}, },
} }
@@ -84,10 +83,10 @@ func TestCreateMessage(t *testing.T) {
t.Run(tc.Name, func(t *testing.T) { t.Run(tc.Name, func(t *testing.T) {
var rsp pb.CreateMessageResponse var rsp pb.CreateMessageResponse
err := h.CreateMessage(microAccountCtx(), &pb.CreateMessageRequest{ err := h.CreateMessage(microAccountCtx(), &pb.CreateMessageRequest{
AuthorId: tc.AuthorID, AuthorId: tc.AuthorID,
ConversationId: tc.ConversationID, ThreadId: tc.ThreadID,
Text: tc.Text, Text: tc.Text,
Id: tc.ID, Id: tc.ID,
}, &rsp) }, &rsp)
assert.Equal(t, tc.Error, err) assert.Equal(t, tc.Error, err)
@@ -97,11 +96,11 @@ func TestCreateMessage(t *testing.T) {
} }
assertMessagesMatch(t, &pb.Message{ assertMessagesMatch(t, &pb.Message{
Id: tc.ID, Id: tc.ID,
AuthorId: tc.AuthorID, AuthorId: tc.AuthorID,
ConversationId: tc.ConversationID, ThreadId: tc.ThreadID,
SentAt: timestamppb.New(h.Time()), SentAt: handler.FormatTime(h.Time()),
Text: tc.Text, Text: tc.Text,
}, rsp.Message) }, rsp.Message)
}) })
} }

View File

@@ -7,11 +7,12 @@ import (
"github.com/micro/micro/v3/service/auth" "github.com/micro/micro/v3/service/auth"
"github.com/micro/micro/v3/service/errors" "github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/logger" "github.com/micro/micro/v3/service/logger"
"github.com/micro/services/pkg/model"
pb "github.com/micro/services/threads/proto" pb "github.com/micro/services/threads/proto"
) )
// Create a conversation // Create a thread
func (s *Threads) CreateConversation(ctx context.Context, req *pb.CreateConversationRequest, rsp *pb.CreateConversationResponse) error { func (s *Threads) CreateThread(ctx context.Context, req *pb.CreateThreadRequest, rsp *pb.CreateThreadResponse) error {
_, ok := auth.AccountFromContext(ctx) _, ok := auth.AccountFromContext(ctx)
if !ok { if !ok {
errors.Unauthorized("UNAUTHORIZED", "Unauthorized") errors.Unauthorized("UNAUTHORIZED", "Unauthorized")
@@ -24,24 +25,21 @@ func (s *Threads) CreateConversation(ctx context.Context, req *pb.CreateConversa
return ErrMissingTopic return ErrMissingTopic
} }
// write the conversation to the database // write the thread to the database
conv := &Conversation{ thread := &Thread{
ID: uuid.New().String(), ID: uuid.New().String(),
Topic: req.Topic, Topic: req.Topic,
GroupID: req.GroupId, GroupID: req.GroupId,
CreatedAt: s.Time(), CreatedAt: s.Time(),
} }
db, err := s.GetDBConn(ctx)
if err != nil { // write the thread to the database
logger.Errorf("Error connecting to DB: %v", err) if err := model.Create(ctx, thread); err != nil {
return errors.InternalServerError("DB_ERROR", "Error connecting to DB") logger.Errorf("Error creating thread: %v", err)
} return err
if err := db.Create(conv).Error; err != nil {
logger.Errorf("Error creating conversation: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
} }
// serialize the response // serialize the response
rsp.Conversation = conv.Serialize() rsp.Thread = thread.Serialize()
return nil return nil
} }

View File

@@ -5,13 +5,12 @@ import (
"github.com/micro/services/threads/handler" "github.com/micro/services/threads/handler"
pb "github.com/micro/services/threads/proto" pb "github.com/micro/services/threads/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func TestCreateConversation(t *testing.T) { func TestCreateThread(t *testing.T) {
tt := []struct { tt := []struct {
Name string Name string
GroupID string GroupID string
@@ -38,22 +37,22 @@ func TestCreateConversation(t *testing.T) {
h := testHandler(t) h := testHandler(t)
for _, tc := range tt { for _, tc := range tt {
t.Run(tc.Name, func(t *testing.T) { t.Run(tc.Name, func(t *testing.T) {
var rsp pb.CreateConversationResponse var rsp pb.CreateThreadResponse
err := h.CreateConversation(microAccountCtx(), &pb.CreateConversationRequest{ err := h.CreateThread(microAccountCtx(), &pb.CreateThreadRequest{
Topic: tc.Topic, GroupId: tc.GroupID, Topic: tc.Topic, GroupId: tc.GroupID,
}, &rsp) }, &rsp)
assert.Equal(t, tc.Error, err) assert.Equal(t, tc.Error, err)
if tc.Error != nil { if tc.Error != nil {
assert.Nil(t, rsp.Conversation) assert.Nil(t, rsp.Thread)
return return
} }
assertConversationsMatch(t, &pb.Conversation{ assertThreadsMatch(t, &pb.Thread{
CreatedAt: timestamppb.New(h.Time()), CreatedAt: handler.FormatTime(h.Time()),
GroupId: tc.GroupID, GroupId: tc.GroupID,
Topic: tc.Topic, Topic: tc.Topic,
}, rsp.Conversation) }, rsp.Thread)
}) })
} }
} }

View File

@@ -1,44 +0,0 @@
package handler
import (
"context"
"github.com/micro/micro/v3/service/auth"
"github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/logger"
pb "github.com/micro/services/threads/proto"
"gorm.io/gorm"
)
// Delete a conversation and all the messages within
func (s *Threads) DeleteConversation(ctx context.Context, req *pb.DeleteConversationRequest, rsp *pb.DeleteConversationResponse) error {
_, ok := auth.AccountFromContext(ctx)
if !ok {
errors.Unauthorized("UNAUTHORIZED", "Unauthorized")
}
// validate the request
if len(req.Id) == 0 {
return ErrMissingID
}
db, err := s.GetDBConn(ctx)
if err != nil {
logger.Errorf("Error connecting to DB: %v", err)
return errors.InternalServerError("DB_ERROR", "Error connecting to DB")
}
return db.Transaction(func(tx *gorm.DB) error {
// delete all the messages
if err := tx.Where(&Message{ConversationID: req.Id}).Delete(&Message{}).Error; err != nil {
logger.Errorf("Error deleting messages: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
// delete the conversation
if err := tx.Where(&Conversation{ID: req.Id}).Delete(&Conversation{}).Error; err != nil {
logger.Errorf("Error deleting conversation: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
return nil
})
}

View File

@@ -1,49 +0,0 @@
package handler_test
import (
"testing"
"github.com/micro/services/threads/handler"
pb "github.com/micro/services/threads/proto"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
)
func TestDeleteConversation(t *testing.T) {
h := testHandler(t)
// seed some data
var cRsp pb.CreateConversationResponse
err := h.CreateConversation(microAccountCtx(), &pb.CreateConversationRequest{
Topic: "HelloWorld", GroupId: uuid.New().String(),
}, &cRsp)
if err != nil {
t.Fatalf("Error creating conversation: %v", err)
return
}
t.Run("MissingID", func(t *testing.T) {
err := h.DeleteConversation(microAccountCtx(), &pb.DeleteConversationRequest{}, &pb.DeleteConversationResponse{})
assert.Equal(t, handler.ErrMissingID, err)
})
t.Run("Valid", func(t *testing.T) {
err := h.DeleteConversation(microAccountCtx(), &pb.DeleteConversationRequest{
Id: cRsp.Conversation.Id,
}, &pb.DeleteConversationResponse{})
assert.NoError(t, err)
err = h.ReadConversation(microAccountCtx(), &pb.ReadConversationRequest{
Id: cRsp.Conversation.Id,
}, &pb.ReadConversationResponse{})
assert.Equal(t, handler.ErrNotFound, err)
})
t.Run("Retry", func(t *testing.T) {
err := h.DeleteConversation(microAccountCtx(), &pb.DeleteConversationRequest{
Id: cRsp.Conversation.Id,
}, &pb.DeleteConversationResponse{})
assert.NoError(t, err)
})
}

View File

@@ -0,0 +1,41 @@
package handler
import (
"context"
"github.com/micro/micro/v3/service/auth"
"github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/logger"
"github.com/micro/services/pkg/model"
pb "github.com/micro/services/threads/proto"
)
// Delete a thread and all the messages within
func (s *Threads) DeleteThread(ctx context.Context, req *pb.DeleteThreadRequest, rsp *pb.DeleteThreadResponse) error {
_, ok := auth.AccountFromContext(ctx)
if !ok {
errors.Unauthorized("UNAUTHORIZED", "Unauthorized")
}
// validate the request
if len(req.Id) == 0 {
return ErrMissingID
}
thread := Thread{ID: req.Id}
// delete the thread
if err := model.Delete(ctx, &thread); err != nil {
logger.Errorf("Error deleting thread: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
message := Message{ThreadID: req.Id}
// delete the messages
if err := model.Delete(ctx, &message); err != nil {
logger.Errorf("Error deleting messages: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
return nil
}

View File

@@ -0,0 +1,49 @@
package handler_test
import (
"testing"
"github.com/micro/services/threads/handler"
pb "github.com/micro/services/threads/proto"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
)
func TestDeleteThread(t *testing.T) {
h := testHandler(t)
// seed some data
var cRsp pb.CreateThreadResponse
err := h.CreateThread(microAccountCtx(), &pb.CreateThreadRequest{
Topic: "HelloWorld", GroupId: uuid.New().String(),
}, &cRsp)
if err != nil {
t.Fatalf("Error creating thread: %v", err)
return
}
t.Run("MissingID", func(t *testing.T) {
err := h.DeleteThread(microAccountCtx(), &pb.DeleteThreadRequest{}, &pb.DeleteThreadResponse{})
assert.Equal(t, handler.ErrMissingID, err)
})
t.Run("Valid", func(t *testing.T) {
err := h.DeleteThread(microAccountCtx(), &pb.DeleteThreadRequest{
Id: cRsp.Thread.Id,
}, &pb.DeleteThreadResponse{})
assert.NoError(t, err)
err = h.ReadThread(microAccountCtx(), &pb.ReadThreadRequest{
Id: cRsp.Thread.Id,
}, &pb.ReadThreadResponse{})
assert.Equal(t, handler.ErrNotFound, err)
})
t.Run("Retry", func(t *testing.T) {
err := h.DeleteThread(microAccountCtx(), &pb.DeleteThreadRequest{
Id: cRsp.Thread.Id,
}, &pb.DeleteThreadResponse{})
assert.NoError(t, err)
})
}

144
threads/handler/handler.go Normal file
View File

@@ -0,0 +1,144 @@
package handler
import (
"context"
"fmt"
"time"
"github.com/micro/micro/v3/service/errors"
"github.com/micro/services/pkg/tenant"
pb "github.com/micro/services/threads/proto"
)
var (
ErrMissingID = errors.BadRequest("MISSING_ID", "Missing ID")
ErrMissingGroupID = errors.BadRequest("MISSING_GROUP_ID", "Missing GroupID")
ErrMissingTopic = errors.BadRequest("MISSING_TOPIC", "Missing Topic")
ErrMissingAuthorID = errors.BadRequest("MISSING_AUTHOR_ID", "Missing Author ID")
ErrMissingText = errors.BadRequest("MISSING_TEXT", "Missing text")
ErrMissingThreadID = errors.BadRequest("MISSING_CONVERSATION_ID", "Missing Thread ID")
ErrMissingThreadIDs = errors.BadRequest("MISSING_CONVERSATION_IDS", "One or more Thread IDs are required")
ErrNotFound = errors.NotFound("NOT_FOUND", "Thread not found")
)
type Threads struct {
Time func() time.Time
}
type Message struct {
ID string
AuthorID string
ThreadID string
Text string
SentAt time.Time
}
func (m *Message) Serialize() *pb.Message {
return &pb.Message{
Id: m.ID,
AuthorId: m.AuthorID,
ThreadId: m.ThreadID,
Text: m.Text,
SentAt: m.SentAt.Format(time.RFC3339Nano),
}
}
type Thread struct {
ID string
GroupID string
Topic string
CreatedAt time.Time
}
func (c *Thread) Serialize() *pb.Thread {
return &pb.Thread{
Id: c.ID,
GroupId: c.GroupID,
Topic: c.Topic,
CreatedAt: c.CreatedAt.Format(time.RFC3339Nano),
}
}
func ParseTime(v string) time.Time {
t, err := time.Parse(time.RFC3339Nano, v)
if err == nil {
return t
}
t, err = time.Parse(time.RFC3339, v)
if err == nil {
return t
}
return time.Time{}
}
func FormatTime(t time.Time) string {
return t.Format(time.RFC3339Nano)
}
func (t *Thread) Key(ctx context.Context) string {
if len(t.ID) == 0 {
return ""
}
key := fmt.Sprintf("thread:%s", t.ID)
tnt, ok := tenant.FromContext(ctx)
if !ok {
return key
}
return fmt.Sprintf("%s/%s", tnt, key)
}
func (t *Thread) Index(ctx context.Context) string {
key := fmt.Sprintf("threadsByGroupID:%s:%s", t.GroupID, t.ID)
tnt, ok := tenant.FromContext(ctx)
if !ok {
return key
}
return fmt.Sprintf("%s/%s", tnt, key)
}
func (t *Thread) Value() interface{} {
return t
}
func (m *Message) Key(ctx context.Context) string {
if len(m.ID) == 0 {
return ""
}
key := fmt.Sprintf("message:%s:%s", m.ID, m.ThreadID)
t, ok := tenant.FromContext(ctx)
if !ok {
return key
}
return fmt.Sprintf("%s/%s", t, key)
}
func (m *Message) Index(ctx context.Context) string {
key := fmt.Sprintf("messagesByThreadID:%s", m.ThreadID)
if !m.SentAt.IsZero() {
key = fmt.Sprintf("%s:%d", key, m.SentAt.UnixNano())
if len(m.ID) > 0 {
key = fmt.Sprintf("%s:%s", key, m.ID)
}
}
t, ok := tenant.FromContext(ctx)
if !ok {
return key
}
return fmt.Sprintf("%s/%s", t, key)
}
func (m *Message) Value() interface{} {
return m
}

View File

@@ -2,43 +2,25 @@ package handler_test
import ( import (
"context" "context"
"database/sql"
"os"
"testing" "testing"
"time" "time"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/micro/micro/v3/service/auth" "github.com/micro/micro/v3/service/auth"
"github.com/micro/micro/v3/service/store"
"github.com/micro/micro/v3/service/store/memory"
"github.com/micro/services/threads/handler" "github.com/micro/services/threads/handler"
pb "github.com/micro/services/threads/proto" pb "github.com/micro/services/threads/proto"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func testHandler(t *testing.T) *handler.Threads { func testHandler(t *testing.T) *handler.Threads {
// connect to the database store.DefaultStore = memory.NewStore()
addr := os.Getenv("POSTGRES_URL") return &handler.Threads{Time: func() time.Time { return time.Unix(1611327673, 0) }}
if len(addr) == 0 {
addr = "postgresql://postgres@localhost:5432/postgres?sslmode=disable"
}
sqlDB, err := sql.Open("pgx", addr)
if err != nil {
t.Fatalf("Failed to open connection to DB %s", err)
}
// clean any data from a previous run
if _, err := sqlDB.Exec("DROP TABLE IF EXISTS micro_conversations, micro_messages CASCADE"); err != nil {
t.Fatalf("Error cleaning database: %v", err)
}
h := &handler.Threads{Time: func() time.Time { return time.Unix(1611327673, 0) }}
h.DBConn(sqlDB).Migrations(&handler.Conversation{}, &handler.Message{})
return h
} }
func assertConversationsMatch(t *testing.T, exp, act *pb.Conversation) { func assertThreadsMatch(t *testing.T, exp, act *pb.Thread) {
if act == nil { if act == nil {
t.Errorf("Conversation not returned") t.Errorf("Thread not returned")
return return
} }
@@ -53,7 +35,7 @@ func assertConversationsMatch(t *testing.T, exp, act *pb.Conversation) {
assert.Equal(t, exp.Topic, act.Topic) assert.Equal(t, exp.Topic, act.Topic)
assert.Equal(t, exp.GroupId, act.GroupId) assert.Equal(t, exp.GroupId, act.GroupId)
if act.CreatedAt == nil { if act.CreatedAt == "" {
t.Errorf("CreatedAt not set") t.Errorf("CreatedAt not set")
return return
} }
@@ -77,9 +59,9 @@ func assertMessagesMatch(t *testing.T, exp, act *pb.Message) {
assert.Equal(t, exp.Text, act.Text) assert.Equal(t, exp.Text, act.Text)
assert.Equal(t, exp.AuthorId, act.AuthorId) assert.Equal(t, exp.AuthorId, act.AuthorId)
assert.Equal(t, exp.ConversationId, act.ConversationId) assert.Equal(t, exp.ThreadId, act.ThreadId)
if act.SentAt == nil { if act.SentAt == "" {
t.Errorf("SentAt not set") t.Errorf("SentAt not set")
return return
} }
@@ -88,8 +70,8 @@ func assertMessagesMatch(t *testing.T, exp, act *pb.Message) {
} }
// postgres has a resolution of 100microseconds so just test that it's accurate to the second // postgres has a resolution of 100microseconds so just test that it's accurate to the second
func microSecondTime(t *timestamp.Timestamp) time.Time { func microSecondTime(t string) time.Time {
tt := t.AsTime() tt := handler.ParseTime(t)
return time.Unix(tt.Unix(), int64(tt.Nanosecond()-tt.Nanosecond()%1000)) return time.Unix(tt.Unix(), int64(tt.Nanosecond()-tt.Nanosecond()%1000))
} }

View File

@@ -1,41 +0,0 @@
package handler
import (
"context"
"github.com/micro/micro/v3/service/auth"
"github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/logger"
pb "github.com/micro/services/threads/proto"
)
// List all the conversations for a group
func (s *Threads) ListConversations(ctx context.Context, req *pb.ListConversationsRequest, rsp *pb.ListConversationsResponse) error {
_, ok := auth.AccountFromContext(ctx)
if !ok {
errors.Unauthorized("UNAUTHORIZED", "Unauthorized")
}
// validate the request
if len(req.GroupId) == 0 {
return ErrMissingGroupID
}
db, err := s.GetDBConn(ctx)
if err != nil {
logger.Errorf("Error connecting to DB: %v", err)
return errors.InternalServerError("DB_ERROR", "Error connecting to DB")
}
// query the database
var convs []Conversation
if err := db.Where(&Conversation{GroupID: req.GroupId}).Find(&convs).Error; err != nil {
logger.Errorf("Error reading conversation: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
// serialize the response
rsp.Conversations = make([]*pb.Conversation, len(convs))
for i, c := range convs {
rsp.Conversations[i] = c.Serialize()
}
return nil
}

View File

@@ -1,54 +0,0 @@
package handler_test
import (
"testing"
"github.com/google/uuid"
"github.com/micro/services/threads/handler"
pb "github.com/micro/services/threads/proto"
"github.com/stretchr/testify/assert"
)
func TestListConversations(t *testing.T) {
h := testHandler(t)
// seed some data
var cRsp1 pb.CreateConversationResponse
err := h.CreateConversation(microAccountCtx(), &pb.CreateConversationRequest{
Topic: "HelloWorld", GroupId: uuid.New().String(),
}, &cRsp1)
if err != nil {
t.Fatalf("Error creating conversation: %v", err)
return
}
var cRsp2 pb.CreateConversationResponse
err = h.CreateConversation(microAccountCtx(), &pb.CreateConversationRequest{
Topic: "FooBar", GroupId: uuid.New().String(),
}, &cRsp2)
if err != nil {
t.Fatalf("Error creating conversation: %v", err)
return
}
t.Run("MissingGroupID", func(t *testing.T) {
var rsp pb.ListConversationsResponse
err := h.ListConversations(microAccountCtx(), &pb.ListConversationsRequest{}, &rsp)
assert.Equal(t, handler.ErrMissingGroupID, err)
assert.Nil(t, rsp.Conversations)
})
t.Run("Valid", func(t *testing.T) {
var rsp pb.ListConversationsResponse
err := h.ListConversations(microAccountCtx(), &pb.ListConversationsRequest{
GroupId: cRsp1.Conversation.GroupId,
}, &rsp)
assert.NoError(t, err)
if len(rsp.Conversations) != 1 {
t.Fatalf("Expected 1 conversation to be returned, got %v", len(rsp.Conversations))
return
}
assertConversationsMatch(t, cRsp1.Conversation, rsp.Conversations[0])
})
}

View File

@@ -6,12 +6,13 @@ import (
"github.com/micro/micro/v3/service/auth" "github.com/micro/micro/v3/service/auth"
"github.com/micro/micro/v3/service/errors" "github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/logger" "github.com/micro/micro/v3/service/logger"
"github.com/micro/micro/v3/service/store"
pb "github.com/micro/services/threads/proto" pb "github.com/micro/services/threads/proto"
) )
const DefaultLimit = 25 const DefaultLimit = 25
// List the messages within a conversation in reverse chronological order, using sent_before to // List the messages within a thread in reverse chronological order, using sent_before to
// offset as older messages need to be loaded // offset as older messages need to be loaded
func (s *Threads) ListMessages(ctx context.Context, req *pb.ListMessagesRequest, rsp *pb.ListMessagesResponse) error { func (s *Threads) ListMessages(ctx context.Context, req *pb.ListMessagesRequest, rsp *pb.ListMessagesResponse) error {
_, ok := auth.AccountFromContext(ctx) _, ok := auth.AccountFromContext(ctx)
@@ -19,37 +20,50 @@ func (s *Threads) ListMessages(ctx context.Context, req *pb.ListMessagesRequest,
errors.Unauthorized("UNAUTHORIZED", "Unauthorized") errors.Unauthorized("UNAUTHORIZED", "Unauthorized")
} }
// validate the request // validate the request
if len(req.ConversationId) == 0 { if len(req.ThreadId) == 0 {
return ErrMissingConversationID return ErrMissingThreadID
} }
db, err := s.GetDBConn(ctx) // default order is descending
if err != nil { order := store.OrderDesc
logger.Errorf("Error connecting to DB: %v", err) if req.Order == "asc" {
return errors.InternalServerError("DB_ERROR", "Error connecting to DB") order = store.OrderAsc
} }
// construct the query
q := db.Where(&Message{ConversationID: req.ConversationId}).Order("sent_at DESC") opts := []store.ReadOption{
if req.SentBefore != nil { store.ReadPrefix(),
q = q.Where("sent_at < ?", req.SentBefore.AsTime()) store.ReadOrder(order),
} }
if req.Limit != nil {
q.Limit(int(req.Limit.Value)) if req.Limit > 0 {
opts = append(opts, store.ReadLimit(uint(req.Limit)))
} else { } else {
q.Limit(DefaultLimit) opts = append(opts, store.ReadLimit(uint(DefaultLimit)))
}
if req.Offset > 0 {
opts = append(opts, store.ReadOffset(uint(req.Offset)))
} }
// execute the query message := &Message{
var msgs []Message ThreadID: req.ThreadId,
if err := q.Find(&msgs).Error; err != nil { }
// read all the records with the chat ID suffix
recs, err := store.Read(message.Index(ctx), opts...)
if err != nil {
logger.Errorf("Error reading messages: %v", err) logger.Errorf("Error reading messages: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database") return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
} }
// serialize the response // return all the messages
rsp.Messages = make([]*pb.Message, len(msgs)) for _, rec := range recs {
for i, m := range msgs { m := &Message{}
rsp.Messages[i] = m.Serialize() rec.Decode(&m)
if len(m.ID) == 0 || m.ThreadID != req.ThreadId {
continue
}
rsp.Messages = append(rsp.Messages, m.Serialize())
} }
return nil return nil
} }

View File

@@ -10,7 +10,6 @@ import (
"github.com/micro/services/threads/handler" "github.com/micro/services/threads/handler"
pb "github.com/micro/services/threads/proto" pb "github.com/micro/services/threads/proto"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/wrapperspb"
) )
func TestListMessages(t *testing.T) { func TestListMessages(t *testing.T) {
@@ -18,8 +17,8 @@ func TestListMessages(t *testing.T) {
h.Time = time.Now h.Time = time.Now
// seed some data // seed some data
var convRsp pb.CreateConversationResponse var convRsp pb.CreateThreadResponse
err := h.CreateConversation(microAccountCtx(), &pb.CreateConversationRequest{ err := h.CreateThread(microAccountCtx(), &pb.CreateThreadRequest{
Topic: "TestListMessages", GroupId: uuid.New().String(), Topic: "TestListMessages", GroupId: uuid.New().String(),
}, &convRsp) }, &convRsp)
assert.NoError(t, err) assert.NoError(t, err)
@@ -31,25 +30,25 @@ func TestListMessages(t *testing.T) {
for i := 0; i < len(msgs); i++ { for i := 0; i < len(msgs); i++ {
var rsp pb.CreateMessageResponse var rsp pb.CreateMessageResponse
err := h.CreateMessage(microAccountCtx(), &pb.CreateMessageRequest{ err := h.CreateMessage(microAccountCtx(), &pb.CreateMessageRequest{
ConversationId: convRsp.Conversation.Id, ThreadId: convRsp.Thread.Id,
AuthorId: uuid.New().String(), AuthorId: uuid.New().String(),
Text: strconv.Itoa(i), Text: strconv.Itoa(i),
}, &rsp) }, &rsp)
assert.NoError(t, err) assert.NoError(t, err)
msgs[i] = rsp.Message msgs[i] = rsp.Message
} }
t.Run("MissingConversationID", func(t *testing.T) { t.Run("MissingThreadID", func(t *testing.T) {
var rsp pb.ListMessagesResponse var rsp pb.ListMessagesResponse
err := h.ListMessages(microAccountCtx(), &pb.ListMessagesRequest{}, &rsp) err := h.ListMessages(microAccountCtx(), &pb.ListMessagesRequest{}, &rsp)
assert.Equal(t, handler.ErrMissingConversationID, err) assert.Equal(t, handler.ErrMissingThreadID, err)
assert.Nil(t, rsp.Messages) assert.Nil(t, rsp.Messages)
}) })
t.Run("NoOffset", func(t *testing.T) { t.Run("NoOffset", func(t *testing.T) {
var rsp pb.ListMessagesResponse var rsp pb.ListMessagesResponse
err := h.ListMessages(microAccountCtx(), &pb.ListMessagesRequest{ err := h.ListMessages(microAccountCtx(), &pb.ListMessagesRequest{
ConversationId: convRsp.Conversation.Id, ThreadId: convRsp.Thread.Id,
}, &rsp) }, &rsp)
assert.NoError(t, err) assert.NoError(t, err)
@@ -67,8 +66,8 @@ func TestListMessages(t *testing.T) {
t.Run("LimitSet", func(t *testing.T) { t.Run("LimitSet", func(t *testing.T) {
var rsp pb.ListMessagesResponse var rsp pb.ListMessagesResponse
err := h.ListMessages(microAccountCtx(), &pb.ListMessagesRequest{ err := h.ListMessages(microAccountCtx(), &pb.ListMessagesRequest{
ConversationId: convRsp.Conversation.Id, ThreadId: convRsp.Thread.Id,
Limit: &wrapperspb.Int32Value{Value: 10}, Limit: 10,
}, &rsp) }, &rsp)
assert.NoError(t, err) assert.NoError(t, err)
@@ -86,9 +85,9 @@ func TestListMessages(t *testing.T) {
t.Run("OffsetAndLimit", func(t *testing.T) { t.Run("OffsetAndLimit", func(t *testing.T) {
var rsp pb.ListMessagesResponse var rsp pb.ListMessagesResponse
err := h.ListMessages(microAccountCtx(), &pb.ListMessagesRequest{ err := h.ListMessages(microAccountCtx(), &pb.ListMessagesRequest{
ConversationId: convRsp.Conversation.Id, ThreadId: convRsp.Thread.Id,
Limit: &wrapperspb.Int32Value{Value: 5}, Limit: 5,
SentBefore: msgs[20].SentAt, Offset: 30,
}, &rsp) }, &rsp)
assert.NoError(t, err) assert.NoError(t, err)
@@ -107,9 +106,9 @@ func TestListMessages(t *testing.T) {
// sortMessages by the time they were sent // sortMessages by the time they were sent
func sortMessages(msgs []*pb.Message) { func sortMessages(msgs []*pb.Message) {
sort.Slice(msgs, func(i, j int) bool { sort.Slice(msgs, func(i, j int) bool {
if msgs[i].SentAt == nil || msgs[j].SentAt == nil { if msgs[i].SentAt == "" || msgs[j].SentAt == "" {
return true return true
} }
return msgs[i].SentAt.AsTime().Before(msgs[j].SentAt.AsTime()) return handler.ParseTime(msgs[i].SentAt).Before(handler.ParseTime(msgs[j].SentAt))
}) })
} }

View File

@@ -0,0 +1,39 @@
package handler
import (
"context"
"github.com/micro/micro/v3/service/auth"
"github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/logger"
"github.com/micro/services/pkg/model"
pb "github.com/micro/services/threads/proto"
)
// List all the threads for a group
func (s *Threads) ListThreads(ctx context.Context, req *pb.ListThreadsRequest, rsp *pb.ListThreadsResponse) error {
_, ok := auth.AccountFromContext(ctx)
if !ok {
errors.Unauthorized("UNAUTHORIZED", "Unauthorized")
}
// validate the request
if len(req.GroupId) == 0 {
return ErrMissingGroupID
}
var threads []*Thread
thread := &Thread{GroupID: req.GroupId}
// get all the threads
if err := model.List(ctx, thread, &threads, model.Query{}); err != nil {
logger.Errorf("Error reading thread: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
// return the response
for _, thread := range threads {
rsp.Threads = append(rsp.Threads, thread.Serialize())
}
return nil
}

View File

@@ -0,0 +1,54 @@
package handler_test
import (
"testing"
"github.com/google/uuid"
"github.com/micro/services/threads/handler"
pb "github.com/micro/services/threads/proto"
"github.com/stretchr/testify/assert"
)
func TestListThreads(t *testing.T) {
h := testHandler(t)
// seed some data
var cRsp1 pb.CreateThreadResponse
err := h.CreateThread(microAccountCtx(), &pb.CreateThreadRequest{
Topic: "HelloWorld", GroupId: uuid.New().String(),
}, &cRsp1)
if err != nil {
t.Fatalf("Error creating thread: %v", err)
return
}
var cRsp2 pb.CreateThreadResponse
err = h.CreateThread(microAccountCtx(), &pb.CreateThreadRequest{
Topic: "FooBar", GroupId: uuid.New().String(),
}, &cRsp2)
if err != nil {
t.Fatalf("Error creating thread: %v", err)
return
}
t.Run("MissingGroupID", func(t *testing.T) {
var rsp pb.ListThreadsResponse
err := h.ListThreads(microAccountCtx(), &pb.ListThreadsRequest{}, &rsp)
assert.Equal(t, handler.ErrMissingGroupID, err)
assert.Nil(t, rsp.Threads)
})
t.Run("Valid", func(t *testing.T) {
var rsp pb.ListThreadsResponse
err := h.ListThreads(microAccountCtx(), &pb.ListThreadsRequest{
GroupId: cRsp1.Thread.GroupId,
}, &rsp)
assert.NoError(t, err)
if len(rsp.Threads) != 1 {
t.Fatalf("Expected 1 thread to be returned, got %v", len(rsp.Threads))
return
}
assertThreadsMatch(t, cRsp1.Thread, rsp.Threads[0])
})
}

View File

@@ -1,48 +0,0 @@
package handler
import (
"context"
"github.com/micro/micro/v3/service/auth"
"gorm.io/gorm"
"github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/logger"
pb "github.com/micro/services/threads/proto"
)
// Read a conversation using its ID, can filter using group ID if provided
func (s *Threads) ReadConversation(ctx context.Context, req *pb.ReadConversationRequest, rsp *pb.ReadConversationResponse) error {
_, ok := auth.AccountFromContext(ctx)
if !ok {
errors.Unauthorized("UNAUTHORIZED", "Unauthorized")
}
// validate the request
if len(req.Id) == 0 {
return ErrMissingID
}
// construct the query
q := Conversation{ID: req.Id}
if req.GroupId != nil {
q.GroupID = req.GroupId.Value
}
db, err := s.GetDBConn(ctx)
if err != nil {
logger.Errorf("Error connecting to DB: %v", err)
return errors.InternalServerError("DB_ERROR", "Error connecting to DB")
}
// execute the query
var conv Conversation
if err := db.Where(&q).First(&conv).Error; err == gorm.ErrRecordNotFound {
return ErrNotFound
} else if err != nil {
logger.Errorf("Error reading conversation: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
// serialize the response
rsp.Conversation = conv.Serialize()
return nil
}

View File

@@ -0,0 +1,46 @@
package handler
import (
"context"
"github.com/micro/micro/v3/service/auth"
"github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/logger"
"github.com/micro/services/pkg/model"
pb "github.com/micro/services/threads/proto"
)
// Read a thread using its ID, can filter using group ID if provided
func (s *Threads) ReadThread(ctx context.Context, req *pb.ReadThreadRequest, rsp *pb.ReadThreadResponse) error {
_, ok := auth.AccountFromContext(ctx)
if !ok {
errors.Unauthorized("UNAUTHORIZED", "Unauthorized")
}
// validate the request
if len(req.Id) == 0 {
return ErrMissingID
}
// construct the query
thread := &Thread{ID: req.Id}
var err error
if len(req.GroupId) > 0 {
thread.GroupID = req.GroupId
err = model.ReadIndex(ctx, thread)
} else {
err = model.Read(ctx, thread)
}
if err == model.ErrNotFound {
return ErrNotFound
} else if err != nil {
logger.Errorf("Error reading thread: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
// serialize the response
rsp.Thread = thread.Serialize()
return nil
}

View File

@@ -7,28 +7,27 @@ import (
"github.com/micro/services/threads/handler" "github.com/micro/services/threads/handler"
pb "github.com/micro/services/threads/proto" pb "github.com/micro/services/threads/proto"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/wrapperspb"
) )
func TestReadConversation(t *testing.T) { func TestReadThread(t *testing.T) {
h := testHandler(t) h := testHandler(t)
// seed some data // seed some data
var cRsp pb.CreateConversationResponse var cRsp pb.CreateThreadResponse
err := h.CreateConversation(microAccountCtx(), &pb.CreateConversationRequest{ err := h.CreateThread(microAccountCtx(), &pb.CreateThreadRequest{
Topic: "HelloWorld", GroupId: uuid.New().String(), Topic: "HelloWorld", GroupId: uuid.New().String(),
}, &cRsp) }, &cRsp)
if err != nil { if err != nil {
t.Fatalf("Error creating conversation: %v", err) t.Fatalf("Error creating thread: %v", err)
return return
} }
tt := []struct { tt := []struct {
Name string Name string
ID string ID string
GroupID *wrapperspb.StringValue GroupID string
Error error Error error
Result *pb.Conversation Result *pb.Thread
}{ }{
{ {
Name: "MissingID", Name: "MissingID",
@@ -41,28 +40,28 @@ func TestReadConversation(t *testing.T) {
}, },
{ {
Name: "FoundUsingIDOnly", Name: "FoundUsingIDOnly",
ID: cRsp.Conversation.Id, ID: cRsp.Thread.Id,
Result: cRsp.Conversation, Result: cRsp.Thread,
}, },
{ {
Name: "IncorrectGroupID", Name: "IncorrectGroupID",
ID: cRsp.Conversation.Id, ID: cRsp.Thread.Id,
Error: handler.ErrNotFound, Error: handler.ErrNotFound,
GroupID: &wrapperspb.StringValue{Value: uuid.New().String()}, GroupID: uuid.New().String(),
}, },
} }
for _, tc := range tt { for _, tc := range tt {
t.Run(tc.Name, func(t *testing.T) { t.Run(tc.Name, func(t *testing.T) {
var rsp pb.ReadConversationResponse var rsp pb.ReadThreadResponse
err := h.ReadConversation(microAccountCtx(), &pb.ReadConversationRequest{ err := h.ReadThread(microAccountCtx(), &pb.ReadThreadRequest{
Id: tc.ID, GroupId: tc.GroupID, Id: tc.ID, GroupId: tc.GroupID,
}, &rsp) }, &rsp)
assert.Equal(t, tc.Error, err) assert.Equal(t, tc.Error, err)
if tc.Result == nil { if tc.Result == nil {
assert.Nil(t, rsp.Conversation) assert.Nil(t, rsp.Thread)
} else { } else {
assertConversationsMatch(t, tc.Result, rsp.Conversation) assertThreadsMatch(t, tc.Result, rsp.Thread)
} }
}) })
} }

View File

@@ -6,54 +6,42 @@ import (
"github.com/micro/micro/v3/service/auth" "github.com/micro/micro/v3/service/auth"
"github.com/micro/micro/v3/service/errors" "github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/logger" "github.com/micro/micro/v3/service/logger"
"github.com/micro/services/pkg/model"
pb "github.com/micro/services/threads/proto" pb "github.com/micro/services/threads/proto"
"gorm.io/gorm"
) )
// RecentMessages returns the most recent messages in a group of conversations. By default the // RecentMessages returns the most recent messages in a group of threads. By default the
// most messages retrieved per conversation is 25, however this can be overriden using the // most messages retrieved per thread is 25, however this can be overriden using the
// limit_per_conversation option // limit_per_thread option
func (s *Threads) RecentMessages(ctx context.Context, req *pb.RecentMessagesRequest, rsp *pb.RecentMessagesResponse) error { func (s *Threads) RecentMessages(ctx context.Context, req *pb.RecentMessagesRequest, rsp *pb.RecentMessagesResponse) error {
_, ok := auth.AccountFromContext(ctx) _, ok := auth.AccountFromContext(ctx)
if !ok { if !ok {
errors.Unauthorized("UNAUTHORIZED", "Unauthorized") errors.Unauthorized("UNAUTHORIZED", "Unauthorized")
} }
// validate the request // validate the request
if len(req.ConversationIds) == 0 { if len(req.ThreadIds) == 0 {
return ErrMissingConversationIDs return ErrMissingThreadIDs
} }
limit := DefaultLimit limit := uint(DefaultLimit)
if req.LimitPerConversation != nil { if req.LimitPerThread > 0 {
limit = int(req.LimitPerConversation.Value) limit = uint(req.LimitPerThread)
} }
db, err := s.GetDBConn(ctx) for _, thread := range req.ThreadIds {
if err != nil { q := model.Query{Limit: limit, Order: "desc"}
logger.Errorf("Error connecting to DB: %v", err) m := &Message{ThreadID: thread}
return errors.InternalServerError("DB_ERROR", "Error connecting to DB") var messages []*Message
}
// query the database if err := model.List(ctx, m, &messages, q); err != nil {
var msgs []Message logger.Errorf("Error reading messages: %v", err)
err = db.Transaction(func(tx *gorm.DB) error { return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
for _, id := range req.ConversationIds { }
var cms []Message
if err := tx.Where(&Message{ConversationID: id}).Order("sent_at DESC").Limit(limit).Find(&cms).Error; err != nil { for _, msg := range messages {
return err rsp.Messages = append(rsp.Messages, msg.Serialize())
}
msgs = append(msgs, cms...)
} }
return nil
})
if err != nil {
logger.Errorf("Error reading messages: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
} }
// serialize the response
rsp.Messages = make([]*pb.Message, len(msgs))
for i, m := range msgs {
rsp.Messages[i] = m.Serialize()
}
return nil return nil
} }

View File

@@ -9,7 +9,6 @@ import (
"github.com/micro/services/threads/handler" "github.com/micro/services/threads/handler"
pb "github.com/micro/services/threads/proto" pb "github.com/micro/services/threads/proto"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/wrapperspb"
) )
func TestRecentMessages(t *testing.T) { func TestRecentMessages(t *testing.T) {
@@ -20,8 +19,8 @@ func TestRecentMessages(t *testing.T) {
ids := make([]string, 3) ids := make([]string, 3)
convos := make(map[string][]*pb.Message, 3) convos := make(map[string][]*pb.Message, 3)
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
var convRsp pb.CreateConversationResponse var convRsp pb.CreateThreadResponse
err := h.CreateConversation(microAccountCtx(), &pb.CreateConversationRequest{ err := h.CreateThread(microAccountCtx(), &pb.CreateThreadRequest{
Topic: "TestRecentMessages", GroupId: uuid.New().String(), Topic: "TestRecentMessages", GroupId: uuid.New().String(),
}, &convRsp) }, &convRsp)
assert.NoError(t, err) assert.NoError(t, err)
@@ -29,33 +28,33 @@ func TestRecentMessages(t *testing.T) {
return return
} }
convos[convRsp.Conversation.Id] = make([]*pb.Message, 50) convos[convRsp.Thread.Id] = make([]*pb.Message, 50)
ids[i] = convRsp.Conversation.Id ids[i] = convRsp.Thread.Id
for j := 0; j < 50; j++ { for j := 0; j < 50; j++ {
var rsp pb.CreateMessageResponse var rsp pb.CreateMessageResponse
err := h.CreateMessage(microAccountCtx(), &pb.CreateMessageRequest{ err := h.CreateMessage(microAccountCtx(), &pb.CreateMessageRequest{
ConversationId: convRsp.Conversation.Id, ThreadId: convRsp.Thread.Id,
AuthorId: uuid.New().String(), AuthorId: uuid.New().String(),
Text: fmt.Sprintf("Conversation %v, Message %v", i, j), Text: fmt.Sprintf("Thread %v, Message %v", i, j),
}, &rsp) }, &rsp)
assert.NoError(t, err) assert.NoError(t, err)
convos[convRsp.Conversation.Id][j] = rsp.Message convos[convRsp.Thread.Id][j] = rsp.Message
} }
} }
t.Run("MissingConversationIDs", func(t *testing.T) { t.Run("MissingThreadIDs", func(t *testing.T) {
var rsp pb.RecentMessagesResponse var rsp pb.RecentMessagesResponse
err := h.RecentMessages(microAccountCtx(), &pb.RecentMessagesRequest{}, &rsp) err := h.RecentMessages(microAccountCtx(), &pb.RecentMessagesRequest{}, &rsp)
assert.Equal(t, handler.ErrMissingConversationIDs, err) assert.Equal(t, handler.ErrMissingThreadIDs, err)
assert.Nil(t, rsp.Messages) assert.Nil(t, rsp.Messages)
}) })
t.Run("LimitSet", func(t *testing.T) { t.Run("LimitSet", func(t *testing.T) {
var rsp pb.RecentMessagesResponse var rsp pb.RecentMessagesResponse
err := h.RecentMessages(microAccountCtx(), &pb.RecentMessagesRequest{ err := h.RecentMessages(microAccountCtx(), &pb.RecentMessagesRequest{
ConversationIds: ids, ThreadIds: ids,
LimitPerConversation: &wrapperspb.Int32Value{Value: 10}, LimitPerThread: 10,
}, &rsp) }, &rsp)
assert.NoError(t, err) assert.NoError(t, err)
@@ -79,7 +78,7 @@ func TestRecentMessages(t *testing.T) {
var rsp pb.RecentMessagesResponse var rsp pb.RecentMessagesResponse
err := h.RecentMessages(microAccountCtx(), &pb.RecentMessagesRequest{ err := h.RecentMessages(microAccountCtx(), &pb.RecentMessagesRequest{
ConversationIds: reducedIDs, ThreadIds: reducedIDs,
}, &rsp) }, &rsp)
assert.NoError(t, err) assert.NoError(t, err)

View File

@@ -1,60 +0,0 @@
package handler
import (
"time"
"github.com/micro/micro/v3/service/errors"
gorm2 "github.com/micro/services/pkg/gorm"
pb "github.com/micro/services/threads/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)
var (
ErrMissingID = errors.BadRequest("MISSING_ID", "Missing ID")
ErrMissingGroupID = errors.BadRequest("MISSING_GROUP_ID", "Missing GroupID")
ErrMissingTopic = errors.BadRequest("MISSING_TOPIC", "Missing Topic")
ErrMissingAuthorID = errors.BadRequest("MISSING_AUTHOR_ID", "Missing Author ID")
ErrMissingText = errors.BadRequest("MISSING_TEXT", "Missing text")
ErrMissingConversationID = errors.BadRequest("MISSING_CONVERSATION_ID", "Missing Conversation ID")
ErrMissingConversationIDs = errors.BadRequest("MISSING_CONVERSATION_IDS", "One or more Conversation IDs are required")
ErrNotFound = errors.NotFound("NOT_FOUND", "Conversation not found")
)
type Threads struct {
gorm2.Helper
Time func() time.Time
}
type Message struct {
ID string
AuthorID string
ConversationID string
Text string
SentAt time.Time
}
func (m *Message) Serialize() *pb.Message {
return &pb.Message{
Id: m.ID,
AuthorId: m.AuthorID,
ConversationId: m.ConversationID,
Text: m.Text,
SentAt: timestamppb.New(m.SentAt),
}
}
type Conversation struct {
ID string
GroupID string
Topic string
CreatedAt time.Time
}
func (c *Conversation) Serialize() *pb.Conversation {
return &pb.Conversation{
Id: c.ID,
GroupId: c.GroupID,
Topic: c.Topic,
CreatedAt: timestamppb.New(c.CreatedAt),
}
}

View File

@@ -1,51 +0,0 @@
package handler
import (
"context"
"github.com/micro/micro/v3/service/auth"
"github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/logger"
pb "github.com/micro/services/threads/proto"
"gorm.io/gorm"
)
// Update a conversations topic
func (s *Threads) UpdateConversation(ctx context.Context, req *pb.UpdateConversationRequest, rsp *pb.UpdateConversationResponse) error {
_, ok := auth.AccountFromContext(ctx)
if !ok {
errors.Unauthorized("UNAUTHORIZED", "Unauthorized")
}
// validate the request
if len(req.Id) == 0 {
return ErrMissingID
}
if len(req.Topic) == 0 {
return ErrMissingTopic
}
db, err := s.GetDBConn(ctx)
if err != nil {
logger.Errorf("Error connecting to DB: %v", err)
return errors.InternalServerError("DB_ERROR", "Error connecting to DB")
}
// lookup the conversation
var conv Conversation
if err := db.Where(&Conversation{ID: req.Id}).First(&conv).Error; err == gorm.ErrRecordNotFound {
return ErrNotFound
} else if err != nil {
logger.Errorf("Error reading conversation: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
// update the conversation
conv.Topic = req.Topic
if err := db.Save(&conv).Error; err != nil {
logger.Errorf("Error updating conversation: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
// serialize the result
rsp.Conversation = conv.Serialize()
return nil
}

View File

@@ -1,65 +0,0 @@
package handler_test
import (
"testing"
"github.com/google/uuid"
"github.com/micro/services/threads/handler"
pb "github.com/micro/services/threads/proto"
"github.com/stretchr/testify/assert"
)
func TestUpdateConversation(t *testing.T) {
h := testHandler(t)
// seed some data
var cRsp pb.CreateConversationResponse
err := h.CreateConversation(microAccountCtx(), &pb.CreateConversationRequest{
Topic: "HelloWorld", GroupId: uuid.New().String(),
}, &cRsp)
if err != nil {
t.Fatalf("Error creating conversation: %v", err)
return
}
t.Run("MissingID", func(t *testing.T) {
err := h.UpdateConversation(microAccountCtx(), &pb.UpdateConversationRequest{
Topic: "NewTopic",
}, &pb.UpdateConversationResponse{})
assert.Equal(t, handler.ErrMissingID, err)
})
t.Run("MissingTopic", func(t *testing.T) {
err := h.UpdateConversation(microAccountCtx(), &pb.UpdateConversationRequest{
Id: uuid.New().String(),
}, &pb.UpdateConversationResponse{})
assert.Equal(t, handler.ErrMissingTopic, err)
})
t.Run("InvalidID", func(t *testing.T) {
err := h.UpdateConversation(microAccountCtx(), &pb.UpdateConversationRequest{
Id: uuid.New().String(),
Topic: "NewTopic",
}, &pb.UpdateConversationResponse{})
assert.Equal(t, handler.ErrNotFound, err)
})
t.Run("Valid", func(t *testing.T) {
err := h.UpdateConversation(microAccountCtx(), &pb.UpdateConversationRequest{
Id: cRsp.Conversation.Id,
Topic: "NewTopic",
}, &pb.UpdateConversationResponse{})
assert.NoError(t, err)
var rsp pb.ReadConversationResponse
err = h.ReadConversation(microAccountCtx(), &pb.ReadConversationRequest{
Id: cRsp.Conversation.Id,
}, &rsp)
assert.NoError(t, err)
if rsp.Conversation == nil {
t.Fatal("No conversation returned")
return
}
assert.Equal(t, "NewTopic", rsp.Conversation.Topic)
})
}

View File

@@ -0,0 +1,47 @@
package handler
import (
"context"
"github.com/micro/micro/v3/service/auth"
"github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/logger"
"github.com/micro/services/pkg/model"
pb "github.com/micro/services/threads/proto"
)
// Update a threads topic
func (s *Threads) UpdateThread(ctx context.Context, req *pb.UpdateThreadRequest, rsp *pb.UpdateThreadResponse) error {
_, ok := auth.AccountFromContext(ctx)
if !ok {
errors.Unauthorized("UNAUTHORIZED", "Unauthorized")
}
// validate the request
if len(req.Id) == 0 {
return ErrMissingID
}
if len(req.Topic) == 0 {
return ErrMissingTopic
}
t := &Thread{ID: req.Id}
if err := model.Read(ctx, t); err == model.ErrNotFound {
return ErrNotFound
} else if err != nil {
logger.Errorf("Error reading thread: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
// update the thread
t.Topic = req.Topic
if err := model.Update(ctx, t); err != nil {
logger.Errorf("Error updating thread: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error connecting to database")
}
// serialize the result
rsp.Thread = t.Serialize()
return nil
}

View File

@@ -0,0 +1,65 @@
package handler_test
import (
"testing"
"github.com/google/uuid"
"github.com/micro/services/threads/handler"
pb "github.com/micro/services/threads/proto"
"github.com/stretchr/testify/assert"
)
func TestUpdateThread(t *testing.T) {
h := testHandler(t)
// seed some data
var cRsp pb.CreateThreadResponse
err := h.CreateThread(microAccountCtx(), &pb.CreateThreadRequest{
Topic: "HelloWorld", GroupId: uuid.New().String(),
}, &cRsp)
if err != nil {
t.Fatalf("Error creating thread: %v", err)
return
}
t.Run("MissingID", func(t *testing.T) {
err := h.UpdateThread(microAccountCtx(), &pb.UpdateThreadRequest{
Topic: "NewTopic",
}, &pb.UpdateThreadResponse{})
assert.Equal(t, handler.ErrMissingID, err)
})
t.Run("MissingTopic", func(t *testing.T) {
err := h.UpdateThread(microAccountCtx(), &pb.UpdateThreadRequest{
Id: uuid.New().String(),
}, &pb.UpdateThreadResponse{})
assert.Equal(t, handler.ErrMissingTopic, err)
})
t.Run("InvalidID", func(t *testing.T) {
err := h.UpdateThread(microAccountCtx(), &pb.UpdateThreadRequest{
Id: uuid.New().String(),
Topic: "NewTopic",
}, &pb.UpdateThreadResponse{})
assert.Equal(t, handler.ErrNotFound, err)
})
t.Run("Valid", func(t *testing.T) {
err := h.UpdateThread(microAccountCtx(), &pb.UpdateThreadRequest{
Id: cRsp.Thread.Id,
Topic: "NewTopic",
}, &pb.UpdateThreadResponse{})
assert.NoError(t, err)
var rsp pb.ReadThreadResponse
err = h.ReadThread(microAccountCtx(), &pb.ReadThreadRequest{
Id: cRsp.Thread.Id,
}, &rsp)
assert.NoError(t, err)
if rsp.Thread == nil {
t.Fatal("No thread returned")
return
}
assert.Equal(t, "NewTopic", rsp.Thread.Topic)
})
}

View File

@@ -1,21 +1,15 @@
package main package main
import ( import (
"database/sql"
"time" "time"
"github.com/micro/services/threads/handler" "github.com/micro/services/threads/handler"
pb "github.com/micro/services/threads/proto" pb "github.com/micro/services/threads/proto"
"github.com/micro/micro/v3/service" "github.com/micro/micro/v3/service"
"github.com/micro/micro/v3/service/config"
"github.com/micro/micro/v3/service/logger" "github.com/micro/micro/v3/service/logger"
_ "github.com/jackc/pgx/v4/stdlib"
) )
var dbAddress = "postgresql://postgres:postgres@localhost:5432/threads?sslmode=disable"
func main() { func main() {
// Create service // Create service
srv := service.New( srv := service.New(
@@ -23,19 +17,7 @@ func main() {
service.Version("latest"), service.Version("latest"),
) )
// Connect to the database
cfg, err := config.Get("threads.database")
if err != nil {
logger.Fatalf("Error loading config: %v", err)
}
addr := cfg.String(dbAddress)
sqlDB, err := sql.Open("pgx", addr)
if err != nil {
logger.Fatalf("Failed to open connection to DB %s", err)
}
h := &handler.Threads{Time: time.Now} h := &handler.Threads{Time: time.Now}
h.DBConn(sqlDB).Migrations(&handler.Conversation{}, &handler.Message{})
// Register handler // Register handler
pb.RegisterThreadsHandler(srv.Server(), h) pb.RegisterThreadsHandler(srv.Server(), h)

File diff suppressed because it is too large Load Diff

View File

@@ -6,8 +6,6 @@ package threads
import ( import (
fmt "fmt" fmt "fmt"
proto "github.com/golang/protobuf/proto" proto "github.com/golang/protobuf/proto"
_ "google.golang.org/protobuf/types/known/timestamppb"
_ "google.golang.org/protobuf/types/known/wrapperspb"
math "math" math "math"
) )
@@ -44,24 +42,24 @@ func NewThreadsEndpoints() []*api.Endpoint {
// Client API for Threads service // Client API for Threads service
type ThreadsService interface { type ThreadsService interface {
// Create a conversation // Create a thread
CreateConversation(ctx context.Context, in *CreateConversationRequest, opts ...client.CallOption) (*CreateConversationResponse, error) CreateThread(ctx context.Context, in *CreateThreadRequest, opts ...client.CallOption) (*CreateThreadResponse, error)
// Read a conversation using its ID, can filter using group ID if provided // Read a thread using its ID, can filter using group ID if provided
ReadConversation(ctx context.Context, in *ReadConversationRequest, opts ...client.CallOption) (*ReadConversationResponse, error) ReadThread(ctx context.Context, in *ReadThreadRequest, opts ...client.CallOption) (*ReadThreadResponse, error)
// Update a conversations topic // Update a threads topic
UpdateConversation(ctx context.Context, in *UpdateConversationRequest, opts ...client.CallOption) (*UpdateConversationResponse, error) UpdateThread(ctx context.Context, in *UpdateThreadRequest, opts ...client.CallOption) (*UpdateThreadResponse, error)
// Delete a conversation and all the messages within // Delete a thread and all the messages within
DeleteConversation(ctx context.Context, in *DeleteConversationRequest, opts ...client.CallOption) (*DeleteConversationResponse, error) DeleteThread(ctx context.Context, in *DeleteThreadRequest, opts ...client.CallOption) (*DeleteThreadResponse, error)
// List all the conversations for a group // List all the threads for a group
ListConversations(ctx context.Context, in *ListConversationsRequest, opts ...client.CallOption) (*ListConversationsResponse, error) ListThreads(ctx context.Context, in *ListThreadsRequest, opts ...client.CallOption) (*ListThreadsResponse, error)
// Create a message within a conversation // Create a message within a thread
CreateMessage(ctx context.Context, in *CreateMessageRequest, opts ...client.CallOption) (*CreateMessageResponse, error) CreateMessage(ctx context.Context, in *CreateMessageRequest, opts ...client.CallOption) (*CreateMessageResponse, error)
// List the messages within a conversation in reverse chronological order, using sent_before to // List the messages within a thread in reverse chronological order, using sent_before to
// offset as older messages need to be loaded // offset as older messages need to be loaded
ListMessages(ctx context.Context, in *ListMessagesRequest, opts ...client.CallOption) (*ListMessagesResponse, error) ListMessages(ctx context.Context, in *ListMessagesRequest, opts ...client.CallOption) (*ListMessagesResponse, error)
// RecentMessages returns the most recent messages in a group of conversations. By default the // RecentMessages returns the most recent messages in a group of threads. By default the
// most messages retrieved per conversation is 25, however this can be overriden using the // most messages retrieved per thread is 25, however this can be overriden using the
// limit_per_conversation option // limit_per_thread option
RecentMessages(ctx context.Context, in *RecentMessagesRequest, opts ...client.CallOption) (*RecentMessagesResponse, error) RecentMessages(ctx context.Context, in *RecentMessagesRequest, opts ...client.CallOption) (*RecentMessagesResponse, error)
} }
@@ -77,9 +75,9 @@ func NewThreadsService(name string, c client.Client) ThreadsService {
} }
} }
func (c *threadsService) CreateConversation(ctx context.Context, in *CreateConversationRequest, opts ...client.CallOption) (*CreateConversationResponse, error) { func (c *threadsService) CreateThread(ctx context.Context, in *CreateThreadRequest, opts ...client.CallOption) (*CreateThreadResponse, error) {
req := c.c.NewRequest(c.name, "Threads.CreateConversation", in) req := c.c.NewRequest(c.name, "Threads.CreateThread", in)
out := new(CreateConversationResponse) out := new(CreateThreadResponse)
err := c.c.Call(ctx, req, out, opts...) err := c.c.Call(ctx, req, out, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -87,9 +85,9 @@ func (c *threadsService) CreateConversation(ctx context.Context, in *CreateConve
return out, nil return out, nil
} }
func (c *threadsService) ReadConversation(ctx context.Context, in *ReadConversationRequest, opts ...client.CallOption) (*ReadConversationResponse, error) { func (c *threadsService) ReadThread(ctx context.Context, in *ReadThreadRequest, opts ...client.CallOption) (*ReadThreadResponse, error) {
req := c.c.NewRequest(c.name, "Threads.ReadConversation", in) req := c.c.NewRequest(c.name, "Threads.ReadThread", in)
out := new(ReadConversationResponse) out := new(ReadThreadResponse)
err := c.c.Call(ctx, req, out, opts...) err := c.c.Call(ctx, req, out, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -97,9 +95,9 @@ func (c *threadsService) ReadConversation(ctx context.Context, in *ReadConversat
return out, nil return out, nil
} }
func (c *threadsService) UpdateConversation(ctx context.Context, in *UpdateConversationRequest, opts ...client.CallOption) (*UpdateConversationResponse, error) { func (c *threadsService) UpdateThread(ctx context.Context, in *UpdateThreadRequest, opts ...client.CallOption) (*UpdateThreadResponse, error) {
req := c.c.NewRequest(c.name, "Threads.UpdateConversation", in) req := c.c.NewRequest(c.name, "Threads.UpdateThread", in)
out := new(UpdateConversationResponse) out := new(UpdateThreadResponse)
err := c.c.Call(ctx, req, out, opts...) err := c.c.Call(ctx, req, out, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -107,9 +105,9 @@ func (c *threadsService) UpdateConversation(ctx context.Context, in *UpdateConve
return out, nil return out, nil
} }
func (c *threadsService) DeleteConversation(ctx context.Context, in *DeleteConversationRequest, opts ...client.CallOption) (*DeleteConversationResponse, error) { func (c *threadsService) DeleteThread(ctx context.Context, in *DeleteThreadRequest, opts ...client.CallOption) (*DeleteThreadResponse, error) {
req := c.c.NewRequest(c.name, "Threads.DeleteConversation", in) req := c.c.NewRequest(c.name, "Threads.DeleteThread", in)
out := new(DeleteConversationResponse) out := new(DeleteThreadResponse)
err := c.c.Call(ctx, req, out, opts...) err := c.c.Call(ctx, req, out, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -117,9 +115,9 @@ func (c *threadsService) DeleteConversation(ctx context.Context, in *DeleteConve
return out, nil return out, nil
} }
func (c *threadsService) ListConversations(ctx context.Context, in *ListConversationsRequest, opts ...client.CallOption) (*ListConversationsResponse, error) { func (c *threadsService) ListThreads(ctx context.Context, in *ListThreadsRequest, opts ...client.CallOption) (*ListThreadsResponse, error) {
req := c.c.NewRequest(c.name, "Threads.ListConversations", in) req := c.c.NewRequest(c.name, "Threads.ListThreads", in)
out := new(ListConversationsResponse) out := new(ListThreadsResponse)
err := c.c.Call(ctx, req, out, opts...) err := c.c.Call(ctx, req, out, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -160,34 +158,34 @@ func (c *threadsService) RecentMessages(ctx context.Context, in *RecentMessagesR
// Server API for Threads service // Server API for Threads service
type ThreadsHandler interface { type ThreadsHandler interface {
// Create a conversation // Create a thread
CreateConversation(context.Context, *CreateConversationRequest, *CreateConversationResponse) error CreateThread(context.Context, *CreateThreadRequest, *CreateThreadResponse) error
// Read a conversation using its ID, can filter using group ID if provided // Read a thread using its ID, can filter using group ID if provided
ReadConversation(context.Context, *ReadConversationRequest, *ReadConversationResponse) error ReadThread(context.Context, *ReadThreadRequest, *ReadThreadResponse) error
// Update a conversations topic // Update a threads topic
UpdateConversation(context.Context, *UpdateConversationRequest, *UpdateConversationResponse) error UpdateThread(context.Context, *UpdateThreadRequest, *UpdateThreadResponse) error
// Delete a conversation and all the messages within // Delete a thread and all the messages within
DeleteConversation(context.Context, *DeleteConversationRequest, *DeleteConversationResponse) error DeleteThread(context.Context, *DeleteThreadRequest, *DeleteThreadResponse) error
// List all the conversations for a group // List all the threads for a group
ListConversations(context.Context, *ListConversationsRequest, *ListConversationsResponse) error ListThreads(context.Context, *ListThreadsRequest, *ListThreadsResponse) error
// Create a message within a conversation // Create a message within a thread
CreateMessage(context.Context, *CreateMessageRequest, *CreateMessageResponse) error CreateMessage(context.Context, *CreateMessageRequest, *CreateMessageResponse) error
// List the messages within a conversation in reverse chronological order, using sent_before to // List the messages within a thread in reverse chronological order, using sent_before to
// offset as older messages need to be loaded // offset as older messages need to be loaded
ListMessages(context.Context, *ListMessagesRequest, *ListMessagesResponse) error ListMessages(context.Context, *ListMessagesRequest, *ListMessagesResponse) error
// RecentMessages returns the most recent messages in a group of conversations. By default the // RecentMessages returns the most recent messages in a group of threads. By default the
// most messages retrieved per conversation is 25, however this can be overriden using the // most messages retrieved per thread is 25, however this can be overriden using the
// limit_per_conversation option // limit_per_thread option
RecentMessages(context.Context, *RecentMessagesRequest, *RecentMessagesResponse) error RecentMessages(context.Context, *RecentMessagesRequest, *RecentMessagesResponse) error
} }
func RegisterThreadsHandler(s server.Server, hdlr ThreadsHandler, opts ...server.HandlerOption) error { func RegisterThreadsHandler(s server.Server, hdlr ThreadsHandler, opts ...server.HandlerOption) error {
type threads interface { type threads interface {
CreateConversation(ctx context.Context, in *CreateConversationRequest, out *CreateConversationResponse) error CreateThread(ctx context.Context, in *CreateThreadRequest, out *CreateThreadResponse) error
ReadConversation(ctx context.Context, in *ReadConversationRequest, out *ReadConversationResponse) error ReadThread(ctx context.Context, in *ReadThreadRequest, out *ReadThreadResponse) error
UpdateConversation(ctx context.Context, in *UpdateConversationRequest, out *UpdateConversationResponse) error UpdateThread(ctx context.Context, in *UpdateThreadRequest, out *UpdateThreadResponse) error
DeleteConversation(ctx context.Context, in *DeleteConversationRequest, out *DeleteConversationResponse) error DeleteThread(ctx context.Context, in *DeleteThreadRequest, out *DeleteThreadResponse) error
ListConversations(ctx context.Context, in *ListConversationsRequest, out *ListConversationsResponse) error ListThreads(ctx context.Context, in *ListThreadsRequest, out *ListThreadsResponse) error
CreateMessage(ctx context.Context, in *CreateMessageRequest, out *CreateMessageResponse) error CreateMessage(ctx context.Context, in *CreateMessageRequest, out *CreateMessageResponse) error
ListMessages(ctx context.Context, in *ListMessagesRequest, out *ListMessagesResponse) error ListMessages(ctx context.Context, in *ListMessagesRequest, out *ListMessagesResponse) error
RecentMessages(ctx context.Context, in *RecentMessagesRequest, out *RecentMessagesResponse) error RecentMessages(ctx context.Context, in *RecentMessagesRequest, out *RecentMessagesResponse) error
@@ -203,24 +201,24 @@ type threadsHandler struct {
ThreadsHandler ThreadsHandler
} }
func (h *threadsHandler) CreateConversation(ctx context.Context, in *CreateConversationRequest, out *CreateConversationResponse) error { func (h *threadsHandler) CreateThread(ctx context.Context, in *CreateThreadRequest, out *CreateThreadResponse) error {
return h.ThreadsHandler.CreateConversation(ctx, in, out) return h.ThreadsHandler.CreateThread(ctx, in, out)
} }
func (h *threadsHandler) ReadConversation(ctx context.Context, in *ReadConversationRequest, out *ReadConversationResponse) error { func (h *threadsHandler) ReadThread(ctx context.Context, in *ReadThreadRequest, out *ReadThreadResponse) error {
return h.ThreadsHandler.ReadConversation(ctx, in, out) return h.ThreadsHandler.ReadThread(ctx, in, out)
} }
func (h *threadsHandler) UpdateConversation(ctx context.Context, in *UpdateConversationRequest, out *UpdateConversationResponse) error { func (h *threadsHandler) UpdateThread(ctx context.Context, in *UpdateThreadRequest, out *UpdateThreadResponse) error {
return h.ThreadsHandler.UpdateConversation(ctx, in, out) return h.ThreadsHandler.UpdateThread(ctx, in, out)
} }
func (h *threadsHandler) DeleteConversation(ctx context.Context, in *DeleteConversationRequest, out *DeleteConversationResponse) error { func (h *threadsHandler) DeleteThread(ctx context.Context, in *DeleteThreadRequest, out *DeleteThreadResponse) error {
return h.ThreadsHandler.DeleteConversation(ctx, in, out) return h.ThreadsHandler.DeleteThread(ctx, in, out)
} }
func (h *threadsHandler) ListConversations(ctx context.Context, in *ListConversationsRequest, out *ListConversationsResponse) error { func (h *threadsHandler) ListThreads(ctx context.Context, in *ListThreadsRequest, out *ListThreadsResponse) error {
return h.ThreadsHandler.ListConversations(ctx, in, out) return h.ThreadsHandler.ListThreads(ctx, in, out)
} }
func (h *threadsHandler) CreateMessage(ctx context.Context, in *CreateMessageRequest, out *CreateMessageResponse) error { func (h *threadsHandler) CreateMessage(ctx context.Context, in *CreateMessageRequest, out *CreateMessageResponse) error {

View File

@@ -1,91 +1,90 @@
syntax = "proto3"; syntax = "proto3";
package threads; package threads;
option go_package = "./proto;threads"; option go_package = "./proto;threads";
import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";
service Threads { service Threads {
// Create a conversation // Create a thread
rpc CreateConversation(CreateConversationRequest) returns (CreateConversationResponse); rpc CreateThread(CreateThreadRequest) returns (CreateThreadResponse);
// Read a conversation using its ID, can filter using group ID if provided // Read a thread using its ID, can filter using group ID if provided
rpc ReadConversation(ReadConversationRequest) returns (ReadConversationResponse); rpc ReadThread(ReadThreadRequest) returns (ReadThreadResponse);
// Update a conversations topic // Update a threads topic
rpc UpdateConversation(UpdateConversationRequest) returns (UpdateConversationResponse); rpc UpdateThread(UpdateThreadRequest) returns (UpdateThreadResponse);
// Delete a conversation and all the messages within // Delete a thread and all the messages within
rpc DeleteConversation(DeleteConversationRequest) returns (DeleteConversationResponse); rpc DeleteThread(DeleteThreadRequest) returns (DeleteThreadResponse);
// List all the conversations for a group // List all the threads for a group
rpc ListConversations(ListConversationsRequest) returns (ListConversationsResponse); rpc ListThreads(ListThreadsRequest) returns (ListThreadsResponse);
// Create a message within a conversation // Create a message within a thread
rpc CreateMessage(CreateMessageRequest) returns (CreateMessageResponse); rpc CreateMessage(CreateMessageRequest) returns (CreateMessageResponse);
// List the messages within a conversation in reverse chronological order, using sent_before to // List the messages within a thread in reverse chronological order, using sent_before to
// offset as older messages need to be loaded // offset as older messages need to be loaded
rpc ListMessages(ListMessagesRequest) returns (ListMessagesResponse); rpc ListMessages(ListMessagesRequest) returns (ListMessagesResponse);
// RecentMessages returns the most recent messages in a group of conversations. By default the // RecentMessages returns the most recent messages in a group of threads. By default the
// most messages retrieved per conversation is 25, however this can be overriden using the // most messages retrieved per thread is 25, however this can be overriden using the
// limit_per_conversation option // limit_per_thread option
rpc RecentMessages(RecentMessagesRequest) returns (RecentMessagesResponse); rpc RecentMessages(RecentMessagesRequest) returns (RecentMessagesResponse);
} }
message Conversation { message Thread {
string id = 1; string id = 1;
string group_id = 2; string group_id = 2;
string topic = 3; string topic = 3;
google.protobuf.Timestamp created_at = 4; string created_at = 4;
} }
message Message { message Message {
string id = 1; string id = 1;
string author_id = 2; string author_id = 2;
string conversation_id = 3; string thread_id = 3;
string text = 4; string text = 4;
google.protobuf.Timestamp sent_at = 5; string sent_at = 5;
} }
message CreateConversationRequest { message CreateThreadRequest {
string group_id = 1; string group_id = 1;
string topic = 2; string topic = 2;
} }
message CreateConversationResponse { message CreateThreadResponse {
Conversation conversation = 1; Thread thread = 1;
} }
message ReadConversationRequest { message ReadThreadRequest {
string id = 1; string id = 1;
google.protobuf.StringValue group_id = 2; string group_id = 2;
} }
message ReadConversationResponse { message ReadThreadResponse {
Conversation conversation = 1; Thread thread = 1;
} }
message ListConversationsRequest { message ListThreadsRequest {
string group_id = 1; string group_id = 1;
} }
message ListConversationsResponse { message ListThreadsResponse {
repeated Conversation conversations = 1; repeated Thread threads = 1;
} }
message UpdateConversationRequest { message UpdateThreadRequest {
string id = 1; string id = 1;
string topic = 2; string topic = 2;
} }
message UpdateConversationResponse { message UpdateThreadResponse {
Conversation conversation = 1; Thread thread = 1;
} }
message DeleteConversationRequest { message DeleteThreadRequest {
string id = 1; string id = 1;
} }
message DeleteConversationResponse {} message DeleteThreadResponse {}
message CreateMessageRequest { message CreateMessageRequest {
string id = 1; string id = 1;
string conversation_id = 2; string thread_id = 2;
string author_id = 3; string author_id = 3;
string text = 4; string text = 4;
} }
@@ -95,9 +94,10 @@ message CreateMessageResponse {
} }
message ListMessagesRequest { message ListMessagesRequest {
string conversation_id = 1; string thread_id = 1;
google.protobuf.Timestamp sent_before = 2; int64 limit = 2;
google.protobuf.Int32Value limit = 3; int64 offset = 3;
string order = 4;
} }
message ListMessagesResponse { message ListMessagesResponse {
@@ -105,8 +105,8 @@ message ListMessagesResponse {
} }
message RecentMessagesRequest { message RecentMessagesRequest {
repeated string conversation_ids = 1; repeated string thread_ids = 1;
google.protobuf.Int32Value limit_per_conversation = 2; int64 limit_per_thread = 2;
} }
message RecentMessagesResponse { message RecentMessagesResponse {