From 18e5e61f7987efcef1f0fef487d51aee3cb14f71 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 2 Nov 2021 13:47:29 +0000 Subject: [PATCH] add mq --- mq/.gitignore | 2 + mq/.t.swp | Bin 0 -> 12288 bytes mq/Dockerfile | 3 + mq/Makefile | 28 +++ mq/README.md | 6 + mq/examples.json | 25 +++ mq/generate.go | 3 + mq/handler/mq.go | 81 +++++++++ mq/main.go | 24 +++ mq/micro.mu | 1 + mq/proto/mq.pb.go | 367 ++++++++++++++++++++++++++++++++++++++++ mq/proto/mq.pb.micro.go | 186 ++++++++++++++++++++ mq/proto/mq.proto | 35 ++++ mq/publicapi.json | 6 + 14 files changed, 767 insertions(+) create mode 100644 mq/.gitignore create mode 100644 mq/.t.swp create mode 100644 mq/Dockerfile create mode 100644 mq/Makefile create mode 100644 mq/README.md create mode 100644 mq/examples.json create mode 100644 mq/generate.go create mode 100644 mq/handler/mq.go create mode 100644 mq/main.go create mode 100644 mq/micro.mu create mode 100644 mq/proto/mq.pb.go create mode 100644 mq/proto/mq.pb.micro.go create mode 100644 mq/proto/mq.proto create mode 100644 mq/publicapi.json diff --git a/mq/.gitignore b/mq/.gitignore new file mode 100644 index 0000000..0a6a66d --- /dev/null +++ b/mq/.gitignore @@ -0,0 +1,2 @@ + +mq diff --git a/mq/.t.swp b/mq/.t.swp new file mode 100644 index 0000000000000000000000000000000000000000..6d05ff811618155ae4567c0736d6d0817ffbd290 GIT binary patch literal 12288 zcmeI&u};G<5C-5Y3ljpt3mBNGTUZby0we||lOn+2rd= zovD}TlT)@X96hG!Opv*z&7V9kl~&y5Y@^SeO0^1(*_4jycIB!xxQ=eFyEQJd=cEdg S3}y0{z03tuNgvfp#q google.protobuf.Struct + 4, // 1: mq.SubscribeResponse.message:type_name -> google.protobuf.Struct + 0, // 2: mq.MQ.Publish:input_type -> mq.PublishRequest + 2, // 3: mq.MQ.Subscribe:input_type -> mq.SubscribeRequest + 1, // 4: mq.MQ.Publish:output_type -> mq.PublishResponse + 3, // 5: mq.MQ.Subscribe:output_type -> mq.SubscribeResponse + 4, // [4:6] is the sub-list for method output_type + 2, // [2:4] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_proto_mq_proto_init() } +func file_proto_mq_proto_init() { + if File_proto_mq_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_mq_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PublishRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_mq_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PublishResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_mq_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_mq_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_mq_proto_rawDesc, + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_proto_mq_proto_goTypes, + DependencyIndexes: file_proto_mq_proto_depIdxs, + MessageInfos: file_proto_mq_proto_msgTypes, + }.Build() + File_proto_mq_proto = out.File + file_proto_mq_proto_rawDesc = nil + file_proto_mq_proto_goTypes = nil + file_proto_mq_proto_depIdxs = nil +} diff --git a/mq/proto/mq.pb.micro.go b/mq/proto/mq.pb.micro.go new file mode 100644 index 0000000..fc3e30d --- /dev/null +++ b/mq/proto/mq.pb.micro.go @@ -0,0 +1,186 @@ +// Code generated by protoc-gen-micro. DO NOT EDIT. +// source: proto/mq.proto + +package mq + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + _ "google.golang.org/protobuf/types/known/structpb" + math "math" +) + +import ( + context "context" + api "github.com/micro/micro/v3/service/api" + client "github.com/micro/micro/v3/service/client" + server "github.com/micro/micro/v3/service/server" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// Reference imports to suppress errors if they are not otherwise used. +var _ api.Endpoint +var _ context.Context +var _ client.Option +var _ server.Option + +// Api Endpoints for MQ service + +func NewMQEndpoints() []*api.Endpoint { + return []*api.Endpoint{} +} + +// Client API for MQ service + +type MQService interface { + Publish(ctx context.Context, in *PublishRequest, opts ...client.CallOption) (*PublishResponse, error) + Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (MQ_SubscribeService, error) +} + +type mQService struct { + c client.Client + name string +} + +func NewMQService(name string, c client.Client) MQService { + return &mQService{ + c: c, + name: name, + } +} + +func (c *mQService) Publish(ctx context.Context, in *PublishRequest, opts ...client.CallOption) (*PublishResponse, error) { + req := c.c.NewRequest(c.name, "MQ.Publish", in) + out := new(PublishResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *mQService) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (MQ_SubscribeService, error) { + req := c.c.NewRequest(c.name, "MQ.Subscribe", &SubscribeRequest{}) + stream, err := c.c.Stream(ctx, req, opts...) + if err != nil { + return nil, err + } + if err := stream.Send(in); err != nil { + return nil, err + } + return &mQServiceSubscribe{stream}, nil +} + +type MQ_SubscribeService interface { + Context() context.Context + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Recv() (*SubscribeResponse, error) +} + +type mQServiceSubscribe struct { + stream client.Stream +} + +func (x *mQServiceSubscribe) Close() error { + return x.stream.Close() +} + +func (x *mQServiceSubscribe) Context() context.Context { + return x.stream.Context() +} + +func (x *mQServiceSubscribe) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *mQServiceSubscribe) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *mQServiceSubscribe) Recv() (*SubscribeResponse, error) { + m := new(SubscribeResponse) + err := x.stream.Recv(m) + if err != nil { + return nil, err + } + return m, nil +} + +// Server API for MQ service + +type MQHandler interface { + Publish(context.Context, *PublishRequest, *PublishResponse) error + Subscribe(context.Context, *SubscribeRequest, MQ_SubscribeStream) error +} + +func RegisterMQHandler(s server.Server, hdlr MQHandler, opts ...server.HandlerOption) error { + type mQ interface { + Publish(ctx context.Context, in *PublishRequest, out *PublishResponse) error + Subscribe(ctx context.Context, stream server.Stream) error + } + type MQ struct { + mQ + } + h := &mQHandler{hdlr} + return s.Handle(s.NewHandler(&MQ{h}, opts...)) +} + +type mQHandler struct { + MQHandler +} + +func (h *mQHandler) Publish(ctx context.Context, in *PublishRequest, out *PublishResponse) error { + return h.MQHandler.Publish(ctx, in, out) +} + +func (h *mQHandler) Subscribe(ctx context.Context, stream server.Stream) error { + m := new(SubscribeRequest) + if err := stream.Recv(m); err != nil { + return err + } + return h.MQHandler.Subscribe(ctx, m, &mQSubscribeStream{stream}) +} + +type MQ_SubscribeStream interface { + Context() context.Context + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Send(*SubscribeResponse) error +} + +type mQSubscribeStream struct { + stream server.Stream +} + +func (x *mQSubscribeStream) Close() error { + return x.stream.Close() +} + +func (x *mQSubscribeStream) Context() context.Context { + return x.stream.Context() +} + +func (x *mQSubscribeStream) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *mQSubscribeStream) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *mQSubscribeStream) Send(m *SubscribeResponse) error { + return x.stream.Send(m) +} diff --git a/mq/proto/mq.proto b/mq/proto/mq.proto new file mode 100644 index 0000000..ab93ac0 --- /dev/null +++ b/mq/proto/mq.proto @@ -0,0 +1,35 @@ +syntax = "proto3"; + +package mq; + +option go_package = "./proto;mq"; +import "google/protobuf/struct.proto"; + +service MQ { + rpc Publish(PublishRequest) returns (PublishResponse) {} + rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse) {} +} + +// Publish a message to the mq. Specify a topic to group messages for a specific topic. +message PublishRequest { + // The topic to publish to + string topic = 1; + // The json message to publish + google.protobuf.Struct message = 2; +} + +message PublishResponse {} + +// Subscribe to messages for a given topic. +message SubscribeRequest { + // The topic to subscribe to + string topic = 1; +} + +// A blocking mq will be returned in response. +message SubscribeResponse { + // The topic subscribed to + string topic = 1; + // The next json message on the topic + google.protobuf.Struct message = 2; +} diff --git a/mq/publicapi.json b/mq/publicapi.json new file mode 100644 index 0000000..f156901 --- /dev/null +++ b/mq/publicapi.json @@ -0,0 +1,6 @@ +{ + "name": "mq", + "icon": "🌊", + "category": "messaging", + "display_name": "MQ" +}