boundedcapacity change
This commit is contained in:
parent
43fb42727e
commit
5c75e79abc
1 changed files with 5 additions and 4 deletions
|
@ -39,16 +39,17 @@ namespace BirdsiteLive.Pipeline
|
||||||
|
|
||||||
public async Task ExecuteAsync(CancellationToken ct)
|
public async Task ExecuteAsync(CancellationToken ct)
|
||||||
{
|
{
|
||||||
|
var standardBlockOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1 };
|
||||||
// Create blocks
|
// Create blocks
|
||||||
var twitterUserToRefreshBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions
|
var twitterUserToRefreshBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions
|
||||||
{ BoundedCapacity = 1, CancellationToken = ct });
|
{ BoundedCapacity = 1, CancellationToken = ct });
|
||||||
var retrieveTweetsBlock = new TransformBlock<UserWithDataToSync[], UserWithDataToSync[]>(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct));
|
var retrieveTweetsBlock = new TransformBlock<UserWithDataToSync[], UserWithDataToSync[]>(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), standardBlockOptions );
|
||||||
var retrieveTweetsBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions { BoundedCapacity = 2, CancellationToken = ct });
|
var retrieveTweetsBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions { BoundedCapacity = 2, CancellationToken = ct });
|
||||||
var retrieveFollowersBlock = new TransformManyBlock<UserWithDataToSync[], UserWithDataToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct));
|
var retrieveFollowersBlock = new TransformManyBlock<UserWithDataToSync[], UserWithDataToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions);
|
||||||
var retrieveFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = ct });
|
var retrieveFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = ct });
|
||||||
var sendTweetsToFollowersBlock = new TransformBlock<UserWithDataToSync, UserWithDataToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct });
|
var sendTweetsToFollowersBlock = new TransformBlock<UserWithDataToSync, UserWithDataToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct, BoundedCapacity = 1 });
|
||||||
var sendTweetsToFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = ct });
|
var sendTweetsToFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = ct });
|
||||||
var saveProgressionBlock = new ActionBlock<UserWithDataToSync>(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct });
|
var saveProgressionBlock = new ActionBlock<UserWithDataToSync>(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct, BoundedCapacity = 1 });
|
||||||
|
|
||||||
// Link pipeline
|
// Link pipeline
|
||||||
twitterUserToRefreshBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true });
|
twitterUserToRefreshBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true });
|
||||||
|
|
Loading…
Add table
Reference in a new issue