cloutier--bird.makeup/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs

74 lines
5 KiB
C#
Raw Normal View History

2020-07-16 01:19:41 -04:00
using System;
using System.Runtime.CompilerServices;
2020-07-16 01:19:41 -04:00
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Pipeline.Contracts;
using BirdsiteLive.Pipeline.Models;
2021-01-16 00:34:09 -05:00
using Microsoft.Extensions.Logging;
2020-07-16 01:19:41 -04:00
namespace BirdsiteLive.Pipeline
{
public interface IStatusPublicationPipeline
{
Task ExecuteAsync(CancellationToken ct);
}
public class StatusPublicationPipeline : IStatusPublicationPipeline
{
2020-07-18 23:35:19 -04:00
private readonly IRetrieveTwitterUsersProcessor _retrieveTwitterAccountsProcessor;
2020-07-16 01:19:41 -04:00
private readonly IRetrieveTweetsProcessor _retrieveTweetsProcessor;
private readonly IRetrieveFollowersProcessor _retrieveFollowersProcessor;
private readonly ISendTweetsToFollowersProcessor _sendTweetsToFollowersProcessor;
2021-01-23 18:20:13 -05:00
private readonly ISaveProgressionProcessor _saveProgressionProcessor;
2021-01-16 00:34:09 -05:00
private readonly ILogger<StatusPublicationPipeline> _logger;
2020-07-16 01:19:41 -04:00
#region Ctor
2023-01-01 15:18:54 -05:00
public StatusPublicationPipeline(IRetrieveTweetsProcessor retrieveTweetsProcessor, IRetrieveTwitterUsersProcessor retrieveTwitterAccountsProcessor, IRetrieveFollowersProcessor retrieveFollowersProcessor, ISendTweetsToFollowersProcessor sendTweetsToFollowersProcessor, ISaveProgressionProcessor saveProgressionProcessor, ILogger<StatusPublicationPipeline> logger)
2020-07-16 01:19:41 -04:00
{
_retrieveTweetsProcessor = retrieveTweetsProcessor;
2020-07-18 23:35:19 -04:00
_retrieveFollowersProcessor = retrieveFollowersProcessor;
_sendTweetsToFollowersProcessor = sendTweetsToFollowersProcessor;
2021-01-23 18:20:13 -05:00
_saveProgressionProcessor = saveProgressionProcessor;
2023-01-01 15:18:54 -05:00
_retrieveTwitterAccountsProcessor = retrieveTwitterAccountsProcessor;
2021-01-23 18:20:13 -05:00
2021-01-16 00:34:09 -05:00
_logger = logger;
2020-07-16 01:19:41 -04:00
}
#endregion
public async Task ExecuteAsync(CancellationToken ct)
{
2023-01-03 14:50:05 -05:00
var standardBlockOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1 };
2020-07-16 01:19:41 -04:00
// Create blocks
2023-01-01 15:18:54 -05:00
var twitterUserToRefreshBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions
{ BoundedCapacity = 1, CancellationToken = ct });
2023-01-10 21:00:21 -05:00
var retrieveTweetsBlock = new TransformBlock<UserWithDataToSync[], UserWithDataToSync[]>(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 } );
2023-01-10 20:30:07 -05:00
var retrieveTweetsBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
2023-01-03 14:50:05 -05:00
var retrieveFollowersBlock = new TransformManyBlock<UserWithDataToSync[], UserWithDataToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions);
2023-01-20 14:17:32 -05:00
var retrieveFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 500, CancellationToken = ct });
2023-01-10 20:30:07 -05:00
var sendTweetsToFollowersBlock = new TransformBlock<UserWithDataToSync, UserWithDataToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10, CancellationToken = ct, BoundedCapacity = 1 });
2023-01-05 15:02:24 -05:00
var sendTweetsToFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
var saveProgressionBlock = new ActionBlock<UserWithDataToSync>(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10, CancellationToken = ct, BoundedCapacity = 1 });
2020-07-18 23:35:19 -04:00
2020-07-16 01:19:41 -04:00
// Link pipeline
2023-01-01 15:18:54 -05:00
twitterUserToRefreshBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true });
2020-08-01 19:26:00 -04:00
retrieveTweetsBlock.LinkTo(retrieveTweetsBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveTweetsBufferBlock.LinkTo(retrieveFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveFollowersBlock.LinkTo(retrieveFollowersBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveFollowersBufferBlock.LinkTo(sendTweetsToFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true });
2021-01-23 18:20:13 -05:00
sendTweetsToFollowersBlock.LinkTo(sendTweetsToFollowersBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
sendTweetsToFollowersBufferBlock.LinkTo(saveProgressionBlock, new DataflowLinkOptions { PropagateCompletion = true });
2020-07-16 01:19:41 -04:00
// Launch twitter user retriever
var retrieveTwitterAccountsTask = _retrieveTwitterAccountsProcessor.GetTwitterUsersAsync(twitterUserToRefreshBufferBlock, ct);
2020-07-16 01:19:41 -04:00
// Wait
2021-01-23 18:20:13 -05:00
await Task.WhenAny(new[] { retrieveTwitterAccountsTask, saveProgressionBlock.Completion });
2020-07-18 23:35:19 -04:00
2021-01-23 18:20:13 -05:00
var ex = retrieveTwitterAccountsTask.IsFaulted ? retrieveTwitterAccountsTask.Exception : saveProgressionBlock.Completion.Exception;
2021-01-16 00:34:09 -05:00
_logger.LogCritical(ex, "An error occurred, pipeline stopped");
2020-07-16 01:19:41 -04:00
}
}
}