cloutier--bird.makeup/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs

63 lines
3.5 KiB
C#
Raw Normal View History

2020-07-16 01:19:41 -04:00
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Pipeline.Contracts;
using BirdsiteLive.Pipeline.Models;
namespace BirdsiteLive.Pipeline
{
public interface IStatusPublicationPipeline
{
Task ExecuteAsync(CancellationToken ct);
}
public class StatusPublicationPipeline : IStatusPublicationPipeline
{
2020-07-18 23:35:19 -04:00
private readonly IRetrieveTwitterUsersProcessor _retrieveTwitterAccountsProcessor;
2020-07-16 01:19:41 -04:00
private readonly IRetrieveTweetsProcessor _retrieveTweetsProcessor;
private readonly IRetrieveFollowersProcessor _retrieveFollowersProcessor;
private readonly ISendTweetsToFollowersProcessor _sendTweetsToFollowersProcessor;
#region Ctor
2020-07-18 23:35:19 -04:00
public StatusPublicationPipeline(IRetrieveTweetsProcessor retrieveTweetsProcessor, IRetrieveTwitterUsersProcessor retrieveTwitterAccountsProcessor, IRetrieveFollowersProcessor retrieveFollowersProcessor, ISendTweetsToFollowersProcessor sendTweetsToFollowersProcessor)
2020-07-16 01:19:41 -04:00
{
_retrieveTweetsProcessor = retrieveTweetsProcessor;
2020-07-18 23:35:19 -04:00
_retrieveTwitterAccountsProcessor = retrieveTwitterAccountsProcessor;
_retrieveFollowersProcessor = retrieveFollowersProcessor;
_sendTweetsToFollowersProcessor = sendTweetsToFollowersProcessor;
2020-07-16 01:19:41 -04:00
}
#endregion
public async Task ExecuteAsync(CancellationToken ct)
{
// Create blocks
var twitterUsersBufferBlock = new BufferBlock<SyncTwitterUser[]>(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct});
var retrieveTweetsBlock = new TransformBlock<SyncTwitterUser[], UserWithTweetsToSync[]>(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct));
var retrieveTweetsBufferBlock = new BufferBlock<UserWithTweetsToSync[]>(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct });
var retrieveFollowersBlock = new TransformManyBlock<UserWithTweetsToSync[], UserWithTweetsToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct));
var retrieveFollowersBufferBlock = new BufferBlock<UserWithTweetsToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
var sendTweetsToFollowersBlock = new ActionBlock<UserWithTweetsToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct});
2020-07-18 23:35:19 -04:00
2020-07-16 01:19:41 -04:00
// Link pipeline
2020-07-18 23:35:19 -04:00
twitterUsersBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions {PropagateCompletion = true});
retrieveTweetsBlock.LinkTo(retrieveTweetsBufferBlock);
retrieveTweetsBufferBlock.LinkTo(retrieveFollowersBlock);
retrieveFollowersBlock.LinkTo(retrieveFollowersBufferBlock);
retrieveFollowersBufferBlock.LinkTo(sendTweetsToFollowersBlock);
2020-07-16 01:19:41 -04:00
// Launch twitter user retriever
2020-07-18 23:35:19 -04:00
var retrieveTwitterAccountsTask = _retrieveTwitterAccountsProcessor.GetTwitterUsersAsync(twitterUsersBufferBlock, ct);
2020-07-16 01:19:41 -04:00
// Wait
2020-07-18 23:35:19 -04:00
await Task.WhenAll(retrieveTwitterAccountsTask, sendTweetsToFollowersBlock.Completion);
var foreground = Console.ForegroundColor;
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("An error occured, pipeline stopped");
Console.ForegroundColor = foreground;
2020-07-16 01:19:41 -04:00
}
}
}