diff --git a/stream/.gitignore b/stream/.gitignore deleted file mode 100644 index 38037a6..0000000 --- a/stream/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ - -stream diff --git a/stream/Dockerfile b/stream/Dockerfile deleted file mode 100644 index 4a0bce6..0000000 --- a/stream/Dockerfile +++ /dev/null @@ -1,3 +0,0 @@ -FROM alpine -ADD stream /stream -ENTRYPOINT [ "/stream" ] diff --git a/stream/Makefile b/stream/Makefile deleted file mode 100644 index 49c574d..0000000 --- a/stream/Makefile +++ /dev/null @@ -1,28 +0,0 @@ - -GOPATH:=$(shell go env GOPATH) -.PHONY: init -init: - go get -u github.com/golang/protobuf/proto - go get -u github.com/golang/protobuf/protoc-gen-go - go get github.com/micro/micro/v3/cmd/protoc-gen-micro - go get github.com/micro/micro/v3/cmd/protoc-gen-openapi - -.PHONY: api -api: - protoc --openapi_out=. --proto_path=. proto/stream.proto - -.PHONY: proto -proto: - protoc --proto_path=. --micro_out=. --go_out=:. proto/stream.proto - -.PHONY: build -build: - go build -o stream *.go - -.PHONY: test -test: - go test -v ./... -cover - -.PHONY: docker -docker: - docker build . -t stream:latest diff --git a/stream/README.md b/stream/README.md deleted file mode 100644 index bb83a34..0000000 --- a/stream/README.md +++ /dev/null @@ -1,6 +0,0 @@ -Publish and subscribe to messages - -# Stream Service - -Send messages to a stream and broadcast to multiple subscribers. Group by topic -and rapidly blast fire-and-forget messages to anyone listening. diff --git a/stream/examples.json b/stream/examples.json deleted file mode 100644 index e2e8e7e..0000000 --- a/stream/examples.json +++ /dev/null @@ -1,25 +0,0 @@ - -{ - "publish": [{ - "title": "Publish a message", - "description": "Publish a message to a topic on the stream", - "run_check": true, - "request": { - "topic": "events", - "message": {"id": "1", "type": "signup", "user": "john"} - }, - "response": {} - }], - "subscribe": [{ - "title": "Subscribe to a topic", - "description": "Subscribe to messages on a given topic from the stream", - "run_check": false, - "request": { - "topic": "events" - }, - "response": { - "topic": "events", - "message": {"id": "1", "type": "signup", "user": "john"} - } - }] -} diff --git a/stream/generate.go b/stream/generate.go deleted file mode 100644 index 7d9db91..0000000 --- a/stream/generate.go +++ /dev/null @@ -1,3 +0,0 @@ -package main - -//go:generate make proto diff --git a/stream/handler/stream.go b/stream/handler/stream.go deleted file mode 100644 index 6299b5f..0000000 --- a/stream/handler/stream.go +++ /dev/null @@ -1,81 +0,0 @@ -package handler - -import ( - "context" - "encoding/json" - "fmt" - "path" - - "github.com/asim/mq/broker" - "github.com/micro/micro/v3/service/errors" - log "github.com/micro/micro/v3/service/logger" - "github.com/micro/services/pkg/tenant" - pb "github.com/micro/services/stream/proto" - "google.golang.org/protobuf/types/known/structpb" -) - -type Stream struct{} - -func (s *Stream) Publish(ctx context.Context, req *pb.PublishRequest, rsp *pb.PublishResponse) error { - if len(req.Topic) == 0 { - return errors.BadRequest("stream.publish", "topic is blank") - } - - // get the tenant - id, ok := tenant.FromContext(ctx) - if !ok { - id = "default" - } - - // create tenant based topics - topic := path.Join("stream", id, req.Topic) - - // marshal the data - b, _ := json.Marshal(req.Message.AsMap()) - - log.Infof("Tenant %v publishing to %v\n", id, req.Topic) - - // publish the message - broker.Publish(topic, b) - - return nil -} - -func (s *Stream) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream pb.Stream_SubscribeStream) error { - if len(req.Topic) == 0 { - return errors.BadRequest("stream.publish", "topic is blank") - } - - id, ok := tenant.FromContext(ctx) - if !ok { - id = "default" - } - - // create tenant based topics - topic := path.Join("stream", id, req.Topic) - - log.Infof("Tenant %v subscribing to %v\n", id, req.Topic) - - sub, err := broker.Subscribe(topic) - if err != nil { - return errors.InternalServerError("stream.subscribe", "failed to subscribe to stream") - } - defer broker.Unsubscribe(req.Topic, sub) - - // range over the messages until the subscriber is closed - for msg := range sub { - fmt.Println("got message, sending") - // unmarshal the message into a struct - d := &structpb.Struct{} - d.UnmarshalJSON(msg) - - if err := stream.Send(&pb.SubscribeResponse{ - Topic: req.Topic, - Message: d, - }); err != nil { - return err - } - } - - return nil -} diff --git a/stream/main.go b/stream/main.go deleted file mode 100644 index 1c8e28b..0000000 --- a/stream/main.go +++ /dev/null @@ -1,24 +0,0 @@ -package main - -import ( - "github.com/micro/micro/v3/service" - "github.com/micro/micro/v3/service/logger" - "github.com/micro/services/stream/handler" - pb "github.com/micro/services/stream/proto" -) - -func main() { - // Create service - srv := service.New( - service.Name("stream"), - service.Version("latest"), - ) - - // Register handler - pb.RegisterStreamHandler(srv.Server(), new(handler.Stream)) - - // Run service - if err := srv.Run(); err != nil { - logger.Fatal(err) - } -} diff --git a/stream/micro.mu b/stream/micro.mu deleted file mode 100644 index b1ea404..0000000 --- a/stream/micro.mu +++ /dev/null @@ -1 +0,0 @@ -service stream diff --git a/stream/proto/stream.pb.go b/stream/proto/stream.pb.go deleted file mode 100644 index 0dd9f11..0000000 --- a/stream/proto/stream.pb.go +++ /dev/null @@ -1,369 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.26.0 -// protoc v3.15.6 -// source: proto/stream.proto - -package stream - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - structpb "google.golang.org/protobuf/types/known/structpb" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -// Publish a message to the stream. Specify a topic to group messages for a specific topic. -type PublishRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - // The topic to publish to - Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` - // The json message to publish - Message *structpb.Struct `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` -} - -func (x *PublishRequest) Reset() { - *x = PublishRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_proto_stream_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *PublishRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*PublishRequest) ProtoMessage() {} - -func (x *PublishRequest) ProtoReflect() protoreflect.Message { - mi := &file_proto_stream_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use PublishRequest.ProtoReflect.Descriptor instead. -func (*PublishRequest) Descriptor() ([]byte, []int) { - return file_proto_stream_proto_rawDescGZIP(), []int{0} -} - -func (x *PublishRequest) GetTopic() string { - if x != nil { - return x.Topic - } - return "" -} - -func (x *PublishRequest) GetMessage() *structpb.Struct { - if x != nil { - return x.Message - } - return nil -} - -type PublishResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *PublishResponse) Reset() { - *x = PublishResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_proto_stream_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *PublishResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*PublishResponse) ProtoMessage() {} - -func (x *PublishResponse) ProtoReflect() protoreflect.Message { - mi := &file_proto_stream_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use PublishResponse.ProtoReflect.Descriptor instead. -func (*PublishResponse) Descriptor() ([]byte, []int) { - return file_proto_stream_proto_rawDescGZIP(), []int{1} -} - -// Subscribe to messages for a given topic. -type SubscribeRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - // The topic to subscribe to - Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` -} - -func (x *SubscribeRequest) Reset() { - *x = SubscribeRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_proto_stream_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *SubscribeRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*SubscribeRequest) ProtoMessage() {} - -func (x *SubscribeRequest) ProtoReflect() protoreflect.Message { - mi := &file_proto_stream_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use SubscribeRequest.ProtoReflect.Descriptor instead. -func (*SubscribeRequest) Descriptor() ([]byte, []int) { - return file_proto_stream_proto_rawDescGZIP(), []int{2} -} - -func (x *SubscribeRequest) GetTopic() string { - if x != nil { - return x.Topic - } - return "" -} - -// A blocking stream will be returned in response. -type SubscribeResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - // The topic subscribed to - Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` - // The next json message on the topic - Message *structpb.Struct `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` -} - -func (x *SubscribeResponse) Reset() { - *x = SubscribeResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_proto_stream_proto_msgTypes[3] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *SubscribeResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*SubscribeResponse) ProtoMessage() {} - -func (x *SubscribeResponse) ProtoReflect() protoreflect.Message { - mi := &file_proto_stream_proto_msgTypes[3] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use SubscribeResponse.ProtoReflect.Descriptor instead. -func (*SubscribeResponse) Descriptor() ([]byte, []int) { - return file_proto_stream_proto_rawDescGZIP(), []int{3} -} - -func (x *SubscribeResponse) GetTopic() string { - if x != nil { - return x.Topic - } - return "" -} - -func (x *SubscribeResponse) GetMessage() *structpb.Struct { - if x != nil { - return x.Message - } - return nil -} - -var File_proto_stream_proto protoreflect.FileDescriptor - -var file_proto_stream_proto_rawDesc = []byte{ - 0x0a, 0x12, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x1a, 0x1c, 0x67, 0x6f, - 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, - 0x72, 0x75, 0x63, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x59, 0x0a, 0x0e, 0x50, 0x75, - 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, - 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, - 0x69, 0x63, 0x12, 0x31, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x07, 0x6d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x11, 0x0a, 0x0f, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x28, 0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, - 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, - 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, - 0x69, 0x63, 0x22, 0x5c, 0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x31, 0x0a, - 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, - 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, - 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x32, 0x8c, 0x01, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x3c, 0x0a, 0x07, 0x50, - 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x16, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, - 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, - 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x44, 0x0a, 0x09, 0x53, 0x75, 0x62, - 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x18, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, - 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x19, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, - 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, - 0x10, 0x5a, 0x0e, 0x2e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x73, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_proto_stream_proto_rawDescOnce sync.Once - file_proto_stream_proto_rawDescData = file_proto_stream_proto_rawDesc -) - -func file_proto_stream_proto_rawDescGZIP() []byte { - file_proto_stream_proto_rawDescOnce.Do(func() { - file_proto_stream_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_stream_proto_rawDescData) - }) - return file_proto_stream_proto_rawDescData -} - -var file_proto_stream_proto_msgTypes = make([]protoimpl.MessageInfo, 4) -var file_proto_stream_proto_goTypes = []interface{}{ - (*PublishRequest)(nil), // 0: stream.PublishRequest - (*PublishResponse)(nil), // 1: stream.PublishResponse - (*SubscribeRequest)(nil), // 2: stream.SubscribeRequest - (*SubscribeResponse)(nil), // 3: stream.SubscribeResponse - (*structpb.Struct)(nil), // 4: google.protobuf.Struct -} -var file_proto_stream_proto_depIdxs = []int32{ - 4, // 0: stream.PublishRequest.message:type_name -> google.protobuf.Struct - 4, // 1: stream.SubscribeResponse.message:type_name -> google.protobuf.Struct - 0, // 2: stream.Stream.Publish:input_type -> stream.PublishRequest - 2, // 3: stream.Stream.Subscribe:input_type -> stream.SubscribeRequest - 1, // 4: stream.Stream.Publish:output_type -> stream.PublishResponse - 3, // 5: stream.Stream.Subscribe:output_type -> stream.SubscribeResponse - 4, // [4:6] is the sub-list for method output_type - 2, // [2:4] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name -} - -func init() { file_proto_stream_proto_init() } -func file_proto_stream_proto_init() { - if File_proto_stream_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_proto_stream_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PublishRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_proto_stream_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PublishResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_proto_stream_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*SubscribeRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_proto_stream_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*SubscribeResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_proto_stream_proto_rawDesc, - NumEnums: 0, - NumMessages: 4, - NumExtensions: 0, - NumServices: 1, - }, - GoTypes: file_proto_stream_proto_goTypes, - DependencyIndexes: file_proto_stream_proto_depIdxs, - MessageInfos: file_proto_stream_proto_msgTypes, - }.Build() - File_proto_stream_proto = out.File - file_proto_stream_proto_rawDesc = nil - file_proto_stream_proto_goTypes = nil - file_proto_stream_proto_depIdxs = nil -} diff --git a/stream/proto/stream.pb.micro.go b/stream/proto/stream.pb.micro.go deleted file mode 100644 index fbb0fe8..0000000 --- a/stream/proto/stream.pb.micro.go +++ /dev/null @@ -1,186 +0,0 @@ -// Code generated by protoc-gen-micro. DO NOT EDIT. -// source: proto/stream.proto - -package stream - -import ( - fmt "fmt" - proto "github.com/golang/protobuf/proto" - _ "google.golang.org/protobuf/types/known/structpb" - math "math" -) - -import ( - context "context" - api "github.com/micro/micro/v3/service/api" - client "github.com/micro/micro/v3/service/client" - server "github.com/micro/micro/v3/service/server" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package - -// Reference imports to suppress errors if they are not otherwise used. -var _ api.Endpoint -var _ context.Context -var _ client.Option -var _ server.Option - -// Api Endpoints for Stream service - -func NewStreamEndpoints() []*api.Endpoint { - return []*api.Endpoint{} -} - -// Client API for Stream service - -type StreamService interface { - Publish(ctx context.Context, in *PublishRequest, opts ...client.CallOption) (*PublishResponse, error) - Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (Stream_SubscribeService, error) -} - -type streamService struct { - c client.Client - name string -} - -func NewStreamService(name string, c client.Client) StreamService { - return &streamService{ - c: c, - name: name, - } -} - -func (c *streamService) Publish(ctx context.Context, in *PublishRequest, opts ...client.CallOption) (*PublishResponse, error) { - req := c.c.NewRequest(c.name, "Stream.Publish", in) - out := new(PublishResponse) - err := c.c.Call(ctx, req, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *streamService) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (Stream_SubscribeService, error) { - req := c.c.NewRequest(c.name, "Stream.Subscribe", &SubscribeRequest{}) - stream, err := c.c.Stream(ctx, req, opts...) - if err != nil { - return nil, err - } - if err := stream.Send(in); err != nil { - return nil, err - } - return &streamServiceSubscribe{stream}, nil -} - -type Stream_SubscribeService interface { - Context() context.Context - SendMsg(interface{}) error - RecvMsg(interface{}) error - Close() error - Recv() (*SubscribeResponse, error) -} - -type streamServiceSubscribe struct { - stream client.Stream -} - -func (x *streamServiceSubscribe) Close() error { - return x.stream.Close() -} - -func (x *streamServiceSubscribe) Context() context.Context { - return x.stream.Context() -} - -func (x *streamServiceSubscribe) SendMsg(m interface{}) error { - return x.stream.Send(m) -} - -func (x *streamServiceSubscribe) RecvMsg(m interface{}) error { - return x.stream.Recv(m) -} - -func (x *streamServiceSubscribe) Recv() (*SubscribeResponse, error) { - m := new(SubscribeResponse) - err := x.stream.Recv(m) - if err != nil { - return nil, err - } - return m, nil -} - -// Server API for Stream service - -type StreamHandler interface { - Publish(context.Context, *PublishRequest, *PublishResponse) error - Subscribe(context.Context, *SubscribeRequest, Stream_SubscribeStream) error -} - -func RegisterStreamHandler(s server.Server, hdlr StreamHandler, opts ...server.HandlerOption) error { - type stream interface { - Publish(ctx context.Context, in *PublishRequest, out *PublishResponse) error - Subscribe(ctx context.Context, stream server.Stream) error - } - type Stream struct { - stream - } - h := &streamHandler{hdlr} - return s.Handle(s.NewHandler(&Stream{h}, opts...)) -} - -type streamHandler struct { - StreamHandler -} - -func (h *streamHandler) Publish(ctx context.Context, in *PublishRequest, out *PublishResponse) error { - return h.StreamHandler.Publish(ctx, in, out) -} - -func (h *streamHandler) Subscribe(ctx context.Context, stream server.Stream) error { - m := new(SubscribeRequest) - if err := stream.Recv(m); err != nil { - return err - } - return h.StreamHandler.Subscribe(ctx, m, &streamSubscribeStream{stream}) -} - -type Stream_SubscribeStream interface { - Context() context.Context - SendMsg(interface{}) error - RecvMsg(interface{}) error - Close() error - Send(*SubscribeResponse) error -} - -type streamSubscribeStream struct { - stream server.Stream -} - -func (x *streamSubscribeStream) Close() error { - return x.stream.Close() -} - -func (x *streamSubscribeStream) Context() context.Context { - return x.stream.Context() -} - -func (x *streamSubscribeStream) SendMsg(m interface{}) error { - return x.stream.Send(m) -} - -func (x *streamSubscribeStream) RecvMsg(m interface{}) error { - return x.stream.Recv(m) -} - -func (x *streamSubscribeStream) Send(m *SubscribeResponse) error { - return x.stream.Send(m) -} diff --git a/stream/proto/stream.proto b/stream/proto/stream.proto deleted file mode 100644 index 06b36e1..0000000 --- a/stream/proto/stream.proto +++ /dev/null @@ -1,35 +0,0 @@ -syntax = "proto3"; - -package stream; - -option go_package = "./proto;stream"; -import "google/protobuf/struct.proto"; - -service Stream { - rpc Publish(PublishRequest) returns (PublishResponse) {} - rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse) {} -} - -// Publish a message to the stream. Specify a topic to group messages for a specific topic. -message PublishRequest { - // The topic to publish to - string topic = 1; - // The json message to publish - google.protobuf.Struct message = 2; -} - -message PublishResponse {} - -// Subscribe to messages for a given topic. -message SubscribeRequest { - // The topic to subscribe to - string topic = 1; -} - -// A blocking stream will be returned in response. -message SubscribeResponse { - // The topic subscribed to - string topic = 1; - // The next json message on the topic - google.protobuf.Struct message = 2; -} diff --git a/stream/publicapi.json b/stream/publicapi.json deleted file mode 100644 index f0f05f0..0000000 --- a/stream/publicapi.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "name": "stream", - "icon": "🌊", - "category": "messaging", - "display_name": "Stream" -}