moved followers retrieval 2
This commit is contained in:
parent
8d6851c639
commit
9551c735ea
3 changed files with 45 additions and 12 deletions
|
@ -15,15 +15,17 @@ namespace BirdsiteLive.Pipeline.Processors
|
||||||
public class RetrieveTwitterUsersProcessor : IRetrieveTwitterUsersProcessor
|
public class RetrieveTwitterUsersProcessor : IRetrieveTwitterUsersProcessor
|
||||||
{
|
{
|
||||||
private readonly ITwitterUserDal _twitterUserDal;
|
private readonly ITwitterUserDal _twitterUserDal;
|
||||||
|
private readonly IFollowersDal _followersDal;
|
||||||
private readonly ILogger<RetrieveTwitterUsersProcessor> _logger;
|
private readonly ILogger<RetrieveTwitterUsersProcessor> _logger;
|
||||||
private static Random rng = new Random();
|
private static Random rng = new Random();
|
||||||
|
|
||||||
public int WaitFactor = 1000 * 60; //1 min
|
public int WaitFactor = 1000 * 60; //1 min
|
||||||
|
|
||||||
#region Ctor
|
#region Ctor
|
||||||
public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, ILogger<RetrieveTwitterUsersProcessor> logger)
|
public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, IFollowersDal followersDal, ILogger<RetrieveTwitterUsersProcessor> logger)
|
||||||
{
|
{
|
||||||
_twitterUserDal = twitterUserDal;
|
_twitterUserDal = twitterUserDal;
|
||||||
|
_followersDal = followersDal;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
}
|
}
|
||||||
#endregion
|
#endregion
|
||||||
|
@ -44,7 +46,11 @@ namespace BirdsiteLive.Pipeline.Processors
|
||||||
foreach (var u in splitUsers)
|
foreach (var u in splitUsers)
|
||||||
{
|
{
|
||||||
ct.ThrowIfCancellationRequested();
|
ct.ThrowIfCancellationRequested();
|
||||||
UserWithDataToSync[] toSync = u.Select(x => new UserWithDataToSync { User = x }).ToArray();
|
UserWithDataToSync[] toSync = await Task.WhenAll(
|
||||||
|
u.Select(async x => new UserWithDataToSync
|
||||||
|
{ User = x, Followers = await _followersDal.GetFollowersAsync(x.Id) }
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
await twitterUsersBufferBlock.SendAsync(toSync, ct);
|
await twitterUsersBufferBlock.SendAsync(toSync, ct);
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,10 +49,6 @@ namespace BirdsiteLive.Pipeline.Processors
|
||||||
|
|
||||||
var t = Task.Run( async () =>
|
var t = Task.Run( async () =>
|
||||||
{
|
{
|
||||||
if (userWithTweetsToSync.Followers is null || userWithTweetsToSync.Followers.Length == 0)
|
|
||||||
{
|
|
||||||
userWithTweetsToSync.Followers = await _followersDal.GetFollowersAsync(user.Id);
|
|
||||||
}
|
|
||||||
// Process Shared Inbox
|
// Process Shared Inbox
|
||||||
var followersWtSharedInbox = userWithTweetsToSync.Followers
|
var followersWtSharedInbox = userWithTweetsToSync.Followers
|
||||||
.Where(x => !string.IsNullOrWhiteSpace(x.SharedInboxRoute))
|
.Where(x => !string.IsNullOrWhiteSpace(x.SharedInboxRoute))
|
||||||
|
|
|
@ -38,10 +38,16 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
|
||||||
It.Is<int>(y => true)))
|
It.Is<int>(y => true)))
|
||||||
.ReturnsAsync(users);
|
.ReturnsAsync(users);
|
||||||
|
|
||||||
|
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
|
||||||
|
followersDalMock
|
||||||
|
.Setup(x => x.GetFollowersAsync(It.Is<int>(x => true)))
|
||||||
|
.ReturnsAsync(new Follower[] {});
|
||||||
|
|
||||||
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
|
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
|
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object);
|
||||||
processor.WaitFactor = 10;
|
processor.WaitFactor = 10;
|
||||||
var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
|
var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
|
||||||
|
|
||||||
|
@ -79,10 +85,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
|
||||||
.ReturnsAsync(new SyncTwitterUser[0])
|
.ReturnsAsync(new SyncTwitterUser[0])
|
||||||
.ReturnsAsync(new SyncTwitterUser[0]);
|
.ReturnsAsync(new SyncTwitterUser[0]);
|
||||||
|
|
||||||
|
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
|
||||||
|
followersDalMock
|
||||||
|
.Setup(x => x.GetFollowersAsync(It.Is<int>(x => true)))
|
||||||
|
.ReturnsAsync(new Follower[] {});
|
||||||
|
|
||||||
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
|
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
|
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object);
|
||||||
processor.WaitFactor = 2;
|
processor.WaitFactor = 2;
|
||||||
var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
|
var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
|
||||||
|
|
||||||
|
@ -120,10 +131,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
|
||||||
.ReturnsAsync(new SyncTwitterUser[0])
|
.ReturnsAsync(new SyncTwitterUser[0])
|
||||||
.ReturnsAsync(new SyncTwitterUser[0]);
|
.ReturnsAsync(new SyncTwitterUser[0]);
|
||||||
|
|
||||||
|
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
|
||||||
|
followersDalMock
|
||||||
|
.Setup(x => x.GetFollowersAsync(It.Is<int>(x => true)))
|
||||||
|
.ReturnsAsync(new Follower[] {});
|
||||||
|
|
||||||
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
|
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
|
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object);
|
||||||
processor.WaitFactor = 2;
|
processor.WaitFactor = 2;
|
||||||
var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
|
var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
|
||||||
|
|
||||||
|
@ -154,10 +170,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
|
||||||
It.Is<int>(y => true)))
|
It.Is<int>(y => true)))
|
||||||
.ReturnsAsync(new SyncTwitterUser[0]);
|
.ReturnsAsync(new SyncTwitterUser[0]);
|
||||||
|
|
||||||
|
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
|
||||||
|
followersDalMock
|
||||||
|
.Setup(x => x.GetFollowersAsync(It.Is<int>(x => true)))
|
||||||
|
.ReturnsAsync(new Follower[] {});
|
||||||
|
|
||||||
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
|
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
|
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object);
|
||||||
processor.WaitFactor = 1;
|
processor.WaitFactor = 1;
|
||||||
var t =processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
|
var t =processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
|
||||||
|
|
||||||
|
@ -185,10 +206,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
|
||||||
It.Is<int>(y => true)))
|
It.Is<int>(y => true)))
|
||||||
.Returns(async () => await DelayFaultedTask<SyncTwitterUser[]>(new Exception()));
|
.Returns(async () => await DelayFaultedTask<SyncTwitterUser[]>(new Exception()));
|
||||||
|
|
||||||
|
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
|
||||||
|
followersDalMock
|
||||||
|
.Setup(x => x.GetFollowersAsync(It.Is<int>(x => true)))
|
||||||
|
.ReturnsAsync(new Follower[] {});
|
||||||
|
|
||||||
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
|
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
|
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object);
|
||||||
processor.WaitFactor = 10;
|
processor.WaitFactor = 10;
|
||||||
var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
|
var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None);
|
||||||
|
|
||||||
|
@ -215,10 +241,15 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
|
||||||
#region Mocks
|
#region Mocks
|
||||||
var twitterUserDalMock = new Mock<ITwitterUserDal>(MockBehavior.Strict);
|
var twitterUserDalMock = new Mock<ITwitterUserDal>(MockBehavior.Strict);
|
||||||
|
|
||||||
|
var followersDalMock = new Mock<IFollowersDal>(MockBehavior.Strict);
|
||||||
|
followersDalMock
|
||||||
|
.Setup(x => x.GetFollowersAsync(It.Is<int>(x => true)))
|
||||||
|
.ReturnsAsync(new Follower[] {});
|
||||||
|
|
||||||
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
|
var loggerMock = new Mock<ILogger<RetrieveTwitterUsersProcessor>>();
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object);
|
var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object);
|
||||||
processor.WaitFactor = 1;
|
processor.WaitFactor = 1;
|
||||||
await processor.GetTwitterUsersAsync(buffer, canTokenS.Token);
|
await processor.GetTwitterUsersAsync(buffer, canTokenS.Token);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue