From 9026273f456cb46b6358f0573c0992eec5ac8334 Mon Sep 17 00:00:00 2001 From: Vincent Cloutier Date: Tue, 10 Jan 2023 21:00:21 -0500 Subject: [PATCH] made tweet fetching concurrent instead --- .../Processors/RetrieveTweetsProcessor.cs | 51 ++++++++++++------- .../StatusPublicationPipeline.cs | 2 +- 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs index 681ee66..a50f1d1 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -33,35 +34,47 @@ namespace BirdsiteLive.Pipeline.Processors public async Task ProcessAsync(UserWithDataToSync[] syncTwitterUsers, CancellationToken ct) { - var usersWtTweets = new List(); + var usersWtTweets = new ConcurrentBag(); + List todo = new List(); int index = 0; foreach (var userWtData in syncTwitterUsers) { index++; - var user = userWtData.User; - var tweets = await RetrieveNewTweets(user); - _logger.LogInformation(index + "/" + syncTwitterUsers.Count() + " Got " + tweets.Length + " tweets from user " + user.Acct + " " ); - if (tweets.Length > 0 && user.LastTweetPostedId != -1) + + var t = Task.Run(async () => { + var user = userWtData.User; + var tweets = await RetrieveNewTweets(user); + _logger.LogInformation(index + "/" + syncTwitterUsers.Count() + " Got " + tweets.Length + " tweets from user " + user.Acct + " " ); + if (tweets.Length > 0 && user.LastTweetPostedId != -1) + { + userWtData.Tweets = tweets; + usersWtTweets.Add(userWtData); + } + else if (tweets.Length > 0 && user.LastTweetPostedId == -1) + { + var tweetId = tweets.Last().Id; + var now = DateTime.UtcNow; + await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, user.FetchingErrorCount, now); + } + else + { + var now = DateTime.UtcNow; + await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, user.FetchingErrorCount, now); + } + }); + todo.Add(t); + if (todo.Count > 3) { - userWtData.Tweets = tweets; - usersWtTweets.Add(userWtData); - } - else if (tweets.Length > 0 && user.LastTweetPostedId == -1) - { - var tweetId = tweets.Last().Id; - var now = DateTime.UtcNow; - await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, user.FetchingErrorCount, now); - } - else - { - var now = DateTime.UtcNow; - await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, user.FetchingErrorCount, now); + await Task.WhenAll(todo); + _logger.LogInformation(index + "/" + syncTwitterUsers.Count() ); + todo.Clear(); + //await Task.Delay(250); } - await Task.Delay(250); } + await Task.WhenAll(todo); return usersWtTweets.ToArray(); } diff --git a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs index b629b7e..16a40ee 100644 --- a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs +++ b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs @@ -43,7 +43,7 @@ namespace BirdsiteLive.Pipeline // Create blocks var twitterUserToRefreshBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct }); - var retrieveTweetsBlock = new TransformBlock(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 2 } ); + var retrieveTweetsBlock = new TransformBlock(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 } ); var retrieveTweetsBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); var retrieveFollowersBlock = new TransformManyBlock(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions); var retrieveFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });