mirror of
https://github.com/kevin-DL/services.git
synced 2026-01-11 19:04:35 +00:00
make location service multi-tenant (#123)
* make location service multi-tenant * meters
This commit is contained in:
@@ -1,16 +1,21 @@
|
|||||||
package domain
|
package domain
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
geo "github.com/hailocab/go-geoindex"
|
geo "github.com/hailocab/go-geoindex"
|
||||||
"github.com/micro/micro/v3/service/errors"
|
"github.com/micro/micro/v3/service/errors"
|
||||||
common "github.com/micro/services/location/proto"
|
common "github.com/micro/services/location/proto"
|
||||||
|
"github.com/micro/services/pkg/tenant"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
defaultIndex = geo.NewPointsIndex(geo.Km(0.5))
|
defaultIndex = geo.NewPointsIndex(geo.Meters(100))
|
||||||
|
|
||||||
|
// index per tenant
|
||||||
|
indexes = map[string]*geo.PointsIndex{}
|
||||||
)
|
)
|
||||||
|
|
||||||
type Entity struct {
|
type Entity struct {
|
||||||
@@ -45,6 +50,23 @@ func (e *Entity) ToProto() *common.Entity {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getIndex(ctx context.Context) *geo.PointsIndex {
|
||||||
|
tenantId, ok := tenant.FromContext(ctx)
|
||||||
|
if !ok {
|
||||||
|
//return default index
|
||||||
|
return defaultIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
// get the index
|
||||||
|
index, ok := indexes[tenantId]
|
||||||
|
if !ok {
|
||||||
|
index = geo.NewPointsIndex(geo.Meters(100))
|
||||||
|
indexes[tenantId] = index
|
||||||
|
}
|
||||||
|
|
||||||
|
return index
|
||||||
|
}
|
||||||
|
|
||||||
func ProtoToEntity(e *common.Entity) *Entity {
|
func ProtoToEntity(e *common.Entity) *Entity {
|
||||||
return &Entity{
|
return &Entity{
|
||||||
ID: e.Id,
|
ID: e.Id,
|
||||||
@@ -55,11 +77,14 @@ func ProtoToEntity(e *common.Entity) *Entity {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Read(id string) (*Entity, error) {
|
func Read(ctx context.Context, id string) (*Entity, error) {
|
||||||
mtx.RLock()
|
mtx.RLock()
|
||||||
defer mtx.RUnlock()
|
defer mtx.RUnlock()
|
||||||
|
|
||||||
p := defaultIndex.Get(id)
|
// get the index
|
||||||
|
index := getIndex(ctx)
|
||||||
|
|
||||||
|
p := index.Get(id)
|
||||||
if p == nil {
|
if p == nil {
|
||||||
return nil, errors.NotFound("location.read", "Not found")
|
return nil, errors.NotFound("location.read", "Not found")
|
||||||
}
|
}
|
||||||
@@ -72,17 +97,22 @@ func Read(id string) (*Entity, error) {
|
|||||||
return entity, nil
|
return entity, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func Save(e *Entity) {
|
func Save(ctx context.Context, e *Entity) {
|
||||||
mtx.Lock()
|
mtx.Lock()
|
||||||
defaultIndex.Add(e)
|
// get the index
|
||||||
|
index := getIndex(ctx)
|
||||||
|
index.Add(e)
|
||||||
mtx.Unlock()
|
mtx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func Search(typ string, entity *Entity, radius float64, numEntities int) []*Entity {
|
func Search(ctx context.Context, typ string, entity *Entity, radius float64, numEntities int) []*Entity {
|
||||||
mtx.RLock()
|
mtx.RLock()
|
||||||
defer mtx.RUnlock()
|
defer mtx.RUnlock()
|
||||||
|
|
||||||
points := defaultIndex.KNearest(entity, numEntities, geo.Meters(radius), func(p geo.Point) bool {
|
// get the index
|
||||||
|
index := getIndex(ctx)
|
||||||
|
|
||||||
|
points := index.KNearest(entity, numEntities, geo.Meters(radius), func(p geo.Point) bool {
|
||||||
e, ok := p.(*Entity)
|
e, ok := p.(*Entity)
|
||||||
if !ok || e.Type != typ {
|
if !ok || e.Type != typ {
|
||||||
return false
|
return false
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ func (l *Location) Read(ctx context.Context, req *loc.ReadRequest, rsp *loc.Read
|
|||||||
return errors.BadRequest("location.read", "Require Id")
|
return errors.BadRequest("location.read", "Require Id")
|
||||||
}
|
}
|
||||||
|
|
||||||
entity, err := domain.Read(id)
|
entity, err := domain.Read(ctx, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -41,7 +41,12 @@ func (l *Location) Save(ctx context.Context, req *loc.SaveRequest, rsp *loc.Save
|
|||||||
return errors.BadRequest("location.save", "Require location")
|
return errors.BadRequest("location.save", "Require location")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// immediate save
|
||||||
|
domain.Save(ctx, domain.ProtoToEntity(entity))
|
||||||
|
|
||||||
|
// publish the event so other copies of location service can save it
|
||||||
p := service.NewEvent(subscriber.Topic)
|
p := service.NewEvent(subscriber.Topic)
|
||||||
|
|
||||||
if err := p.Publish(ctx, entity); err != nil {
|
if err := p.Publish(ctx, entity); err != nil {
|
||||||
return errors.InternalServerError("location.save", err.Error())
|
return errors.InternalServerError("location.save", err.Error())
|
||||||
}
|
}
|
||||||
@@ -57,7 +62,7 @@ func (l *Location) Search(ctx context.Context, req *loc.SearchRequest, rsp *loc.
|
|||||||
Longitude: req.Center.Longitude,
|
Longitude: req.Center.Longitude,
|
||||||
}
|
}
|
||||||
|
|
||||||
entities := domain.Search(req.Type, entity, req.Radius, int(req.NumEntities))
|
entities := domain.Search(ctx, req.Type, entity, req.Radius, int(req.NumEntities))
|
||||||
|
|
||||||
for _, e := range entities {
|
for _, e := range entities {
|
||||||
rsp.Entities = append(rsp.Entities, e.ToProto())
|
rsp.Entities = append(rsp.Entities, e.ToProto())
|
||||||
|
|||||||
@@ -16,6 +16,6 @@ type Location struct{}
|
|||||||
|
|
||||||
func (g *Location) Handle(ctx context.Context, e *proto.Entity) error {
|
func (g *Location) Handle(ctx context.Context, e *proto.Entity) error {
|
||||||
log.Printf("Saving entity ID %s", e.Id)
|
log.Printf("Saving entity ID %s", e.Id)
|
||||||
domain.Save(domain.ProtoToEntity(e))
|
domain.Save(ctx, domain.ProtoToEntity(e))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user