fix account sync update issue

This commit is contained in:
Nicolas Constant 2021-01-23 18:20:13 -05:00
parent 99714330ee
commit c048786b96
No known key found for this signature in database
GPG key ID: 1E9F677FB01A5688
2 changed files with 16 additions and 4 deletions

View file

@ -49,6 +49,11 @@ namespace BirdsiteLive.Pipeline.Processors
var now = DateTime.UtcNow; var now = DateTime.UtcNow;
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, now); await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, now);
} }
else
{
var now = DateTime.UtcNow;
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, now);
}
} }
return usersWtTweets.ToArray(); return usersWtTweets.ToArray();

View file

@ -20,15 +20,18 @@ namespace BirdsiteLive.Pipeline
private readonly IRetrieveTweetsProcessor _retrieveTweetsProcessor; private readonly IRetrieveTweetsProcessor _retrieveTweetsProcessor;
private readonly IRetrieveFollowersProcessor _retrieveFollowersProcessor; private readonly IRetrieveFollowersProcessor _retrieveFollowersProcessor;
private readonly ISendTweetsToFollowersProcessor _sendTweetsToFollowersProcessor; private readonly ISendTweetsToFollowersProcessor _sendTweetsToFollowersProcessor;
private readonly ISaveProgressionProcessor _saveProgressionProcessor;
private readonly ILogger<StatusPublicationPipeline> _logger; private readonly ILogger<StatusPublicationPipeline> _logger;
#region Ctor #region Ctor
public StatusPublicationPipeline(IRetrieveTweetsProcessor retrieveTweetsProcessor, IRetrieveTwitterUsersProcessor retrieveTwitterAccountsProcessor, IRetrieveFollowersProcessor retrieveFollowersProcessor, ISendTweetsToFollowersProcessor sendTweetsToFollowersProcessor, ILogger<StatusPublicationPipeline> logger) public StatusPublicationPipeline(IRetrieveTweetsProcessor retrieveTweetsProcessor, IRetrieveTwitterUsersProcessor retrieveTwitterAccountsProcessor, IRetrieveFollowersProcessor retrieveFollowersProcessor, ISendTweetsToFollowersProcessor sendTweetsToFollowersProcessor, ISaveProgressionProcessor saveProgressionProcessor, ILogger<StatusPublicationPipeline> logger)
{ {
_retrieveTweetsProcessor = retrieveTweetsProcessor; _retrieveTweetsProcessor = retrieveTweetsProcessor;
_retrieveTwitterAccountsProcessor = retrieveTwitterAccountsProcessor; _retrieveTwitterAccountsProcessor = retrieveTwitterAccountsProcessor;
_retrieveFollowersProcessor = retrieveFollowersProcessor; _retrieveFollowersProcessor = retrieveFollowersProcessor;
_sendTweetsToFollowersProcessor = sendTweetsToFollowersProcessor; _sendTweetsToFollowersProcessor = sendTweetsToFollowersProcessor;
_saveProgressionProcessor = saveProgressionProcessor;
_logger = logger; _logger = logger;
} }
#endregion #endregion
@ -41,7 +44,9 @@ namespace BirdsiteLive.Pipeline
var retrieveTweetsBufferBlock = new BufferBlock<UserWithTweetsToSync[]>(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct }); var retrieveTweetsBufferBlock = new BufferBlock<UserWithTweetsToSync[]>(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct });
var retrieveFollowersBlock = new TransformManyBlock<UserWithTweetsToSync[], UserWithTweetsToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct)); var retrieveFollowersBlock = new TransformManyBlock<UserWithTweetsToSync[], UserWithTweetsToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct));
var retrieveFollowersBufferBlock = new BufferBlock<UserWithTweetsToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); var retrieveFollowersBufferBlock = new BufferBlock<UserWithTweetsToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
var sendTweetsToFollowersBlock = new ActionBlock<UserWithTweetsToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct }); var sendTweetsToFollowersBlock = new TransformBlock<UserWithTweetsToSync, UserWithTweetsToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct });
var sendTweetsToFollowersBufferBlock = new BufferBlock<UserWithTweetsToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
var saveProgressionBlock = new ActionBlock<UserWithTweetsToSync>(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct });
// Link pipeline // Link pipeline
twitterUsersBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true }); twitterUsersBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true });
@ -49,14 +54,16 @@ namespace BirdsiteLive.Pipeline
retrieveTweetsBufferBlock.LinkTo(retrieveFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true }); retrieveTweetsBufferBlock.LinkTo(retrieveFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveFollowersBlock.LinkTo(retrieveFollowersBufferBlock, new DataflowLinkOptions { PropagateCompletion = true }); retrieveFollowersBlock.LinkTo(retrieveFollowersBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveFollowersBufferBlock.LinkTo(sendTweetsToFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true }); retrieveFollowersBufferBlock.LinkTo(sendTweetsToFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true });
sendTweetsToFollowersBlock.LinkTo(sendTweetsToFollowersBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
sendTweetsToFollowersBufferBlock.LinkTo(saveProgressionBlock, new DataflowLinkOptions { PropagateCompletion = true });
// Launch twitter user retriever // Launch twitter user retriever
var retrieveTwitterAccountsTask = _retrieveTwitterAccountsProcessor.GetTwitterUsersAsync(twitterUsersBufferBlock, ct); var retrieveTwitterAccountsTask = _retrieveTwitterAccountsProcessor.GetTwitterUsersAsync(twitterUsersBufferBlock, ct);
// Wait // Wait
await Task.WhenAny(new[] { retrieveTwitterAccountsTask, sendTweetsToFollowersBlock.Completion }); await Task.WhenAny(new[] { retrieveTwitterAccountsTask, saveProgressionBlock.Completion });
var ex = retrieveTwitterAccountsTask.IsFaulted ? retrieveTwitterAccountsTask.Exception : sendTweetsToFollowersBlock.Completion.Exception; var ex = retrieveTwitterAccountsTask.IsFaulted ? retrieveTwitterAccountsTask.Exception : saveProgressionBlock.Completion.Exception;
_logger.LogCritical(ex, "An error occurred, pipeline stopped"); _logger.LogCritical(ex, "An error occurred, pipeline stopped");
} }
} }