diff --git a/event/examples.json b/event/examples.json index f92df7f..d201e56 100644 --- a/event/examples.json +++ b/event/examples.json @@ -21,5 +21,18 @@ "topic": "events", "message": {"id": "1", "type": "signup", "user": "john"} } + }], + "read": [{ + "title": "Read events on a topic", + "description": "Read historic events sent to a topic", + "run_check": false, + "request": { + "topic": "user" + }, + "response": { + "events": [ + {"id": "123e4567-e89b-12d3-a456-426652340000", "message": {"id": "1", "type": "signup", "user": "john"}} + ] + } }] } diff --git a/event/handler/event.go b/event/handler/event.go index 3abb425..77793a9 100644 --- a/event/handler/event.go +++ b/event/handler/event.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "path" + "time" "github.com/micro/micro/v3/service/errors" "github.com/micro/micro/v3/service/events" @@ -52,9 +53,17 @@ func (s *Event) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream // check if a group os provided opts := []events.ConsumeOption{} + offset := time.Now() if len(req.Group) > 0 { opts = append(opts, events.WithGroup(req.Group)) } + if len(req.Offset) > 0 { + t, err := time.Parse(time.RFC3339Nano, req.Offset) + if err == nil { + offset = t + } + } + opts = append(opts, events.WithOffset(offset)) sub, err := events.Consume(topic, opts...) if err != nil { @@ -69,8 +78,10 @@ func (s *Event) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream d.UnmarshalJSON(msg.Payload) if err := stream.Send(&pb.SubscribeResponse{ - Topic: req.Topic, - Message: d, + Topic: req.Topic, + Id: msg.ID, + Timestamp: msg.Timestamp.Format(time.RFC3339Nano), + Message: d, }); err != nil { return err } @@ -78,3 +89,48 @@ func (s *Event) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream return nil } + +func (s *Event) Read(ctx context.Context, req *pb.ReadRequest, rsp *pb.ReadResponse) error { + if len(req.Topic) == 0 { + return errors.BadRequest("event.read", "topic is blank") + } + + id, ok := tenant.FromContext(ctx) + if !ok { + id = "default" + } + + // create tenant based topics + topic := path.Join("event", id, req.Topic) + + log.Infof("Tenant %v reading %v\n", id, req.Topic) + limit := uint(25) + offset := uint(0) + + if req.Limit > 0 { + limit = uint(req.Limit) + } + + if req.Offset > 0 { + offset = uint(req.Offset) + } + + events, err := events.Read(topic, events.ReadLimit(limit), events.ReadOffset(offset)) + if err != nil { + return err + } + + for _, ev := range events { + // unmarshal the message into a struct + d := &structpb.Struct{} + d.UnmarshalJSON(ev.Payload) + + rsp.Events = append(rsp.Events, &pb.Ev{ + Id: ev.ID, + Timestamp: ev.Timestamp.Format(time.RFC3339Nano), + Message: d, + }) + } + + return nil +} diff --git a/event/proto/event.pb.go b/event/proto/event.pb.go index b2ba09b..6243681 100644 --- a/event/proto/event.pb.go +++ b/event/proto/event.pb.go @@ -21,7 +21,73 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -// Publish a message to the event. Specify a topic to group messages for a specific topic. +type Ev struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // event id + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + // event timestamp + Timestamp string `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + // event message + Message *structpb.Struct `protobuf:"bytes,3,opt,name=message,proto3" json:"message,omitempty"` +} + +func (x *Ev) Reset() { + *x = Ev{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_event_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Ev) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Ev) ProtoMessage() {} + +func (x *Ev) ProtoReflect() protoreflect.Message { + mi := &file_proto_event_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Ev.ProtoReflect.Descriptor instead. +func (*Ev) Descriptor() ([]byte, []int) { + return file_proto_event_proto_rawDescGZIP(), []int{0} +} + +func (x *Ev) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *Ev) GetTimestamp() string { + if x != nil { + return x.Timestamp + } + return "" +} + +func (x *Ev) GetMessage() *structpb.Struct { + if x != nil { + return x.Message + } + return nil +} + +// Publish a message to the event stream. type PublishRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -30,13 +96,13 @@ type PublishRequest struct { // The topic to publish to Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` // The json message to publish - Message *structpb.Struct `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` + Message *structpb.Struct `protobuf:"bytes,4,opt,name=message,proto3" json:"message,omitempty"` } func (x *PublishRequest) Reset() { *x = PublishRequest{} if protoimpl.UnsafeEnabled { - mi := &file_proto_event_proto_msgTypes[0] + mi := &file_proto_event_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -49,7 +115,7 @@ func (x *PublishRequest) String() string { func (*PublishRequest) ProtoMessage() {} func (x *PublishRequest) ProtoReflect() protoreflect.Message { - mi := &file_proto_event_proto_msgTypes[0] + mi := &file_proto_event_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -62,7 +128,7 @@ func (x *PublishRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use PublishRequest.ProtoReflect.Descriptor instead. func (*PublishRequest) Descriptor() ([]byte, []int) { - return file_proto_event_proto_rawDescGZIP(), []int{0} + return file_proto_event_proto_rawDescGZIP(), []int{1} } func (x *PublishRequest) GetTopic() string { @@ -88,7 +154,7 @@ type PublishResponse struct { func (x *PublishResponse) Reset() { *x = PublishResponse{} if protoimpl.UnsafeEnabled { - mi := &file_proto_event_proto_msgTypes[1] + mi := &file_proto_event_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -101,7 +167,7 @@ func (x *PublishResponse) String() string { func (*PublishResponse) ProtoMessage() {} func (x *PublishResponse) ProtoReflect() protoreflect.Message { - mi := &file_proto_event_proto_msgTypes[1] + mi := &file_proto_event_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -114,7 +180,7 @@ func (x *PublishResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use PublishResponse.ProtoReflect.Descriptor instead. func (*PublishResponse) Descriptor() ([]byte, []int) { - return file_proto_event_proto_rawDescGZIP(), []int{1} + return file_proto_event_proto_rawDescGZIP(), []int{2} } // Subscribe to messages for a given topic. @@ -127,12 +193,14 @@ type SubscribeRequest struct { Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` // Optional group for the subscription Group string `protobuf:"bytes,2,opt,name=group,proto3" json:"group,omitempty"` + // Optional offset to read from e.g "2006-01-02T15:04:05.999Z07:00" + Offset string `protobuf:"bytes,3,opt,name=offset,proto3" json:"offset,omitempty"` } func (x *SubscribeRequest) Reset() { *x = SubscribeRequest{} if protoimpl.UnsafeEnabled { - mi := &file_proto_event_proto_msgTypes[2] + mi := &file_proto_event_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -145,7 +213,7 @@ func (x *SubscribeRequest) String() string { func (*SubscribeRequest) ProtoMessage() {} func (x *SubscribeRequest) ProtoReflect() protoreflect.Message { - mi := &file_proto_event_proto_msgTypes[2] + mi := &file_proto_event_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -158,7 +226,7 @@ func (x *SubscribeRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use SubscribeRequest.ProtoReflect.Descriptor instead. func (*SubscribeRequest) Descriptor() ([]byte, []int) { - return file_proto_event_proto_rawDescGZIP(), []int{2} + return file_proto_event_proto_rawDescGZIP(), []int{3} } func (x *SubscribeRequest) GetTopic() string { @@ -175,6 +243,13 @@ func (x *SubscribeRequest) GetGroup() string { return "" } +func (x *SubscribeRequest) GetOffset() string { + if x != nil { + return x.Offset + } + return "" +} + // A blocking event will be returned in response. type SubscribeResponse struct { state protoimpl.MessageState @@ -183,14 +258,18 @@ type SubscribeResponse struct { // The topic subscribed to Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + // Unique message id + Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` + // Timestamp of publishing + Timestamp string `protobuf:"bytes,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` // The next json message on the topic - Message *structpb.Struct `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` + Message *structpb.Struct `protobuf:"bytes,4,opt,name=message,proto3" json:"message,omitempty"` } func (x *SubscribeResponse) Reset() { *x = SubscribeResponse{} if protoimpl.UnsafeEnabled { - mi := &file_proto_event_proto_msgTypes[3] + mi := &file_proto_event_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -203,7 +282,7 @@ func (x *SubscribeResponse) String() string { func (*SubscribeResponse) ProtoMessage() {} func (x *SubscribeResponse) ProtoReflect() protoreflect.Message { - mi := &file_proto_event_proto_msgTypes[3] + mi := &file_proto_event_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -216,7 +295,7 @@ func (x *SubscribeResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use SubscribeResponse.ProtoReflect.Descriptor instead. func (*SubscribeResponse) Descriptor() ([]byte, []int) { - return file_proto_event_proto_rawDescGZIP(), []int{3} + return file_proto_event_proto_rawDescGZIP(), []int{4} } func (x *SubscribeResponse) GetTopic() string { @@ -226,6 +305,20 @@ func (x *SubscribeResponse) GetTopic() string { return "" } +func (x *SubscribeResponse) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *SubscribeResponse) GetTimestamp() string { + if x != nil { + return x.Timestamp + } + return "" +} + func (x *SubscribeResponse) GetMessage() *structpb.Struct { if x != nil { return x.Message @@ -233,40 +326,177 @@ func (x *SubscribeResponse) GetMessage() *structpb.Struct { return nil } +// Read stored events +type ReadRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // topic to read from + Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + // number of events to read; default 25 + Limit int32 `protobuf:"varint,2,opt,name=limit,proto3" json:"limit,omitempty"` + // offset for the events; default 0 + Offset int32 `protobuf:"varint,3,opt,name=offset,proto3" json:"offset,omitempty"` +} + +func (x *ReadRequest) Reset() { + *x = ReadRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_event_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReadRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReadRequest) ProtoMessage() {} + +func (x *ReadRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_event_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReadRequest.ProtoReflect.Descriptor instead. +func (*ReadRequest) Descriptor() ([]byte, []int) { + return file_proto_event_proto_rawDescGZIP(), []int{5} +} + +func (x *ReadRequest) GetTopic() string { + if x != nil { + return x.Topic + } + return "" +} + +func (x *ReadRequest) GetLimit() int32 { + if x != nil { + return x.Limit + } + return 0 +} + +func (x *ReadRequest) GetOffset() int32 { + if x != nil { + return x.Offset + } + return 0 +} + +type ReadResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // the events + Events []*Ev `protobuf:"bytes,1,rep,name=events,proto3" json:"events,omitempty"` +} + +func (x *ReadResponse) Reset() { + *x = ReadResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_event_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReadResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReadResponse) ProtoMessage() {} + +func (x *ReadResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_event_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReadResponse.ProtoReflect.Descriptor instead. +func (*ReadResponse) Descriptor() ([]byte, []int) { + return file_proto_event_proto_rawDescGZIP(), []int{6} +} + +func (x *ReadResponse) GetEvents() []*Ev { + if x != nil { + return x.Events + } + return nil +} + var File_proto_event_proto protoreflect.FileDescriptor var file_proto_event_proto_rawDesc = []byte{ 0x0a, 0x11, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, 0x72, 0x75, - 0x63, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x59, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, - 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, - 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, - 0x12, 0x31, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x65, 0x22, 0x11, 0x0a, 0x0f, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x3e, 0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, - 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, - 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, - 0x12, 0x14, 0x0a, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x22, 0x5c, 0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, + 0x63, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x65, 0x0a, 0x02, 0x45, 0x76, 0x12, 0x0e, + 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1c, + 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x31, 0x0a, 0x07, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, + 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, + 0x59, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x31, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, + 0x74, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x11, 0x0a, 0x0f, 0x50, 0x75, + 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x56, 0x0a, + 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x14, 0x0a, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x16, 0x0a, + 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6f, + 0x66, 0x66, 0x73, 0x65, 0x74, 0x22, 0x8a, 0x01, 0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, - 0x63, 0x12, 0x31, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x07, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x32, 0x87, 0x01, 0x0a, 0x05, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x3a, - 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x15, 0x2e, 0x65, 0x76, 0x65, 0x6e, - 0x74, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x16, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x42, 0x0a, 0x09, 0x53, 0x75, - 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x17, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, - 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x18, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, - 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x0f, - 0x5a, 0x0d, 0x2e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x63, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, + 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, + 0x31, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x22, 0x51, 0x0a, 0x0b, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x16, 0x0a, + 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x6f, + 0x66, 0x66, 0x73, 0x65, 0x74, 0x22, 0x31, 0x0a, 0x0c, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x21, 0x0a, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, + 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x09, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x45, 0x76, + 0x52, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x32, 0xba, 0x01, 0x0a, 0x05, 0x45, 0x76, 0x65, + 0x6e, 0x74, 0x12, 0x3a, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x15, 0x2e, + 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x50, 0x75, 0x62, + 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x42, + 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x17, 0x2e, 0x65, 0x76, + 0x65, 0x6e, 0x74, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x53, 0x75, 0x62, + 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, + 0x30, 0x01, 0x12, 0x31, 0x0a, 0x04, 0x52, 0x65, 0x61, 0x64, 0x12, 0x12, 0x2e, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, + 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x3b, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -281,26 +511,33 @@ func file_proto_event_proto_rawDescGZIP() []byte { return file_proto_event_proto_rawDescData } -var file_proto_event_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_proto_event_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_proto_event_proto_goTypes = []interface{}{ - (*PublishRequest)(nil), // 0: event.PublishRequest - (*PublishResponse)(nil), // 1: event.PublishResponse - (*SubscribeRequest)(nil), // 2: event.SubscribeRequest - (*SubscribeResponse)(nil), // 3: event.SubscribeResponse - (*structpb.Struct)(nil), // 4: google.protobuf.Struct + (*Ev)(nil), // 0: event.Ev + (*PublishRequest)(nil), // 1: event.PublishRequest + (*PublishResponse)(nil), // 2: event.PublishResponse + (*SubscribeRequest)(nil), // 3: event.SubscribeRequest + (*SubscribeResponse)(nil), // 4: event.SubscribeResponse + (*ReadRequest)(nil), // 5: event.ReadRequest + (*ReadResponse)(nil), // 6: event.ReadResponse + (*structpb.Struct)(nil), // 7: google.protobuf.Struct } var file_proto_event_proto_depIdxs = []int32{ - 4, // 0: event.PublishRequest.message:type_name -> google.protobuf.Struct - 4, // 1: event.SubscribeResponse.message:type_name -> google.protobuf.Struct - 0, // 2: event.Event.Publish:input_type -> event.PublishRequest - 2, // 3: event.Event.Subscribe:input_type -> event.SubscribeRequest - 1, // 4: event.Event.Publish:output_type -> event.PublishResponse - 3, // 5: event.Event.Subscribe:output_type -> event.SubscribeResponse - 4, // [4:6] is the sub-list for method output_type - 2, // [2:4] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 7, // 0: event.Ev.message:type_name -> google.protobuf.Struct + 7, // 1: event.PublishRequest.message:type_name -> google.protobuf.Struct + 7, // 2: event.SubscribeResponse.message:type_name -> google.protobuf.Struct + 0, // 3: event.ReadResponse.events:type_name -> event.Ev + 1, // 4: event.Event.Publish:input_type -> event.PublishRequest + 3, // 5: event.Event.Subscribe:input_type -> event.SubscribeRequest + 5, // 6: event.Event.Read:input_type -> event.ReadRequest + 2, // 7: event.Event.Publish:output_type -> event.PublishResponse + 4, // 8: event.Event.Subscribe:output_type -> event.SubscribeResponse + 6, // 9: event.Event.Read:output_type -> event.ReadResponse + 7, // [7:10] is the sub-list for method output_type + 4, // [4:7] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name } func init() { file_proto_event_proto_init() } @@ -310,7 +547,7 @@ func file_proto_event_proto_init() { } if !protoimpl.UnsafeEnabled { file_proto_event_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PublishRequest); i { + switch v := v.(*Ev); i { case 0: return &v.state case 1: @@ -322,7 +559,7 @@ func file_proto_event_proto_init() { } } file_proto_event_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PublishResponse); i { + switch v := v.(*PublishRequest); i { case 0: return &v.state case 1: @@ -334,7 +571,7 @@ func file_proto_event_proto_init() { } } file_proto_event_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*SubscribeRequest); i { + switch v := v.(*PublishResponse); i { case 0: return &v.state case 1: @@ -346,6 +583,18 @@ func file_proto_event_proto_init() { } } file_proto_event_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_event_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*SubscribeResponse); i { case 0: return &v.state @@ -357,6 +606,30 @@ func file_proto_event_proto_init() { return nil } } + file_proto_event_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReadRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_event_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReadResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -364,7 +637,7 @@ func file_proto_event_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_proto_event_proto_rawDesc, NumEnums: 0, - NumMessages: 4, + NumMessages: 7, NumExtensions: 0, NumServices: 1, }, diff --git a/event/proto/event.pb.micro.go b/event/proto/event.pb.micro.go index 87dcaca..7b62afd 100644 --- a/event/proto/event.pb.micro.go +++ b/event/proto/event.pb.micro.go @@ -45,6 +45,7 @@ func NewEventEndpoints() []*api.Endpoint { type EventService interface { Publish(ctx context.Context, in *PublishRequest, opts ...client.CallOption) (*PublishResponse, error) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (Event_SubscribeService, error) + Read(ctx context.Context, in *ReadRequest, opts ...client.CallOption) (*ReadResponse, error) } type eventService struct { @@ -118,17 +119,29 @@ func (x *eventServiceSubscribe) Recv() (*SubscribeResponse, error) { return m, nil } +func (c *eventService) Read(ctx context.Context, in *ReadRequest, opts ...client.CallOption) (*ReadResponse, error) { + req := c.c.NewRequest(c.name, "Event.Read", in) + out := new(ReadResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // Server API for Event service type EventHandler interface { Publish(context.Context, *PublishRequest, *PublishResponse) error Subscribe(context.Context, *SubscribeRequest, Event_SubscribeStream) error + Read(context.Context, *ReadRequest, *ReadResponse) error } func RegisterEventHandler(s server.Server, hdlr EventHandler, opts ...server.HandlerOption) error { type event interface { Publish(ctx context.Context, in *PublishRequest, out *PublishResponse) error Subscribe(ctx context.Context, stream server.Stream) error + Read(ctx context.Context, in *ReadRequest, out *ReadResponse) error } type Event struct { event @@ -184,3 +197,7 @@ func (x *eventSubscribeStream) RecvMsg(m interface{}) error { func (x *eventSubscribeStream) Send(m *SubscribeResponse) error { return x.stream.Send(m) } + +func (h *eventHandler) Read(ctx context.Context, in *ReadRequest, out *ReadResponse) error { + return h.EventHandler.Read(ctx, in, out) +} diff --git a/event/proto/event.proto b/event/proto/event.proto index 5681328..5546be0 100644 --- a/event/proto/event.proto +++ b/event/proto/event.proto @@ -8,6 +8,16 @@ import "google/protobuf/struct.proto"; service Event { rpc Publish(PublishRequest) returns (PublishResponse) {} rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse) {} + rpc Read(ReadRequest) returns (ReadResponse) {} +} + +message Ev { + // event id + string id = 1; + // event timestamp + string timestamp = 2; + // event message + google.protobuf.Struct message = 3; } // Publish a message to the event stream. @@ -15,7 +25,7 @@ message PublishRequest { // The topic to publish to string topic = 1; // The json message to publish - google.protobuf.Struct message = 2; + google.protobuf.Struct message = 4; } message PublishResponse {} @@ -26,12 +36,33 @@ message SubscribeRequest { string topic = 1; // Optional group for the subscription string group = 2; + // Optional offset to read from e.g "2006-01-02T15:04:05.999Z07:00" + string offset = 3; } // A blocking event will be returned in response. message SubscribeResponse { // The topic subscribed to string topic = 1; + // Unique message id + string id = 2; + // Timestamp of publishing + string timestamp = 3; // The next json message on the topic - google.protobuf.Struct message = 2; + google.protobuf.Struct message = 4; +} + +// Read stored events +message ReadRequest { + // topic to read from + string topic = 1; + // number of events to read; default 25 + int32 limit = 2; + // offset for the events; default 0 + int32 offset = 3; +} + +message ReadResponse { + // the events + repeated Ev events = 1; }