From 46be9552e99d002c80205bbc04f8d3f3e662c54a Mon Sep 17 00:00:00 2001 From: Vincent Cloutier Date: Sat, 25 Mar 2023 15:26:11 -0400 Subject: [PATCH] pipeline refactoring --- .../ISendTweetsToFollowersProcessor.cs | 2 +- .../SendTweetsToFollowersProcessor.cs | 51 ++++++++++--------- .../StatusPublicationPipeline.cs | 10 ++-- .../SendTweetsToFollowersProcessorTests.cs | 24 ++++----- 4 files changed, 45 insertions(+), 42 deletions(-) diff --git a/src/BirdsiteLive.Pipeline/Contracts/ISendTweetsToFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Contracts/ISendTweetsToFollowersProcessor.cs index c188f55..eddadf5 100644 --- a/src/BirdsiteLive.Pipeline/Contracts/ISendTweetsToFollowersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Contracts/ISendTweetsToFollowersProcessor.cs @@ -6,6 +6,6 @@ namespace BirdsiteLive.Pipeline.Contracts { public interface ISendTweetsToFollowersProcessor { - Task ProcessAsync(UserWithDataToSync userWithTweetsToSync, CancellationToken ct); + Task ProcessAsync(UserWithDataToSync[] usersWithTweetsToSync, CancellationToken ct); } } \ No newline at end of file diff --git a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs index 80bb25a..49e3bdb 100644 --- a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs @@ -41,34 +41,39 @@ namespace BirdsiteLive.Pipeline.Processors } #endregion - public async Task ProcessAsync(UserWithDataToSync userWithTweetsToSync, CancellationToken ct) + public async Task ProcessAsync(UserWithDataToSync[] usersWithTweetsToSync, CancellationToken ct) { - var user = userWithTweetsToSync.User; - - _todo = _todo.Where(x => !x.IsCompleted).ToList(); - - var t = Task.Run( async () => + foreach (var userWithTweetsToSync in usersWithTweetsToSync) { - // Process Shared Inbox - var followersWtSharedInbox = userWithTweetsToSync.Followers - .Where(x => !string.IsNullOrWhiteSpace(x.SharedInboxRoute)) - .ToList(); - await ProcessFollowersWithSharedInboxAsync(userWithTweetsToSync.Tweets, followersWtSharedInbox, user); + var user = userWithTweetsToSync.User; - // Process Inbox - var followerWtInbox = userWithTweetsToSync.Followers - .Where(x => string.IsNullOrWhiteSpace(x.SharedInboxRoute)) - .ToList(); - await ProcessFollowersWithInboxAsync(userWithTweetsToSync.Tweets, followerWtInbox, user); - }, ct); - _todo.Add(t); + _todo = _todo.Where(x => !x.IsCompleted).ToList(); + + var t = Task.Run( async () => + { + // Process Shared Inbox + var followersWtSharedInbox = userWithTweetsToSync.Followers + .Where(x => !string.IsNullOrWhiteSpace(x.SharedInboxRoute)) + .ToList(); + await ProcessFollowersWithSharedInboxAsync(userWithTweetsToSync.Tweets, followersWtSharedInbox, user); - if (_todo.Count >= _instanceSettings.ParallelFediversePosts) - { - await Task.WhenAny(_todo); + // Process Inbox + var followerWtInbox = userWithTweetsToSync.Followers + .Where(x => string.IsNullOrWhiteSpace(x.SharedInboxRoute)) + .ToList(); + await ProcessFollowersWithInboxAsync(userWithTweetsToSync.Tweets, followerWtInbox, user); + + _logger.LogInformation("Done sending " + userWithTweetsToSync.Tweets.Length + "tweets for user " + userWithTweetsToSync.User.Acct); + }, ct); + _todo.Add(t); + + if (_todo.Count >= _instanceSettings.ParallelFediversePosts) + { + await Task.WhenAny(_todo); + } + + } - - _logger.LogInformation("Done sending " + userWithTweetsToSync.Followers.Length + "tweets for user " + userWithTweetsToSync.User.Acct); } diff --git a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs index 97c4bf6..47ee70d 100644 --- a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs +++ b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs @@ -45,16 +45,14 @@ namespace BirdsiteLive.Pipeline { BoundedCapacity = 1, CancellationToken = ct }); var retrieveTweetsBlock = new TransformBlock(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), standardBlockOptions ); var retrieveTweetsBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); - var retrieveFollowersBlock = new TransformManyBlock(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1 } ); - var retrieveFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 500, CancellationToken = ct }); - var sendTweetsToFollowersBlock = new ActionBlock(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions); + // var retrieveFollowersBlock = new TransformManyBlock(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1 } ); + // var retrieveFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 500, CancellationToken = ct }); + var sendTweetsToFollowersBlock = new ActionBlock(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions); // Link pipeline twitterUserToRefreshBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true }); retrieveTweetsBlock.LinkTo(retrieveTweetsBufferBlock, new DataflowLinkOptions { PropagateCompletion = true }); - retrieveTweetsBufferBlock.LinkTo(retrieveFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true }); - retrieveFollowersBlock.LinkTo(retrieveFollowersBufferBlock, new DataflowLinkOptions { PropagateCompletion = true }); - retrieveFollowersBufferBlock.LinkTo(sendTweetsToFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true }); + retrieveTweetsBufferBlock.LinkTo(sendTweetsToFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true }); // Launch twitter user retriever after a little delay // to give time for the Tweet cache to fill diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SendTweetsToFollowersProcessorTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SendTweetsToFollowersProcessorTests.cs index 06d8b67..2121831 100644 --- a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SendTweetsToFollowersProcessorTests.cs +++ b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SendTweetsToFollowersProcessorTests.cs @@ -88,7 +88,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new[] {userWithTweets}, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); @@ -170,7 +170,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new[] {userWithTweets}, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); @@ -261,7 +261,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new[] {userWithTweets}, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); @@ -353,7 +353,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); @@ -450,7 +450,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); @@ -530,7 +530,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); @@ -611,7 +611,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); @@ -700,7 +700,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); @@ -790,7 +790,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); @@ -880,7 +880,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); @@ -970,7 +970,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll(); @@ -1065,7 +1065,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); - await processor.ProcessAsync(userWithTweets, CancellationToken.None); + await processor.ProcessAsync(new [] { userWithTweets }, CancellationToken.None); #region Validations sendTweetsToInboxTaskMock.VerifyAll();