mirror of
https://github.com/kevin-DL/services.git
synced 2026-01-13 11:35:26 +00:00
add rss service (#117)
* add rss service * git ignore rss * update readme * go fmt * maintain map by url * make rss service multi-tenant
This commit is contained in:
109
rss/handler/crawl.go
Normal file
109
rss/handler/crawl.go
Normal file
@@ -0,0 +1,109 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/SlyMarbo/rss"
|
||||
log "github.com/micro/micro/v3/service/logger"
|
||||
"github.com/micro/services/rss/parser"
|
||||
pb "github.com/micro/services/rss/proto"
|
||||
)
|
||||
|
||||
var (
|
||||
rssSync sync.RWMutex
|
||||
rssFeeds = map[string]*rss.Feed{}
|
||||
)
|
||||
|
||||
func (e *Rss) fetchAll() {
|
||||
fs := []*pb.Feed{}
|
||||
err := e.feeds.Read(e.feedsNameIndex.ToQuery(nil), &fs)
|
||||
if err != nil {
|
||||
log.Errorf("Error listing pb: %v", err)
|
||||
return
|
||||
}
|
||||
if len(fs) == 0 {
|
||||
log.Infof("No pb to fetch")
|
||||
return
|
||||
}
|
||||
for _, feed := range fs {
|
||||
err = e.fetch(feed)
|
||||
if err != nil {
|
||||
log.Errorf("Error saving post: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Rss) fetch(f *pb.Feed) error {
|
||||
url := f.Url
|
||||
log.Infof("Fetching address %v", url)
|
||||
|
||||
// see if there's an existing rss feed
|
||||
rssSync.RLock()
|
||||
fd, ok := rssFeeds[f.Url]
|
||||
rssSync.RUnlock()
|
||||
|
||||
if !ok {
|
||||
// create a new one if it doesn't exist
|
||||
var err error
|
||||
fd, err = rss.Fetch(f.Url)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error fetching address %v: %v", url, err)
|
||||
}
|
||||
// save the feed
|
||||
rssSync.Lock()
|
||||
rssFeeds[f.Url] = fd
|
||||
rssSync.Unlock()
|
||||
} else {
|
||||
// otherwise update the existing feed
|
||||
fd.Items = []*rss.Item{}
|
||||
fd.Unread = 0
|
||||
if err := fd.Update(); err != nil {
|
||||
return fmt.Errorf("Error updating address %v: %v", url, err)
|
||||
}
|
||||
}
|
||||
|
||||
// set the refresh time
|
||||
fd.Refresh = time.Now()
|
||||
domain := getDomain(url)
|
||||
|
||||
// range over the feed and save the items
|
||||
for _, item := range fd.Items {
|
||||
id := fmt.Sprintf("%x", md5.Sum([]byte(item.ID)))
|
||||
|
||||
// check if content exists
|
||||
content := item.Content
|
||||
|
||||
// if we have a parser which returns content use it
|
||||
// e.g cnbc
|
||||
c, err := parser.Parse(item.Link)
|
||||
if err == nil && len(c) > 0 {
|
||||
content = c
|
||||
}
|
||||
|
||||
err = e.entries.Create(pb.Entry{
|
||||
Id: id,
|
||||
Title: item.Title,
|
||||
Summary: item.Summary,
|
||||
Url: item.Link,
|
||||
Domain: domain,
|
||||
Content: content,
|
||||
Date: item.Date.Unix(),
|
||||
Category: f.Category,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error saving item: %v", err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getDomain(address string) string {
|
||||
uri, _ := url.Parse(address)
|
||||
return uri.Host
|
||||
}
|
||||
174
rss/handler/rss.go
Normal file
174
rss/handler/rss.go
Normal file
@@ -0,0 +1,174 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/micro/micro/v3/service/errors"
|
||||
log "github.com/micro/micro/v3/service/logger"
|
||||
"github.com/micro/micro/v3/service/model"
|
||||
"github.com/micro/services/pkg/tenant"
|
||||
pb "github.com/micro/services/rss/proto"
|
||||
)
|
||||
|
||||
type Rss struct {
|
||||
feeds model.Model
|
||||
entries model.Model
|
||||
feedsIdIndex model.Index
|
||||
feedsNameIndex model.Index
|
||||
entriesDateIndex model.Index
|
||||
entriesURLIndex model.Index
|
||||
}
|
||||
|
||||
func idFromName(name string) string {
|
||||
hash := fnv.New64a()
|
||||
hash.Write([]byte(name))
|
||||
return fmt.Sprintf("%d", hash.Sum64())
|
||||
}
|
||||
|
||||
func NewRss() *Rss {
|
||||
idIndex := model.ByEquality("id")
|
||||
idIndex.Order.Type = model.OrderTypeUnordered
|
||||
|
||||
nameIndex := model.ByEquality("name")
|
||||
nameIndex.Order.Type = model.OrderTypeUnordered
|
||||
|
||||
dateIndex := model.ByEquality("date")
|
||||
dateIndex.Order.Type = model.OrderTypeDesc
|
||||
|
||||
entriesURLIndex := model.ByEquality("url")
|
||||
entriesURLIndex.Order.Type = model.OrderTypeDesc
|
||||
entriesURLIndex.Order.FieldName = "date"
|
||||
|
||||
f := &Rss{
|
||||
feeds: model.NewModel(
|
||||
model.WithNamespace("feeds"),
|
||||
model.WithIndexes(idIndex, nameIndex),
|
||||
),
|
||||
entries: model.NewModel(
|
||||
model.WithNamespace("entries"),
|
||||
model.WithIndexes(dateIndex, entriesURLIndex),
|
||||
),
|
||||
feedsIdIndex: idIndex,
|
||||
feedsNameIndex: nameIndex,
|
||||
entriesDateIndex: dateIndex,
|
||||
entriesURLIndex: entriesURLIndex,
|
||||
}
|
||||
|
||||
// register model instances
|
||||
f.feeds.Register(new(pb.Feed))
|
||||
f.entries.Register(new(pb.Entry))
|
||||
|
||||
go f.crawl()
|
||||
return f
|
||||
}
|
||||
|
||||
func (e *Rss) crawl() {
|
||||
e.fetchAll()
|
||||
tick := time.NewTicker(1 * time.Minute)
|
||||
for _ = range tick.C {
|
||||
e.fetchAll()
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Rss) Add(ctx context.Context, req *pb.AddRequest, rsp *pb.AddResponse) error {
|
||||
log.Info("Received Rss.Add request")
|
||||
|
||||
if len(req.Name) == 0 {
|
||||
return errors.BadRequest("rss.add", "require name")
|
||||
}
|
||||
|
||||
// get the tenantID
|
||||
tenantID, ok := tenant.FromContext(ctx)
|
||||
if !ok {
|
||||
tenantID = "micro"
|
||||
}
|
||||
|
||||
f := pb.Feed{
|
||||
Id: tenantID + "/" + idFromName(req.Name),
|
||||
Name: req.Name,
|
||||
Url: req.Url,
|
||||
Category: req.Category,
|
||||
}
|
||||
|
||||
// create the feed
|
||||
e.feeds.Create(f)
|
||||
|
||||
// schedule immediate fetch
|
||||
go e.fetch(&f)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Rss) Feed(ctx context.Context, req *pb.FeedRequest, rsp *pb.FeedResponse) error {
|
||||
log.Info("Received Rss.Entries request")
|
||||
if len(req.Name) == 0 {
|
||||
return errors.BadRequest("rss.feed", "missing feed name")
|
||||
}
|
||||
|
||||
// get the tenantID
|
||||
tenantID, ok := tenant.FromContext(ctx)
|
||||
if !ok {
|
||||
tenantID = "micro"
|
||||
}
|
||||
|
||||
feed := new(pb.Feed)
|
||||
id := tenantID + "/" + idFromName(req.Name)
|
||||
q := model.QueryEquals("ID", id)
|
||||
|
||||
// get the feed
|
||||
if err := e.feeds.Read(q, feed); err != nil {
|
||||
return errors.InternalServerError("rss.feeds", "could not read feed")
|
||||
}
|
||||
|
||||
// get the entries for each
|
||||
return e.entries.Read(e.entriesURLIndex.ToQuery(feed.Url), &rsp.Entries)
|
||||
}
|
||||
|
||||
func (e *Rss) List(ctx context.Context, req *pb.ListRequest, rsp *pb.ListResponse) error {
|
||||
var feeds []*pb.Feed
|
||||
q := model.QueryAll()
|
||||
|
||||
// TODO: find a way to query only by tenant
|
||||
err := e.feeds.Read(q, &feeds)
|
||||
if err != nil {
|
||||
return errors.InternalServerError("rss.list", "failed to read list of feeds: %v", err)
|
||||
}
|
||||
|
||||
// get the tenantID
|
||||
tenantID, ok := tenant.FromContext(ctx)
|
||||
if !ok {
|
||||
tenantID = "micro"
|
||||
}
|
||||
|
||||
for _, feed := range feeds {
|
||||
// filter for the tenant
|
||||
if !strings.HasPrefix(feed.Id, tenantID+"/") {
|
||||
continue
|
||||
}
|
||||
|
||||
rsp.Feeds = append(rsp.Feeds, feed)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Rss) Remove(ctx context.Context, req *pb.RemoveRequest, rsp *pb.RemoveResponse) error {
|
||||
if len(req.Name) == 0 {
|
||||
return errors.BadRequest("rss.remove", "blank name provided")
|
||||
}
|
||||
|
||||
// get the tenantID
|
||||
tenantID, ok := tenant.FromContext(ctx)
|
||||
if !ok {
|
||||
tenantID = "micro"
|
||||
}
|
||||
|
||||
id := tenantID + "/" + idFromName(req.Name)
|
||||
|
||||
e.feeds.Delete(model.QueryEquals("ID", id))
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user