diff --git a/event/examples.json b/event/examples.json index d201e56..5889fd5 100644 --- a/event/examples.json +++ b/event/examples.json @@ -1,8 +1,8 @@ { "publish": [{ - "title": "Publish a message", - "description": "Publish a message to a topic", + "title": "Publish an event", + "description": "Publish an event to a topic", "run_check": true, "request": { "topic": "user", @@ -10,9 +10,9 @@ }, "response": {} }], - "subscribe": [{ - "title": "Subscribe to a topic", - "description": "Subscribe to messages on a given topic", + "consume": [{ + "title": "Consume from a topic", + "description": "Consume events from a given topic", "run_check": false, "request": { "topic": "user" diff --git a/event/handler/event.go b/event/handler/event.go index 77793a9..a752607 100644 --- a/event/handler/event.go +++ b/event/handler/event.go @@ -36,7 +36,7 @@ func (s *Event) Publish(ctx context.Context, req *pb.PublishRequest, rsp *pb.Pub return events.Publish(topic, req.Message.AsMap()) } -func (s *Event) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream pb.Event_SubscribeStream) error { +func (s *Event) Consume(ctx context.Context, req *pb.ConsumeRequest, stream pb.Event_ConsumeStream) error { if len(req.Topic) == 0 { return errors.BadRequest("event.publish", "topic is blank") } @@ -77,7 +77,7 @@ func (s *Event) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream d := &structpb.Struct{} d.UnmarshalJSON(msg.Payload) - if err := stream.Send(&pb.SubscribeResponse{ + if err := stream.Send(&pb.ConsumeResponse{ Topic: req.Topic, Id: msg.ID, Timestamp: msg.Timestamp.Format(time.RFC3339Nano), diff --git a/event/proto/event.pb.go b/event/proto/event.pb.go index 6243681..c0be590 100644 --- a/event/proto/event.pb.go +++ b/event/proto/event.pb.go @@ -87,7 +87,7 @@ func (x *Ev) GetMessage() *structpb.Struct { return nil } -// Publish a message to the event stream. +// Publish a event to the event stream. type PublishRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -183,8 +183,8 @@ func (*PublishResponse) Descriptor() ([]byte, []int) { return file_proto_event_proto_rawDescGZIP(), []int{2} } -// Subscribe to messages for a given topic. -type SubscribeRequest struct { +// Consume events from a given topic. +type ConsumeRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields @@ -197,8 +197,8 @@ type SubscribeRequest struct { Offset string `protobuf:"bytes,3,opt,name=offset,proto3" json:"offset,omitempty"` } -func (x *SubscribeRequest) Reset() { - *x = SubscribeRequest{} +func (x *ConsumeRequest) Reset() { + *x = ConsumeRequest{} if protoimpl.UnsafeEnabled { mi := &file_proto_event_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -206,13 +206,13 @@ func (x *SubscribeRequest) Reset() { } } -func (x *SubscribeRequest) String() string { +func (x *ConsumeRequest) String() string { return protoimpl.X.MessageStringOf(x) } -func (*SubscribeRequest) ProtoMessage() {} +func (*ConsumeRequest) ProtoMessage() {} -func (x *SubscribeRequest) ProtoReflect() protoreflect.Message { +func (x *ConsumeRequest) ProtoReflect() protoreflect.Message { mi := &file_proto_event_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -224,26 +224,26 @@ func (x *SubscribeRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use SubscribeRequest.ProtoReflect.Descriptor instead. -func (*SubscribeRequest) Descriptor() ([]byte, []int) { +// Deprecated: Use ConsumeRequest.ProtoReflect.Descriptor instead. +func (*ConsumeRequest) Descriptor() ([]byte, []int) { return file_proto_event_proto_rawDescGZIP(), []int{3} } -func (x *SubscribeRequest) GetTopic() string { +func (x *ConsumeRequest) GetTopic() string { if x != nil { return x.Topic } return "" } -func (x *SubscribeRequest) GetGroup() string { +func (x *ConsumeRequest) GetGroup() string { if x != nil { return x.Group } return "" } -func (x *SubscribeRequest) GetOffset() string { +func (x *ConsumeRequest) GetOffset() string { if x != nil { return x.Offset } @@ -251,7 +251,7 @@ func (x *SubscribeRequest) GetOffset() string { } // A blocking event will be returned in response. -type SubscribeResponse struct { +type ConsumeResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields @@ -266,8 +266,8 @@ type SubscribeResponse struct { Message *structpb.Struct `protobuf:"bytes,4,opt,name=message,proto3" json:"message,omitempty"` } -func (x *SubscribeResponse) Reset() { - *x = SubscribeResponse{} +func (x *ConsumeResponse) Reset() { + *x = ConsumeResponse{} if protoimpl.UnsafeEnabled { mi := &file_proto_event_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -275,13 +275,13 @@ func (x *SubscribeResponse) Reset() { } } -func (x *SubscribeResponse) String() string { +func (x *ConsumeResponse) String() string { return protoimpl.X.MessageStringOf(x) } -func (*SubscribeResponse) ProtoMessage() {} +func (*ConsumeResponse) ProtoMessage() {} -func (x *SubscribeResponse) ProtoReflect() protoreflect.Message { +func (x *ConsumeResponse) ProtoReflect() protoreflect.Message { mi := &file_proto_event_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -293,33 +293,33 @@ func (x *SubscribeResponse) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use SubscribeResponse.ProtoReflect.Descriptor instead. -func (*SubscribeResponse) Descriptor() ([]byte, []int) { +// Deprecated: Use ConsumeResponse.ProtoReflect.Descriptor instead. +func (*ConsumeResponse) Descriptor() ([]byte, []int) { return file_proto_event_proto_rawDescGZIP(), []int{4} } -func (x *SubscribeResponse) GetTopic() string { +func (x *ConsumeResponse) GetTopic() string { if x != nil { return x.Topic } return "" } -func (x *SubscribeResponse) GetId() string { +func (x *ConsumeResponse) GetId() string { if x != nil { return x.Id } return "" } -func (x *SubscribeResponse) GetTimestamp() string { +func (x *ConsumeResponse) GetTimestamp() string { if x != nil { return x.Timestamp } return "" } -func (x *SubscribeResponse) GetMessage() *structpb.Struct { +func (x *ConsumeResponse) GetMessage() *structpb.Struct { if x != nil { return x.Message } @@ -460,43 +460,43 @@ var file_proto_event_proto_rawDesc = []byte{ 0x67, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x11, 0x0a, 0x0f, 0x50, 0x75, - 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x56, 0x0a, - 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x14, 0x0a, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x16, 0x0a, - 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6f, - 0x66, 0x66, 0x73, 0x65, 0x74, 0x22, 0x8a, 0x01, 0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, - 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, - 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, - 0x63, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, - 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, - 0x31, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, - 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x22, 0x51, 0x0a, 0x0b, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x16, 0x0a, - 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x6f, - 0x66, 0x66, 0x73, 0x65, 0x74, 0x22, 0x31, 0x0a, 0x0c, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x21, 0x0a, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, - 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x09, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x45, 0x76, - 0x52, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x32, 0xba, 0x01, 0x0a, 0x05, 0x45, 0x76, 0x65, - 0x6e, 0x74, 0x12, 0x3a, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x15, 0x2e, - 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x50, 0x75, 0x62, - 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x42, - 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x17, 0x2e, 0x65, 0x76, - 0x65, 0x6e, 0x74, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x53, 0x75, 0x62, - 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, - 0x30, 0x01, 0x12, 0x31, 0x0a, 0x04, 0x52, 0x65, 0x61, 0x64, 0x12, 0x12, 0x2e, 0x65, 0x76, 0x65, - 0x6e, 0x74, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, - 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x3b, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x54, 0x0a, + 0x0e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x14, 0x0a, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x16, 0x0a, 0x06, 0x6f, + 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6f, 0x66, 0x66, + 0x73, 0x65, 0x74, 0x22, 0x88, 0x01, 0x0a, 0x0f, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x0e, 0x0a, + 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1c, 0x0a, + 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x31, 0x0a, 0x07, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, + 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x51, + 0x0a, 0x0b, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, + 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, + 0x70, 0x69, 0x63, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x05, 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, + 0x73, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, + 0x74, 0x22, 0x31, 0x0a, 0x0c, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x21, 0x0a, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, + 0x0b, 0x32, 0x09, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x45, 0x76, 0x52, 0x06, 0x65, 0x76, + 0x65, 0x6e, 0x74, 0x73, 0x32, 0xb4, 0x01, 0x0a, 0x05, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x3a, + 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x15, 0x2e, 0x65, 0x76, 0x65, 0x6e, + 0x74, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x16, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3c, 0x0a, 0x07, 0x43, 0x6f, + 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x12, 0x15, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x43, 0x6f, + 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x65, + 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x31, 0x0a, 0x04, 0x52, 0x65, 0x61, 0x64, + 0x12, 0x12, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x52, 0x65, 0x61, + 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, + 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -513,25 +513,25 @@ func file_proto_event_proto_rawDescGZIP() []byte { var file_proto_event_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_proto_event_proto_goTypes = []interface{}{ - (*Ev)(nil), // 0: event.Ev - (*PublishRequest)(nil), // 1: event.PublishRequest - (*PublishResponse)(nil), // 2: event.PublishResponse - (*SubscribeRequest)(nil), // 3: event.SubscribeRequest - (*SubscribeResponse)(nil), // 4: event.SubscribeResponse - (*ReadRequest)(nil), // 5: event.ReadRequest - (*ReadResponse)(nil), // 6: event.ReadResponse - (*structpb.Struct)(nil), // 7: google.protobuf.Struct + (*Ev)(nil), // 0: event.Ev + (*PublishRequest)(nil), // 1: event.PublishRequest + (*PublishResponse)(nil), // 2: event.PublishResponse + (*ConsumeRequest)(nil), // 3: event.ConsumeRequest + (*ConsumeResponse)(nil), // 4: event.ConsumeResponse + (*ReadRequest)(nil), // 5: event.ReadRequest + (*ReadResponse)(nil), // 6: event.ReadResponse + (*structpb.Struct)(nil), // 7: google.protobuf.Struct } var file_proto_event_proto_depIdxs = []int32{ 7, // 0: event.Ev.message:type_name -> google.protobuf.Struct 7, // 1: event.PublishRequest.message:type_name -> google.protobuf.Struct - 7, // 2: event.SubscribeResponse.message:type_name -> google.protobuf.Struct + 7, // 2: event.ConsumeResponse.message:type_name -> google.protobuf.Struct 0, // 3: event.ReadResponse.events:type_name -> event.Ev 1, // 4: event.Event.Publish:input_type -> event.PublishRequest - 3, // 5: event.Event.Subscribe:input_type -> event.SubscribeRequest + 3, // 5: event.Event.Consume:input_type -> event.ConsumeRequest 5, // 6: event.Event.Read:input_type -> event.ReadRequest 2, // 7: event.Event.Publish:output_type -> event.PublishResponse - 4, // 8: event.Event.Subscribe:output_type -> event.SubscribeResponse + 4, // 8: event.Event.Consume:output_type -> event.ConsumeResponse 6, // 9: event.Event.Read:output_type -> event.ReadResponse 7, // [7:10] is the sub-list for method output_type 4, // [4:7] is the sub-list for method input_type @@ -583,7 +583,7 @@ func file_proto_event_proto_init() { } } file_proto_event_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*SubscribeRequest); i { + switch v := v.(*ConsumeRequest); i { case 0: return &v.state case 1: @@ -595,7 +595,7 @@ func file_proto_event_proto_init() { } } file_proto_event_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*SubscribeResponse); i { + switch v := v.(*ConsumeResponse); i { case 0: return &v.state case 1: diff --git a/event/proto/event.pb.micro.go b/event/proto/event.pb.micro.go index 7b62afd..357e551 100644 --- a/event/proto/event.pb.micro.go +++ b/event/proto/event.pb.micro.go @@ -44,7 +44,7 @@ func NewEventEndpoints() []*api.Endpoint { type EventService interface { Publish(ctx context.Context, in *PublishRequest, opts ...client.CallOption) (*PublishResponse, error) - Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (Event_SubscribeService, error) + Consume(ctx context.Context, in *ConsumeRequest, opts ...client.CallOption) (Event_ConsumeService, error) Read(ctx context.Context, in *ReadRequest, opts ...client.CallOption) (*ReadResponse, error) } @@ -70,8 +70,8 @@ func (c *eventService) Publish(ctx context.Context, in *PublishRequest, opts ... return out, nil } -func (c *eventService) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (Event_SubscribeService, error) { - req := c.c.NewRequest(c.name, "Event.Subscribe", &SubscribeRequest{}) +func (c *eventService) Consume(ctx context.Context, in *ConsumeRequest, opts ...client.CallOption) (Event_ConsumeService, error) { + req := c.c.NewRequest(c.name, "Event.Consume", &ConsumeRequest{}) stream, err := c.c.Stream(ctx, req, opts...) if err != nil { return nil, err @@ -79,39 +79,39 @@ func (c *eventService) Subscribe(ctx context.Context, in *SubscribeRequest, opts if err := stream.Send(in); err != nil { return nil, err } - return &eventServiceSubscribe{stream}, nil + return &eventServiceConsume{stream}, nil } -type Event_SubscribeService interface { +type Event_ConsumeService interface { Context() context.Context SendMsg(interface{}) error RecvMsg(interface{}) error Close() error - Recv() (*SubscribeResponse, error) + Recv() (*ConsumeResponse, error) } -type eventServiceSubscribe struct { +type eventServiceConsume struct { stream client.Stream } -func (x *eventServiceSubscribe) Close() error { +func (x *eventServiceConsume) Close() error { return x.stream.Close() } -func (x *eventServiceSubscribe) Context() context.Context { +func (x *eventServiceConsume) Context() context.Context { return x.stream.Context() } -func (x *eventServiceSubscribe) SendMsg(m interface{}) error { +func (x *eventServiceConsume) SendMsg(m interface{}) error { return x.stream.Send(m) } -func (x *eventServiceSubscribe) RecvMsg(m interface{}) error { +func (x *eventServiceConsume) RecvMsg(m interface{}) error { return x.stream.Recv(m) } -func (x *eventServiceSubscribe) Recv() (*SubscribeResponse, error) { - m := new(SubscribeResponse) +func (x *eventServiceConsume) Recv() (*ConsumeResponse, error) { + m := new(ConsumeResponse) err := x.stream.Recv(m) if err != nil { return nil, err @@ -133,14 +133,14 @@ func (c *eventService) Read(ctx context.Context, in *ReadRequest, opts ...client type EventHandler interface { Publish(context.Context, *PublishRequest, *PublishResponse) error - Subscribe(context.Context, *SubscribeRequest, Event_SubscribeStream) error + Consume(context.Context, *ConsumeRequest, Event_ConsumeStream) error Read(context.Context, *ReadRequest, *ReadResponse) error } func RegisterEventHandler(s server.Server, hdlr EventHandler, opts ...server.HandlerOption) error { type event interface { Publish(ctx context.Context, in *PublishRequest, out *PublishResponse) error - Subscribe(ctx context.Context, stream server.Stream) error + Consume(ctx context.Context, stream server.Stream) error Read(ctx context.Context, in *ReadRequest, out *ReadResponse) error } type Event struct { @@ -158,43 +158,43 @@ func (h *eventHandler) Publish(ctx context.Context, in *PublishRequest, out *Pub return h.EventHandler.Publish(ctx, in, out) } -func (h *eventHandler) Subscribe(ctx context.Context, stream server.Stream) error { - m := new(SubscribeRequest) +func (h *eventHandler) Consume(ctx context.Context, stream server.Stream) error { + m := new(ConsumeRequest) if err := stream.Recv(m); err != nil { return err } - return h.EventHandler.Subscribe(ctx, m, &eventSubscribeStream{stream}) + return h.EventHandler.Consume(ctx, m, &eventConsumeStream{stream}) } -type Event_SubscribeStream interface { +type Event_ConsumeStream interface { Context() context.Context SendMsg(interface{}) error RecvMsg(interface{}) error Close() error - Send(*SubscribeResponse) error + Send(*ConsumeResponse) error } -type eventSubscribeStream struct { +type eventConsumeStream struct { stream server.Stream } -func (x *eventSubscribeStream) Close() error { +func (x *eventConsumeStream) Close() error { return x.stream.Close() } -func (x *eventSubscribeStream) Context() context.Context { +func (x *eventConsumeStream) Context() context.Context { return x.stream.Context() } -func (x *eventSubscribeStream) SendMsg(m interface{}) error { +func (x *eventConsumeStream) SendMsg(m interface{}) error { return x.stream.Send(m) } -func (x *eventSubscribeStream) RecvMsg(m interface{}) error { +func (x *eventConsumeStream) RecvMsg(m interface{}) error { return x.stream.Recv(m) } -func (x *eventSubscribeStream) Send(m *SubscribeResponse) error { +func (x *eventConsumeStream) Send(m *ConsumeResponse) error { return x.stream.Send(m) } diff --git a/event/proto/event.proto b/event/proto/event.proto index 5546be0..2a65fe5 100644 --- a/event/proto/event.proto +++ b/event/proto/event.proto @@ -7,7 +7,7 @@ import "google/protobuf/struct.proto"; service Event { rpc Publish(PublishRequest) returns (PublishResponse) {} - rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse) {} + rpc Consume(ConsumeRequest) returns (stream ConsumeResponse) {} rpc Read(ReadRequest) returns (ReadResponse) {} } @@ -20,7 +20,7 @@ message Ev { google.protobuf.Struct message = 3; } -// Publish a message to the event stream. +// Publish a event to the event stream. message PublishRequest { // The topic to publish to string topic = 1; @@ -30,8 +30,8 @@ message PublishRequest { message PublishResponse {} -// Subscribe to messages for a given topic. -message SubscribeRequest { +// Consume events from a given topic. +message ConsumeRequest { // The topic to subscribe to string topic = 1; // Optional group for the subscription @@ -41,7 +41,7 @@ message SubscribeRequest { } // A blocking event will be returned in response. -message SubscribeResponse { +message ConsumeResponse { // The topic subscribed to string topic = 1; // Unique message id