From 240dfd19025f183c7db11af561826db2f4510f8b Mon Sep 17 00:00:00 2001 From: Vincent Cloutier Date: Fri, 17 Mar 2023 16:14:30 -0400 Subject: [PATCH] pipeline tweaks --- src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs index 045c673..5638a96 100644 --- a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs +++ b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs @@ -39,15 +39,15 @@ namespace BirdsiteLive.Pipeline public async Task ExecuteAsync(CancellationToken ct) { - var standardBlockOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }; + var standardBlockOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1, CancellationToken = ct}; // Create blocks var twitterUserToRefreshBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct }); - var retrieveTweetsBlock = new TransformBlock(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 } ); + var retrieveTweetsBlock = new TransformBlock(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), standardBlockOptions ); var retrieveTweetsBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); var retrieveFollowersBlock = new TransformManyBlock(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions); var retrieveFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 500, CancellationToken = ct }); - var sendTweetsToFollowersBlock = new ActionBlock(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, CancellationToken = ct, BoundedCapacity = 1 }); + var sendTweetsToFollowersBlock = new ActionBlock(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions); // Link pipeline twitterUserToRefreshBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true });