Fetch entries and list them on demand (#28)

* Fetch entries and list them on demand

* Fix bugs

* Bug

* Fix ordering
This commit is contained in:
Janos Dobronszki
2020-12-06 08:58:58 +01:00
committed by GitHub
parent 9a24c74081
commit a4c54cd5c3
5 changed files with 185 additions and 52 deletions

View File

@@ -25,45 +25,52 @@ func (e *Feeds) fetchAll() {
return return
} }
for _, feed := range fs { for _, feed := range fs {
log.Infof("Fetching address %v", feed.Url) err = e.fetch(feed.Url)
fd, err := rss.Fetch(feed.Url)
if err != nil { if err != nil {
log.Errorf("Error fetching address %v: %v", feed.Url, err) log.Errorf("Error saving post: %v", err)
continue
}
domain := getDomain(feed.Url)
for _, item := range fd.Items {
id := fmt.Sprintf("%x", md5.Sum([]byte(item.ID)))
err = e.entries.Save(feeds.Entry{
Id: id,
Url: item.Link,
Title: item.Title,
Domain: domain,
Content: item.Summary,
Date: item.Date.Unix(),
})
if err != nil {
log.Errorf("Error saving item: %v", err)
}
// @todo make this optional
_, err := e.postsService.Save(context.TODO(), &posts.SaveRequest{
Id: id,
Title: item.Title,
Content: item.Content,
Timestamp: item.Date.Unix(),
Metadata: map[string]string{
"domain": domain,
"link": item.Link,
},
})
if err != nil {
log.Errorf("Error saving post: %v", err)
}
} }
} }
} }
func (e *Feeds) fetch(url string) error {
log.Infof("Fetching address %v", url)
fd, err := rss.Fetch(url)
if err != nil {
return fmt.Errorf("Error fetching address %v: %v", url, err)
}
domain := getDomain(url)
for _, item := range fd.Items {
id := fmt.Sprintf("%x", md5.Sum([]byte(item.ID)))
err = e.entries.Save(feeds.Entry{
Id: id,
Url: item.Link,
Title: item.Title,
Domain: domain,
Content: item.Summary,
Date: item.Date.Unix(),
})
if err != nil {
return fmt.Errorf("Error saving item: %v", err)
}
// @todo make this optional
_, err := e.postsService.Save(context.TODO(), &posts.SaveRequest{
Id: id,
Title: item.Title,
Content: item.Content,
Timestamp: item.Date.Unix(),
Metadata: map[string]string{
"domain": domain,
"link": item.Link,
},
})
if err != nil {
return err
}
}
return nil
}
func getDomain(address string) string { func getDomain(address string) string {
uri, _ := url.Parse(address) uri, _ := url.Parse(address)
return uri.Host return uri.Host

View File

@@ -19,6 +19,7 @@ type Feeds struct {
feedsIdIndex model.Index feedsIdIndex model.Index
feedsNameIndex model.Index feedsNameIndex model.Index
entriesDateIndex model.Index entriesDateIndex model.Index
entriesURLIndex model.Index
} }
func NewFeeds(postsService posts.PostsService) *Feeds { func NewFeeds(postsService posts.PostsService) *Feeds {
@@ -32,6 +33,10 @@ func NewFeeds(postsService posts.PostsService) *Feeds {
dateIndex := model.ByEquality("date") dateIndex := model.ByEquality("date")
dateIndex.Order.Type = model.OrderTypeDesc dateIndex.Order.Type = model.OrderTypeDesc
entriesURLIndex := model.ByEquality("url")
entriesURLIndex.Order.Type = model.OrderTypeDesc
entriesURLIndex.Order.FieldName = "date"
f := &Feeds{ f := &Feeds{
feeds: model.New( feeds: model.New(
store.DefaultStore, store.DefaultStore,
@@ -45,7 +50,7 @@ func NewFeeds(postsService posts.PostsService) *Feeds {
entries: model.New( entries: model.New(
store.DefaultStore, store.DefaultStore,
"entries", "entries",
model.Indexes(dateIndex), model.Indexes(dateIndex, entriesURLIndex),
&model.ModelOptions{ &model.ModelOptions{
Debug: false, Debug: false,
}, },
@@ -54,6 +59,7 @@ func NewFeeds(postsService posts.PostsService) *Feeds {
feedsIdIndex: idIndex, feedsIdIndex: idIndex,
feedsNameIndex: nameIndex, feedsNameIndex: nameIndex,
entriesDateIndex: dateIndex, entriesDateIndex: dateIndex,
entriesURLIndex: entriesURLIndex,
} }
go f.crawl() go f.crawl()
@@ -76,3 +82,12 @@ func (e *Feeds) New(ctx context.Context, req *feeds.NewRequest, rsp *feeds.NewRe
}) })
return nil return nil
} }
func (e *Feeds) Entries(ctx context.Context, req *feeds.EntriesRequest, rsp *feeds.EntriesResponse) error {
log.Info("Received Feeds.New request")
err := e.fetch(req.Url)
if err != nil {
return err
}
return e.entries.List(e.entriesURLIndex.ToQuery(req.Url), &rsp.Entries)
}

View File

@@ -226,30 +226,115 @@ func (m *NewResponse) XXX_DiscardUnknown() {
var xxx_messageInfo_NewResponse proto.InternalMessageInfo var xxx_messageInfo_NewResponse proto.InternalMessageInfo
type EntriesRequest struct {
Url string `protobuf:"bytes,1,opt,name=url,proto3" json:"url,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *EntriesRequest) Reset() { *m = EntriesRequest{} }
func (m *EntriesRequest) String() string { return proto.CompactTextString(m) }
func (*EntriesRequest) ProtoMessage() {}
func (*EntriesRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_dd517c38176c13bf, []int{4}
}
func (m *EntriesRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_EntriesRequest.Unmarshal(m, b)
}
func (m *EntriesRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_EntriesRequest.Marshal(b, m, deterministic)
}
func (m *EntriesRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_EntriesRequest.Merge(m, src)
}
func (m *EntriesRequest) XXX_Size() int {
return xxx_messageInfo_EntriesRequest.Size(m)
}
func (m *EntriesRequest) XXX_DiscardUnknown() {
xxx_messageInfo_EntriesRequest.DiscardUnknown(m)
}
var xxx_messageInfo_EntriesRequest proto.InternalMessageInfo
func (m *EntriesRequest) GetUrl() string {
if m != nil {
return m.Url
}
return ""
}
type EntriesResponse struct {
Entries []*Entry `protobuf:"bytes,1,rep,name=entries,proto3" json:"entries,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *EntriesResponse) Reset() { *m = EntriesResponse{} }
func (m *EntriesResponse) String() string { return proto.CompactTextString(m) }
func (*EntriesResponse) ProtoMessage() {}
func (*EntriesResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_dd517c38176c13bf, []int{5}
}
func (m *EntriesResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_EntriesResponse.Unmarshal(m, b)
}
func (m *EntriesResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_EntriesResponse.Marshal(b, m, deterministic)
}
func (m *EntriesResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_EntriesResponse.Merge(m, src)
}
func (m *EntriesResponse) XXX_Size() int {
return xxx_messageInfo_EntriesResponse.Size(m)
}
func (m *EntriesResponse) XXX_DiscardUnknown() {
xxx_messageInfo_EntriesResponse.DiscardUnknown(m)
}
var xxx_messageInfo_EntriesResponse proto.InternalMessageInfo
func (m *EntriesResponse) GetEntries() []*Entry {
if m != nil {
return m.Entries
}
return nil
}
func init() { func init() {
proto.RegisterType((*Feed)(nil), "feeds.Feed") proto.RegisterType((*Feed)(nil), "feeds.Feed")
proto.RegisterType((*Entry)(nil), "feeds.Entry") proto.RegisterType((*Entry)(nil), "feeds.Entry")
proto.RegisterType((*NewRequest)(nil), "feeds.NewRequest") proto.RegisterType((*NewRequest)(nil), "feeds.NewRequest")
proto.RegisterType((*NewResponse)(nil), "feeds.NewResponse") proto.RegisterType((*NewResponse)(nil), "feeds.NewResponse")
proto.RegisterType((*EntriesRequest)(nil), "feeds.EntriesRequest")
proto.RegisterType((*EntriesResponse)(nil), "feeds.EntriesResponse")
} }
func init() { proto.RegisterFile("proto/feeds.proto", fileDescriptor_dd517c38176c13bf) } func init() {
proto.RegisterFile("proto/feeds.proto", fileDescriptor_dd517c38176c13bf)
}
var fileDescriptor_dd517c38176c13bf = []byte{ var fileDescriptor_dd517c38176c13bf = []byte{
// 233 bytes of a gzipped FileDescriptorProto // 282 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x90, 0x31, 0x4f, 0xc3, 0x30, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x51, 0x3d, 0x4f, 0xc3, 0x30,
0x10, 0x85, 0x71, 0x1c, 0x07, 0x71, 0x55, 0x11, 0x3d, 0x21, 0x64, 0x31, 0x55, 0x9e, 0x3a, 0xa0, 0x10, 0x25, 0xcd, 0x97, 0xb8, 0xd0, 0x42, 0x4f, 0x50, 0x59, 0x9d, 0x50, 0x06, 0xd4, 0x01, 0x05,
0x20, 0x95, 0x81, 0x81, 0x0d, 0x09, 0xc6, 0x0e, 0x19, 0xd9, 0x02, 0x3e, 0x24, 0x4b, 0xad, 0x5d, 0x29, 0x4c, 0xc0, 0x86, 0x04, 0x23, 0x43, 0x46, 0xb6, 0x40, 0x0e, 0x29, 0x52, 0x6b, 0x97, 0xd8,
0xe2, 0xab, 0x2a, 0x7e, 0x00, 0xff, 0x1b, 0xe5, 0x92, 0x00, 0x23, 0xdb, 0xfb, 0x9e, 0xef, 0x9e, 0x15, 0xe2, 0x07, 0xf0, 0xbf, 0x71, 0x2e, 0x76, 0x53, 0x31, 0xb1, 0xbd, 0xf7, 0xf2, 0xde, 0xdd,
0xce, 0x0f, 0x16, 0xfb, 0x2e, 0x71, 0xba, 0x7d, 0x27, 0xf2, 0xb9, 0x16, 0x8d, 0x46, 0xc0, 0xdd, 0xcb, 0x19, 0xe6, 0xdb, 0x4e, 0x19, 0x75, 0xf3, 0x41, 0xd4, 0xe8, 0x82, 0x31, 0xc6, 0x4c, 0xf2,
0x40, 0xf9, 0x4c, 0xe4, 0x11, 0xa1, 0x8c, 0xed, 0x8e, 0xac, 0x5a, 0xaa, 0xd5, 0x59, 0x23, 0x1a, 0x6b, 0x88, 0x9e, 0x2d, 0x40, 0x84, 0x48, 0xd6, 0x1b, 0x12, 0xc1, 0x65, 0xb0, 0x3a, 0xae, 0x18,
0x2f, 0x40, 0x1f, 0xba, 0xad, 0x2d, 0xc4, 0xea, 0xa5, 0xfb, 0x52, 0x60, 0x9e, 0x22, 0x77, 0x9f, 0xe3, 0x19, 0x84, 0xbb, 0x6e, 0x2d, 0x26, 0x2c, 0xf5, 0x30, 0xff, 0x09, 0x20, 0x7e, 0x92, 0xa6,
0x78, 0x0e, 0x45, 0xf0, 0xe3, 0x74, 0x11, 0x3c, 0x5e, 0x41, 0xe5, 0xd3, 0xae, 0x0d, 0x71, 0x1c, 0xfb, 0xc6, 0x19, 0x4c, 0xda, 0xc6, 0xb9, 0x2d, 0xc2, 0x05, 0x24, 0x8d, 0xda, 0xd4, 0xad, 0x74,
0x1f, 0x69, 0xca, 0xd0, 0x3f, 0x19, 0x78, 0x09, 0x86, 0x03, 0x6f, 0xc9, 0x96, 0xe2, 0x0d, 0x80, 0x76, 0xc7, 0xfc, 0x8c, 0x70, 0x3f, 0x03, 0xcf, 0x21, 0x36, 0xad, 0x59, 0x93, 0x88, 0x58, 0x1b,
0x16, 0x4e, 0xdf, 0x52, 0x64, 0x8a, 0x6c, 0x8d, 0xf8, 0x13, 0xf6, 0x97, 0xf9, 0x96, 0xc9, 0x56, 0x08, 0x0a, 0x48, 0xdf, 0x95, 0x34, 0x24, 0x8d, 0x88, 0x59, 0xf7, 0xb4, 0x6f, 0xd6, 0xd4, 0x86,
0x4b, 0xb5, 0xd2, 0x8d, 0x68, 0xb7, 0x06, 0xd8, 0xd0, 0xb1, 0xa1, 0x8f, 0x03, 0x65, 0xfe, 0xe7, 0x44, 0x62, 0xe5, 0xb0, 0x62, 0x9c, 0x97, 0x00, 0x2f, 0xf4, 0x55, 0xd1, 0xe7, 0x8e, 0xb4, 0xf9,
0xed, 0x73, 0x98, 0xc9, 0x4e, 0xde, 0xa7, 0x98, 0x69, 0x7d, 0x0f, 0xa6, 0xff, 0x78, 0xc6, 0x1a, 0x67, 0xf7, 0x29, 0x64, 0x9c, 0xd1, 0x5b, 0x25, 0x35, 0xe5, 0x39, 0xcc, 0xfa, 0x3f, 0x69, 0x49,
0xf4, 0x86, 0x8e, 0xb8, 0xa8, 0x87, 0x76, 0x7e, 0x73, 0xaf, 0xf1, 0xaf, 0x35, 0xac, 0xb9, 0x93, 0xfb, 0x31, 0x2e, 0x12, 0x8c, 0x91, 0x3b, 0x38, 0xdd, 0x7b, 0x86, 0x18, 0x5e, 0x41, 0x4a, 0x83,
0xc7, 0xf9, 0xcb, 0x4c, 0x1a, 0x7c, 0x90, 0xc7, 0xd7, 0x4a, 0xe0, 0xee, 0x3b, 0x00, 0x00, 0xff, 0x64, 0x8d, 0xe1, 0x2a, 0x2b, 0x4f, 0x8a, 0xe1, 0xaa, 0x7c, 0x96, 0xca, 0x7f, 0x2c, 0x35, 0xc4,
0xff, 0xb0, 0x77, 0x9c, 0xb9, 0x63, 0x01, 0x00, 0x00, 0xfd, 0x5d, 0x35, 0x16, 0x10, 0xda, 0xb5, 0x38, 0x77, 0xb6, 0xb1, 0xf6, 0x12, 0x0f, 0x25, 0xd7,
0xea, 0x08, 0xef, 0x21, 0x75, 0x3b, 0xf1, 0xe2, 0x60, 0xf4, 0xd8, 0x73, 0xb9, 0xf8, 0x2b, 0xfb,
0xec, 0xe3, 0xf4, 0x35, 0xe3, 0xc7, 0x7d, 0x60, 0xc3, 0x5b, 0xc2, 0xe4, 0xf6, 0x37, 0x00, 0x00,
0xff, 0xff, 0x66, 0x49, 0x14, 0x45, 0xfe, 0x01, 0x00, 0x00,
} }

View File

@@ -43,6 +43,7 @@ func NewFeedsEndpoints() []*api.Endpoint {
type FeedsService interface { type FeedsService interface {
New(ctx context.Context, in *NewRequest, opts ...client.CallOption) (*NewResponse, error) New(ctx context.Context, in *NewRequest, opts ...client.CallOption) (*NewResponse, error)
Entries(ctx context.Context, in *EntriesRequest, opts ...client.CallOption) (*EntriesResponse, error)
} }
type feedsService struct { type feedsService struct {
@@ -67,15 +68,27 @@ func (c *feedsService) New(ctx context.Context, in *NewRequest, opts ...client.C
return out, nil return out, nil
} }
func (c *feedsService) Entries(ctx context.Context, in *EntriesRequest, opts ...client.CallOption) (*EntriesResponse, error) {
req := c.c.NewRequest(c.name, "Feeds.Entries", in)
out := new(EntriesResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Feeds service // Server API for Feeds service
type FeedsHandler interface { type FeedsHandler interface {
New(context.Context, *NewRequest, *NewResponse) error New(context.Context, *NewRequest, *NewResponse) error
Entries(context.Context, *EntriesRequest, *EntriesResponse) error
} }
func RegisterFeedsHandler(s server.Server, hdlr FeedsHandler, opts ...server.HandlerOption) error { func RegisterFeedsHandler(s server.Server, hdlr FeedsHandler, opts ...server.HandlerOption) error {
type feeds interface { type feeds interface {
New(ctx context.Context, in *NewRequest, out *NewResponse) error New(ctx context.Context, in *NewRequest, out *NewResponse) error
Entries(ctx context.Context, in *EntriesRequest, out *EntriesResponse) error
} }
type Feeds struct { type Feeds struct {
feeds feeds
@@ -91,3 +104,7 @@ type feedsHandler struct {
func (h *feedsHandler) New(ctx context.Context, in *NewRequest, out *NewResponse) error { func (h *feedsHandler) New(ctx context.Context, in *NewRequest, out *NewResponse) error {
return h.FeedsHandler.New(ctx, in, out) return h.FeedsHandler.New(ctx, in, out)
} }
func (h *feedsHandler) Entries(ctx context.Context, in *EntriesRequest, out *EntriesResponse) error {
return h.FeedsHandler.Entries(ctx, in, out)
}

View File

@@ -6,6 +6,7 @@ option go_package = "proto;feeds";
service Feeds { service Feeds {
rpc New(NewRequest) returns (NewResponse) {} rpc New(NewRequest) returns (NewResponse) {}
rpc Entries(EntriesRequest) returns (EntriesResponse) {}
} }
message Feed { message Feed {
@@ -31,3 +32,11 @@ message NewRequest {
message NewResponse { message NewResponse {
} }
message EntriesRequest {
string url = 1;
}
message EntriesResponse {
repeated Entry entries = 1;
}