Files
services/test/events/main.go

155 lines
3.9 KiB
Go

package main
import (
"sync"
"time"
"github.com/micro/micro/v3/service"
"github.com/micro/micro/v3/service/events"
goevents "github.com/micro/micro/v3/service/events"
"github.com/micro/micro/v3/service/logger"
)
type testPayload struct {
Message string
}
var (
payload = testPayload{Message: "HelloWorld"}
metadata = map[string]string{"foo": "bar"}
)
func main() {
srv := service.New()
srv.Init()
go func() { // test 1, ordinary sub with autoack
evChan, err := events.Subscribe("test1")
if err != nil {
logger.Fatalf("Error creating subscriber: %v", err)
}
// Is there a race condition here with publish coming straight after subscribe? Seems to be according to this
// test so sleep for a bit to wait for nats to register subscription properly
time.Sleep(2 * time.Second)
logger.Infof("TEST1: publishing event")
err = events.Publish("test1", payload, goevents.WithMetadata(metadata))
if err != nil {
logger.Errorf("TEST1: Error publishing event: %v", err)
return
}
logger.Infof("TEST1: waiting for event")
event, ok := <-evChan
if !ok {
logger.Error("TEST1: Channel closed")
return
}
if event.Topic != "test1" {
logger.Errorf("TEST1: Incorrect topic: %v", event.Topic)
return
}
if event.Metadata == nil || event.Metadata["foo"] != metadata["foo"] {
logger.Errorf("TEST1: Incorrect metadata: %v", event.Metadata)
return
}
var result testPayload
if err := event.Unmarshal(&result); err != nil {
logger.Errorf("TEST1: Error decoding result: %v. Payload was %v", err, string(event.Payload))
return
}
if result.Message != payload.Message {
logger.Errorf("TEST1: Incorrect message: %v", result.Message)
return
}
logger.Infof("TEST1: Finished ok")
}()
go func() { // test 2, sub with manual ack
evChan, err := events.Subscribe("test2", goevents.WithAutoAck(false, 5*time.Second))
if err != nil {
logger.Errorf("TEST2: Error creating subscriber: %v", err)
return
}
// Is there a race condition here with publish coming straight after subscribe? Seems to be according to this
// test so sleep for a bit to wait for nats to register subscription properly
time.Sleep(2 * time.Second)
logger.Infof("TEST2: publishing event")
err = events.Publish("test2", payload, goevents.WithMetadata(metadata))
if err != nil {
logger.Errorf("TEST2: Error publishing event: %v", err)
return
}
logger.Infof("TEST2: waiting for event")
event, ok := <-evChan
if !ok {
logger.Errorf("TEST2: Channel closed")
return
}
if event.Topic != "test2" {
logger.Errorf("TEST2: Incorrect topic: %v", event.Topic)
return
}
if event.Metadata == nil || event.Metadata["foo"] != metadata["foo"] {
logger.Errorf("TEST2: Incorrect metadata: %v", event.Metadata)
return
}
var result testPayload
if err := event.Unmarshal(&result); err != nil {
logger.Errorf("TEST2: Error decoding result: %v. Payload was %v", err, string(event.Payload))
return
}
if result.Message != payload.Message {
logger.Errorf("TEST2: Incorrect message: %v", result.Message)
return
}
id := event.ID
// nack the event to put it back on the queue
logger.Infof("TEST2: Nacking the event")
event.Nack()
logger.Infof("TEST2: waiting for event")
select {
case event := <-evChan:
if event.ID != id {
logger.Errorf("Unexpected event received, expected %s, received %s", id, event.ID)
return
}
logger.Infof("TEST2: Acking the event")
event.Ack()
case <-time.After(10 * time.Second):
logger.Errorf("Timed out waiting for event")
return
}
// we've acked so should receive nothing else
logger.Infof("TEST2: Waiting for timeout")
select {
case event := <-evChan:
logger.Errorf("Unexpected event received %s", event.ID)
return
case <-time.After(10 * time.Second):
}
logger.Infof("TEST2: Finished ok")
}()
wg := sync.WaitGroup{}
wg.Add(1)
// wait indefinitely so this only runs once
wg.Wait()
}