diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs index af721d6..31b4ea4 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs @@ -15,15 +15,17 @@ namespace BirdsiteLive.Pipeline.Processors public class RetrieveTwitterUsersProcessor : IRetrieveTwitterUsersProcessor { private readonly ITwitterUserDal _twitterUserDal; + private readonly IFollowersDal _followersDal; private readonly ILogger _logger; private static Random rng = new Random(); public int WaitFactor = 1000 * 60; //1 min #region Ctor - public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, ILogger logger) + public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, IFollowersDal followersDal, ILogger logger) { _twitterUserDal = twitterUserDal; + _followersDal = followersDal; _logger = logger; } #endregion @@ -44,7 +46,11 @@ namespace BirdsiteLive.Pipeline.Processors foreach (var u in splitUsers) { ct.ThrowIfCancellationRequested(); - UserWithDataToSync[] toSync = u.Select(x => new UserWithDataToSync { User = x }).ToArray(); + UserWithDataToSync[] toSync = await Task.WhenAll( + u.Select(async x => new UserWithDataToSync + { User = x, Followers = await _followersDal.GetFollowersAsync(x.Id) } + ) + ); await twitterUsersBufferBlock.SendAsync(toSync, ct); } diff --git a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs index 630d158..80bb25a 100644 --- a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs @@ -49,10 +49,6 @@ namespace BirdsiteLive.Pipeline.Processors var t = Task.Run( async () => { - if (userWithTweetsToSync.Followers is null || userWithTweetsToSync.Followers.Length == 0) - { - userWithTweetsToSync.Followers = await _followersDal.GetFollowersAsync(user.Id); - } // Process Shared Inbox var followersWtSharedInbox = userWithTweetsToSync.Followers .Where(x => !string.IsNullOrWhiteSpace(x.SharedInboxRoute)) diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs index 244c85f..a9cddd3 100644 --- a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs +++ b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs @@ -38,10 +38,16 @@ namespace BirdsiteLive.Pipeline.Tests.Processors It.Is(y => true))) .ReturnsAsync(users); + var followersDalMock = new Mock(MockBehavior.Strict); + followersDalMock + .Setup(x => x.GetFollowersAsync(It.Is(x => true))) + .ReturnsAsync(new Follower[] {}); + var loggerMock = new Mock>(); + #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object); processor.WaitFactor = 10; var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None); @@ -79,10 +85,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors .ReturnsAsync(new SyncTwitterUser[0]) .ReturnsAsync(new SyncTwitterUser[0]); + var followersDalMock = new Mock(MockBehavior.Strict); + followersDalMock + .Setup(x => x.GetFollowersAsync(It.Is(x => true))) + .ReturnsAsync(new Follower[] {}); + var loggerMock = new Mock>(); #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object); processor.WaitFactor = 2; var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None); @@ -120,10 +131,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors .ReturnsAsync(new SyncTwitterUser[0]) .ReturnsAsync(new SyncTwitterUser[0]); + var followersDalMock = new Mock(MockBehavior.Strict); + followersDalMock + .Setup(x => x.GetFollowersAsync(It.Is(x => true))) + .ReturnsAsync(new Follower[] {}); + var loggerMock = new Mock>(); #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object); processor.WaitFactor = 2; var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None); @@ -154,10 +170,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors It.Is(y => true))) .ReturnsAsync(new SyncTwitterUser[0]); + var followersDalMock = new Mock(MockBehavior.Strict); + followersDalMock + .Setup(x => x.GetFollowersAsync(It.Is(x => true))) + .ReturnsAsync(new Follower[] {}); + var loggerMock = new Mock>(); #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object); processor.WaitFactor = 1; var t =processor.GetTwitterUsersAsync(buffer, CancellationToken.None); @@ -185,10 +206,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors It.Is(y => true))) .Returns(async () => await DelayFaultedTask(new Exception())); + var followersDalMock = new Mock(MockBehavior.Strict); + followersDalMock + .Setup(x => x.GetFollowersAsync(It.Is(x => true))) + .ReturnsAsync(new Follower[] {}); + var loggerMock = new Mock>(); #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object); processor.WaitFactor = 10; var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None); @@ -215,10 +241,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #region Mocks var twitterUserDalMock = new Mock(MockBehavior.Strict); + var followersDalMock = new Mock(MockBehavior.Strict); + followersDalMock + .Setup(x => x.GetFollowersAsync(It.Is(x => true))) + .ReturnsAsync(new Follower[] {}); + var loggerMock = new Mock>(); #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object); processor.WaitFactor = 1; await processor.GetTwitterUsersAsync(buffer, canTokenS.Token); }