diff --git a/youtube/feed.go b/youtube/feed.go index f494e1b881..7ca7dc37ca 100644 --- a/youtube/feed.go +++ b/youtube/feed.go @@ -28,12 +28,13 @@ import ( const ( WebSubCheckInterval = time.Second * 10 - // PollInterval = time.Second * 5 // <- used for debug purposes ) func (p *Plugin) StartFeed() { p.Stop = make(chan *sync.WaitGroup) - p.runWebsubChecker() + go p.runWebsubChecker() + go p.autoSyncWebsubs() + go p.deleteOldVideos() } func (p *Plugin) StopFeed(wg *sync.WaitGroup) { @@ -54,18 +55,45 @@ func (p *Plugin) SetupClient() error { return nil } +func (p *Plugin) deleteOldVideos() { + ticker := time.NewTicker(time.Minute * 1) + // Remove videos older than 24 hours + for { + select { + case <-ticker.C: + var expiring int64 + videoCacheDays := confYoutubeVideoCacheDays.GetInt() + if videoCacheDays < 1 { + videoCacheDays = 1 + } + common.RedisPool.Do(radix.FlatCmd(&expiring, "ZREMRANGEBYSCORE", RedisKeyPublishedVideoList, "-inf", time.Now().AddDate(0, 0, -1*videoCacheDays).Unix())) + logger.Infof("Removed %d old videos", expiring) + } + } +} + +func (p *Plugin) autoSyncWebsubs() { + // force syncs all websubs from db every 24 hours in case of outages or missed updates + ticker := time.NewTicker(time.Hour * 24) + for { + select { + case <-ticker.C: + go p.syncWebSubs() + } + } +} + // keeps the subscriptions up to date by updating the ones soon to be expiring func (p *Plugin) runWebsubChecker() { go p.syncWebSubs() - - websubTicker := time.NewTicker(WebSubCheckInterval) + ticker := time.NewTicker(WebSubCheckInterval) for { select { case wg := <-p.Stop: wg.Done() return - case <-websubTicker.C: - p.checkExpiringWebsubs() + case <-ticker.C: + go p.checkExpiringWebsubs() } } } @@ -548,18 +576,21 @@ func (p *Plugin) CheckVideo(parsedVideo XMLFeed) error { return err } - lastVid, lastVidTime, err := p.getLastVidTimes(channelID) - if err != nil { - return err + videoCacheDays := confYoutubeVideoCacheDays.GetInt() + if videoCacheDays < 1 { + videoCacheDays = 1 } - - if lastVidTime.After(parsedPublishedTime) { - // wasn't a new vid + if time.Since(parsedPublishedTime) > time.Hour*24*time.Duration(videoCacheDays) { + // don't post videos older than videoCacheDays + logger.Infof("Skipped Stale video for youtube channel %s: video_id: %s", channelID, videoID) return nil } - if lastVid == videoID { - // the video was already posted and was probably just edited + mn := radix.MaybeNil{} + common.RedisPool.Do(radix.Cmd(&mn, "ZSCORE", RedisKeyPublishedVideoList, videoID)) + if !mn.Nil { + // video was already published, maybe just an update on it? + logger.Infof("Skipped Already Published video for youtube channel %s: video_id: %s", channelID, videoID) return nil } @@ -620,19 +651,14 @@ func (p *Plugin) isShortsRedirect(videoId string) bool { } func (p *Plugin) postVideo(subs []*ChannelSubscription, publishedAt time.Time, video *youtube.Video, channelID string) error { - err := common.MultipleCmds( - radix.FlatCmd(nil, "SET", KeyLastVidTime(channelID), publishedAt.Unix()), - radix.FlatCmd(nil, "SET", KeyLastVidID(channelID), video.Id), - ) + // add video to list of published videos + err := common.RedisPool.Do(radix.FlatCmd(nil, "ZADD", RedisKeyPublishedVideoList, publishedAt.Unix(), video.Id)) if err != nil { return err } contentType := video.Snippet.LiveBroadcastContent logger.Infof("Got a new video for channel %s (%s) with videoid %s (%s), of type %s and publishing to %d subscriptions", channelID, video.Snippet.ChannelTitle, video.Id, video.Snippet.Title, contentType, len(subs)) - if contentType != "live" && contentType != "none" { - return nil - } isLivestream := contentType == "live" isUpcoming := contentType == "upcoming" @@ -681,20 +707,3 @@ func (p *Plugin) getRemoveSubs(channelID string) ([]*ChannelSubscription, error) return subs, nil } - -func (p *Plugin) getLastVidTimes(channelID string) (lastVid string, lastVidTime time.Time, err error) { - // Find the last video time for this channel - var unixSeconds int64 - err = common.RedisPool.Do(radix.Cmd(&unixSeconds, "GET", KeyLastVidTime(channelID))) - - var lastProcessedVidTime time.Time - if err != nil || unixSeconds == 0 { - lastProcessedVidTime = time.Time{} - } else { - lastProcessedVidTime = time.Unix(unixSeconds, 0) - } - - var lastVidID string - err = common.RedisPool.Do(radix.Cmd(&lastVidID, "GET", KeyLastVidID(channelID))) - return lastVidID, lastProcessedVidTime, err -} diff --git a/youtube/youtube.go b/youtube/youtube.go index 2551baf65b..3f7cd8afbb 100644 --- a/youtube/youtube.go +++ b/youtube/youtube.go @@ -17,16 +17,17 @@ import ( ) const ( - RedisChannelsLockKey = "youtube_subbed_channel_lock" - - RedisKeyWebSubChannels = "youtube_registered_websub_channels" - GoogleWebsubHub = "https://pubsubhubbub.appspot.com/subscribe" + RedisChannelsLockKey = "youtube_subbed_channel_lock" + RedisKeyPublishedVideoList = "youtube_published_videos" + RedisKeyWebSubChannels = "youtube_registered_websub_channels" + GoogleWebsubHub = "https://pubsubhubbub.appspot.com/subscribe" ) var ( - confWebsubVerifytoken = config.RegisterOption("yagpdb.youtube.verify_token", "Youtube websub push verify token, set it to a random string and never change it", "asdkpoasdkpaoksdpako") - confResubBatchSize = config.RegisterOption("yagpdb.youtube.resub_batch_size", "Number of Websubs to resubscribe to concurrently", 1) - logger = common.GetPluginLogger(&Plugin{}) + confWebsubVerifytoken = config.RegisterOption("yagpdb.youtube.verify_token", "Youtube websub push verify token, set it to a random string and never change it", "asdkpoasdkpaoksdpako") + confResubBatchSize = config.RegisterOption("yagpdb.youtube.resub_batch_size", "Number of Websubs to resubscribe to concurrently", 1) + confYoutubeVideoCacheDays = config.RegisterOption("yagpdb.youtube.video_cache_duration", "Duration in days to cache youtube video data", 1) + logger = common.GetPluginLogger(&Plugin{}) ) func KeyLastVidTime(channel string) string { return "youtube_last_video_time:" + channel }