Files
services/rss/handler/crawl.go
2021-05-20 15:24:26 +01:00

108 lines
2.1 KiB
Go

package handler
import (
"crypto/md5"
"fmt"
"net/url"
"sync"
"time"
"github.com/SlyMarbo/rss"
log "github.com/micro/micro/v3/service/logger"
"github.com/micro/services/rss/parser"
pb "github.com/micro/services/rss/proto"
)
var (
rssSync sync.RWMutex
rssFeeds = map[string]*rss.Feed{}
)
func (e *Rss) fetchAll() {
fs := []*pb.Feed{}
err := e.feeds.Read(e.feedsNameIndex.ToQuery(nil), &fs)
if err != nil {
log.Errorf("Error listing pb: %v", err)
return
}
if len(fs) == 0 {
log.Infof("No pb to fetch")
return
}
for _, feed := range fs {
err = e.fetch(feed)
if err != nil {
log.Errorf("Error saving post: %v", err)
}
}
}
func (e *Rss) fetch(f *pb.Feed) error {
url := f.Url
log.Infof("Fetching address %v", url)
// see if there's an existing rss feed
rssSync.RLock()
fd, ok := rssFeeds[f.Url]
rssSync.RUnlock()
if !ok {
// create a new one if it doesn't exist
var err error
fd, err = rss.Fetch(f.Url)
if err != nil {
return fmt.Errorf("Error fetching address %v: %v", url, err)
}
// save the feed
rssSync.Lock()
rssFeeds[f.Url] = fd
rssSync.Unlock()
} else {
// otherwise update the existing feed
fd.Items = []*rss.Item{}
fd.Unread = 0
if err := fd.Update(); err != nil {
return fmt.Errorf("Error updating address %v: %v", url, err)
}
}
// set the refresh time
fd.Refresh = time.Now()
// range over the feed and save the items
for _, item := range fd.Items {
id := fmt.Sprintf("%x", md5.Sum([]byte(item.ID)))
// check if content exists
content := item.Content
// if we have a parser which returns content use it
// e.g cnbc
c, err := parser.Parse(item.Link)
if err == nil && len(c) > 0 {
content = c
}
err = e.entries.Create(pb.Entry{
Id: id,
Title: item.Title,
Summary: item.Summary,
Feed: f.Url,
Link: item.Link,
Content: content,
Date: item.Date.Format(time.RFC3339Nano),
})
if err != nil {
return fmt.Errorf("Error saving item: %v", err)
}
}
return nil
}
func getDomain(address string) string {
uri, _ := url.Parse(address)
return uri.Host
}