cloutier--bird.makeup/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs

73 lines
2.7 KiB
C#
Raw Normal View History

2020-07-18 23:35:19 -04:00
using System;
using System.Linq;
2020-07-18 23:35:19 -04:00
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using BirdsiteLive.Common.Extensions;
2021-01-22 21:23:27 -05:00
using BirdsiteLive.Common.Settings;
2020-07-18 23:35:19 -04:00
using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Pipeline.Contracts;
2021-01-16 00:34:09 -05:00
using Microsoft.Extensions.Logging;
2020-07-18 23:35:19 -04:00
namespace BirdsiteLive.Pipeline.Processors
{
public class RetrieveTwitterUsersProcessor : IRetrieveTwitterUsersProcessor
{
private readonly ITwitterUserDal _twitterUserDal;
2021-01-16 00:34:09 -05:00
private readonly ILogger<RetrieveTwitterUsersProcessor> _logger;
2021-01-22 21:23:27 -05:00
private readonly InstanceSettings _instanceSettings;
public int WaitFactor = 1000 * 60; //1 min
2020-07-18 23:35:19 -04:00
#region Ctor
2021-01-22 21:23:27 -05:00
public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, InstanceSettings instanceSettings, ILogger<RetrieveTwitterUsersProcessor> logger)
2020-07-18 23:35:19 -04:00
{
_twitterUserDal = twitterUserDal;
2021-01-22 21:23:27 -05:00
_instanceSettings = instanceSettings;
2021-01-16 00:34:09 -05:00
_logger = logger;
2020-07-18 23:35:19 -04:00
}
#endregion
public async Task GetTwitterUsersAsync(BufferBlock<SyncTwitterUser[]> twitterUsersBufferBlock, CancellationToken ct)
{
var totalUsers = await _twitterUserDal.GetTwitterUsersCountAsync();
var warmUpMaxCapacity = _instanceSettings.MaxUsersCapacity / 4;
var warmUpIterations = warmUpMaxCapacity == 0 ? 0 : (int) (totalUsers / (float) warmUpMaxCapacity);
for (; ; )
2020-07-18 23:35:19 -04:00
{
ct.ThrowIfCancellationRequested();
try
{
var maxUsers = warmUpIterations > 0
2021-01-22 21:23:27 -05:00
? _instanceSettings.MaxUsersCapacity / 4
: _instanceSettings.MaxUsersCapacity;
warmUpIterations--;
2021-01-22 21:23:27 -05:00
var users = await _twitterUserDal.GetAllTwitterUsersAsync(maxUsers);
2020-11-18 22:49:44 -05:00
var userCount = users.Any() ? users.Length : 1;
var splitNumber = (int) Math.Ceiling(userCount / 15d);
var splitUsers = users.Split(splitNumber).ToList();
foreach (var u in splitUsers)
{
ct.ThrowIfCancellationRequested();
await twitterUsersBufferBlock.SendAsync(u.ToArray(), ct);
await Task.Delay(WaitFactor, ct);
}
var splitCount = splitUsers.Count();
if (splitCount < 15) await Task.Delay((15 - splitCount) * WaitFactor, ct);
2020-07-18 23:35:19 -04:00
}
catch (Exception e)
{
2021-01-16 00:34:09 -05:00
_logger.LogError(e, "Failing retrieving Twitter Users.");
2020-07-18 23:35:19 -04:00
}
}
}
}
}