diff --git a/location/domain/domain.go b/location/domain/domain.go index 03a1e93..d54e5c4 100644 --- a/location/domain/domain.go +++ b/location/domain/domain.go @@ -1,16 +1,21 @@ package domain import ( + "context" "sync" geo "github.com/hailocab/go-geoindex" "github.com/micro/micro/v3/service/errors" common "github.com/micro/services/location/proto" + "github.com/micro/services/pkg/tenant" ) var ( mtx sync.RWMutex - defaultIndex = geo.NewPointsIndex(geo.Km(0.5)) + defaultIndex = geo.NewPointsIndex(geo.Meters(100)) + + // index per tenant + indexes = map[string]*geo.PointsIndex{} ) type Entity struct { @@ -45,6 +50,23 @@ func (e *Entity) ToProto() *common.Entity { } } +func getIndex(ctx context.Context) *geo.PointsIndex { + tenantId, ok := tenant.FromContext(ctx) + if !ok { + //return default index + return defaultIndex + } + + // get the index + index, ok := indexes[tenantId] + if !ok { + index = geo.NewPointsIndex(geo.Meters(100)) + indexes[tenantId] = index + } + + return index +} + func ProtoToEntity(e *common.Entity) *Entity { return &Entity{ ID: e.Id, @@ -55,11 +77,14 @@ func ProtoToEntity(e *common.Entity) *Entity { } } -func Read(id string) (*Entity, error) { +func Read(ctx context.Context, id string) (*Entity, error) { mtx.RLock() defer mtx.RUnlock() - p := defaultIndex.Get(id) + // get the index + index := getIndex(ctx) + + p := index.Get(id) if p == nil { return nil, errors.NotFound("location.read", "Not found") } @@ -72,17 +97,22 @@ func Read(id string) (*Entity, error) { return entity, nil } -func Save(e *Entity) { +func Save(ctx context.Context, e *Entity) { mtx.Lock() - defaultIndex.Add(e) + // get the index + index := getIndex(ctx) + index.Add(e) mtx.Unlock() } -func Search(typ string, entity *Entity, radius float64, numEntities int) []*Entity { +func Search(ctx context.Context, typ string, entity *Entity, radius float64, numEntities int) []*Entity { mtx.RLock() defer mtx.RUnlock() - points := defaultIndex.KNearest(entity, numEntities, geo.Meters(radius), func(p geo.Point) bool { + // get the index + index := getIndex(ctx) + + points := index.KNearest(entity, numEntities, geo.Meters(radius), func(p geo.Point) bool { e, ok := p.(*Entity) if !ok || e.Type != typ { return false diff --git a/location/handler/handler.go b/location/handler/handler.go index aff4bdf..fd216e7 100644 --- a/location/handler/handler.go +++ b/location/handler/handler.go @@ -22,7 +22,7 @@ func (l *Location) Read(ctx context.Context, req *loc.ReadRequest, rsp *loc.Read return errors.BadRequest("location.read", "Require Id") } - entity, err := domain.Read(id) + entity, err := domain.Read(ctx, id) if err != nil { return err } @@ -41,7 +41,12 @@ func (l *Location) Save(ctx context.Context, req *loc.SaveRequest, rsp *loc.Save return errors.BadRequest("location.save", "Require location") } + // immediate save + domain.Save(ctx, domain.ProtoToEntity(entity)) + + // publish the event so other copies of location service can save it p := service.NewEvent(subscriber.Topic) + if err := p.Publish(ctx, entity); err != nil { return errors.InternalServerError("location.save", err.Error()) } @@ -57,7 +62,7 @@ func (l *Location) Search(ctx context.Context, req *loc.SearchRequest, rsp *loc. Longitude: req.Center.Longitude, } - entities := domain.Search(req.Type, entity, req.Radius, int(req.NumEntities)) + entities := domain.Search(ctx, req.Type, entity, req.Radius, int(req.NumEntities)) for _, e := range entities { rsp.Entities = append(rsp.Entities, e.ToProto()) diff --git a/location/subscriber/subscriber.go b/location/subscriber/subscriber.go index dc14a7a..53b05de 100644 --- a/location/subscriber/subscriber.go +++ b/location/subscriber/subscriber.go @@ -16,6 +16,6 @@ type Location struct{} func (g *Location) Handle(ctx context.Context, e *proto.Entity) error { log.Printf("Saving entity ID %s", e.Id) - domain.Save(domain.ProtoToEntity(e)) + domain.Save(ctx, domain.ProtoToEntity(e)) return nil }