diff --git a/event/.gitignore b/event/.gitignore new file mode 100644 index 0000000..e75d4b6 --- /dev/null +++ b/event/.gitignore @@ -0,0 +1,2 @@ + +event diff --git a/event/Dockerfile b/event/Dockerfile new file mode 100644 index 0000000..0028018 --- /dev/null +++ b/event/Dockerfile @@ -0,0 +1,3 @@ +FROM alpine +ADD event /event +ENTRYPOINT [ "/event" ] diff --git a/event/Makefile b/event/Makefile new file mode 100644 index 0000000..0b2b93f --- /dev/null +++ b/event/Makefile @@ -0,0 +1,28 @@ + +GOPATH:=$(shell go env GOPATH) +.PHONY: init +init: + go get -u github.com/golang/protobuf/proto + go get -u github.com/golang/protobuf/protoc-gen-go + go get github.com/micro/micro/v3/cmd/protoc-gen-micro + go get github.com/micro/micro/v3/cmd/protoc-gen-openapi + +.PHONY: api +api: + protoc --openapi_out=. --proto_path=. proto/event.proto + +.PHONY: proto +proto: + protoc --proto_path=. --micro_out=. --go_out=:. proto/event.proto + +.PHONY: build +build: + go build -o event *.go + +.PHONY: test +test: + go test -v ./... -cover + +.PHONY: docker +docker: + docker build . -t event:latest diff --git a/event/README.md b/event/README.md new file mode 100644 index 0000000..d82d985 --- /dev/null +++ b/event/README.md @@ -0,0 +1,6 @@ +An event stream + +# Event Service + +Publish and subscribe to messages on an event stream. Group messages by topic and asynchronously +notify listeners of new events occuring in real time. diff --git a/event/examples.json b/event/examples.json new file mode 100644 index 0000000..f92df7f --- /dev/null +++ b/event/examples.json @@ -0,0 +1,25 @@ + +{ + "publish": [{ + "title": "Publish a message", + "description": "Publish a message to a topic", + "run_check": true, + "request": { + "topic": "user", + "message": {"id": "1", "type": "signup", "user": "john"} + }, + "response": {} + }], + "subscribe": [{ + "title": "Subscribe to a topic", + "description": "Subscribe to messages on a given topic", + "run_check": false, + "request": { + "topic": "user" + }, + "response": { + "topic": "events", + "message": {"id": "1", "type": "signup", "user": "john"} + } + }] +} diff --git a/event/generate.go b/event/generate.go new file mode 100644 index 0000000..7d9db91 --- /dev/null +++ b/event/generate.go @@ -0,0 +1,3 @@ +package main + +//go:generate make proto diff --git a/event/handler/event.go b/event/handler/event.go new file mode 100644 index 0000000..3abb425 --- /dev/null +++ b/event/handler/event.go @@ -0,0 +1,80 @@ +package handler + +import ( + "context" + "fmt" + "path" + + "github.com/micro/micro/v3/service/errors" + "github.com/micro/micro/v3/service/events" + log "github.com/micro/micro/v3/service/logger" + pb "github.com/micro/services/event/proto" + "github.com/micro/services/pkg/tenant" + "google.golang.org/protobuf/types/known/structpb" +) + +type Event struct{} + +func (s *Event) Publish(ctx context.Context, req *pb.PublishRequest, rsp *pb.PublishResponse) error { + if len(req.Topic) == 0 { + return errors.BadRequest("event.publish", "topic is blank") + } + + // get the tenant + id, ok := tenant.FromContext(ctx) + if !ok { + id = "default" + } + + // create tenant based topics + topic := path.Join("event", id, req.Topic) + + log.Infof("Tenant %v publishing to %v\n", id, req.Topic) + + // publish the message + return events.Publish(topic, req.Message.AsMap()) +} + +func (s *Event) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream pb.Event_SubscribeStream) error { + if len(req.Topic) == 0 { + return errors.BadRequest("event.publish", "topic is blank") + } + + id, ok := tenant.FromContext(ctx) + if !ok { + id = "default" + } + + // create tenant based topics + topic := path.Join("event", id, req.Topic) + + log.Infof("Tenant %v subscribing to %v\n", id, req.Topic) + + // check if a group os provided + opts := []events.ConsumeOption{} + if len(req.Group) > 0 { + opts = append(opts, events.WithGroup(req.Group)) + } + + sub, err := events.Consume(topic, opts...) + if err != nil { + return errors.InternalServerError("event.subscribe", "failed to subscribe to event") + } + + // range over the messages until the subscriber is closed + for msg := range sub { + fmt.Println("got message, sending") + // unmarshal the message into a struct + d := &structpb.Struct{} + d.UnmarshalJSON(msg.Payload) + + if err := stream.Send(&pb.SubscribeResponse{ + Topic: req.Topic, + Message: d, + }); err != nil { + return err + } + } + + return nil +} diff --git a/event/main.go b/event/main.go new file mode 100644 index 0000000..352203e --- /dev/null +++ b/event/main.go @@ -0,0 +1,24 @@ +package main + +import ( + "github.com/micro/micro/v3/service" + "github.com/micro/micro/v3/service/logger" + "github.com/micro/services/event/handler" + pb "github.com/micro/services/event/proto" +) + +func main() { + // Create service + srv := service.New( + service.Name("event"), + service.Version("latest"), + ) + + // Register handler + pb.RegisterEventHandler(srv.Server(), new(handler.Event)) + + // Run service + if err := srv.Run(); err != nil { + logger.Fatal(err) + } +} diff --git a/event/micro.mu b/event/micro.mu new file mode 100644 index 0000000..60b00ce --- /dev/null +++ b/event/micro.mu @@ -0,0 +1 @@ +service event diff --git a/event/proto/event.pb.go b/event/proto/event.pb.go new file mode 100644 index 0000000..b2ba09b --- /dev/null +++ b/event/proto/event.pb.go @@ -0,0 +1,379 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.15.6 +// source: proto/event.proto + +package event + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + structpb "google.golang.org/protobuf/types/known/structpb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Publish a message to the event. Specify a topic to group messages for a specific topic. +type PublishRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The topic to publish to + Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + // The json message to publish + Message *structpb.Struct `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` +} + +func (x *PublishRequest) Reset() { + *x = PublishRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_event_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PublishRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PublishRequest) ProtoMessage() {} + +func (x *PublishRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_event_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PublishRequest.ProtoReflect.Descriptor instead. +func (*PublishRequest) Descriptor() ([]byte, []int) { + return file_proto_event_proto_rawDescGZIP(), []int{0} +} + +func (x *PublishRequest) GetTopic() string { + if x != nil { + return x.Topic + } + return "" +} + +func (x *PublishRequest) GetMessage() *structpb.Struct { + if x != nil { + return x.Message + } + return nil +} + +type PublishResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *PublishResponse) Reset() { + *x = PublishResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_event_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PublishResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PublishResponse) ProtoMessage() {} + +func (x *PublishResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_event_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PublishResponse.ProtoReflect.Descriptor instead. +func (*PublishResponse) Descriptor() ([]byte, []int) { + return file_proto_event_proto_rawDescGZIP(), []int{1} +} + +// Subscribe to messages for a given topic. +type SubscribeRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The topic to subscribe to + Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + // Optional group for the subscription + Group string `protobuf:"bytes,2,opt,name=group,proto3" json:"group,omitempty"` +} + +func (x *SubscribeRequest) Reset() { + *x = SubscribeRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_event_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeRequest) ProtoMessage() {} + +func (x *SubscribeRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_event_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeRequest.ProtoReflect.Descriptor instead. +func (*SubscribeRequest) Descriptor() ([]byte, []int) { + return file_proto_event_proto_rawDescGZIP(), []int{2} +} + +func (x *SubscribeRequest) GetTopic() string { + if x != nil { + return x.Topic + } + return "" +} + +func (x *SubscribeRequest) GetGroup() string { + if x != nil { + return x.Group + } + return "" +} + +// A blocking event will be returned in response. +type SubscribeResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The topic subscribed to + Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + // The next json message on the topic + Message *structpb.Struct `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` +} + +func (x *SubscribeResponse) Reset() { + *x = SubscribeResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_event_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeResponse) ProtoMessage() {} + +func (x *SubscribeResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_event_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeResponse.ProtoReflect.Descriptor instead. +func (*SubscribeResponse) Descriptor() ([]byte, []int) { + return file_proto_event_proto_rawDescGZIP(), []int{3} +} + +func (x *SubscribeResponse) GetTopic() string { + if x != nil { + return x.Topic + } + return "" +} + +func (x *SubscribeResponse) GetMessage() *structpb.Struct { + if x != nil { + return x.Message + } + return nil +} + +var File_proto_event_proto protoreflect.FileDescriptor + +var file_proto_event_proto_rawDesc = []byte{ + 0x0a, 0x11, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, 0x72, 0x75, + 0x63, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x59, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, + 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, + 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, + 0x12, 0x31, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x22, 0x11, 0x0a, 0x0f, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x3e, 0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, + 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, + 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, + 0x12, 0x14, 0x0a, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x22, 0x5c, 0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, + 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, + 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, + 0x63, 0x12, 0x31, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x07, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x32, 0x87, 0x01, 0x0a, 0x05, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x3a, + 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x15, 0x2e, 0x65, 0x76, 0x65, 0x6e, + 0x74, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x16, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x42, 0x0a, 0x09, 0x53, 0x75, + 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x17, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, + 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x18, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, + 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x0f, + 0x5a, 0x0d, 0x2e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_proto_event_proto_rawDescOnce sync.Once + file_proto_event_proto_rawDescData = file_proto_event_proto_rawDesc +) + +func file_proto_event_proto_rawDescGZIP() []byte { + file_proto_event_proto_rawDescOnce.Do(func() { + file_proto_event_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_event_proto_rawDescData) + }) + return file_proto_event_proto_rawDescData +} + +var file_proto_event_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_proto_event_proto_goTypes = []interface{}{ + (*PublishRequest)(nil), // 0: event.PublishRequest + (*PublishResponse)(nil), // 1: event.PublishResponse + (*SubscribeRequest)(nil), // 2: event.SubscribeRequest + (*SubscribeResponse)(nil), // 3: event.SubscribeResponse + (*structpb.Struct)(nil), // 4: google.protobuf.Struct +} +var file_proto_event_proto_depIdxs = []int32{ + 4, // 0: event.PublishRequest.message:type_name -> google.protobuf.Struct + 4, // 1: event.SubscribeResponse.message:type_name -> google.protobuf.Struct + 0, // 2: event.Event.Publish:input_type -> event.PublishRequest + 2, // 3: event.Event.Subscribe:input_type -> event.SubscribeRequest + 1, // 4: event.Event.Publish:output_type -> event.PublishResponse + 3, // 5: event.Event.Subscribe:output_type -> event.SubscribeResponse + 4, // [4:6] is the sub-list for method output_type + 2, // [2:4] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_proto_event_proto_init() } +func file_proto_event_proto_init() { + if File_proto_event_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_event_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PublishRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_event_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PublishResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_event_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_event_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_event_proto_rawDesc, + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_proto_event_proto_goTypes, + DependencyIndexes: file_proto_event_proto_depIdxs, + MessageInfos: file_proto_event_proto_msgTypes, + }.Build() + File_proto_event_proto = out.File + file_proto_event_proto_rawDesc = nil + file_proto_event_proto_goTypes = nil + file_proto_event_proto_depIdxs = nil +} diff --git a/event/proto/event.pb.micro.go b/event/proto/event.pb.micro.go new file mode 100644 index 0000000..87dcaca --- /dev/null +++ b/event/proto/event.pb.micro.go @@ -0,0 +1,186 @@ +// Code generated by protoc-gen-micro. DO NOT EDIT. +// source: proto/event.proto + +package event + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + _ "google.golang.org/protobuf/types/known/structpb" + math "math" +) + +import ( + context "context" + api "github.com/micro/micro/v3/service/api" + client "github.com/micro/micro/v3/service/client" + server "github.com/micro/micro/v3/service/server" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// Reference imports to suppress errors if they are not otherwise used. +var _ api.Endpoint +var _ context.Context +var _ client.Option +var _ server.Option + +// Api Endpoints for Event service + +func NewEventEndpoints() []*api.Endpoint { + return []*api.Endpoint{} +} + +// Client API for Event service + +type EventService interface { + Publish(ctx context.Context, in *PublishRequest, opts ...client.CallOption) (*PublishResponse, error) + Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (Event_SubscribeService, error) +} + +type eventService struct { + c client.Client + name string +} + +func NewEventService(name string, c client.Client) EventService { + return &eventService{ + c: c, + name: name, + } +} + +func (c *eventService) Publish(ctx context.Context, in *PublishRequest, opts ...client.CallOption) (*PublishResponse, error) { + req := c.c.NewRequest(c.name, "Event.Publish", in) + out := new(PublishResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *eventService) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (Event_SubscribeService, error) { + req := c.c.NewRequest(c.name, "Event.Subscribe", &SubscribeRequest{}) + stream, err := c.c.Stream(ctx, req, opts...) + if err != nil { + return nil, err + } + if err := stream.Send(in); err != nil { + return nil, err + } + return &eventServiceSubscribe{stream}, nil +} + +type Event_SubscribeService interface { + Context() context.Context + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Recv() (*SubscribeResponse, error) +} + +type eventServiceSubscribe struct { + stream client.Stream +} + +func (x *eventServiceSubscribe) Close() error { + return x.stream.Close() +} + +func (x *eventServiceSubscribe) Context() context.Context { + return x.stream.Context() +} + +func (x *eventServiceSubscribe) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *eventServiceSubscribe) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *eventServiceSubscribe) Recv() (*SubscribeResponse, error) { + m := new(SubscribeResponse) + err := x.stream.Recv(m) + if err != nil { + return nil, err + } + return m, nil +} + +// Server API for Event service + +type EventHandler interface { + Publish(context.Context, *PublishRequest, *PublishResponse) error + Subscribe(context.Context, *SubscribeRequest, Event_SubscribeStream) error +} + +func RegisterEventHandler(s server.Server, hdlr EventHandler, opts ...server.HandlerOption) error { + type event interface { + Publish(ctx context.Context, in *PublishRequest, out *PublishResponse) error + Subscribe(ctx context.Context, stream server.Stream) error + } + type Event struct { + event + } + h := &eventHandler{hdlr} + return s.Handle(s.NewHandler(&Event{h}, opts...)) +} + +type eventHandler struct { + EventHandler +} + +func (h *eventHandler) Publish(ctx context.Context, in *PublishRequest, out *PublishResponse) error { + return h.EventHandler.Publish(ctx, in, out) +} + +func (h *eventHandler) Subscribe(ctx context.Context, stream server.Stream) error { + m := new(SubscribeRequest) + if err := stream.Recv(m); err != nil { + return err + } + return h.EventHandler.Subscribe(ctx, m, &eventSubscribeStream{stream}) +} + +type Event_SubscribeStream interface { + Context() context.Context + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Send(*SubscribeResponse) error +} + +type eventSubscribeStream struct { + stream server.Stream +} + +func (x *eventSubscribeStream) Close() error { + return x.stream.Close() +} + +func (x *eventSubscribeStream) Context() context.Context { + return x.stream.Context() +} + +func (x *eventSubscribeStream) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *eventSubscribeStream) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *eventSubscribeStream) Send(m *SubscribeResponse) error { + return x.stream.Send(m) +} diff --git a/event/proto/event.proto b/event/proto/event.proto new file mode 100644 index 0000000..57109ef --- /dev/null +++ b/event/proto/event.proto @@ -0,0 +1,37 @@ +syntax = "proto3"; + +package event; + +option go_package = "./proto;event"; +import "google/protobuf/struct.proto"; + +service Event { + rpc Publish(PublishRequest) returns (PublishResponse) {} + rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse) {} +} + +// Publish a message to the event. Specify a topic to group messages for a specific topic. +message PublishRequest { + // The topic to publish to + string topic = 1; + // The json message to publish + google.protobuf.Struct message = 2; +} + +message PublishResponse {} + +// Subscribe to messages for a given topic. +message SubscribeRequest { + // The topic to subscribe to + string topic = 1; + // Optional group for the subscription + string group = 2; +} + +// A blocking event will be returned in response. +message SubscribeResponse { + // The topic subscribed to + string topic = 1; + // The next json message on the topic + google.protobuf.Struct message = 2; +} diff --git a/event/publicapi.json b/event/publicapi.json new file mode 100644 index 0000000..1064a8a --- /dev/null +++ b/event/publicapi.json @@ -0,0 +1,6 @@ +{ + "name": "event", + "icon": "🌊", + "category": "messaging", + "display_name": "Events" +}