diff --git a/src/BirdsiteLive.Common/Settings/InstanceSettings.cs b/src/BirdsiteLive.Common/Settings/InstanceSettings.cs index 84821d4..db82611 100644 --- a/src/BirdsiteLive.Common/Settings/InstanceSettings.cs +++ b/src/BirdsiteLive.Common/Settings/InstanceSettings.cs @@ -16,5 +16,6 @@ public int UserCacheCapacity { get; set; } public int ParallelTwitterRequests { get; set; } = 10; + public int ParallelFediversePosts { get; set; } = 10; } } diff --git a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs index 02e86cf..796e472 100644 --- a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs @@ -28,6 +28,7 @@ namespace BirdsiteLive.Pipeline.Processors private readonly InstanceSettings _instanceSettings; private readonly ILogger _logger; private readonly IRemoveFollowerAction _removeFollowerAction; + private List _todo = new List(); #region Ctor public SendTweetsToFollowersProcessor(ISendTweetsToInboxTask sendTweetsToInboxTask, ISendTweetsToSharedInboxTask sendTweetsToSharedInbox, ISaveProgressionTask saveProgressionTask, IFollowersDal followersDal, ILogger logger, InstanceSettings instanceSettings, IRemoveFollowerAction removeFollowerAction) @@ -46,51 +47,56 @@ namespace BirdsiteLive.Pipeline.Processors { var user = userWithTweetsToSync.User; - // Process Shared Inbox - var followersWtSharedInbox = userWithTweetsToSync.Followers - .Where(x => !string.IsNullOrWhiteSpace(x.SharedInboxRoute)) - .ToList(); - await ProcessFollowersWithSharedInboxAsync(userWithTweetsToSync.Tweets, followersWtSharedInbox, user); + _todo = _todo.Where(x => !x.IsCompleted).ToList(); + + var t = Task.Run( async () => + { + // Process Shared Inbox + var followersWtSharedInbox = userWithTweetsToSync.Followers + .Where(x => !string.IsNullOrWhiteSpace(x.SharedInboxRoute)) + .ToList(); + await ProcessFollowersWithSharedInboxAsync(userWithTweetsToSync.Tweets, followersWtSharedInbox, user); - // Process Inbox - var followerWtInbox = userWithTweetsToSync.Followers - .Where(x => string.IsNullOrWhiteSpace(x.SharedInboxRoute)) - .ToList(); - await ProcessFollowersWithInboxAsync(userWithTweetsToSync.Tweets, followerWtInbox, user); + // Process Inbox + var followerWtInbox = userWithTweetsToSync.Followers + .Where(x => string.IsNullOrWhiteSpace(x.SharedInboxRoute)) + .ToList(); + await ProcessFollowersWithInboxAsync(userWithTweetsToSync.Tweets, followerWtInbox, user); - await _saveProgressionTask.ProcessAsync(userWithTweetsToSync, ct); + await _saveProgressionTask.ProcessAsync(userWithTweetsToSync, ct); + }); + _todo.Add(t); + + if (_todo.Count >= _instanceSettings.ParallelFediversePosts) + { + await Task.WhenAny(_todo); + } } private async Task ProcessFollowersWithSharedInboxAsync(ExtractedTweet[] tweets, List followers, SyncTwitterUser user) { var followersPerInstances = followers.GroupBy(x => x.Host); - List todo = new List(); foreach (var followersPerInstance in followersPerInstances) { - var t = Task.Run( async () => + try { - try - { - _logger.LogInformation("Sending " + tweets.Length + " tweets from user " + user.Acct + " to instance " + followersPerInstance.Key); - await _sendTweetsToSharedInbox.ExecuteAsync(tweets, user, followersPerInstance.Key, followersPerInstance.ToArray()); + _logger.LogInformation("Sending " + tweets.Length + " tweets from user " + user.Acct + " to instance " + followersPerInstance.Key); + await _sendTweetsToSharedInbox.ExecuteAsync(tweets, user, followersPerInstance.Key, followersPerInstance.ToArray()); - foreach (var f in followersPerInstance) - await ProcessWorkingUserAsync(f); - } - catch (Exception e) - { - var follower = followersPerInstance.First(); - _logger.LogError(e, "Posting to {Host}{Route} failed", follower.Host, follower.SharedInboxRoute); + foreach (var f in followersPerInstance) + await ProcessWorkingUserAsync(f); + } + catch (Exception e) + { + var follower = followersPerInstance.First(); + _logger.LogError(e, "Posting to {Host}{Route} failed", follower.Host, follower.SharedInboxRoute); - foreach (var f in followersPerInstance) - await ProcessFailingUserAsync(f); - } - }); - todo.Add(t); + foreach (var f in followersPerInstance) + await ProcessFailingUserAsync(f); + } } - await Task.WhenAll(todo); } private async Task ProcessFollowersWithInboxAsync(ExtractedTweet[] tweets, List followerWtInbox, SyncTwitterUser user) diff --git a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs index 806064d..88fbb28 100644 --- a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs +++ b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs @@ -47,7 +47,7 @@ namespace BirdsiteLive.Pipeline var retrieveTweetsBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); var retrieveFollowersBlock = new TransformManyBlock(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions); var retrieveFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 500, CancellationToken = ct }); - var sendTweetsToFollowersBlock = new ActionBlock(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10, CancellationToken = ct, BoundedCapacity = 1 }); + var sendTweetsToFollowersBlock = new ActionBlock(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, CancellationToken = ct, BoundedCapacity = 1 }); // Link pipeline twitterUserToRefreshBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true }); diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SendTweetsToFollowersProcessorTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SendTweetsToFollowersProcessorTests.cs index 18c60a0..03e90ad 100644 --- a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SendTweetsToFollowersProcessorTests.cs +++ b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SendTweetsToFollowersProcessorTests.cs @@ -79,7 +79,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var loggerMock = new Mock>(); var saveProgressMock = new Mock(); - var settings = new InstanceSettings(); + var settings = new InstanceSettings + { + ParallelFediversePosts = 1 + }; var removeFollowerMock = new Mock(MockBehavior.Strict); #endregion @@ -157,7 +160,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var loggerMock = new Mock>(); - var settings = new InstanceSettings(); + var settings = new InstanceSettings + { + ParallelFediversePosts = 1 + }; var saveProgressMock = new Mock(); var removeFollowerMock = new Mock(MockBehavior.Strict); @@ -246,7 +252,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var loggerMock = new Mock>(); var saveProgressMock = new Mock(); - var settings = new InstanceSettings(); + var settings = new InstanceSettings + { + ParallelFediversePosts = 1 + }; var removeFollowerMock = new Mock(MockBehavior.Strict); #endregion @@ -335,7 +344,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var loggerMock = new Mock>(); var saveProgressMock = new Mock(); - var settings = new InstanceSettings(); + var settings = new InstanceSettings + { + ParallelFediversePosts = 1 + }; var removeFollowerMock = new Mock(MockBehavior.Strict); #endregion @@ -429,7 +441,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var loggerMock = new Mock>(); var saveProgressMock = new Mock(); - var settings = new InstanceSettings(); + var settings = new InstanceSettings + { + ParallelFediversePosts = 1 + }; var removeFollowerMock = new Mock(MockBehavior.Strict); #endregion @@ -506,7 +521,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var loggerMock = new Mock>(); var saveProgressMock = new Mock(); - var settings = new InstanceSettings(); + var settings = new InstanceSettings + { + ParallelFediversePosts = 1 + }; var removeFollowerMock = new Mock(MockBehavior.Strict); #endregion @@ -584,7 +602,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var loggerMock = new Mock>(); var saveProgressMock = new Mock(); - var settings = new InstanceSettings(); + var settings = new InstanceSettings + { + ParallelFediversePosts = 1 + }; var removeFollowerMock = new Mock(MockBehavior.Strict); #endregion @@ -670,7 +691,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var loggerMock = new Mock>(); var saveProgressMock = new Mock(); - var settings = new InstanceSettings(); + var settings = new InstanceSettings + { + ParallelFediversePosts = 1 + }; var removeFollowerMock = new Mock(MockBehavior.Strict); #endregion @@ -755,7 +779,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var settings = new InstanceSettings { - FailingFollowerCleanUpThreshold = 10 + FailingFollowerCleanUpThreshold = 10, + ParallelFediversePosts = 1 }; var removeFollowerMock = new Mock(MockBehavior.Strict); @@ -844,7 +869,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var settings = new InstanceSettings { - FailingFollowerCleanUpThreshold = 0 + FailingFollowerCleanUpThreshold = 0, + ParallelFediversePosts = 1 }; var removeFollowerMock = new Mock(MockBehavior.Strict); @@ -935,7 +961,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var loggerMock = new Mock>(); var saveProgressMock = new Mock(); - var settings = new InstanceSettings(); + var settings = new InstanceSettings + { + ParallelFediversePosts = 1 + }; var removeFollowerMock = new Mock(MockBehavior.Strict); #endregion @@ -1027,8 +1056,11 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var loggerMock = new Mock>(); var saveProgressMock = new Mock(); - var settings = new InstanceSettings(); - + var settings = new InstanceSettings + { + ParallelFediversePosts = 1 + }; + var removeFollowerMock = new Mock(MockBehavior.Strict); #endregion