update events

This commit is contained in:
Asim Aslam
2021-11-02 16:31:09 +00:00
parent 636f5f1758
commit b25df3eb45
5 changed files with 113 additions and 113 deletions

View File

@@ -44,7 +44,7 @@ func NewEventEndpoints() []*api.Endpoint {
type EventService interface {
Publish(ctx context.Context, in *PublishRequest, opts ...client.CallOption) (*PublishResponse, error)
Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (Event_SubscribeService, error)
Consume(ctx context.Context, in *ConsumeRequest, opts ...client.CallOption) (Event_ConsumeService, error)
Read(ctx context.Context, in *ReadRequest, opts ...client.CallOption) (*ReadResponse, error)
}
@@ -70,8 +70,8 @@ func (c *eventService) Publish(ctx context.Context, in *PublishRequest, opts ...
return out, nil
}
func (c *eventService) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (Event_SubscribeService, error) {
req := c.c.NewRequest(c.name, "Event.Subscribe", &SubscribeRequest{})
func (c *eventService) Consume(ctx context.Context, in *ConsumeRequest, opts ...client.CallOption) (Event_ConsumeService, error) {
req := c.c.NewRequest(c.name, "Event.Consume", &ConsumeRequest{})
stream, err := c.c.Stream(ctx, req, opts...)
if err != nil {
return nil, err
@@ -79,39 +79,39 @@ func (c *eventService) Subscribe(ctx context.Context, in *SubscribeRequest, opts
if err := stream.Send(in); err != nil {
return nil, err
}
return &eventServiceSubscribe{stream}, nil
return &eventServiceConsume{stream}, nil
}
type Event_SubscribeService interface {
type Event_ConsumeService interface {
Context() context.Context
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Recv() (*SubscribeResponse, error)
Recv() (*ConsumeResponse, error)
}
type eventServiceSubscribe struct {
type eventServiceConsume struct {
stream client.Stream
}
func (x *eventServiceSubscribe) Close() error {
func (x *eventServiceConsume) Close() error {
return x.stream.Close()
}
func (x *eventServiceSubscribe) Context() context.Context {
func (x *eventServiceConsume) Context() context.Context {
return x.stream.Context()
}
func (x *eventServiceSubscribe) SendMsg(m interface{}) error {
func (x *eventServiceConsume) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *eventServiceSubscribe) RecvMsg(m interface{}) error {
func (x *eventServiceConsume) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *eventServiceSubscribe) Recv() (*SubscribeResponse, error) {
m := new(SubscribeResponse)
func (x *eventServiceConsume) Recv() (*ConsumeResponse, error) {
m := new(ConsumeResponse)
err := x.stream.Recv(m)
if err != nil {
return nil, err
@@ -133,14 +133,14 @@ func (c *eventService) Read(ctx context.Context, in *ReadRequest, opts ...client
type EventHandler interface {
Publish(context.Context, *PublishRequest, *PublishResponse) error
Subscribe(context.Context, *SubscribeRequest, Event_SubscribeStream) error
Consume(context.Context, *ConsumeRequest, Event_ConsumeStream) error
Read(context.Context, *ReadRequest, *ReadResponse) error
}
func RegisterEventHandler(s server.Server, hdlr EventHandler, opts ...server.HandlerOption) error {
type event interface {
Publish(ctx context.Context, in *PublishRequest, out *PublishResponse) error
Subscribe(ctx context.Context, stream server.Stream) error
Consume(ctx context.Context, stream server.Stream) error
Read(ctx context.Context, in *ReadRequest, out *ReadResponse) error
}
type Event struct {
@@ -158,43 +158,43 @@ func (h *eventHandler) Publish(ctx context.Context, in *PublishRequest, out *Pub
return h.EventHandler.Publish(ctx, in, out)
}
func (h *eventHandler) Subscribe(ctx context.Context, stream server.Stream) error {
m := new(SubscribeRequest)
func (h *eventHandler) Consume(ctx context.Context, stream server.Stream) error {
m := new(ConsumeRequest)
if err := stream.Recv(m); err != nil {
return err
}
return h.EventHandler.Subscribe(ctx, m, &eventSubscribeStream{stream})
return h.EventHandler.Consume(ctx, m, &eventConsumeStream{stream})
}
type Event_SubscribeStream interface {
type Event_ConsumeStream interface {
Context() context.Context
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Send(*SubscribeResponse) error
Send(*ConsumeResponse) error
}
type eventSubscribeStream struct {
type eventConsumeStream struct {
stream server.Stream
}
func (x *eventSubscribeStream) Close() error {
func (x *eventConsumeStream) Close() error {
return x.stream.Close()
}
func (x *eventSubscribeStream) Context() context.Context {
func (x *eventConsumeStream) Context() context.Context {
return x.stream.Context()
}
func (x *eventSubscribeStream) SendMsg(m interface{}) error {
func (x *eventConsumeStream) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *eventSubscribeStream) RecvMsg(m interface{}) error {
func (x *eventConsumeStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *eventSubscribeStream) Send(m *SubscribeResponse) error {
func (x *eventConsumeStream) Send(m *ConsumeResponse) error {
return x.stream.Send(m)
}