2020-07-16 01:19:41 -04:00
using System ;
2021-09-05 13:58:33 -04:00
using System.Runtime.CompilerServices ;
2020-07-16 01:19:41 -04:00
using System.Threading ;
using System.Threading.Tasks ;
using System.Threading.Tasks.Dataflow ;
using BirdsiteLive.DAL.Models ;
using BirdsiteLive.Pipeline.Contracts ;
using BirdsiteLive.Pipeline.Models ;
2021-01-16 00:34:09 -05:00
using Microsoft.Extensions.Logging ;
2020-07-16 01:19:41 -04:00
namespace BirdsiteLive.Pipeline
{
public interface IStatusPublicationPipeline
{
Task ExecuteAsync ( CancellationToken ct ) ;
}
public class StatusPublicationPipeline : IStatusPublicationPipeline
{
2020-07-18 23:35:19 -04:00
private readonly IRetrieveTwitterUsersProcessor _retrieveTwitterAccountsProcessor ;
2020-07-16 01:19:41 -04:00
private readonly IRetrieveTweetsProcessor _retrieveTweetsProcessor ;
private readonly IRetrieveFollowersProcessor _retrieveFollowersProcessor ;
private readonly ISendTweetsToFollowersProcessor _sendTweetsToFollowersProcessor ;
2023-02-22 11:30:02 -05:00
private readonly ISaveProgressionTask _saveProgressionTask ;
2021-01-16 00:34:09 -05:00
private readonly ILogger < StatusPublicationPipeline > _logger ;
2020-07-16 01:19:41 -04:00
#region Ctor
2023-02-22 11:30:02 -05:00
public StatusPublicationPipeline ( IRetrieveTweetsProcessor retrieveTweetsProcessor , IRetrieveTwitterUsersProcessor retrieveTwitterAccountsProcessor , IRetrieveFollowersProcessor retrieveFollowersProcessor , ISendTweetsToFollowersProcessor sendTweetsToFollowersProcessor , ISaveProgressionTask saveProgressionTask , ILogger < StatusPublicationPipeline > logger )
2020-07-16 01:19:41 -04:00
{
_retrieveTweetsProcessor = retrieveTweetsProcessor ;
2020-07-18 23:35:19 -04:00
_retrieveFollowersProcessor = retrieveFollowersProcessor ;
_sendTweetsToFollowersProcessor = sendTweetsToFollowersProcessor ;
2023-02-22 11:30:02 -05:00
_saveProgressionTask = saveProgressionTask ;
2023-01-01 15:18:54 -05:00
_retrieveTwitterAccountsProcessor = retrieveTwitterAccountsProcessor ;
2021-01-23 18:20:13 -05:00
2021-01-16 00:34:09 -05:00
_logger = logger ;
2020-07-16 01:19:41 -04:00
}
#endregion
public async Task ExecuteAsync ( CancellationToken ct )
{
2023-03-17 16:14:30 -04:00
var standardBlockOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1 , MaxDegreeOfParallelism = 1 , CancellationToken = ct } ;
2020-07-16 01:19:41 -04:00
// Create blocks
2023-01-01 15:18:54 -05:00
var twitterUserToRefreshBufferBlock = new BufferBlock < UserWithDataToSync [ ] > ( new DataflowBlockOptions
2021-09-05 13:58:33 -04:00
{ BoundedCapacity = 1 , CancellationToken = ct } ) ;
2023-03-17 16:14:30 -04:00
var retrieveTweetsBlock = new TransformBlock < UserWithDataToSync [ ] , UserWithDataToSync [ ] > ( async x = > await _retrieveTweetsProcessor . ProcessAsync ( x , ct ) , standardBlockOptions ) ;
2023-03-29 19:03:22 -04:00
var retrieveTweetsBufferBlock = new BufferBlock < UserWithDataToSync [ ] > ( new DataflowBlockOptions { BoundedCapacity = 2 , CancellationToken = ct } ) ;
2023-03-25 15:26:11 -04:00
// var retrieveFollowersBlock = new TransformManyBlock<UserWithDataToSync[], UserWithDataToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1 } );
// var retrieveFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 500, CancellationToken = ct });
var sendTweetsToFollowersBlock = new ActionBlock < UserWithDataToSync [ ] > ( async x = > await _sendTweetsToFollowersProcessor . ProcessAsync ( x , ct ) , standardBlockOptions ) ;
2020-07-18 23:35:19 -04:00
2020-07-16 01:19:41 -04:00
// Link pipeline
2023-01-01 15:18:54 -05:00
twitterUserToRefreshBufferBlock . LinkTo ( retrieveTweetsBlock , new DataflowLinkOptions { PropagateCompletion = true } ) ;
2020-08-01 19:26:00 -04:00
retrieveTweetsBlock . LinkTo ( retrieveTweetsBufferBlock , new DataflowLinkOptions { PropagateCompletion = true } ) ;
2023-03-25 15:26:11 -04:00
retrieveTweetsBufferBlock . LinkTo ( sendTweetsToFollowersBlock , new DataflowLinkOptions { PropagateCompletion = true } ) ;
2020-07-16 01:19:41 -04:00
2023-03-17 15:10:53 -04:00
// Launch twitter user retriever after a little delay
// to give time for the Tweet cache to fill
await Task . Delay ( 30 * 1000 , ct ) ;
2021-09-05 13:58:33 -04:00
var retrieveTwitterAccountsTask = _retrieveTwitterAccountsProcessor . GetTwitterUsersAsync ( twitterUserToRefreshBufferBlock , ct ) ;
2020-07-16 01:19:41 -04:00
// Wait
2023-02-22 11:30:02 -05:00
await Task . WhenAny ( new [ ] { retrieveTwitterAccountsTask , sendTweetsToFollowersBlock . Completion } ) ;
2020-07-18 23:35:19 -04:00
2023-02-22 11:30:02 -05:00
var ex = retrieveTwitterAccountsTask . IsFaulted ? retrieveTwitterAccountsTask . Exception : sendTweetsToFollowersBlock . Completion . Exception ;
2021-01-16 00:34:09 -05:00
_logger . LogCritical ( ex , "An error occurred, pipeline stopped" ) ;
2020-07-16 01:19:41 -04:00
}
}
}