diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs index b7e4003..f4a48ba 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs @@ -52,24 +52,33 @@ namespace BirdsiteLive.Pipeline.Processors index++; var t = Task.Run(async () => { - var user = userWtData.User; - var tweets = await RetrieveNewTweets(user); - _logger.LogInformation(index + "/" + syncTwitterUsers.Count() + " Got " + tweets.Length + " tweets from user " + user.Acct + " " ); - if (tweets.Length > 0 && user.LastTweetPostedId != -1) + try { - userWtData.Tweets = tweets; - usersWtTweets.Add(userWtData); - } - else if (tweets.Length > 0 && user.LastTweetPostedId == -1) + var user = userWtData.User; + var tweets = await RetrieveNewTweets(user); + _logger.LogInformation(index + "/" + syncTwitterUsers.Count() + " Got " + tweets.Length + " tweets from user " + user.Acct + " " ); + if (tweets.Length > 0 && user.LastTweetPostedId != -1) + { + userWtData.Tweets = tweets; + usersWtTweets.Add(userWtData); + } + else if (tweets.Length > 0 && user.LastTweetPostedId == -1) + { + var tweetId = tweets.Last().Id; + var now = DateTime.UtcNow; + await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, user.FetchingErrorCount, now); + } + else + { + var now = DateTime.UtcNow; + await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, user.FetchingErrorCount, now); + } + + } + catch(Exception e) { - var tweetId = tweets.Last().Id; - var now = DateTime.UtcNow; - await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, user.FetchingErrorCount, now); - } - else - { - var now = DateTime.UtcNow; - await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, user.FetchingErrorCount, now); + _logger.LogError(e.Message); + } }); todo.Add(t); diff --git a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs index eb7b260..f5f3e2c 100644 --- a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs +++ b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs @@ -46,7 +46,7 @@ namespace BirdsiteLive.Pipeline var retrieveTweetsBlock = new TransformBlock(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 } ); var retrieveTweetsBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); var retrieveFollowersBlock = new TransformManyBlock(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions); - var retrieveFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 1000, CancellationToken = ct }); + var retrieveFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 500, CancellationToken = ct }); var sendTweetsToFollowersBlock = new TransformBlock(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10, CancellationToken = ct, BoundedCapacity = 1 }); var sendTweetsToFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); var saveProgressionBlock = new ActionBlock(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10, CancellationToken = ct, BoundedCapacity = 1 });