package handler import ( "context" "github.com/micro/micro/v3/service/auth" "github.com/micro/micro/v3/service/logger" pb "github.com/micro/services/streams/proto" ) func (s *Streams) Publish(ctx context.Context, req *pb.Message, rsp *pb.PublishResponse) error { // validate the request if len(req.Topic) == 0 { return ErrMissingTopic } if err := validateTopicInput(req.Topic); err != nil { return err } if len(req.Message) == 0 { return ErrMissingMessage } topic := req.Topic // in the event we have an account we use multi-tenancy acc, ok := auth.AccountFromContext(ctx) if ok { topic = fmtTopic(acc, req.Topic) } // publish the message logger.Infof("Publishing message to topic: %v", req.Topic) return s.Events.Publish(topic, req.Message) }