add create channels to stream

This commit is contained in:
Asim Aslam
2021-11-03 16:03:20 +00:00
parent 784e1a71d3
commit 5ad499c8a0
6 changed files with 340 additions and 110 deletions

View File

@@ -32,6 +32,7 @@ type Metadata struct {
type Stream struct {
Id string
Description string
Messages []*Message
Updated int64
}
@@ -50,7 +51,7 @@ type Store struct {
mtx sync.RWMutex
Streams *lru.Cache
streams map[string]int64
streams map[string]*Stream
metadatas map[string]*Metadata
}
@@ -63,14 +64,15 @@ func newStore() *Store {
Created: time.Now().UnixNano(),
Streams: lru.New(maxStreams),
Updates: make(chan *Message, 100),
streams: make(map[string]int64),
streams: make(map[string]*Stream),
metadatas: make(map[string]*Metadata),
}
}
func newStream(id string) *Stream {
func newStream(id, desc string) *Stream {
return &Stream{
Id: id,
Description: desc,
Updated: time.Now().UnixNano(),
}
}
@@ -140,6 +142,18 @@ func getMetadata(uri string) *Metadata {
return g
}
func (c *Store) CreateStream(name, description string) {
c.mtx.Lock()
ch, ok := c.streams[name]
if ok {
ch.Description = description
} else {
ch = newStream(name, description)
}
c.streams[name] = ch
c.mtx.Unlock()
}
func (c *Store) Metadata(t *Message) {
parts := strings.Split(t.Text, " ")
for _, part := range parts {
@@ -154,7 +168,7 @@ func (c *Store) Metadata(t *Message) {
}
}
func (c *Store) List() map[string]int64 {
func (c *Store) ListStreams() map[string]*Stream {
c.mtx.RLock()
streams := c.streams
c.mtx.RUnlock()
@@ -170,7 +184,7 @@ func (c *Store) Save(message *Message) {
if obj, ok := c.Streams.Get(message.Stream); ok {
stream = obj.(*Stream)
} else {
stream = newStream(message.Stream)
stream = newStream(message.Stream, "")
c.Streams.Add(message.Stream, stream)
}
@@ -270,18 +284,24 @@ func (c *Store) Retrieve(message string, streem string, direction, last, limit i
func (c *Store) Run() {
t1 := time.NewTicker(time.Hour)
t2 := time.NewTicker(time.Minute)
streams := make(map[string]int64)
streams := make(map[string]*Stream)
for {
select {
case message := <-c.Updates:
c.Save(message)
streams[message.Stream] = time.Now().UnixNano()
ch, ok := streams[message.Stream]
if !ok {
ch = newStream(message.Stream, "")
streams[message.Stream] = ch
}
ch.Updated = time.Now().UnixNano()
streams[message.Stream] = ch
go c.Metadata(message)
case <-t1.C:
now := time.Now().UnixNano()
for stream, u := range streams {
if d := now - u; d > streamTTL {
for stream, ch := range streams {
if d := now - ch.Updated; d > streamTTL {
c.Streams.Remove(stream)
delete(streams, stream)
}
@@ -301,8 +321,12 @@ func (c *Store) Run() {
}
}
func ListChannels() map[string]int64 {
return C.List()
func CreateChannel(name, description string) {
C.CreateStream(name, description)
}
func ListChannels() map[string]*Stream {
return C.ListStreams()
}
func ListMessages(channel string, limit int64) []*Message {