update stream to be about social ephemeral messaging (#255)

This commit is contained in:
Asim Aslam
2021-11-03 14:41:49 +00:00
committed by GitHub
parent 06e7c0f242
commit 8827af19f5
9 changed files with 964 additions and 274 deletions

View File

@@ -1,6 +1,10 @@
Publish and subscribe to messages Ephemeral messaging service
# Stream Service # Stream Service
Send messages to a stream and broadcast to multiple subscribers. Group by topic The stream service provides ephemeral message streams for building chat applications,
and rapidly blast fire-and-forget messages to anyone listening. feeds and timelines. Simply send messages to a channel and know they'll expire
after 24 hours. Streams are limited to 1000 messages and 1000 streams in total.
Max message size is 512 characters.
Messages with links are automatically populated with metadata related to site.

348
stream/domain/domain.go Normal file
View File

@@ -0,0 +1,348 @@
package domain
import (
"errors"
"net/url"
"strings"
"sync"
"time"
"github.com/PuerkitoBio/goquery"
"github.com/golang/groupcache/lru"
"github.com/google/uuid"
)
const (
defaultStream = "_"
maxMessageSize = 512
maxMessages = 1000
maxStreams = 1000
streamTTL = 8.64e13
)
type Metadata struct {
Created int64
Title string
Description string
Type string
Image string
Url string
Site string
}
type Stream struct {
Id string
Messages []*Message
Updated int64
}
type Message struct {
Id string
Text string
Created int64 `json:",string"`
Stream string
Metadata *Metadata
}
type Store struct {
Created int64
Updates chan *Message
mtx sync.RWMutex
Streams *lru.Cache
streams map[string]int64
metadatas map[string]*Metadata
}
var (
C = newStore()
)
func newStore() *Store {
return &Store{
Created: time.Now().UnixNano(),
Streams: lru.New(maxStreams),
Updates: make(chan *Message, 100),
streams: make(map[string]int64),
metadatas: make(map[string]*Metadata),
}
}
func newStream(id string) *Stream {
return &Stream{
Id: id,
Updated: time.Now().UnixNano(),
}
}
func newMessage(text, stream string) *Message {
return &Message{
Id: uuid.New().String(),
Text: text,
Created: time.Now().UnixNano(),
Stream: stream,
}
}
func getMetadata(uri string) *Metadata {
u, err := url.Parse(uri)
if err != nil {
return nil
}
d, err := goquery.NewDocument(u.String())
if err != nil {
return nil
}
g := &Metadata{
Created: time.Now().UnixNano(),
}
for _, node := range d.Find("meta").Nodes {
if len(node.Attr) < 2 {
continue
}
p := strings.Split(node.Attr[0].Val, ":")
if len(p) < 2 || (p[0] != "twitter" && p[0] != "og") {
continue
}
switch p[1] {
case "site_name":
g.Site = node.Attr[1].Val
case "site":
if len(g.Site) == 0 {
g.Site = node.Attr[1].Val
}
case "title":
g.Title = node.Attr[1].Val
case "description":
g.Description = node.Attr[1].Val
case "card", "type":
g.Type = node.Attr[1].Val
case "url":
g.Url = node.Attr[1].Val
case "image":
if len(p) > 2 && p[2] == "src" {
g.Image = node.Attr[1].Val
} else if len(g.Image) == 0 {
g.Image = node.Attr[1].Val
}
}
}
if len(g.Type) == 0 || len(g.Image) == 0 || len(g.Title) == 0 || len(g.Url) == 0 {
return nil
}
return g
}
func (c *Store) Metadata(t *Message) {
parts := strings.Split(t.Text, " ")
for _, part := range parts {
g := getMetadata(part)
if g == nil {
continue
}
c.mtx.Lock()
c.metadatas[t.Id] = g
c.mtx.Unlock()
return
}
}
func (c *Store) List() map[string]int64 {
c.mtx.RLock()
streams := c.streams
c.mtx.RUnlock()
return streams
}
func (c *Store) Save(message *Message) {
c.mtx.Lock()
defer c.mtx.Unlock()
var stream *Stream
if obj, ok := c.Streams.Get(message.Stream); ok {
stream = obj.(*Stream)
} else {
stream = newStream(message.Stream)
c.Streams.Add(message.Stream, stream)
}
stream.Messages = append(stream.Messages, message)
if len(stream.Messages) > maxMessages {
stream.Messages = stream.Messages[1:]
}
stream.Updated = time.Now().UnixNano()
}
func (c *Store) Retrieve(message string, streem string, direction, last, limit int64) []*Message {
c.mtx.RLock()
defer c.mtx.RUnlock()
var stream *Stream
if message, ok := c.Streams.Get(streem); ok {
stream = message.(*Stream)
} else {
return []*Message{}
}
if len(message) == 0 {
var messages []*Message
if limit <= 0 {
return messages
}
li := int(limit)
// go back in time
if direction < 0 {
for i := len(stream.Messages) - 1; i >= 0; i-- {
if len(messages) >= li {
return messages
}
message := stream.Messages[i]
if message.Created < last {
if g, ok := c.metadatas[message.Id]; ok {
tc := *message
tc.Metadata = g
messages = append(messages, &tc)
} else {
messages = append(messages, message)
}
}
}
return messages
}
start := 0
if len(stream.Messages) > li {
start = len(stream.Messages) - li
}
for i := start; i < len(stream.Messages); i++ {
if len(messages) >= li {
return messages
}
message := stream.Messages[i]
if message.Created > last {
if g, ok := c.metadatas[message.Id]; ok {
tc := *message
tc.Metadata = g
messages = append(messages, &tc)
} else {
messages = append(messages, message)
}
}
}
return messages
}
// retrieve one
for _, t := range stream.Messages {
var messages []*Message
if message == t.Id {
if g, ok := c.metadatas[t.Id]; ok {
tc := *t
tc.Metadata = g
messages = append(messages, &tc)
} else {
messages = append(messages, t)
}
return messages
}
}
return []*Message{}
}
func (c *Store) Run() {
t1 := time.NewTicker(time.Hour)
t2 := time.NewTicker(time.Minute)
streams := make(map[string]int64)
for {
select {
case message := <-c.Updates:
c.Save(message)
streams[message.Stream] = time.Now().UnixNano()
go c.Metadata(message)
case <-t1.C:
now := time.Now().UnixNano()
for stream, u := range streams {
if d := now - u; d > streamTTL {
c.Streams.Remove(stream)
delete(streams, stream)
}
}
c.mtx.Lock()
for metadata, g := range c.metadatas {
if d := now - g.Created; d > streamTTL {
delete(c.metadatas, metadata)
}
}
c.mtx.Unlock()
case <-t2.C:
c.mtx.Lock()
c.streams = streams
c.mtx.Unlock()
}
}
}
func ListChannels() map[string]int64 {
return C.List()
}
func ListMessages(channel string, limit int64) []*Message {
message := ""
last := int64(0)
direction := int64(1)
if limit <= 0 {
limit = 25
}
// default stream
if len(channel) == 0 {
channel = defaultStream
}
return C.Retrieve(message, channel, direction, last, limit)
}
func SendMessage(channel, message string) error {
// default stream
if len(channel) == 0 {
channel = defaultStream
}
// default length
if len(message) > maxMessageSize {
message = message[:maxMessageSize]
}
select {
case C.Updates <- newMessage(message, channel):
case <-time.After(time.Second):
return errors.New("timed out creating message")
}
return nil
}
// TODO: streams per user
func Setup() {
go C.Run()
}

View File

@@ -1,25 +1,46 @@
{ {
"publish": [{ "sendMessage": [{
"title": "Publish a message", "title": "Send a message",
"description": "Publish a message to a topic on the stream", "description": "Send a message to a channel",
"run_check": true, "run_check": true,
"request": { "request": {
"topic": "events", "channel": "general",
"message": {"id": "1", "type": "signup", "user": "john"} "text": "Hey checkout this tweet https://twitter.com/m3oservices/status/1455291054295498752"
}, },
"response": {} "response": {}
}], }],
"subscribe": [{ "listMessages": [{
"title": "Subscribe to a topic", "title": "List messages",
"description": "Subscribe to messages on a given topic from the stream", "description": "List messages for a channel",
"run_check": false, "run_check": false,
"request": { "request": {
"topic": "events" "channel": "general"
}, },
"response": { "response": {
"topic": "events", "messages": [
"message": {"id": "1", "type": "signup", "user": "john"} {
"id": "e6099dca-22af-440e-bdbf-e14525af9824",
"text": "Hey checkout this tweet https://twitter.com/m3oservices/status/1455291054295498752",
"timestamp": "2021-11-03T14:34:40.333401738Z",
"channel": "general",
"metadata": {}
}
]
}
}],
"listChannels": [{
"title": "List channels",
"description": "List all the channels",
"run_check": true,
"request": {},
"response": {
"channels": [
{
"name": "general",
"last_active": "2021-11-03T14:35:07.594972213Z"
}
]
} }
}] }]
} }

View File

@@ -2,23 +2,29 @@ package handler
import ( import (
"context" "context"
"encoding/json"
"fmt"
"path" "path"
"strings"
"time"
"github.com/asim/mq/broker"
"github.com/micro/micro/v3/service/errors" "github.com/micro/micro/v3/service/errors"
log "github.com/micro/micro/v3/service/logger"
"github.com/micro/services/pkg/tenant" "github.com/micro/services/pkg/tenant"
"github.com/micro/services/stream/domain"
pb "github.com/micro/services/stream/proto" pb "github.com/micro/services/stream/proto"
"google.golang.org/protobuf/types/known/structpb"
) )
type Stream struct{} type Stream struct{}
func (s *Stream) Publish(ctx context.Context, req *pb.PublishRequest, rsp *pb.PublishResponse) error { func New() *Stream {
if len(req.Topic) == 0 { domain.Setup()
return errors.BadRequest("stream.publish", "topic is blank") return &Stream{}
}
func (s *Stream) SendMessage(ctx context.Context, req *pb.SendMessageRequest, rsp *pb.SendMessageResponse) error {
if len(req.Channel) == 0 {
return errors.BadRequest("stream.sendmessage", "channel is blank")
}
if len(req.Text) == 0 {
return errors.BadRequest("stream.sendmessage", "message is blank")
} }
// get the tenant // get the tenant
@@ -27,23 +33,23 @@ func (s *Stream) Publish(ctx context.Context, req *pb.PublishRequest, rsp *pb.Pu
id = "default" id = "default"
} }
// create tenant based topics // create tenant based channels
topic := path.Join("stream", id, req.Topic) channel := path.Join(id, req.Channel)
// marshal the data // sendmessage the message
b, _ := json.Marshal(req.Message.AsMap()) if err := domain.SendMessage(channel, req.Text); err != nil {
return errors.InternalServerError("stream.sendmessage", err.Error())
log.Infof("Tenant %v publishing to %v\n", id, req.Topic) }
// publish the message
broker.Publish(topic, b)
return nil return nil
} }
func (s *Stream) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream pb.Stream_SubscribeStream) error { func (s *Stream) ListMessages(ctx context.Context, req *pb.ListMessagesRequest, rsp *pb.ListMessagesResponse) error {
if len(req.Topic) == 0 { if len(req.Channel) == 0 {
return errors.BadRequest("stream.publish", "topic is blank") return errors.BadRequest("stream.sendmessage", "channel is blank")
}
if req.Limit <= 0 {
req.Limit = 25
} }
id, ok := tenant.FromContext(ctx) id, ok := tenant.FromContext(ctx)
@@ -51,30 +57,53 @@ func (s *Stream) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream
id = "default" id = "default"
} }
// create tenant based topics // create tenant based channels
topic := path.Join("stream", id, req.Topic) channel := path.Join(id, req.Channel)
rsp.Channel = req.Channel
log.Infof("Tenant %v subscribing to %v\n", id, req.Topic) for _, message := range domain.ListMessages(channel, int64(req.Limit)) {
metadata := map[string]string{}
sub, err := broker.Subscribe(topic) if message.Metadata != nil {
if err != nil { metadata["created"] = time.Unix(0, message.Metadata.Created).Format(time.RFC3339Nano)
return errors.InternalServerError("stream.subscribe", "failed to subscribe to stream") metadata["title"] = message.Metadata.Title
} metadata["description"] = message.Metadata.Description
defer broker.Unsubscribe(req.Topic, sub) metadata["type"] = message.Metadata.Type
metadata["image"] = message.Metadata.Image
// range over the messages until the subscriber is closed metadata["url"] = message.Metadata.Url
for msg := range sub { metadata["site"] = message.Metadata.Site
fmt.Println("got message, sending")
// unmarshal the message into a struct
d := &structpb.Struct{}
d.UnmarshalJSON(msg)
if err := stream.Send(&pb.SubscribeResponse{
Topic: req.Topic,
Message: d,
}); err != nil {
return err
} }
rsp.Messages = append(rsp.Messages, &pb.Message{
Id: message.Id,
Text: message.Text,
Timestamp: time.Unix(0, message.Created).Format(time.RFC3339Nano),
Channel: req.Channel,
Metadata: metadata,
})
}
return nil
}
func (s *Stream) ListChannels(ctx context.Context, req *pb.ListChannelsRequest, rsp *pb.ListChannelsResponse) error {
// get the tenant
id, ok := tenant.FromContext(ctx)
if !ok {
id = "default"
}
for channel, active := range domain.ListChannels() {
if !strings.HasPrefix(channel, id+"/") {
continue
}
channel = strings.TrimPrefix(channel, id+"/")
rsp.Channels = append(rsp.Channels, &pb.Channel{
Name: channel,
LastActive: time.Unix(0, active).Format(time.RFC3339Nano),
})
} }
return nil return nil

View File

@@ -15,7 +15,7 @@ func main() {
) )
// Register handler // Register handler
pb.RegisterStreamHandler(srv.Server(), new(handler.Stream)) pb.RegisterStreamHandler(srv.Server(), handler.New())
// Run service // Run service
if err := srv.Run(); err != nil { if err := srv.Run(); err != nil {

View File

@@ -1,6 +1,6 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.26.0 // protoc-gen-go v1.27.1
// protoc v3.15.6 // protoc v3.15.6
// source: proto/stream.proto // source: proto/stream.proto
@@ -9,7 +9,6 @@ package stream
import ( import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
structpb "google.golang.org/protobuf/types/known/structpb"
reflect "reflect" reflect "reflect"
sync "sync" sync "sync"
) )
@@ -21,20 +20,25 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
) )
// Publish a message to the stream. Specify a topic to group messages for a specific topic. type Message struct {
type PublishRequest struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
// The topic to publish to // id of the message
Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
// The json message to publish // text of the message
Message *structpb.Struct `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` Text string `protobuf:"bytes,2,opt,name=text,proto3" json:"text,omitempty"`
// time of message creation
Timestamp string `protobuf:"bytes,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// the channel name
Channel string `protobuf:"bytes,4,opt,name=channel,proto3" json:"channel,omitempty"`
// the associated metadata
Metadata map[string]string `protobuf:"bytes,5,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
} }
func (x *PublishRequest) Reset() { func (x *Message) Reset() {
*x = PublishRequest{} *x = Message{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_proto_stream_proto_msgTypes[0] mi := &file_proto_stream_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -42,13 +46,13 @@ func (x *PublishRequest) Reset() {
} }
} }
func (x *PublishRequest) String() string { func (x *Message) String() string {
return protoimpl.X.MessageStringOf(x) return protoimpl.X.MessageStringOf(x)
} }
func (*PublishRequest) ProtoMessage() {} func (*Message) ProtoMessage() {}
func (x *PublishRequest) ProtoReflect() protoreflect.Message { func (x *Message) ProtoReflect() protoreflect.Message {
mi := &file_proto_stream_proto_msgTypes[0] mi := &file_proto_stream_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -60,33 +64,59 @@ func (x *PublishRequest) ProtoReflect() protoreflect.Message {
return mi.MessageOf(x) return mi.MessageOf(x)
} }
// Deprecated: Use PublishRequest.ProtoReflect.Descriptor instead. // Deprecated: Use Message.ProtoReflect.Descriptor instead.
func (*PublishRequest) Descriptor() ([]byte, []int) { func (*Message) Descriptor() ([]byte, []int) {
return file_proto_stream_proto_rawDescGZIP(), []int{0} return file_proto_stream_proto_rawDescGZIP(), []int{0}
} }
func (x *PublishRequest) GetTopic() string { func (x *Message) GetId() string {
if x != nil { if x != nil {
return x.Topic return x.Id
} }
return "" return ""
} }
func (x *PublishRequest) GetMessage() *structpb.Struct { func (x *Message) GetText() string {
if x != nil { if x != nil {
return x.Message return x.Text
}
return ""
}
func (x *Message) GetTimestamp() string {
if x != nil {
return x.Timestamp
}
return ""
}
func (x *Message) GetChannel() string {
if x != nil {
return x.Channel
}
return ""
}
func (x *Message) GetMetadata() map[string]string {
if x != nil {
return x.Metadata
} }
return nil return nil
} }
type PublishResponse struct { type Channel struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
// name of the channel
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
// last activity time
LastActive string `protobuf:"bytes,2,opt,name=last_active,json=lastActive,proto3" json:"last_active,omitempty"`
} }
func (x *PublishResponse) Reset() { func (x *Channel) Reset() {
*x = PublishResponse{} *x = Channel{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_proto_stream_proto_msgTypes[1] mi := &file_proto_stream_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -94,13 +124,13 @@ func (x *PublishResponse) Reset() {
} }
} }
func (x *PublishResponse) String() string { func (x *Channel) String() string {
return protoimpl.X.MessageStringOf(x) return protoimpl.X.MessageStringOf(x)
} }
func (*PublishResponse) ProtoMessage() {} func (*Channel) ProtoMessage() {}
func (x *PublishResponse) ProtoReflect() protoreflect.Message { func (x *Channel) ProtoReflect() protoreflect.Message {
mi := &file_proto_stream_proto_msgTypes[1] mi := &file_proto_stream_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -112,23 +142,39 @@ func (x *PublishResponse) ProtoReflect() protoreflect.Message {
return mi.MessageOf(x) return mi.MessageOf(x)
} }
// Deprecated: Use PublishResponse.ProtoReflect.Descriptor instead. // Deprecated: Use Channel.ProtoReflect.Descriptor instead.
func (*PublishResponse) Descriptor() ([]byte, []int) { func (*Channel) Descriptor() ([]byte, []int) {
return file_proto_stream_proto_rawDescGZIP(), []int{1} return file_proto_stream_proto_rawDescGZIP(), []int{1}
} }
// Subscribe to messages for a given topic. func (x *Channel) GetName() string {
type SubscribeRequest struct { if x != nil {
return x.Name
}
return ""
}
func (x *Channel) GetLastActive() string {
if x != nil {
return x.LastActive
}
return ""
}
// SendMessage a message to the stream.
type SendMessageRequest struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
// The topic to subscribe to // The channel to send to
Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` Channel string `protobuf:"bytes,1,opt,name=channel,proto3" json:"channel,omitempty"`
// The message text to send
Text string `protobuf:"bytes,2,opt,name=text,proto3" json:"text,omitempty"`
} }
func (x *SubscribeRequest) Reset() { func (x *SendMessageRequest) Reset() {
*x = SubscribeRequest{} *x = SendMessageRequest{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_proto_stream_proto_msgTypes[2] mi := &file_proto_stream_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -136,13 +182,13 @@ func (x *SubscribeRequest) Reset() {
} }
} }
func (x *SubscribeRequest) String() string { func (x *SendMessageRequest) String() string {
return protoimpl.X.MessageStringOf(x) return protoimpl.X.MessageStringOf(x)
} }
func (*SubscribeRequest) ProtoMessage() {} func (*SendMessageRequest) ProtoMessage() {}
func (x *SubscribeRequest) ProtoReflect() protoreflect.Message { func (x *SendMessageRequest) ProtoReflect() protoreflect.Message {
mi := &file_proto_stream_proto_msgTypes[2] mi := &file_proto_stream_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -154,32 +200,33 @@ func (x *SubscribeRequest) ProtoReflect() protoreflect.Message {
return mi.MessageOf(x) return mi.MessageOf(x)
} }
// Deprecated: Use SubscribeRequest.ProtoReflect.Descriptor instead. // Deprecated: Use SendMessageRequest.ProtoReflect.Descriptor instead.
func (*SubscribeRequest) Descriptor() ([]byte, []int) { func (*SendMessageRequest) Descriptor() ([]byte, []int) {
return file_proto_stream_proto_rawDescGZIP(), []int{2} return file_proto_stream_proto_rawDescGZIP(), []int{2}
} }
func (x *SubscribeRequest) GetTopic() string { func (x *SendMessageRequest) GetChannel() string {
if x != nil { if x != nil {
return x.Topic return x.Channel
} }
return "" return ""
} }
// A blocking stream will be returned in response. func (x *SendMessageRequest) GetText() string {
type SubscribeResponse struct { if x != nil {
return x.Text
}
return ""
}
type SendMessageResponse struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
// The topic subscribed to
Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"`
// The next json message on the topic
Message *structpb.Struct `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
} }
func (x *SubscribeResponse) Reset() { func (x *SendMessageResponse) Reset() {
*x = SubscribeResponse{} *x = SendMessageResponse{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_proto_stream_proto_msgTypes[3] mi := &file_proto_stream_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -187,13 +234,13 @@ func (x *SubscribeResponse) Reset() {
} }
} }
func (x *SubscribeResponse) String() string { func (x *SendMessageResponse) String() string {
return protoimpl.X.MessageStringOf(x) return protoimpl.X.MessageStringOf(x)
} }
func (*SubscribeResponse) ProtoMessage() {} func (*SendMessageResponse) ProtoMessage() {}
func (x *SubscribeResponse) ProtoReflect() protoreflect.Message { func (x *SendMessageResponse) ProtoReflect() protoreflect.Message {
mi := &file_proto_stream_proto_msgTypes[3] mi := &file_proto_stream_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -205,21 +252,208 @@ func (x *SubscribeResponse) ProtoReflect() protoreflect.Message {
return mi.MessageOf(x) return mi.MessageOf(x)
} }
// Deprecated: Use SubscribeResponse.ProtoReflect.Descriptor instead. // Deprecated: Use SendMessageResponse.ProtoReflect.Descriptor instead.
func (*SubscribeResponse) Descriptor() ([]byte, []int) { func (*SendMessageResponse) Descriptor() ([]byte, []int) {
return file_proto_stream_proto_rawDescGZIP(), []int{3} return file_proto_stream_proto_rawDescGZIP(), []int{3}
} }
func (x *SubscribeResponse) GetTopic() string { // List all the active channels
type ListChannelsRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *ListChannelsRequest) Reset() {
*x = ListChannelsRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_stream_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ListChannelsRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ListChannelsRequest) ProtoMessage() {}
func (x *ListChannelsRequest) ProtoReflect() protoreflect.Message {
mi := &file_proto_stream_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ListChannelsRequest.ProtoReflect.Descriptor instead.
func (*ListChannelsRequest) Descriptor() ([]byte, []int) {
return file_proto_stream_proto_rawDescGZIP(), []int{4}
}
type ListChannelsResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Channels []*Channel `protobuf:"bytes,1,rep,name=channels,proto3" json:"channels,omitempty"`
}
func (x *ListChannelsResponse) Reset() {
*x = ListChannelsResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_stream_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ListChannelsResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ListChannelsResponse) ProtoMessage() {}
func (x *ListChannelsResponse) ProtoReflect() protoreflect.Message {
mi := &file_proto_stream_proto_msgTypes[5]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ListChannelsResponse.ProtoReflect.Descriptor instead.
func (*ListChannelsResponse) Descriptor() ([]byte, []int) {
return file_proto_stream_proto_rawDescGZIP(), []int{5}
}
func (x *ListChannelsResponse) GetChannels() []*Channel {
if x != nil { if x != nil {
return x.Topic return x.Channels
}
return nil
}
// List messages for a given channel
type ListMessagesRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// The channel to subscribe to
Channel string `protobuf:"bytes,1,opt,name=channel,proto3" json:"channel,omitempty"`
// number of message to return
Limit int32 `protobuf:"varint,2,opt,name=limit,proto3" json:"limit,omitempty"`
}
func (x *ListMessagesRequest) Reset() {
*x = ListMessagesRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_stream_proto_msgTypes[6]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ListMessagesRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ListMessagesRequest) ProtoMessage() {}
func (x *ListMessagesRequest) ProtoReflect() protoreflect.Message {
mi := &file_proto_stream_proto_msgTypes[6]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ListMessagesRequest.ProtoReflect.Descriptor instead.
func (*ListMessagesRequest) Descriptor() ([]byte, []int) {
return file_proto_stream_proto_rawDescGZIP(), []int{6}
}
func (x *ListMessagesRequest) GetChannel() string {
if x != nil {
return x.Channel
} }
return "" return ""
} }
func (x *SubscribeResponse) GetMessage() *structpb.Struct { func (x *ListMessagesRequest) GetLimit() int32 {
if x != nil { if x != nil {
return x.Message return x.Limit
}
return 0
}
type ListMessagesResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// The channel subscribed to
Channel string `protobuf:"bytes,1,opt,name=channel,proto3" json:"channel,omitempty"`
// Messages are returned in reverse order; latest first
Messages []*Message `protobuf:"bytes,2,rep,name=messages,proto3" json:"messages,omitempty"`
}
func (x *ListMessagesResponse) Reset() {
*x = ListMessagesResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_stream_proto_msgTypes[7]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ListMessagesResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ListMessagesResponse) ProtoMessage() {}
func (x *ListMessagesResponse) ProtoReflect() protoreflect.Message {
mi := &file_proto_stream_proto_msgTypes[7]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ListMessagesResponse.ProtoReflect.Descriptor instead.
func (*ListMessagesResponse) Descriptor() ([]byte, []int) {
return file_proto_stream_proto_rawDescGZIP(), []int{7}
}
func (x *ListMessagesResponse) GetChannel() string {
if x != nil {
return x.Channel
}
return ""
}
func (x *ListMessagesResponse) GetMessages() []*Message {
if x != nil {
return x.Messages
} }
return nil return nil
} }
@@ -228,35 +462,64 @@ var File_proto_stream_proto protoreflect.FileDescriptor
var file_proto_stream_proto_rawDesc = []byte{ var file_proto_stream_proto_rawDesc = []byte{
0x0a, 0x12, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x70, 0x0a, 0x12, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x1a, 0x1c, 0x67, 0x6f, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x22, 0xdd, 0x01, 0x0a,
0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01,
0x72, 0x75, 0x63, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x59, 0x0a, 0x0e, 0x50, 0x75, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, 0x78, 0x74,
0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x65, 0x78, 0x74, 0x12, 0x1c, 0x0a, 0x09,
0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
0x69, 0x63, 0x12, 0x31, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x68,
0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x68, 0x61,
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x07, 0x6d, 0x65, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x39, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61,
0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x11, 0x0a, 0x0f, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x28, 0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61,
0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x1a,
0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x3b, 0x0a, 0x0d, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79,
0x69, 0x63, 0x22, 0x5c, 0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b,
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x31, 0x0a, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x3e, 0x0a, 0x07,
0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18,
0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x6c,
0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x61, 0x73, 0x74, 0x5f, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09,
0x32, 0x8c, 0x01, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x3c, 0x0a, 0x07, 0x50, 0x52, 0x0a, 0x6c, 0x61, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x22, 0x42, 0x0a, 0x12,
0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x16, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65,
0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x01, 0x20,
0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x12, 0x0a, 0x04,
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x44, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x74, 0x65, 0x78, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x65, 0x78, 0x74,
0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x18, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x22, 0x15, 0x0a, 0x13, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52,
0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x15, 0x0a, 0x13, 0x4c, 0x69, 0x73, 0x74, 0x43,
0x1a, 0x19, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x43,
0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x0a, 0x14, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x73, 0x52, 0x65,
0x10, 0x5a, 0x0e, 0x2e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2b, 0x0a, 0x08, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65,
0x6d, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, 0x6c, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x52, 0x08, 0x63, 0x68, 0x61, 0x6e, 0x6e,
0x65, 0x6c, 0x73, 0x22, 0x45, 0x0a, 0x13, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x68,
0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x68, 0x61,
0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x02, 0x20,
0x01, 0x28, 0x05, 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x22, 0x5d, 0x0a, 0x14, 0x4c, 0x69,
0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x2b, 0x0a, 0x08,
0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f,
0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52,
0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x32, 0xec, 0x01, 0x0a, 0x06, 0x53, 0x74,
0x72, 0x65, 0x61, 0x6d, 0x12, 0x48, 0x0a, 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x65, 0x12, 0x1a, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x53, 0x65, 0x6e,
0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x1b, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4b,
0x0a, 0x0c, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x12, 0x1b,
0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x73, 0x74,
0x72, 0x65, 0x61, 0x6d, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4b, 0x0a, 0x0c, 0x4c,
0x69, 0x73, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x73, 0x12, 0x1b, 0x2e, 0x73, 0x74,
0x72, 0x65, 0x61, 0x6d, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c,
0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x73, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x10, 0x5a, 0x0e, 0x2e, 0x2f, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x3b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
} }
var ( var (
@@ -271,26 +534,33 @@ func file_proto_stream_proto_rawDescGZIP() []byte {
return file_proto_stream_proto_rawDescData return file_proto_stream_proto_rawDescData
} }
var file_proto_stream_proto_msgTypes = make([]protoimpl.MessageInfo, 4) var file_proto_stream_proto_msgTypes = make([]protoimpl.MessageInfo, 9)
var file_proto_stream_proto_goTypes = []interface{}{ var file_proto_stream_proto_goTypes = []interface{}{
(*PublishRequest)(nil), // 0: stream.PublishRequest (*Message)(nil), // 0: stream.Message
(*PublishResponse)(nil), // 1: stream.PublishResponse (*Channel)(nil), // 1: stream.Channel
(*SubscribeRequest)(nil), // 2: stream.SubscribeRequest (*SendMessageRequest)(nil), // 2: stream.SendMessageRequest
(*SubscribeResponse)(nil), // 3: stream.SubscribeResponse (*SendMessageResponse)(nil), // 3: stream.SendMessageResponse
(*structpb.Struct)(nil), // 4: google.protobuf.Struct (*ListChannelsRequest)(nil), // 4: stream.ListChannelsRequest
(*ListChannelsResponse)(nil), // 5: stream.ListChannelsResponse
(*ListMessagesRequest)(nil), // 6: stream.ListMessagesRequest
(*ListMessagesResponse)(nil), // 7: stream.ListMessagesResponse
nil, // 8: stream.Message.MetadataEntry
} }
var file_proto_stream_proto_depIdxs = []int32{ var file_proto_stream_proto_depIdxs = []int32{
4, // 0: stream.PublishRequest.message:type_name -> google.protobuf.Struct 8, // 0: stream.Message.metadata:type_name -> stream.Message.MetadataEntry
4, // 1: stream.SubscribeResponse.message:type_name -> google.protobuf.Struct 1, // 1: stream.ListChannelsResponse.channels:type_name -> stream.Channel
0, // 2: stream.Stream.Publish:input_type -> stream.PublishRequest 0, // 2: stream.ListMessagesResponse.messages:type_name -> stream.Message
2, // 3: stream.Stream.Subscribe:input_type -> stream.SubscribeRequest 2, // 3: stream.Stream.SendMessage:input_type -> stream.SendMessageRequest
1, // 4: stream.Stream.Publish:output_type -> stream.PublishResponse 6, // 4: stream.Stream.ListMessages:input_type -> stream.ListMessagesRequest
3, // 5: stream.Stream.Subscribe:output_type -> stream.SubscribeResponse 4, // 5: stream.Stream.ListChannels:input_type -> stream.ListChannelsRequest
4, // [4:6] is the sub-list for method output_type 3, // 6: stream.Stream.SendMessage:output_type -> stream.SendMessageResponse
2, // [2:4] is the sub-list for method input_type 7, // 7: stream.Stream.ListMessages:output_type -> stream.ListMessagesResponse
2, // [2:2] is the sub-list for extension type_name 5, // 8: stream.Stream.ListChannels:output_type -> stream.ListChannelsResponse
2, // [2:2] is the sub-list for extension extendee 6, // [6:9] is the sub-list for method output_type
0, // [0:2] is the sub-list for field type_name 3, // [3:6] is the sub-list for method input_type
3, // [3:3] is the sub-list for extension type_name
3, // [3:3] is the sub-list for extension extendee
0, // [0:3] is the sub-list for field type_name
} }
func init() { file_proto_stream_proto_init() } func init() { file_proto_stream_proto_init() }
@@ -300,7 +570,7 @@ func file_proto_stream_proto_init() {
} }
if !protoimpl.UnsafeEnabled { if !protoimpl.UnsafeEnabled {
file_proto_stream_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { file_proto_stream_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PublishRequest); i { switch v := v.(*Message); i {
case 0: case 0:
return &v.state return &v.state
case 1: case 1:
@@ -312,7 +582,7 @@ func file_proto_stream_proto_init() {
} }
} }
file_proto_stream_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { file_proto_stream_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PublishResponse); i { switch v := v.(*Channel); i {
case 0: case 0:
return &v.state return &v.state
case 1: case 1:
@@ -324,7 +594,7 @@ func file_proto_stream_proto_init() {
} }
} }
file_proto_stream_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { file_proto_stream_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SubscribeRequest); i { switch v := v.(*SendMessageRequest); i {
case 0: case 0:
return &v.state return &v.state
case 1: case 1:
@@ -336,7 +606,55 @@ func file_proto_stream_proto_init() {
} }
} }
file_proto_stream_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { file_proto_stream_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SubscribeResponse); i { switch v := v.(*SendMessageResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_proto_stream_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ListChannelsRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_proto_stream_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ListChannelsResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_proto_stream_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ListMessagesRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_proto_stream_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ListMessagesResponse); i {
case 0: case 0:
return &v.state return &v.state
case 1: case 1:
@@ -354,7 +672,7 @@ func file_proto_stream_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(), GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_proto_stream_proto_rawDesc, RawDescriptor: file_proto_stream_proto_rawDesc,
NumEnums: 0, NumEnums: 0,
NumMessages: 4, NumMessages: 9,
NumExtensions: 0, NumExtensions: 0,
NumServices: 1, NumServices: 1,
}, },

View File

@@ -6,7 +6,6 @@ package stream
import ( import (
fmt "fmt" fmt "fmt"
proto "github.com/golang/protobuf/proto" proto "github.com/golang/protobuf/proto"
_ "google.golang.org/protobuf/types/known/structpb"
math "math" math "math"
) )
@@ -43,8 +42,9 @@ func NewStreamEndpoints() []*api.Endpoint {
// Client API for Stream service // Client API for Stream service
type StreamService interface { type StreamService interface {
Publish(ctx context.Context, in *PublishRequest, opts ...client.CallOption) (*PublishResponse, error) SendMessage(ctx context.Context, in *SendMessageRequest, opts ...client.CallOption) (*SendMessageResponse, error)
Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (Stream_SubscribeService, error) ListMessages(ctx context.Context, in *ListMessagesRequest, opts ...client.CallOption) (*ListMessagesResponse, error)
ListChannels(ctx context.Context, in *ListChannelsRequest, opts ...client.CallOption) (*ListChannelsResponse, error)
} }
type streamService struct { type streamService struct {
@@ -59,9 +59,9 @@ func NewStreamService(name string, c client.Client) StreamService {
} }
} }
func (c *streamService) Publish(ctx context.Context, in *PublishRequest, opts ...client.CallOption) (*PublishResponse, error) { func (c *streamService) SendMessage(ctx context.Context, in *SendMessageRequest, opts ...client.CallOption) (*SendMessageResponse, error) {
req := c.c.NewRequest(c.name, "Stream.Publish", in) req := c.c.NewRequest(c.name, "Stream.SendMessage", in)
out := new(PublishResponse) out := new(SendMessageResponse)
err := c.c.Call(ctx, req, out, opts...) err := c.c.Call(ctx, req, out, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -69,66 +69,39 @@ func (c *streamService) Publish(ctx context.Context, in *PublishRequest, opts ..
return out, nil return out, nil
} }
func (c *streamService) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (Stream_SubscribeService, error) { func (c *streamService) ListMessages(ctx context.Context, in *ListMessagesRequest, opts ...client.CallOption) (*ListMessagesResponse, error) {
req := c.c.NewRequest(c.name, "Stream.Subscribe", &SubscribeRequest{}) req := c.c.NewRequest(c.name, "Stream.ListMessages", in)
stream, err := c.c.Stream(ctx, req, opts...) out := new(ListMessagesResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := stream.Send(in); err != nil { return out, nil
return nil, err
}
return &streamServiceSubscribe{stream}, nil
} }
type Stream_SubscribeService interface { func (c *streamService) ListChannels(ctx context.Context, in *ListChannelsRequest, opts ...client.CallOption) (*ListChannelsResponse, error) {
Context() context.Context req := c.c.NewRequest(c.name, "Stream.ListChannels", in)
SendMsg(interface{}) error out := new(ListChannelsResponse)
RecvMsg(interface{}) error err := c.c.Call(ctx, req, out, opts...)
Close() error
Recv() (*SubscribeResponse, error)
}
type streamServiceSubscribe struct {
stream client.Stream
}
func (x *streamServiceSubscribe) Close() error {
return x.stream.Close()
}
func (x *streamServiceSubscribe) Context() context.Context {
return x.stream.Context()
}
func (x *streamServiceSubscribe) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *streamServiceSubscribe) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *streamServiceSubscribe) Recv() (*SubscribeResponse, error) {
m := new(SubscribeResponse)
err := x.stream.Recv(m)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return m, nil return out, nil
} }
// Server API for Stream service // Server API for Stream service
type StreamHandler interface { type StreamHandler interface {
Publish(context.Context, *PublishRequest, *PublishResponse) error SendMessage(context.Context, *SendMessageRequest, *SendMessageResponse) error
Subscribe(context.Context, *SubscribeRequest, Stream_SubscribeStream) error ListMessages(context.Context, *ListMessagesRequest, *ListMessagesResponse) error
ListChannels(context.Context, *ListChannelsRequest, *ListChannelsResponse) error
} }
func RegisterStreamHandler(s server.Server, hdlr StreamHandler, opts ...server.HandlerOption) error { func RegisterStreamHandler(s server.Server, hdlr StreamHandler, opts ...server.HandlerOption) error {
type stream interface { type stream interface {
Publish(ctx context.Context, in *PublishRequest, out *PublishResponse) error SendMessage(ctx context.Context, in *SendMessageRequest, out *SendMessageResponse) error
Subscribe(ctx context.Context, stream server.Stream) error ListMessages(ctx context.Context, in *ListMessagesRequest, out *ListMessagesResponse) error
ListChannels(ctx context.Context, in *ListChannelsRequest, out *ListChannelsResponse) error
} }
type Stream struct { type Stream struct {
stream stream
@@ -141,46 +114,14 @@ type streamHandler struct {
StreamHandler StreamHandler
} }
func (h *streamHandler) Publish(ctx context.Context, in *PublishRequest, out *PublishResponse) error { func (h *streamHandler) SendMessage(ctx context.Context, in *SendMessageRequest, out *SendMessageResponse) error {
return h.StreamHandler.Publish(ctx, in, out) return h.StreamHandler.SendMessage(ctx, in, out)
} }
func (h *streamHandler) Subscribe(ctx context.Context, stream server.Stream) error { func (h *streamHandler) ListMessages(ctx context.Context, in *ListMessagesRequest, out *ListMessagesResponse) error {
m := new(SubscribeRequest) return h.StreamHandler.ListMessages(ctx, in, out)
if err := stream.Recv(m); err != nil {
return err
}
return h.StreamHandler.Subscribe(ctx, m, &streamSubscribeStream{stream})
} }
type Stream_SubscribeStream interface { func (h *streamHandler) ListChannels(ctx context.Context, in *ListChannelsRequest, out *ListChannelsResponse) error {
Context() context.Context return h.StreamHandler.ListChannels(ctx, in, out)
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Send(*SubscribeResponse) error
}
type streamSubscribeStream struct {
stream server.Stream
}
func (x *streamSubscribeStream) Close() error {
return x.stream.Close()
}
func (x *streamSubscribeStream) Context() context.Context {
return x.stream.Context()
}
func (x *streamSubscribeStream) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *streamSubscribeStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *streamSubscribeStream) Send(m *SubscribeResponse) error {
return x.stream.Send(m)
} }

View File

@@ -3,33 +3,62 @@ syntax = "proto3";
package stream; package stream;
option go_package = "./proto;stream"; option go_package = "./proto;stream";
import "google/protobuf/struct.proto";
service Stream { service Stream {
rpc Publish(PublishRequest) returns (PublishResponse) {} rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {}
rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse) {} rpc ListMessages(ListMessagesRequest) returns (ListMessagesResponse) {}
rpc ListChannels(ListChannelsRequest) returns (ListChannelsResponse) {}
} }
// Publish a message to the stream. Specify a topic to group messages for a specific topic. message Message {
message PublishRequest { // id of the message
// The topic to publish to string id = 1;
string topic = 1; // text of the message
// The json message to publish string text = 2;
google.protobuf.Struct message = 2; // time of message creation
string timestamp = 3;
// the channel name
string channel = 4;
// the associated metadata
map<string,string> metadata = 5;
} }
message PublishResponse {} message Channel {
// name of the channel
// Subscribe to messages for a given topic. string name = 1;
message SubscribeRequest { // last activity time
// The topic to subscribe to string last_active = 2;
string topic = 1;
} }
// A blocking stream will be returned in response. // SendMessage a message to the stream.
message SubscribeResponse { message SendMessageRequest {
// The topic subscribed to // The channel to send to
string topic = 1; string channel = 1;
// The next json message on the topic // The message text to send
google.protobuf.Struct message = 2; string text = 2;
}
message SendMessageResponse {}
// List all the active channels
message ListChannelsRequest {
}
message ListChannelsResponse {
repeated Channel channels = 1;
}
// List messages for a given channel
message ListMessagesRequest {
// The channel to subscribe to
string channel = 1;
// number of message to return
int32 limit = 2;
}
message ListMessagesResponse {
// The channel subscribed to
string channel = 1;
// Messages are chronological order
repeated Message messages = 2;
} }

View File

@@ -1,6 +1,6 @@
{ {
"name": "stream", "name": "stream",
"icon": "🌊", "icon": "🌊",
"category": "messaging", "category": "social",
"display_name": "Stream" "display_name": "Stream"
} }