mirror of
https://github.com/kevin-DL/services.git
synced 2026-01-18 13:45:09 +00:00
reship events
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/micro/micro/v3/service/errors"
|
||||
"github.com/micro/micro/v3/service/events"
|
||||
@@ -52,9 +53,17 @@ func (s *Event) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream
|
||||
|
||||
// check if a group os provided
|
||||
opts := []events.ConsumeOption{}
|
||||
offset := time.Now()
|
||||
if len(req.Group) > 0 {
|
||||
opts = append(opts, events.WithGroup(req.Group))
|
||||
}
|
||||
if len(req.Offset) > 0 {
|
||||
t, err := time.Parse(time.RFC3339Nano, req.Offset)
|
||||
if err == nil {
|
||||
offset = t
|
||||
}
|
||||
}
|
||||
opts = append(opts, events.WithOffset(offset))
|
||||
|
||||
sub, err := events.Consume(topic, opts...)
|
||||
if err != nil {
|
||||
@@ -69,8 +78,10 @@ func (s *Event) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream
|
||||
d.UnmarshalJSON(msg.Payload)
|
||||
|
||||
if err := stream.Send(&pb.SubscribeResponse{
|
||||
Topic: req.Topic,
|
||||
Message: d,
|
||||
Topic: req.Topic,
|
||||
Id: msg.ID,
|
||||
Timestamp: msg.Timestamp.Format(time.RFC3339Nano),
|
||||
Message: d,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -78,3 +89,48 @@ func (s *Event) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Event) Read(ctx context.Context, req *pb.ReadRequest, rsp *pb.ReadResponse) error {
|
||||
if len(req.Topic) == 0 {
|
||||
return errors.BadRequest("event.read", "topic is blank")
|
||||
}
|
||||
|
||||
id, ok := tenant.FromContext(ctx)
|
||||
if !ok {
|
||||
id = "default"
|
||||
}
|
||||
|
||||
// create tenant based topics
|
||||
topic := path.Join("event", id, req.Topic)
|
||||
|
||||
log.Infof("Tenant %v reading %v\n", id, req.Topic)
|
||||
limit := uint(25)
|
||||
offset := uint(0)
|
||||
|
||||
if req.Limit > 0 {
|
||||
limit = uint(req.Limit)
|
||||
}
|
||||
|
||||
if req.Offset > 0 {
|
||||
offset = uint(req.Offset)
|
||||
}
|
||||
|
||||
events, err := events.Read(topic, events.ReadLimit(limit), events.ReadOffset(offset))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, ev := range events {
|
||||
// unmarshal the message into a struct
|
||||
d := &structpb.Struct{}
|
||||
d.UnmarshalJSON(ev.Payload)
|
||||
|
||||
rsp.Events = append(rsp.Events, &pb.Ev{
|
||||
Id: ev.ID,
|
||||
Timestamp: ev.Timestamp.Format(time.RFC3339Nano),
|
||||
Message: d,
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user