now fetching twitter feed in parallel
This commit is contained in:
parent
702bb3b042
commit
d1018881ec
3 changed files with 6 additions and 5 deletions
|
@ -59,7 +59,7 @@ namespace BirdsiteLive.Pipeline.Processors
|
||||||
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, user.FetchingErrorCount, now);
|
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, user.FetchingErrorCount, now);
|
||||||
}
|
}
|
||||||
|
|
||||||
//await Task.Delay(150);
|
await Task.Delay(250);
|
||||||
}
|
}
|
||||||
|
|
||||||
return usersWtTweets.ToArray();
|
return usersWtTweets.ToArray();
|
||||||
|
|
|
@ -18,6 +18,7 @@ namespace BirdsiteLive.Pipeline.Processors
|
||||||
private readonly ITwitterUserDal _twitterUserDal;
|
private readonly ITwitterUserDal _twitterUserDal;
|
||||||
private readonly IMaxUsersNumberProvider _maxUsersNumberProvider;
|
private readonly IMaxUsersNumberProvider _maxUsersNumberProvider;
|
||||||
private readonly ILogger<RetrieveTwitterUsersProcessor> _logger;
|
private readonly ILogger<RetrieveTwitterUsersProcessor> _logger;
|
||||||
|
private static Random rng = new Random();
|
||||||
|
|
||||||
public int WaitFactor = 1000 * 60; //1 min
|
public int WaitFactor = 1000 * 60; //1 min
|
||||||
|
|
||||||
|
@ -41,7 +42,7 @@ namespace BirdsiteLive.Pipeline.Processors
|
||||||
var users = await _twitterUserDal.GetAllTwitterUsersWithFollowersAsync(500);
|
var users = await _twitterUserDal.GetAllTwitterUsersWithFollowersAsync(500);
|
||||||
|
|
||||||
var userCount = users.Any() ? Math.Min(users.Length, 25) : 1;
|
var userCount = users.Any() ? Math.Min(users.Length, 25) : 1;
|
||||||
var splitUsers = users.Split(userCount).ToList();
|
var splitUsers = users.OrderBy(a => rng.Next()).ToArray().Split(userCount).ToList();
|
||||||
|
|
||||||
foreach (var u in splitUsers)
|
foreach (var u in splitUsers)
|
||||||
{
|
{
|
||||||
|
|
|
@ -43,11 +43,11 @@ namespace BirdsiteLive.Pipeline
|
||||||
// Create blocks
|
// Create blocks
|
||||||
var twitterUserToRefreshBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions
|
var twitterUserToRefreshBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions
|
||||||
{ BoundedCapacity = 1, CancellationToken = ct });
|
{ BoundedCapacity = 1, CancellationToken = ct });
|
||||||
var retrieveTweetsBlock = new TransformBlock<UserWithDataToSync[], UserWithDataToSync[]>(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), standardBlockOptions );
|
var retrieveTweetsBlock = new TransformBlock<UserWithDataToSync[], UserWithDataToSync[]>(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 2 } );
|
||||||
var retrieveTweetsBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = ct });
|
var retrieveTweetsBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
|
||||||
var retrieveFollowersBlock = new TransformManyBlock<UserWithDataToSync[], UserWithDataToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions);
|
var retrieveFollowersBlock = new TransformManyBlock<UserWithDataToSync[], UserWithDataToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions);
|
||||||
var retrieveFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
|
var retrieveFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
|
||||||
var sendTweetsToFollowersBlock = new TransformBlock<UserWithDataToSync, UserWithDataToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct, BoundedCapacity = 1 });
|
var sendTweetsToFollowersBlock = new TransformBlock<UserWithDataToSync, UserWithDataToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10, CancellationToken = ct, BoundedCapacity = 1 });
|
||||||
var sendTweetsToFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
|
var sendTweetsToFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
|
||||||
var saveProgressionBlock = new ActionBlock<UserWithDataToSync>(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct, BoundedCapacity = 1 });
|
var saveProgressionBlock = new ActionBlock<UserWithDataToSync>(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct, BoundedCapacity = 1 });
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue