2020-07-16 01:19:41 -04:00
|
|
|
|
using System;
|
2021-09-05 13:58:33 -04:00
|
|
|
|
using System.Runtime.CompilerServices;
|
2020-07-16 01:19:41 -04:00
|
|
|
|
using System.Threading;
|
|
|
|
|
using System.Threading.Tasks;
|
|
|
|
|
using System.Threading.Tasks.Dataflow;
|
|
|
|
|
using BirdsiteLive.DAL.Models;
|
|
|
|
|
using BirdsiteLive.Pipeline.Contracts;
|
|
|
|
|
using BirdsiteLive.Pipeline.Models;
|
2021-01-16 00:34:09 -05:00
|
|
|
|
using Microsoft.Extensions.Logging;
|
2020-07-16 01:19:41 -04:00
|
|
|
|
|
|
|
|
|
namespace BirdsiteLive.Pipeline
|
|
|
|
|
{
|
|
|
|
|
public interface IStatusPublicationPipeline
|
|
|
|
|
{
|
|
|
|
|
Task ExecuteAsync(CancellationToken ct);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public class StatusPublicationPipeline : IStatusPublicationPipeline
|
|
|
|
|
{
|
2020-07-18 23:35:19 -04:00
|
|
|
|
private readonly IRetrieveTwitterUsersProcessor _retrieveTwitterAccountsProcessor;
|
2020-07-16 01:19:41 -04:00
|
|
|
|
private readonly IRetrieveTweetsProcessor _retrieveTweetsProcessor;
|
|
|
|
|
private readonly IRetrieveFollowersProcessor _retrieveFollowersProcessor;
|
|
|
|
|
private readonly ISendTweetsToFollowersProcessor _sendTweetsToFollowersProcessor;
|
2023-02-22 11:30:02 -05:00
|
|
|
|
private readonly ISaveProgressionTask _saveProgressionTask;
|
2021-01-16 00:34:09 -05:00
|
|
|
|
private readonly ILogger<StatusPublicationPipeline> _logger;
|
|
|
|
|
|
2020-07-16 01:19:41 -04:00
|
|
|
|
#region Ctor
|
2023-02-22 11:30:02 -05:00
|
|
|
|
public StatusPublicationPipeline(IRetrieveTweetsProcessor retrieveTweetsProcessor, IRetrieveTwitterUsersProcessor retrieveTwitterAccountsProcessor, IRetrieveFollowersProcessor retrieveFollowersProcessor, ISendTweetsToFollowersProcessor sendTweetsToFollowersProcessor, ISaveProgressionTask saveProgressionTask, ILogger<StatusPublicationPipeline> logger)
|
2020-07-16 01:19:41 -04:00
|
|
|
|
{
|
|
|
|
|
_retrieveTweetsProcessor = retrieveTweetsProcessor;
|
2020-07-18 23:35:19 -04:00
|
|
|
|
_retrieveFollowersProcessor = retrieveFollowersProcessor;
|
|
|
|
|
_sendTweetsToFollowersProcessor = sendTweetsToFollowersProcessor;
|
2023-02-22 11:30:02 -05:00
|
|
|
|
_saveProgressionTask = saveProgressionTask;
|
2023-01-01 15:18:54 -05:00
|
|
|
|
_retrieveTwitterAccountsProcessor = retrieveTwitterAccountsProcessor;
|
2021-01-23 18:20:13 -05:00
|
|
|
|
|
2021-01-16 00:34:09 -05:00
|
|
|
|
_logger = logger;
|
2020-07-16 01:19:41 -04:00
|
|
|
|
}
|
|
|
|
|
#endregion
|
|
|
|
|
|
|
|
|
|
public async Task ExecuteAsync(CancellationToken ct)
|
|
|
|
|
{
|
2023-01-03 14:50:05 -05:00
|
|
|
|
var standardBlockOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1 };
|
2020-07-16 01:19:41 -04:00
|
|
|
|
// Create blocks
|
2023-01-01 15:18:54 -05:00
|
|
|
|
var twitterUserToRefreshBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions
|
2021-09-05 13:58:33 -04:00
|
|
|
|
{ BoundedCapacity = 1, CancellationToken = ct });
|
2023-01-10 21:00:21 -05:00
|
|
|
|
var retrieveTweetsBlock = new TransformBlock<UserWithDataToSync[], UserWithDataToSync[]>(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 } );
|
2023-01-10 20:30:07 -05:00
|
|
|
|
var retrieveTweetsBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
|
2023-01-03 14:50:05 -05:00
|
|
|
|
var retrieveFollowersBlock = new TransformManyBlock<UserWithDataToSync[], UserWithDataToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions);
|
2023-01-20 14:17:32 -05:00
|
|
|
|
var retrieveFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 500, CancellationToken = ct });
|
2023-02-22 11:54:03 -05:00
|
|
|
|
var sendTweetsToFollowersBlock = new ActionBlock<UserWithDataToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, CancellationToken = ct, BoundedCapacity = 1 });
|
2020-07-18 23:35:19 -04:00
|
|
|
|
|
2020-07-16 01:19:41 -04:00
|
|
|
|
// Link pipeline
|
2023-01-01 15:18:54 -05:00
|
|
|
|
twitterUserToRefreshBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true });
|
2020-08-01 19:26:00 -04:00
|
|
|
|
retrieveTweetsBlock.LinkTo(retrieveTweetsBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
|
|
|
|
|
retrieveTweetsBufferBlock.LinkTo(retrieveFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true });
|
|
|
|
|
retrieveFollowersBlock.LinkTo(retrieveFollowersBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
|
|
|
|
|
retrieveFollowersBufferBlock.LinkTo(sendTweetsToFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true });
|
2020-07-16 01:19:41 -04:00
|
|
|
|
|
2023-03-17 15:10:53 -04:00
|
|
|
|
// Launch twitter user retriever after a little delay
|
|
|
|
|
// to give time for the Tweet cache to fill
|
|
|
|
|
await Task.Delay(30 * 1000, ct);
|
2021-09-05 13:58:33 -04:00
|
|
|
|
var retrieveTwitterAccountsTask = _retrieveTwitterAccountsProcessor.GetTwitterUsersAsync(twitterUserToRefreshBufferBlock, ct);
|
2020-07-16 01:19:41 -04:00
|
|
|
|
|
|
|
|
|
// Wait
|
2023-02-22 11:30:02 -05:00
|
|
|
|
await Task.WhenAny(new[] { retrieveTwitterAccountsTask, sendTweetsToFollowersBlock.Completion });
|
2020-07-18 23:35:19 -04:00
|
|
|
|
|
2023-02-22 11:30:02 -05:00
|
|
|
|
var ex = retrieveTwitterAccountsTask.IsFaulted ? retrieveTwitterAccountsTask.Exception : sendTweetsToFollowersBlock.Completion.Exception;
|
2021-01-16 00:34:09 -05:00
|
|
|
|
_logger.LogCritical(ex, "An error occurred, pipeline stopped");
|
2020-07-16 01:19:41 -04:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|