pipeline refactoring

This commit is contained in:
Vincent Cloutier 2023-03-25 15:26:11 -04:00
parent 9551c735ea
commit 46be9552e9
4 changed files with 45 additions and 42 deletions

View file

@ -6,6 +6,6 @@ namespace BirdsiteLive.Pipeline.Contracts
{ {
public interface ISendTweetsToFollowersProcessor public interface ISendTweetsToFollowersProcessor
{ {
Task ProcessAsync(UserWithDataToSync userWithTweetsToSync, CancellationToken ct); Task ProcessAsync(UserWithDataToSync[] usersWithTweetsToSync, CancellationToken ct);
} }
} }

View file

@ -41,34 +41,39 @@ namespace BirdsiteLive.Pipeline.Processors
} }
#endregion #endregion
public async Task ProcessAsync(UserWithDataToSync userWithTweetsToSync, CancellationToken ct) public async Task ProcessAsync(UserWithDataToSync[] usersWithTweetsToSync, CancellationToken ct)
{ {
var user = userWithTweetsToSync.User; foreach (var userWithTweetsToSync in usersWithTweetsToSync)
_todo = _todo.Where(x => !x.IsCompleted).ToList();
var t = Task.Run( async () =>
{ {
// Process Shared Inbox var user = userWithTweetsToSync.User;
var followersWtSharedInbox = userWithTweetsToSync.Followers
.Where(x => !string.IsNullOrWhiteSpace(x.SharedInboxRoute))
.ToList();
await ProcessFollowersWithSharedInboxAsync(userWithTweetsToSync.Tweets, followersWtSharedInbox, user);
// Process Inbox _todo = _todo.Where(x => !x.IsCompleted).ToList();
var followerWtInbox = userWithTweetsToSync.Followers
.Where(x => string.IsNullOrWhiteSpace(x.SharedInboxRoute)) var t = Task.Run( async () =>
.ToList(); {
await ProcessFollowersWithInboxAsync(userWithTweetsToSync.Tweets, followerWtInbox, user); // Process Shared Inbox
}, ct); var followersWtSharedInbox = userWithTweetsToSync.Followers
_todo.Add(t); .Where(x => !string.IsNullOrWhiteSpace(x.SharedInboxRoute))
.ToList();
await ProcessFollowersWithSharedInboxAsync(userWithTweetsToSync.Tweets, followersWtSharedInbox, user);
if (_todo.Count >= _instanceSettings.ParallelFediversePosts) // Process Inbox
{ var followerWtInbox = userWithTweetsToSync.Followers
await Task.WhenAny(_todo); .Where(x => string.IsNullOrWhiteSpace(x.SharedInboxRoute))
.ToList();
await ProcessFollowersWithInboxAsync(userWithTweetsToSync.Tweets, followerWtInbox, user);
_logger.LogInformation("Done sending " + userWithTweetsToSync.Tweets.Length + "tweets for user " + userWithTweetsToSync.User.Acct);
}, ct);
_todo.Add(t);
if (_todo.Count >= _instanceSettings.ParallelFediversePosts)
{
await Task.WhenAny(_todo);
}
} }
_logger.LogInformation("Done sending " + userWithTweetsToSync.Followers.Length + "tweets for user " + userWithTweetsToSync.User.Acct);
} }

View file

@ -45,16 +45,14 @@ namespace BirdsiteLive.Pipeline
{ BoundedCapacity = 1, CancellationToken = ct }); { BoundedCapacity = 1, CancellationToken = ct });
var retrieveTweetsBlock = new TransformBlock<UserWithDataToSync[], UserWithDataToSync[]>(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), standardBlockOptions ); var retrieveTweetsBlock = new TransformBlock<UserWithDataToSync[], UserWithDataToSync[]>(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), standardBlockOptions );
var retrieveTweetsBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); var retrieveTweetsBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
var retrieveFollowersBlock = new TransformManyBlock<UserWithDataToSync[], UserWithDataToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1 } ); // var retrieveFollowersBlock = new TransformManyBlock<UserWithDataToSync[], UserWithDataToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1 } );
var retrieveFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 500, CancellationToken = ct }); // var retrieveFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 500, CancellationToken = ct });
var sendTweetsToFollowersBlock = new ActionBlock<UserWithDataToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions); var sendTweetsToFollowersBlock = new ActionBlock<UserWithDataToSync[]>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions);
// Link pipeline // Link pipeline
twitterUserToRefreshBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true }); twitterUserToRefreshBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveTweetsBlock.LinkTo(retrieveTweetsBufferBlock, new DataflowLinkOptions { PropagateCompletion = true }); retrieveTweetsBlock.LinkTo(retrieveTweetsBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveTweetsBufferBlock.LinkTo(retrieveFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true }); retrieveTweetsBufferBlock.LinkTo(sendTweetsToFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveFollowersBlock.LinkTo(retrieveFollowersBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveFollowersBufferBlock.LinkTo(sendTweetsToFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true });
// Launch twitter user retriever after a little delay // Launch twitter user retriever after a little delay
// to give time for the Tweet cache to fill // to give time for the Tweet cache to fill

View file

@ -88,7 +88,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
#endregion #endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None); await processor.ProcessAsync(new[] {userWithTweets}, CancellationToken.None);
#region Validations #region Validations
sendTweetsToInboxTaskMock.VerifyAll(); sendTweetsToInboxTaskMock.VerifyAll();
@ -170,7 +170,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
#endregion #endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None); await processor.ProcessAsync(new[] {userWithTweets}, CancellationToken.None);
#region Validations #region Validations
sendTweetsToInboxTaskMock.VerifyAll(); sendTweetsToInboxTaskMock.VerifyAll();
@ -261,7 +261,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
#endregion #endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None); await processor.ProcessAsync(new[] {userWithTweets}, CancellationToken.None);
#region Validations #region Validations
sendTweetsToInboxTaskMock.VerifyAll(); sendTweetsToInboxTaskMock.VerifyAll();
@ -353,7 +353,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
#endregion #endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None); await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None);
#region Validations #region Validations
sendTweetsToInboxTaskMock.VerifyAll(); sendTweetsToInboxTaskMock.VerifyAll();
@ -450,7 +450,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
#endregion #endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None); await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None);
#region Validations #region Validations
sendTweetsToInboxTaskMock.VerifyAll(); sendTweetsToInboxTaskMock.VerifyAll();
@ -530,7 +530,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
#endregion #endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None); await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None);
#region Validations #region Validations
sendTweetsToInboxTaskMock.VerifyAll(); sendTweetsToInboxTaskMock.VerifyAll();
@ -611,7 +611,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
#endregion #endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None); await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None);
#region Validations #region Validations
sendTweetsToInboxTaskMock.VerifyAll(); sendTweetsToInboxTaskMock.VerifyAll();
@ -700,7 +700,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
#endregion #endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None); await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None);
#region Validations #region Validations
sendTweetsToInboxTaskMock.VerifyAll(); sendTweetsToInboxTaskMock.VerifyAll();
@ -790,7 +790,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
#endregion #endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None); await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None);
#region Validations #region Validations
sendTweetsToInboxTaskMock.VerifyAll(); sendTweetsToInboxTaskMock.VerifyAll();
@ -880,7 +880,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
#endregion #endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None); await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None);
#region Validations #region Validations
sendTweetsToInboxTaskMock.VerifyAll(); sendTweetsToInboxTaskMock.VerifyAll();
@ -970,7 +970,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
#endregion #endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None); await processor.ProcessAsync(new [] {userWithTweets}, CancellationToken.None);
#region Validations #region Validations
sendTweetsToInboxTaskMock.VerifyAll(); sendTweetsToInboxTaskMock.VerifyAll();
@ -1065,7 +1065,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
#endregion #endregion
var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object); var processor = new SendTweetsToFollowersProcessor(sendTweetsToInboxTaskMock.Object, sendTweetsToSharedInboxTaskMock.Object, followersDalMock.Object, loggerMock.Object, settings, removeFollowerMock.Object);
await processor.ProcessAsync(userWithTweets, CancellationToken.None); await processor.ProcessAsync(new [] { userWithTweets }, CancellationToken.None);
#region Validations #region Validations
sendTweetsToInboxTaskMock.VerifyAll(); sendTweetsToInboxTaskMock.VerifyAll();