feat: replace model with store (#322)

This commit is contained in:
zhaoyang
2021-12-13 21:23:01 +08:00
committed by GitHub
parent 9a787d2362
commit 28fb141819
3 changed files with 150 additions and 173 deletions

View File

@@ -2,13 +2,15 @@ package handler
import ( import (
"crypto/md5" "crypto/md5"
"encoding/json"
"fmt" "fmt"
"net/url"
"sync" "sync"
"time" "time"
"github.com/SlyMarbo/rss" "github.com/SlyMarbo/rss"
log "github.com/micro/micro/v3/service/logger" log "github.com/micro/micro/v3/service/logger"
"github.com/micro/micro/v3/service/store"
"github.com/micro/services/rss/parser" "github.com/micro/services/rss/parser"
pb "github.com/micro/services/rss/proto" pb "github.com/micro/services/rss/proto"
) )
@@ -18,28 +20,53 @@ var (
rssFeeds = map[string]*rss.Feed{} rssFeeds = map[string]*rss.Feed{}
) )
func (e *Rss) fetchAll() { type Crawler interface {
fs := []*pb.Feed{} Fetch(f *pb.Feed) error
err := e.feeds.Read(e.feedsNameIndex.ToQuery(nil), &fs) FetchAll()
}
type crawl struct {
store store.Store
}
func NewCrawl(st store.Store) *crawl {
return &crawl{store: st}
}
func generateEntryKey(feedUrl, id string) string {
return fmt.Sprintf("rss/entry/%s/%s", feedUrl, id)
}
func (e *crawl) FetchAll() {
prefix := "rss/feed/"
records, err := e.store.Read(prefix, store.ReadPrefix())
if err != nil { if err != nil {
log.Errorf("Error listing pb: %v", err) log.Errorf("get feeds list error: %v", err)
return return
} }
if len(fs) == 0 {
if len(records) == 0 {
log.Infof("No pb to fetch") log.Infof("No pb to fetch")
return return
} }
for _, feed := range fs {
err = e.fetch(feed) for _, v := range records {
feed := pb.Feed{}
if err := json.Unmarshal(v.Value, &feed); err != nil {
log.Errorf("crawl.fetchAll json unmarshal feed error: %v", err)
continue
}
err = e.Fetch(&feed)
if err != nil { if err != nil {
log.Errorf("Error saving post: %v", err) log.Errorf("Error saving post: %v", err)
} }
} }
} }
func (e *Rss) fetch(f *pb.Feed) error { func (e *crawl) Fetch(f *pb.Feed) error {
url := f.Url log.Infof("Fetching address %v", f.Url)
log.Infof("Fetching address %v", url)
// see if there's an existing rss feed // see if there's an existing rss feed
rssSync.RLock() rssSync.RLock()
@@ -51,18 +78,18 @@ func (e *Rss) fetch(f *pb.Feed) error {
var err error var err error
fd, err = rss.Fetch(f.Url) fd, err = rss.Fetch(f.Url)
if err != nil { if err != nil {
return fmt.Errorf("Error fetching address %v: %v", url, err) return fmt.Errorf("error fetching address %v: %v", f.Url, err)
} }
// save the feed // save the feed
rssSync.Lock() rssSync.Lock()
rssFeeds[f.Url] = fd rssFeeds[f.Url] = fd
rssSync.Unlock() rssSync.Unlock()
} else { } else {
// otherwise update the existing feed // otherwise, update the existing feed
fd.Items = []*rss.Item{} fd.Items = []*rss.Item{}
fd.Unread = 0 fd.Unread = 0
if err := fd.Update(); err != nil { if err := fd.Update(); err != nil {
return fmt.Errorf("Error updating address %v: %v", url, err) return fmt.Errorf("error updating address %v: %v", f.Url, err)
} }
} }
@@ -77,13 +104,13 @@ func (e *Rss) fetch(f *pb.Feed) error {
content := item.Content content := item.Content
// if we have a parser which returns content use it // if we have a parser which returns content use it
// e.g cnbc // e.g. cnbc
c, err := parser.Parse(item.Link) c, err := parser.Parse(item.Link)
if err == nil && len(c) > 0 { if err == nil && len(c) > 0 {
content = c content = c
} }
err = e.entries.Create(pb.Entry{ val, err := json.Marshal(&pb.Entry{
Id: id, Id: id,
Title: item.Title, Title: item.Title,
Summary: item.Summary, Summary: item.Summary,
@@ -92,16 +119,22 @@ func (e *Rss) fetch(f *pb.Feed) error {
Content: content, Content: content,
Date: item.Date.Format(time.RFC3339Nano), Date: item.Date.Format(time.RFC3339Nano),
}) })
if err != nil { if err != nil {
return fmt.Errorf("Error saving item: %v", err) log.Errorf("json marshal entry error: %v", err)
continue
}
// save
err = e.store.Write(&store.Record{
Key: generateEntryKey(f.Url, id),
Value: val,
})
if err != nil {
return fmt.Errorf("error saving item: %v", err)
} }
} }
return nil return nil
} }
func getDomain(address string) string {
uri, _ := url.Parse(address)
return uri.Host
}

View File

@@ -2,76 +2,47 @@ package handler
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"hash/fnv" "hash/fnv"
"strings"
"time"
"github.com/micro/micro/v3/service/errors" "github.com/micro/micro/v3/service/errors"
log "github.com/micro/micro/v3/service/logger" log "github.com/micro/micro/v3/service/logger"
"github.com/micro/micro/v3/service/model" "github.com/micro/micro/v3/service/store"
"github.com/micro/services/pkg/tenant" "github.com/micro/services/pkg/tenant"
pb "github.com/micro/services/rss/proto" pb "github.com/micro/services/rss/proto"
) )
type Rss struct { type Rss struct {
feeds model.Model store store.Store
entries model.Model crawl Crawler
feedsIdIndex model.Index
feedsNameIndex model.Index
entriesDateIndex model.Index
entriesURLIndex model.Index
} }
func idFromName(name string) string { // feedIdFromName generates md5 id by feed's name
func feedIdFromName(name string) string {
hash := fnv.New64a() hash := fnv.New64a()
hash.Write([]byte(name)) hash.Write([]byte(name))
return fmt.Sprintf("%d", hash.Sum64()) return fmt.Sprintf("%d", hash.Sum64())
} }
func NewRss() *Rss { // generateFeedKey returns feed key in store
idIndex := model.ByEquality("id") func generateFeedKey(ctx context.Context, name string) string {
idIndex.Order.Type = model.OrderTypeUnordered tenantID, ok := tenant.FromContext(ctx)
if !ok {
nameIndex := model.ByEquality("name") tenantID = "micro"
nameIndex.Order.Type = model.OrderTypeUnordered
dateIndex := model.ByEquality("date")
dateIndex.Order.Type = model.OrderTypeDesc
entriesURLIndex := model.ByEquality("feed")
entriesURLIndex.Order.Type = model.OrderTypeDesc
entriesURLIndex.Order.FieldName = "date"
f := &Rss{
feeds: model.NewModel(
model.WithNamespace("feeds"),
model.WithIndexes(idIndex, nameIndex),
),
entries: model.NewModel(
model.WithNamespace("entries"),
model.WithIndexes(dateIndex, entriesURLIndex),
),
feedsIdIndex: idIndex,
feedsNameIndex: nameIndex,
entriesDateIndex: dateIndex,
entriesURLIndex: entriesURLIndex,
} }
// register model instances return fmt.Sprintf("rss/feed/%s/%s", tenantID, feedIdFromName(name))
f.feeds.Register(new(pb.Feed))
f.entries.Register(new(pb.Entry))
go f.crawl()
return f
} }
func (e *Rss) crawl() { func NewRss(st store.Store, cr Crawler) *Rss {
e.fetchAll() f := &Rss{
tick := time.NewTicker(1 * time.Minute) store: st,
for _ = range tick.C { crawl: cr,
e.fetchAll()
} }
return f
} }
func (e *Rss) Add(ctx context.Context, req *pb.AddRequest, rsp *pb.AddResponse) error { func (e *Rss) Add(ctx context.Context, req *pb.AddRequest, rsp *pb.AddResponse) error {
@@ -81,24 +52,28 @@ func (e *Rss) Add(ctx context.Context, req *pb.AddRequest, rsp *pb.AddResponse)
return errors.BadRequest("rss.add", "require name") return errors.BadRequest("rss.add", "require name")
} }
// get the tenantID key := generateFeedKey(ctx, req.Name)
tenantID, ok := tenant.FromContext(ctx) f := &pb.Feed{
if !ok { Id: key,
tenantID = "micro"
}
f := pb.Feed{
Id: tenantID + "/" + idFromName(req.Name),
Name: req.Name, Name: req.Name,
Url: req.Url, Url: req.Url,
Category: req.Category, Category: req.Category,
} }
// create the feed // create the feed
e.feeds.Create(f) val, err := json.Marshal(f)
if err != nil {
return err
}
if err := e.store.Write(&store.Record{Key: key, Value: val}); err != nil {
return err
}
// schedule immediate fetch // schedule immediate fetch
go e.fetch(&f) go func() {
_ = e.crawl.Fetch(f)
}()
return nil return nil
} }
@@ -106,116 +81,77 @@ func (e *Rss) Add(ctx context.Context, req *pb.AddRequest, rsp *pb.AddResponse)
func (e *Rss) Feed(ctx context.Context, req *pb.FeedRequest, rsp *pb.FeedResponse) error { func (e *Rss) Feed(ctx context.Context, req *pb.FeedRequest, rsp *pb.FeedResponse) error {
log.Info("Received Rss.Entries request") log.Info("Received Rss.Entries request")
// get the tenantID prefix := generateFeedKey(ctx, req.Name)
tenantID, ok := tenant.FromContext(ctx) var records []*store.Record
if !ok { var err error
tenantID = "micro"
// get records with prefix
if len(req.Name) > 0 {
records, err = e.store.Read(prefix)
} else {
records, err = e.store.Read(prefix, store.ReadPrefix())
} }
// feeds by url if err != nil {
feedUrls := map[string]bool{} return err
var feed *pb.Feed }
if len(req.Name) > 0 { if len(records) == 0 {
id := tenantID + "/" + idFromName(req.Name) return nil
q := model.QueryEquals("ID", id)
// get the feed
if err := e.feeds.Read(q, &feed); err != nil {
return errors.InternalServerError("rss.feeds", "could not read feed")
}
feedUrls[feed.Url] = true
} else {
// get all the feeds for a user
var feeds []*pb.Feed
q := model.QueryAll()
if err := e.feeds.Read(q, &feeds); err != nil {
return errors.InternalServerError("rss.feeds", "could not read feed")
}
for _, feed := range feeds {
if !strings.HasPrefix(feed.Id, tenantID+"/") {
continue
}
feedUrls[feed.Url] = true
}
} }
if req.Limit == 0 { if req.Limit == 0 {
req.Limit = int64(25) req.Limit = int64(25)
} }
// default query all for _, v := range records {
q := e.entriesDateIndex.ToQuery(nil) // decode feed
feed := pb.Feed{}
// if the need is not nil, then use one url if err := json.Unmarshal(v.Value, &feed); err != nil {
if feed != nil { log.Errorf("json unmarshal feed error: %v", err)
q = e.entriesURLIndex.ToQuery(feed.Url) continue
}
q.Limit = req.Limit
q.Offset = req.Offset
// iterate until entries hits the limit
for len(rsp.Entries) < int(req.Limit) {
var entries []*pb.Entry
// get the entries for each
err := e.entries.Read(q, &entries)
if err != nil {
return errors.InternalServerError("rss.feeds", "could not read feed")
} }
// find the relevant entries // read entries with prefix
for _, entry := range entries { entryPrefix := generateEntryKey(feed.Url, "")
// check its a url we care about entries, err := e.store.Read(entryPrefix, store.ReadPrefix())
if _, ok := feedUrls[entry.Feed]; !ok { if err != nil {
log.Errorf("read feed entry from store error: %v", err)
continue
}
for _, val := range entries {
var entry pb.Entry
if err := json.Unmarshal(val.Value, &entry); err != nil {
log.Errorf("json unmarshal entry error: %v", err)
continue continue
} }
// add the entry rsp.Entries = append(rsp.Entries, &entry)
rsp.Entries = append(rsp.Entries, entry)
// once you hit the limit return
if len(rsp.Entries) == int(req.Limit) {
return nil
}
} }
// no more entries or less than the limit if len(rsp.Entries) >= int(req.Limit) {
if len(entries) == 0 || len(entries) < int(req.Limit) { break
return nil
} }
// increase the offset
q.Offset += q.Limit
} }
return nil return nil
} }
func (e *Rss) List(ctx context.Context, req *pb.ListRequest, rsp *pb.ListResponse) error { func (e *Rss) List(ctx context.Context, req *pb.ListRequest, rsp *pb.ListResponse) error {
var feeds []*pb.Feed prefix := generateFeedKey(ctx, "")
q := model.QueryAll() records, err := e.store.Read(prefix, store.ReadPrefix())
// TODO: find a way to query only by tenant
err := e.feeds.Read(q, &feeds)
if err != nil { if err != nil {
return errors.InternalServerError("rss.list", "failed to read list of feeds: %v", err) return err
} }
// get the tenantID for _, val := range records {
tenantID, ok := tenant.FromContext(ctx) var feed = pb.Feed{}
if !ok { if err := json.Unmarshal(val.Value, &feed); err != nil {
tenantID = "micro"
}
for _, feed := range feeds {
// filter for the tenant
if !strings.HasPrefix(feed.Id, tenantID+"/") {
continue continue
} }
rsp.Feeds = append(rsp.Feeds, &feed)
rsp.Feeds = append(rsp.Feeds, feed)
} }
return nil return nil
@@ -223,17 +159,8 @@ func (e *Rss) List(ctx context.Context, req *pb.ListRequest, rsp *pb.ListRespons
func (e *Rss) Remove(ctx context.Context, req *pb.RemoveRequest, rsp *pb.RemoveResponse) error { func (e *Rss) Remove(ctx context.Context, req *pb.RemoveRequest, rsp *pb.RemoveResponse) error {
if len(req.Name) == 0 { if len(req.Name) == 0 {
return errors.BadRequest("rss.remove", "blank name provided") return errors.BadRequest("rss.remove", "name is required")
} }
// get the tenantID return e.store.Delete(generateFeedKey(ctx, req.Name))
tenantID, ok := tenant.FromContext(ctx)
if !ok {
tenantID = "micro"
}
id := tenantID + "/" + idFromName(req.Name)
e.feeds.Delete(model.QueryEquals("ID", id))
return nil
} }

View File

@@ -1,8 +1,12 @@
package main package main
import ( import (
"time"
"github.com/micro/micro/v3/service" "github.com/micro/micro/v3/service"
"github.com/micro/micro/v3/service/logger" "github.com/micro/micro/v3/service/logger"
"github.com/micro/micro/v3/service/store"
"github.com/micro/services/pkg/tracing" "github.com/micro/services/pkg/tracing"
"github.com/micro/services/rss/handler" "github.com/micro/services/rss/handler"
pb "github.com/micro/services/rss/proto" pb "github.com/micro/services/rss/proto"
@@ -14,8 +18,21 @@ func main() {
service.Name("rss"), service.Name("rss"),
) )
st := store.DefaultStore
crawl := handler.NewCrawl(st)
rss := handler.NewRss(st, crawl)
// crawl
go func() {
crawl.FetchAll()
tick := time.NewTicker(1 * time.Minute)
for _ = range tick.C {
crawl.FetchAll()
}
}()
// Register handler // Register handler
pb.RegisterRssHandler(srv.Server(), handler.NewRss()) pb.RegisterRssHandler(srv.Server(), rss)
traceCloser := tracing.SetupOpentracing("rss") traceCloser := tracing.SetupOpentracing("rss")
defer traceCloser.Close() defer traceCloser.Close()