From a4c54cd5c38061f99f8760932dc12e9fa1b085f8 Mon Sep 17 00:00:00 2001 From: Janos Dobronszki Date: Sun, 6 Dec 2020 08:58:58 +0100 Subject: [PATCH] Fetch entries and list them on demand (#28) * Fetch entries and list them on demand * Fix bugs * Bug * Fix ordering --- blog/feeds/handler/crawl.go | 75 +++++++++--------- blog/feeds/handler/feeds.go | 17 ++++- blog/feeds/proto/feeds.pb.go | 119 ++++++++++++++++++++++++----- blog/feeds/proto/feeds.pb.micro.go | 17 +++++ blog/feeds/proto/feeds.proto | 9 +++ 5 files changed, 185 insertions(+), 52 deletions(-) diff --git a/blog/feeds/handler/crawl.go b/blog/feeds/handler/crawl.go index 1b8f304..127165f 100644 --- a/blog/feeds/handler/crawl.go +++ b/blog/feeds/handler/crawl.go @@ -25,45 +25,52 @@ func (e *Feeds) fetchAll() { return } for _, feed := range fs { - log.Infof("Fetching address %v", feed.Url) - fd, err := rss.Fetch(feed.Url) + err = e.fetch(feed.Url) if err != nil { - log.Errorf("Error fetching address %v: %v", feed.Url, err) - continue - } - domain := getDomain(feed.Url) - - for _, item := range fd.Items { - id := fmt.Sprintf("%x", md5.Sum([]byte(item.ID))) - err = e.entries.Save(feeds.Entry{ - Id: id, - Url: item.Link, - Title: item.Title, - Domain: domain, - Content: item.Summary, - Date: item.Date.Unix(), - }) - if err != nil { - log.Errorf("Error saving item: %v", err) - } - // @todo make this optional - _, err := e.postsService.Save(context.TODO(), &posts.SaveRequest{ - Id: id, - Title: item.Title, - Content: item.Content, - Timestamp: item.Date.Unix(), - Metadata: map[string]string{ - "domain": domain, - "link": item.Link, - }, - }) - if err != nil { - log.Errorf("Error saving post: %v", err) - } + log.Errorf("Error saving post: %v", err) } } } +func (e *Feeds) fetch(url string) error { + log.Infof("Fetching address %v", url) + fd, err := rss.Fetch(url) + if err != nil { + return fmt.Errorf("Error fetching address %v: %v", url, err) + } + domain := getDomain(url) + + for _, item := range fd.Items { + id := fmt.Sprintf("%x", md5.Sum([]byte(item.ID))) + err = e.entries.Save(feeds.Entry{ + Id: id, + Url: item.Link, + Title: item.Title, + Domain: domain, + Content: item.Summary, + Date: item.Date.Unix(), + }) + if err != nil { + return fmt.Errorf("Error saving item: %v", err) + } + // @todo make this optional + _, err := e.postsService.Save(context.TODO(), &posts.SaveRequest{ + Id: id, + Title: item.Title, + Content: item.Content, + Timestamp: item.Date.Unix(), + Metadata: map[string]string{ + "domain": domain, + "link": item.Link, + }, + }) + if err != nil { + return err + } + } + return nil +} + func getDomain(address string) string { uri, _ := url.Parse(address) return uri.Host diff --git a/blog/feeds/handler/feeds.go b/blog/feeds/handler/feeds.go index afd7251..424d49e 100644 --- a/blog/feeds/handler/feeds.go +++ b/blog/feeds/handler/feeds.go @@ -19,6 +19,7 @@ type Feeds struct { feedsIdIndex model.Index feedsNameIndex model.Index entriesDateIndex model.Index + entriesURLIndex model.Index } func NewFeeds(postsService posts.PostsService) *Feeds { @@ -32,6 +33,10 @@ func NewFeeds(postsService posts.PostsService) *Feeds { dateIndex := model.ByEquality("date") dateIndex.Order.Type = model.OrderTypeDesc + entriesURLIndex := model.ByEquality("url") + entriesURLIndex.Order.Type = model.OrderTypeDesc + entriesURLIndex.Order.FieldName = "date" + f := &Feeds{ feeds: model.New( store.DefaultStore, @@ -45,7 +50,7 @@ func NewFeeds(postsService posts.PostsService) *Feeds { entries: model.New( store.DefaultStore, "entries", - model.Indexes(dateIndex), + model.Indexes(dateIndex, entriesURLIndex), &model.ModelOptions{ Debug: false, }, @@ -54,6 +59,7 @@ func NewFeeds(postsService posts.PostsService) *Feeds { feedsIdIndex: idIndex, feedsNameIndex: nameIndex, entriesDateIndex: dateIndex, + entriesURLIndex: entriesURLIndex, } go f.crawl() @@ -76,3 +82,12 @@ func (e *Feeds) New(ctx context.Context, req *feeds.NewRequest, rsp *feeds.NewRe }) return nil } + +func (e *Feeds) Entries(ctx context.Context, req *feeds.EntriesRequest, rsp *feeds.EntriesResponse) error { + log.Info("Received Feeds.New request") + err := e.fetch(req.Url) + if err != nil { + return err + } + return e.entries.List(e.entriesURLIndex.ToQuery(req.Url), &rsp.Entries) +} diff --git a/blog/feeds/proto/feeds.pb.go b/blog/feeds/proto/feeds.pb.go index 2f189d8..bf3bc02 100644 --- a/blog/feeds/proto/feeds.pb.go +++ b/blog/feeds/proto/feeds.pb.go @@ -226,30 +226,115 @@ func (m *NewResponse) XXX_DiscardUnknown() { var xxx_messageInfo_NewResponse proto.InternalMessageInfo +type EntriesRequest struct { + Url string `protobuf:"bytes,1,opt,name=url,proto3" json:"url,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *EntriesRequest) Reset() { *m = EntriesRequest{} } +func (m *EntriesRequest) String() string { return proto.CompactTextString(m) } +func (*EntriesRequest) ProtoMessage() {} +func (*EntriesRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_dd517c38176c13bf, []int{4} +} + +func (m *EntriesRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_EntriesRequest.Unmarshal(m, b) +} +func (m *EntriesRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_EntriesRequest.Marshal(b, m, deterministic) +} +func (m *EntriesRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_EntriesRequest.Merge(m, src) +} +func (m *EntriesRequest) XXX_Size() int { + return xxx_messageInfo_EntriesRequest.Size(m) +} +func (m *EntriesRequest) XXX_DiscardUnknown() { + xxx_messageInfo_EntriesRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_EntriesRequest proto.InternalMessageInfo + +func (m *EntriesRequest) GetUrl() string { + if m != nil { + return m.Url + } + return "" +} + +type EntriesResponse struct { + Entries []*Entry `protobuf:"bytes,1,rep,name=entries,proto3" json:"entries,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *EntriesResponse) Reset() { *m = EntriesResponse{} } +func (m *EntriesResponse) String() string { return proto.CompactTextString(m) } +func (*EntriesResponse) ProtoMessage() {} +func (*EntriesResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_dd517c38176c13bf, []int{5} +} + +func (m *EntriesResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_EntriesResponse.Unmarshal(m, b) +} +func (m *EntriesResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_EntriesResponse.Marshal(b, m, deterministic) +} +func (m *EntriesResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_EntriesResponse.Merge(m, src) +} +func (m *EntriesResponse) XXX_Size() int { + return xxx_messageInfo_EntriesResponse.Size(m) +} +func (m *EntriesResponse) XXX_DiscardUnknown() { + xxx_messageInfo_EntriesResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_EntriesResponse proto.InternalMessageInfo + +func (m *EntriesResponse) GetEntries() []*Entry { + if m != nil { + return m.Entries + } + return nil +} + func init() { proto.RegisterType((*Feed)(nil), "feeds.Feed") proto.RegisterType((*Entry)(nil), "feeds.Entry") proto.RegisterType((*NewRequest)(nil), "feeds.NewRequest") proto.RegisterType((*NewResponse)(nil), "feeds.NewResponse") + proto.RegisterType((*EntriesRequest)(nil), "feeds.EntriesRequest") + proto.RegisterType((*EntriesResponse)(nil), "feeds.EntriesResponse") } -func init() { proto.RegisterFile("proto/feeds.proto", fileDescriptor_dd517c38176c13bf) } +func init() { + proto.RegisterFile("proto/feeds.proto", fileDescriptor_dd517c38176c13bf) +} var fileDescriptor_dd517c38176c13bf = []byte{ - // 233 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x90, 0x31, 0x4f, 0xc3, 0x30, - 0x10, 0x85, 0x71, 0x1c, 0x07, 0x71, 0x55, 0x11, 0x3d, 0x21, 0x64, 0x31, 0x55, 0x9e, 0x3a, 0xa0, - 0x20, 0x95, 0x81, 0x81, 0x0d, 0x09, 0xc6, 0x0e, 0x19, 0xd9, 0x02, 0x3e, 0x24, 0x4b, 0xad, 0x5d, - 0xe2, 0xab, 0x2a, 0x7e, 0x00, 0xff, 0x1b, 0xe5, 0x92, 0x00, 0x23, 0xdb, 0xfb, 0x9e, 0xef, 0x9e, - 0xce, 0x0f, 0x16, 0xfb, 0x2e, 0x71, 0xba, 0x7d, 0x27, 0xf2, 0xb9, 0x16, 0x8d, 0x46, 0xc0, 0xdd, - 0x40, 0xf9, 0x4c, 0xe4, 0x11, 0xa1, 0x8c, 0xed, 0x8e, 0xac, 0x5a, 0xaa, 0xd5, 0x59, 0x23, 0x1a, - 0x2f, 0x40, 0x1f, 0xba, 0xad, 0x2d, 0xc4, 0xea, 0xa5, 0xfb, 0x52, 0x60, 0x9e, 0x22, 0x77, 0x9f, - 0x78, 0x0e, 0x45, 0xf0, 0xe3, 0x74, 0x11, 0x3c, 0x5e, 0x41, 0xe5, 0xd3, 0xae, 0x0d, 0x71, 0x1c, - 0x1f, 0x69, 0xca, 0xd0, 0x3f, 0x19, 0x78, 0x09, 0x86, 0x03, 0x6f, 0xc9, 0x96, 0xe2, 0x0d, 0x80, - 0x16, 0x4e, 0xdf, 0x52, 0x64, 0x8a, 0x6c, 0x8d, 0xf8, 0x13, 0xf6, 0x97, 0xf9, 0x96, 0xc9, 0x56, - 0x4b, 0xb5, 0xd2, 0x8d, 0x68, 0xb7, 0x06, 0xd8, 0xd0, 0xb1, 0xa1, 0x8f, 0x03, 0x65, 0xfe, 0xe7, - 0xed, 0x73, 0x98, 0xc9, 0x4e, 0xde, 0xa7, 0x98, 0x69, 0x7d, 0x0f, 0xa6, 0xff, 0x78, 0xc6, 0x1a, - 0xf4, 0x86, 0x8e, 0xb8, 0xa8, 0x87, 0x76, 0x7e, 0x73, 0xaf, 0xf1, 0xaf, 0x35, 0xac, 0xb9, 0x93, - 0xc7, 0xf9, 0xcb, 0x4c, 0x1a, 0x7c, 0x90, 0xc7, 0xd7, 0x4a, 0xe0, 0xee, 0x3b, 0x00, 0x00, 0xff, - 0xff, 0xb0, 0x77, 0x9c, 0xb9, 0x63, 0x01, 0x00, 0x00, + // 282 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x51, 0x3d, 0x4f, 0xc3, 0x30, + 0x10, 0x25, 0xcd, 0x97, 0xb8, 0xd0, 0x42, 0x4f, 0x50, 0x59, 0x9d, 0x50, 0x06, 0xd4, 0x01, 0x05, + 0x29, 0x4c, 0xc0, 0x86, 0x04, 0x23, 0x43, 0x46, 0xb6, 0x40, 0x0e, 0x29, 0x52, 0x6b, 0x97, 0xd8, + 0x15, 0xe2, 0x07, 0xf0, 0xbf, 0x71, 0x2e, 0x76, 0x53, 0x31, 0xb1, 0xbd, 0xf7, 0xf2, 0xde, 0xdd, + 0xcb, 0x19, 0xe6, 0xdb, 0x4e, 0x19, 0x75, 0xf3, 0x41, 0xd4, 0xe8, 0x82, 0x31, 0xc6, 0x4c, 0xf2, + 0x6b, 0x88, 0x9e, 0x2d, 0x40, 0x84, 0x48, 0xd6, 0x1b, 0x12, 0xc1, 0x65, 0xb0, 0x3a, 0xae, 0x18, + 0xe3, 0x19, 0x84, 0xbb, 0x6e, 0x2d, 0x26, 0x2c, 0xf5, 0x30, 0xff, 0x09, 0x20, 0x7e, 0x92, 0xa6, + 0xfb, 0xc6, 0x19, 0x4c, 0xda, 0xc6, 0xb9, 0x2d, 0xc2, 0x05, 0x24, 0x8d, 0xda, 0xd4, 0xad, 0x74, + 0x76, 0xc7, 0xfc, 0x8c, 0x70, 0x3f, 0x03, 0xcf, 0x21, 0x36, 0xad, 0x59, 0x93, 0x88, 0x58, 0x1b, + 0x08, 0x0a, 0x48, 0xdf, 0x95, 0x34, 0x24, 0x8d, 0x88, 0x59, 0xf7, 0xb4, 0x6f, 0xd6, 0xd4, 0x86, + 0x44, 0x62, 0xe5, 0xb0, 0x62, 0x9c, 0x97, 0x00, 0x2f, 0xf4, 0x55, 0xd1, 0xe7, 0x8e, 0xb4, 0xf9, + 0x67, 0xf7, 0x29, 0x64, 0x9c, 0xd1, 0x5b, 0x25, 0x35, 0xe5, 0x39, 0xcc, 0xfa, 0x3f, 0x69, 0x49, + 0xfb, 0x31, 0x2e, 0x12, 0x8c, 0x91, 0x3b, 0x38, 0xdd, 0x7b, 0x86, 0x18, 0x5e, 0x41, 0x4a, 0x83, + 0x64, 0x8d, 0xe1, 0x2a, 0x2b, 0x4f, 0x8a, 0xe1, 0xaa, 0x7c, 0x96, 0xca, 0x7f, 0x2c, 0x35, 0xc4, + 0xfd, 0x5d, 0x35, 0x16, 0x10, 0xda, 0xb5, 0x38, 0x77, 0xb6, 0xb1, 0xf6, 0x12, 0x0f, 0x25, 0xd7, + 0xea, 0x08, 0xef, 0x21, 0x75, 0x3b, 0xf1, 0xe2, 0x60, 0xf4, 0xd8, 0x73, 0xb9, 0xf8, 0x2b, 0xfb, + 0xec, 0xe3, 0xf4, 0x35, 0xe3, 0xc7, 0x7d, 0x60, 0xc3, 0x5b, 0xc2, 0xe4, 0xf6, 0x37, 0x00, 0x00, + 0xff, 0xff, 0x66, 0x49, 0x14, 0x45, 0xfe, 0x01, 0x00, 0x00, } diff --git a/blog/feeds/proto/feeds.pb.micro.go b/blog/feeds/proto/feeds.pb.micro.go index 147537f..347b26f 100644 --- a/blog/feeds/proto/feeds.pb.micro.go +++ b/blog/feeds/proto/feeds.pb.micro.go @@ -43,6 +43,7 @@ func NewFeedsEndpoints() []*api.Endpoint { type FeedsService interface { New(ctx context.Context, in *NewRequest, opts ...client.CallOption) (*NewResponse, error) + Entries(ctx context.Context, in *EntriesRequest, opts ...client.CallOption) (*EntriesResponse, error) } type feedsService struct { @@ -67,15 +68,27 @@ func (c *feedsService) New(ctx context.Context, in *NewRequest, opts ...client.C return out, nil } +func (c *feedsService) Entries(ctx context.Context, in *EntriesRequest, opts ...client.CallOption) (*EntriesResponse, error) { + req := c.c.NewRequest(c.name, "Feeds.Entries", in) + out := new(EntriesResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // Server API for Feeds service type FeedsHandler interface { New(context.Context, *NewRequest, *NewResponse) error + Entries(context.Context, *EntriesRequest, *EntriesResponse) error } func RegisterFeedsHandler(s server.Server, hdlr FeedsHandler, opts ...server.HandlerOption) error { type feeds interface { New(ctx context.Context, in *NewRequest, out *NewResponse) error + Entries(ctx context.Context, in *EntriesRequest, out *EntriesResponse) error } type Feeds struct { feeds @@ -91,3 +104,7 @@ type feedsHandler struct { func (h *feedsHandler) New(ctx context.Context, in *NewRequest, out *NewResponse) error { return h.FeedsHandler.New(ctx, in, out) } + +func (h *feedsHandler) Entries(ctx context.Context, in *EntriesRequest, out *EntriesResponse) error { + return h.FeedsHandler.Entries(ctx, in, out) +} diff --git a/blog/feeds/proto/feeds.proto b/blog/feeds/proto/feeds.proto index e962ac3..6e06415 100644 --- a/blog/feeds/proto/feeds.proto +++ b/blog/feeds/proto/feeds.proto @@ -6,6 +6,7 @@ option go_package = "proto;feeds"; service Feeds { rpc New(NewRequest) returns (NewResponse) {} + rpc Entries(EntriesRequest) returns (EntriesResponse) {} } message Feed { @@ -31,3 +32,11 @@ message NewRequest { message NewResponse { } + +message EntriesRequest { + string url = 1; +} + +message EntriesResponse { + repeated Entry entries = 1; +} \ No newline at end of file