Files
services/places/handler/places.go
2021-03-18 17:21:41 +00:00

178 lines
4.9 KiB
Go

package handler
import (
"context"
"sync"
"time"
"github.com/google/uuid"
geo "github.com/hailocab/go-geoindex"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"
"gorm.io/gorm"
"github.com/micro/micro/v3/service/errors"
"github.com/micro/micro/v3/service/logger"
"github.com/micro/services/places/model"
pb "github.com/micro/services/places/proto"
)
var (
ErrMissingPlaces = errors.BadRequest("MISSING_LOCATIONS", "One or more places are required")
ErrMissingLatitude = errors.BadRequest("MISSING_LATITUDE", "Latitude is required")
ErrMissingLongitude = errors.BadRequest("MISSING_LONGITUDE", "Longitude is required")
ErrMissingID = errors.BadRequest("MISSING_ID", "Place ID is required")
ErrMissingIDs = errors.BadRequest("MISSING_IDS", "One or more Place IDs are required")
ErrMissingBefore = errors.BadRequest("MISSING_BEFORE", "Before timestamp is required")
ErrMissingAfter = errors.BadRequest("MISSING_AFTER", "After timestamp is required")
ErrMissingRadius = errors.BadRequest("MISSING_RADIUS", "Radius is required")
)
type Places struct {
sync.RWMutex
Geoindex *geo.PointsIndex
DB *gorm.DB
}
// Save a set of places
func (l *Places) Save(ctx context.Context, req *pb.SaveRequest, rsp *pb.SaveResponse) error {
// validate the request
if len(req.Places) == 0 {
return ErrMissingPlaces
}
for _, l := range req.Places {
if l.Latitude == nil {
return ErrMissingLatitude
}
if l.Longitude == nil {
return ErrMissingLongitude
}
if len(l.Id) == 0 {
return ErrMissingID
}
}
// construct the database objects
ls := make([]*model.Location, len(req.Places))
for i, lc := range req.Places {
loc := &model.Location{
ID: uuid.New().String(),
PlaceID: lc.Id,
Latitude: lc.Latitude.Value,
Longitude: lc.Longitude.Value,
}
if lc.Timestamp != nil {
loc.Timestamp = lc.Timestamp.AsTime()
} else {
loc.Timestamp = time.Now()
}
ls[i] = loc
}
// write to the database
if err := l.DB.Create(ls).Error; err != nil {
logger.Errorf("Error writing to the database: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error writing to the database")
}
// write to the geoindex
l.Lock()
defer l.Unlock()
for _, lc := range ls {
l.Geoindex.Add(lc)
}
return nil
}
// Last places for a set of users
func (l *Places) Last(ctx context.Context, req *pb.LastRequest, rsp *pb.ListResponse) error {
// validate the request
if req.Ids == nil {
return ErrMissingIDs
}
// query the database
q := l.DB.Raw("SELECT DISTINCT ON (place_id) place_id, timestamp, latitude, longitude FROM locations WHERE place_id IN (?) ORDER BY place_id, timestamp DESC", req.Ids)
var locs []*model.Location
if err := q.Find(&locs).Error; err != nil {
logger.Errorf("Error reading from the database: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error reading from the database")
}
// serialize the result
rsp.Places = serializePlaces(locs)
return nil
}
// Near returns the places near a point
func (l *Places) Near(ctx context.Context, req *pb.NearRequest, rsp *pb.ListResponse) error {
// validate the request
if req.Latitude == nil {
return ErrMissingLatitude
}
if req.Longitude == nil {
return ErrMissingLongitude
}
if req.Radius == nil {
return ErrMissingRadius
}
// query the geoindex
l.RLock()
p := geo.NewGeoPoint("query", req.Latitude.Value, req.Longitude.Value)
result := l.Geoindex.PointsWithin(p, geo.Km(req.Radius.Value), func(p geo.Point) bool {
return true
})
l.RUnlock()
// serialize the result
locs := make([]*model.Location, len(result))
for i, r := range result {
locs[i] = r.(*model.Location)
}
rsp.Places = serializePlaces(locs)
return nil
}
// Read places for a group of users between two points in time
func (l *Places) Read(ctx context.Context, req *pb.ReadRequest, rsp *pb.ListResponse) error {
// validate the request
if len(req.Ids) == 0 {
return ErrMissingIDs
}
if req.Before == nil {
return ErrMissingBefore
}
if req.After == nil {
return ErrMissingAfter
}
// construct the request
q := l.DB.Model(&model.Location{})
q = q.Order("timestamp ASC")
q = q.Where("place_id IN (?) AND timestamp > ? AND timestamp < ?", req.Ids, req.After.AsTime(), req.Before.AsTime())
var locs []*model.Location
if err := q.Find(&locs).Error; err != nil {
logger.Errorf("Error reading from the database: %v", err)
return errors.InternalServerError("DATABASE_ERROR", "Error reading from the database")
}
// serialize the result
rsp.Places = serializePlaces(locs)
return nil
}
func serializePlaces(locs []*model.Location) []*pb.Location {
rsp := make([]*pb.Location, len(locs))
for i, l := range locs {
rsp[i] = &pb.Location{
Id: l.PlaceID,
Latitude: &wrapperspb.DoubleValue{Value: l.Latitude},
Longitude: &wrapperspb.DoubleValue{Value: l.Longitude},
Timestamp: timestamppb.New(l.Timestamp),
}
}
return rsp
}