From 28fb1418199679653d7424e13de3f9d99e16e2f1 Mon Sep 17 00:00:00 2001 From: zhaoyang Date: Mon, 13 Dec 2021 21:23:01 +0800 Subject: [PATCH] feat: replace model with store (#322) --- rss/handler/crawl.go | 77 ++++++++++----- rss/handler/rss.go | 227 +++++++++++++++---------------------------- rss/main.go | 19 +++- 3 files changed, 150 insertions(+), 173 deletions(-) diff --git a/rss/handler/crawl.go b/rss/handler/crawl.go index 7feed7b..1a7e632 100644 --- a/rss/handler/crawl.go +++ b/rss/handler/crawl.go @@ -2,13 +2,15 @@ package handler import ( "crypto/md5" + "encoding/json" "fmt" - "net/url" "sync" "time" "github.com/SlyMarbo/rss" log "github.com/micro/micro/v3/service/logger" + "github.com/micro/micro/v3/service/store" + "github.com/micro/services/rss/parser" pb "github.com/micro/services/rss/proto" ) @@ -18,28 +20,53 @@ var ( rssFeeds = map[string]*rss.Feed{} ) -func (e *Rss) fetchAll() { - fs := []*pb.Feed{} - err := e.feeds.Read(e.feedsNameIndex.ToQuery(nil), &fs) +type Crawler interface { + Fetch(f *pb.Feed) error + FetchAll() +} + +type crawl struct { + store store.Store +} + +func NewCrawl(st store.Store) *crawl { + return &crawl{store: st} +} + +func generateEntryKey(feedUrl, id string) string { + return fmt.Sprintf("rss/entry/%s/%s", feedUrl, id) +} + +func (e *crawl) FetchAll() { + prefix := "rss/feed/" + records, err := e.store.Read(prefix, store.ReadPrefix()) + if err != nil { - log.Errorf("Error listing pb: %v", err) + log.Errorf("get feeds list error: %v", err) return } - if len(fs) == 0 { + + if len(records) == 0 { log.Infof("No pb to fetch") return } - for _, feed := range fs { - err = e.fetch(feed) + + for _, v := range records { + feed := pb.Feed{} + if err := json.Unmarshal(v.Value, &feed); err != nil { + log.Errorf("crawl.fetchAll json unmarshal feed error: %v", err) + continue + } + + err = e.Fetch(&feed) if err != nil { log.Errorf("Error saving post: %v", err) } } } -func (e *Rss) fetch(f *pb.Feed) error { - url := f.Url - log.Infof("Fetching address %v", url) +func (e *crawl) Fetch(f *pb.Feed) error { + log.Infof("Fetching address %v", f.Url) // see if there's an existing rss feed rssSync.RLock() @@ -51,18 +78,18 @@ func (e *Rss) fetch(f *pb.Feed) error { var err error fd, err = rss.Fetch(f.Url) if err != nil { - return fmt.Errorf("Error fetching address %v: %v", url, err) + return fmt.Errorf("error fetching address %v: %v", f.Url, err) } // save the feed rssSync.Lock() rssFeeds[f.Url] = fd rssSync.Unlock() } else { - // otherwise update the existing feed + // otherwise, update the existing feed fd.Items = []*rss.Item{} fd.Unread = 0 if err := fd.Update(); err != nil { - return fmt.Errorf("Error updating address %v: %v", url, err) + return fmt.Errorf("error updating address %v: %v", f.Url, err) } } @@ -77,13 +104,13 @@ func (e *Rss) fetch(f *pb.Feed) error { content := item.Content // if we have a parser which returns content use it - // e.g cnbc + // e.g. cnbc c, err := parser.Parse(item.Link) if err == nil && len(c) > 0 { content = c } - err = e.entries.Create(pb.Entry{ + val, err := json.Marshal(&pb.Entry{ Id: id, Title: item.Title, Summary: item.Summary, @@ -92,16 +119,22 @@ func (e *Rss) fetch(f *pb.Feed) error { Content: content, Date: item.Date.Format(time.RFC3339Nano), }) + if err != nil { - return fmt.Errorf("Error saving item: %v", err) + log.Errorf("json marshal entry error: %v", err) + continue + } + + // save + err = e.store.Write(&store.Record{ + Key: generateEntryKey(f.Url, id), + Value: val, + }) + if err != nil { + return fmt.Errorf("error saving item: %v", err) } } return nil } - -func getDomain(address string) string { - uri, _ := url.Parse(address) - return uri.Host -} diff --git a/rss/handler/rss.go b/rss/handler/rss.go index 11c9e15..e1829d9 100644 --- a/rss/handler/rss.go +++ b/rss/handler/rss.go @@ -2,76 +2,47 @@ package handler import ( "context" + "encoding/json" "fmt" "hash/fnv" - "strings" - "time" "github.com/micro/micro/v3/service/errors" log "github.com/micro/micro/v3/service/logger" - "github.com/micro/micro/v3/service/model" + "github.com/micro/micro/v3/service/store" + "github.com/micro/services/pkg/tenant" pb "github.com/micro/services/rss/proto" ) type Rss struct { - feeds model.Model - entries model.Model - feedsIdIndex model.Index - feedsNameIndex model.Index - entriesDateIndex model.Index - entriesURLIndex model.Index + store store.Store + crawl Crawler } -func idFromName(name string) string { +// feedIdFromName generates md5 id by feed's name +func feedIdFromName(name string) string { hash := fnv.New64a() hash.Write([]byte(name)) return fmt.Sprintf("%d", hash.Sum64()) } -func NewRss() *Rss { - idIndex := model.ByEquality("id") - idIndex.Order.Type = model.OrderTypeUnordered - - nameIndex := model.ByEquality("name") - nameIndex.Order.Type = model.OrderTypeUnordered - - dateIndex := model.ByEquality("date") - dateIndex.Order.Type = model.OrderTypeDesc - - entriesURLIndex := model.ByEquality("feed") - entriesURLIndex.Order.Type = model.OrderTypeDesc - entriesURLIndex.Order.FieldName = "date" - - f := &Rss{ - feeds: model.NewModel( - model.WithNamespace("feeds"), - model.WithIndexes(idIndex, nameIndex), - ), - entries: model.NewModel( - model.WithNamespace("entries"), - model.WithIndexes(dateIndex, entriesURLIndex), - ), - feedsIdIndex: idIndex, - feedsNameIndex: nameIndex, - entriesDateIndex: dateIndex, - entriesURLIndex: entriesURLIndex, +// generateFeedKey returns feed key in store +func generateFeedKey(ctx context.Context, name string) string { + tenantID, ok := tenant.FromContext(ctx) + if !ok { + tenantID = "micro" } - // register model instances - f.feeds.Register(new(pb.Feed)) - f.entries.Register(new(pb.Entry)) - - go f.crawl() - return f + return fmt.Sprintf("rss/feed/%s/%s", tenantID, feedIdFromName(name)) } -func (e *Rss) crawl() { - e.fetchAll() - tick := time.NewTicker(1 * time.Minute) - for _ = range tick.C { - e.fetchAll() +func NewRss(st store.Store, cr Crawler) *Rss { + f := &Rss{ + store: st, + crawl: cr, } + + return f } func (e *Rss) Add(ctx context.Context, req *pb.AddRequest, rsp *pb.AddResponse) error { @@ -81,24 +52,28 @@ func (e *Rss) Add(ctx context.Context, req *pb.AddRequest, rsp *pb.AddResponse) return errors.BadRequest("rss.add", "require name") } - // get the tenantID - tenantID, ok := tenant.FromContext(ctx) - if !ok { - tenantID = "micro" - } - - f := pb.Feed{ - Id: tenantID + "/" + idFromName(req.Name), + key := generateFeedKey(ctx, req.Name) + f := &pb.Feed{ + Id: key, Name: req.Name, Url: req.Url, Category: req.Category, } // create the feed - e.feeds.Create(f) + val, err := json.Marshal(f) + if err != nil { + return err + } + + if err := e.store.Write(&store.Record{Key: key, Value: val}); err != nil { + return err + } // schedule immediate fetch - go e.fetch(&f) + go func() { + _ = e.crawl.Fetch(f) + }() return nil } @@ -106,116 +81,77 @@ func (e *Rss) Add(ctx context.Context, req *pb.AddRequest, rsp *pb.AddResponse) func (e *Rss) Feed(ctx context.Context, req *pb.FeedRequest, rsp *pb.FeedResponse) error { log.Info("Received Rss.Entries request") - // get the tenantID - tenantID, ok := tenant.FromContext(ctx) - if !ok { - tenantID = "micro" + prefix := generateFeedKey(ctx, req.Name) + var records []*store.Record + var err error + + // get records with prefix + if len(req.Name) > 0 { + records, err = e.store.Read(prefix) + } else { + records, err = e.store.Read(prefix, store.ReadPrefix()) } - // feeds by url - feedUrls := map[string]bool{} - var feed *pb.Feed + if err != nil { + return err + } - if len(req.Name) > 0 { - id := tenantID + "/" + idFromName(req.Name) - q := model.QueryEquals("ID", id) - - // get the feed - if err := e.feeds.Read(q, &feed); err != nil { - return errors.InternalServerError("rss.feeds", "could not read feed") - } - - feedUrls[feed.Url] = true - } else { - // get all the feeds for a user - var feeds []*pb.Feed - q := model.QueryAll() - if err := e.feeds.Read(q, &feeds); err != nil { - return errors.InternalServerError("rss.feeds", "could not read feed") - } - for _, feed := range feeds { - if !strings.HasPrefix(feed.Id, tenantID+"/") { - continue - } - feedUrls[feed.Url] = true - } + if len(records) == 0 { + return nil } if req.Limit == 0 { req.Limit = int64(25) } - // default query all - q := e.entriesDateIndex.ToQuery(nil) - - // if the need is not nil, then use one url - if feed != nil { - q = e.entriesURLIndex.ToQuery(feed.Url) - } - - q.Limit = req.Limit - q.Offset = req.Offset - - // iterate until entries hits the limit - for len(rsp.Entries) < int(req.Limit) { - var entries []*pb.Entry - // get the entries for each - err := e.entries.Read(q, &entries) - if err != nil { - return errors.InternalServerError("rss.feeds", "could not read feed") + for _, v := range records { + // decode feed + feed := pb.Feed{} + if err := json.Unmarshal(v.Value, &feed); err != nil { + log.Errorf("json unmarshal feed error: %v", err) + continue } - // find the relevant entries - for _, entry := range entries { - // check its a url we care about - if _, ok := feedUrls[entry.Feed]; !ok { + // read entries with prefix + entryPrefix := generateEntryKey(feed.Url, "") + entries, err := e.store.Read(entryPrefix, store.ReadPrefix()) + if err != nil { + log.Errorf("read feed entry from store error: %v", err) + continue + } + + for _, val := range entries { + var entry pb.Entry + if err := json.Unmarshal(val.Value, &entry); err != nil { + log.Errorf("json unmarshal entry error: %v", err) continue } - // add the entry - rsp.Entries = append(rsp.Entries, entry) - - // once you hit the limit return - if len(rsp.Entries) == int(req.Limit) { - return nil - } + rsp.Entries = append(rsp.Entries, &entry) } - // no more entries or less than the limit - if len(entries) == 0 || len(entries) < int(req.Limit) { - return nil + if len(rsp.Entries) >= int(req.Limit) { + break } - - // increase the offset - q.Offset += q.Limit } return nil } func (e *Rss) List(ctx context.Context, req *pb.ListRequest, rsp *pb.ListResponse) error { - var feeds []*pb.Feed - q := model.QueryAll() + prefix := generateFeedKey(ctx, "") + records, err := e.store.Read(prefix, store.ReadPrefix()) - // TODO: find a way to query only by tenant - err := e.feeds.Read(q, &feeds) if err != nil { - return errors.InternalServerError("rss.list", "failed to read list of feeds: %v", err) + return err } - // get the tenantID - tenantID, ok := tenant.FromContext(ctx) - if !ok { - tenantID = "micro" - } - - for _, feed := range feeds { - // filter for the tenant - if !strings.HasPrefix(feed.Id, tenantID+"/") { + for _, val := range records { + var feed = pb.Feed{} + if err := json.Unmarshal(val.Value, &feed); err != nil { continue } - - rsp.Feeds = append(rsp.Feeds, feed) + rsp.Feeds = append(rsp.Feeds, &feed) } return nil @@ -223,17 +159,8 @@ func (e *Rss) List(ctx context.Context, req *pb.ListRequest, rsp *pb.ListRespons func (e *Rss) Remove(ctx context.Context, req *pb.RemoveRequest, rsp *pb.RemoveResponse) error { if len(req.Name) == 0 { - return errors.BadRequest("rss.remove", "blank name provided") + return errors.BadRequest("rss.remove", "name is required") } - // get the tenantID - tenantID, ok := tenant.FromContext(ctx) - if !ok { - tenantID = "micro" - } - - id := tenantID + "/" + idFromName(req.Name) - - e.feeds.Delete(model.QueryEquals("ID", id)) - return nil + return e.store.Delete(generateFeedKey(ctx, req.Name)) } diff --git a/rss/main.go b/rss/main.go index 94de74c..2390872 100644 --- a/rss/main.go +++ b/rss/main.go @@ -1,8 +1,12 @@ package main import ( + "time" + "github.com/micro/micro/v3/service" "github.com/micro/micro/v3/service/logger" + "github.com/micro/micro/v3/service/store" + "github.com/micro/services/pkg/tracing" "github.com/micro/services/rss/handler" pb "github.com/micro/services/rss/proto" @@ -14,8 +18,21 @@ func main() { service.Name("rss"), ) + st := store.DefaultStore + crawl := handler.NewCrawl(st) + rss := handler.NewRss(st, crawl) + + // crawl + go func() { + crawl.FetchAll() + tick := time.NewTicker(1 * time.Minute) + for _ = range tick.C { + crawl.FetchAll() + } + }() + // Register handler - pb.RegisterRssHandler(srv.Server(), handler.NewRss()) + pb.RegisterRssHandler(srv.Server(), rss) traceCloser := tracing.SetupOpentracing("rss") defer traceCloser.Close()