From 71f6d3f3f457605967ab5cda1b2b49eed413c0a3 Mon Sep 17 00:00:00 2001 From: Nicolas Constant Date: Sun, 5 Sep 2021 13:58:33 -0400 Subject: [PATCH] added pipeline processor to analyse user state --- .../BirdsiteLive.Pipeline.csproj | 1 + .../IRefreshTwitterUserStatusProcessor.cs | 12 ++++ .../Contracts/IRetrieveFollowersProcessor.cs | 2 +- .../Contracts/IRetrieveTweetsProcessor.cs | 2 +- .../Contracts/ISaveProgressionProcessor.cs | 2 +- .../ISendTweetsToFollowersProcessor.cs | 2 +- .../Models/UserWithTweetsToSync.cs | 5 +- .../RefreshTwitterUserStatusProcessor.cs | 69 +++++++++++++++++++ .../Processors/RetrieveFollowersProcessor.cs | 2 +- .../Processors/RetrieveTweetsProcessor.cs | 19 +++-- .../Processors/SaveProgressionProcessor.cs | 4 +- .../SendTweetsToFollowersProcessor.cs | 2 +- .../StatusPublicationPipeline.cs | 28 +++++--- .../TwitterUserService.cs | 3 + .../DbInitializerPostgresDal.cs | 10 ++- .../TwitterUserPostgresDal.cs | 11 ++- .../Contracts/ITwitterUserDal.cs | 3 +- .../Models/SyncTwitterUser.cs | 2 + .../TwitterUserPostgresDalTests.cs | 48 +++++++++++-- .../RetrieveFollowersProcessorTests.cs | 6 +- .../SaveProgressionProcessorTests.cs | 9 ++- .../SendTweetsToFollowersProcessorTests.cs | 12 ++-- 22 files changed, 200 insertions(+), 54 deletions(-) create mode 100644 src/BirdsiteLive.Pipeline/Contracts/IRefreshTwitterUserStatusProcessor.cs create mode 100644 src/BirdsiteLive.Pipeline/Processors/RefreshTwitterUserStatusProcessor.cs diff --git a/src/BirdsiteLive.Pipeline/BirdsiteLive.Pipeline.csproj b/src/BirdsiteLive.Pipeline/BirdsiteLive.Pipeline.csproj index 884af18..8601b19 100644 --- a/src/BirdsiteLive.Pipeline/BirdsiteLive.Pipeline.csproj +++ b/src/BirdsiteLive.Pipeline/BirdsiteLive.Pipeline.csproj @@ -13,6 +13,7 @@ + diff --git a/src/BirdsiteLive.Pipeline/Contracts/IRefreshTwitterUserStatusProcessor.cs b/src/BirdsiteLive.Pipeline/Contracts/IRefreshTwitterUserStatusProcessor.cs new file mode 100644 index 0000000..9f20e59 --- /dev/null +++ b/src/BirdsiteLive.Pipeline/Contracts/IRefreshTwitterUserStatusProcessor.cs @@ -0,0 +1,12 @@ +using System.Threading; +using System.Threading.Tasks; +using BirdsiteLive.DAL.Models; +using BirdsiteLive.Pipeline.Models; + +namespace BirdsiteLive.Pipeline.Contracts +{ + public interface IRefreshTwitterUserStatusProcessor + { + Task ProcessAsync(SyncTwitterUser[] syncTwitterUsers, CancellationToken ct); + } +} \ No newline at end of file diff --git a/src/BirdsiteLive.Pipeline/Contracts/IRetrieveFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Contracts/IRetrieveFollowersProcessor.cs index e0d45dc..a9ef35c 100644 --- a/src/BirdsiteLive.Pipeline/Contracts/IRetrieveFollowersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Contracts/IRetrieveFollowersProcessor.cs @@ -7,7 +7,7 @@ namespace BirdsiteLive.Pipeline.Contracts { public interface IRetrieveFollowersProcessor { - Task> ProcessAsync(UserWithTweetsToSync[] userWithTweetsToSyncs, CancellationToken ct); + Task> ProcessAsync(UserWithDataToSync[] userWithTweetsToSyncs, CancellationToken ct); //IAsyncEnumerable ProcessAsync(UserWithTweetsToSync[] userWithTweetsToSyncs, CancellationToken ct); } } \ No newline at end of file diff --git a/src/BirdsiteLive.Pipeline/Contracts/IRetrieveTweetsProcessor.cs b/src/BirdsiteLive.Pipeline/Contracts/IRetrieveTweetsProcessor.cs index 451f1d1..49712c2 100644 --- a/src/BirdsiteLive.Pipeline/Contracts/IRetrieveTweetsProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Contracts/IRetrieveTweetsProcessor.cs @@ -7,6 +7,6 @@ namespace BirdsiteLive.Pipeline.Contracts { public interface IRetrieveTweetsProcessor { - Task ProcessAsync(SyncTwitterUser[] syncTwitterUsers, CancellationToken ct); + Task ProcessAsync(UserWithDataToSync[] syncTwitterUsers, CancellationToken ct); } } \ No newline at end of file diff --git a/src/BirdsiteLive.Pipeline/Contracts/ISaveProgressionProcessor.cs b/src/BirdsiteLive.Pipeline/Contracts/ISaveProgressionProcessor.cs index 02efaef..6b1c9ba 100644 --- a/src/BirdsiteLive.Pipeline/Contracts/ISaveProgressionProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Contracts/ISaveProgressionProcessor.cs @@ -6,6 +6,6 @@ namespace BirdsiteLive.Pipeline.Contracts { public interface ISaveProgressionProcessor { - Task ProcessAsync(UserWithTweetsToSync userWithTweetsToSync, CancellationToken ct); + Task ProcessAsync(UserWithDataToSync userWithTweetsToSync, CancellationToken ct); } } \ No newline at end of file diff --git a/src/BirdsiteLive.Pipeline/Contracts/ISendTweetsToFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Contracts/ISendTweetsToFollowersProcessor.cs index 6d55957..33db423 100644 --- a/src/BirdsiteLive.Pipeline/Contracts/ISendTweetsToFollowersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Contracts/ISendTweetsToFollowersProcessor.cs @@ -6,6 +6,6 @@ namespace BirdsiteLive.Pipeline.Contracts { public interface ISendTweetsToFollowersProcessor { - Task ProcessAsync(UserWithTweetsToSync userWithTweetsToSync, CancellationToken ct); + Task ProcessAsync(UserWithDataToSync userWithTweetsToSync, CancellationToken ct); } } \ No newline at end of file diff --git a/src/BirdsiteLive.Pipeline/Models/UserWithTweetsToSync.cs b/src/BirdsiteLive.Pipeline/Models/UserWithTweetsToSync.cs index 57810c7..e889e9b 100644 --- a/src/BirdsiteLive.Pipeline/Models/UserWithTweetsToSync.cs +++ b/src/BirdsiteLive.Pipeline/Models/UserWithTweetsToSync.cs @@ -4,10 +4,13 @@ using Tweetinvi.Models; namespace BirdsiteLive.Pipeline.Models { - public class UserWithTweetsToSync + public class UserWithDataToSync { public SyncTwitterUser User { get; set; } public ExtractedTweet[] Tweets { get; set; } public Follower[] Followers { get; set; } + + public bool IsUserProtected { get; set; } + public bool IsUserNotRetrieved { get; set; } } } \ No newline at end of file diff --git a/src/BirdsiteLive.Pipeline/Processors/RefreshTwitterUserStatusProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RefreshTwitterUserStatusProcessor.cs new file mode 100644 index 0000000..a2c78ff --- /dev/null +++ b/src/BirdsiteLive.Pipeline/Processors/RefreshTwitterUserStatusProcessor.cs @@ -0,0 +1,69 @@ +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using BirdsiteLive.DAL.Contracts; +using BirdsiteLive.DAL.Models; +using BirdsiteLive.Moderation.Actions; +using BirdsiteLive.Pipeline.Contracts; +using BirdsiteLive.Pipeline.Models; +using BirdsiteLive.Twitter; + +namespace BirdsiteLive.Pipeline.Processors +{ + public class RefreshTwitterUserStatusProcessor : IRefreshTwitterUserStatusProcessor + { + private const int FetchingErrorCountThreshold = 10; + private readonly ICachedTwitterUserService _twitterUserService; + private readonly ITwitterUserDal _twitterUserDal; + private readonly IRemoveTwitterAccountAction _removeTwitterAccountAction; + + #region Ctor + public RefreshTwitterUserStatusProcessor(ICachedTwitterUserService twitterUserService) + { + _twitterUserService = twitterUserService; + } + #endregion + + public async Task ProcessAsync(SyncTwitterUser[] syncTwitterUsers, CancellationToken ct) + { + var usersWtData = new List(); + + foreach (var user in syncTwitterUsers) + { + var userView = _twitterUserService.GetUser(user.Acct); + if (userView == null) + { + await AnalyseFailingUserAsync(user); + } + else if (!userView.Protected) + { + var userWtData = new UserWithDataToSync + { + User = user + }; + usersWtData.Add(userWtData); + } + } + + return usersWtData.ToArray(); + } + + private async Task AnalyseFailingUserAsync(SyncTwitterUser user) + { + var dbUser = await _twitterUserDal.GetTwitterUserAsync(user.Acct); + dbUser.FetchingErrorCount++; + + if (dbUser.FetchingErrorCount > FetchingErrorCountThreshold) + { + await _removeTwitterAccountAction.ProcessAsync(user); + } + else + { + await _twitterUserDal.UpdateTwitterUserAsync(dbUser); + } + + // Purge + _twitterUserService.PurgeUser(user.Acct); + } + } +} \ No newline at end of file diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveFollowersProcessor.cs index 4b2f150..57e3e49 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveFollowersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveFollowersProcessor.cs @@ -18,7 +18,7 @@ namespace BirdsiteLive.Pipeline.Processors } #endregion - public async Task> ProcessAsync(UserWithTweetsToSync[] userWithTweetsToSyncs, CancellationToken ct) + public async Task> ProcessAsync(UserWithDataToSync[] userWithTweetsToSyncs, CancellationToken ct) { //TODO multithread this foreach (var user in userWithTweetsToSyncs) diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs index bb5e026..096d720 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs @@ -31,33 +31,30 @@ namespace BirdsiteLive.Pipeline.Processors } #endregion - public async Task ProcessAsync(SyncTwitterUser[] syncTwitterUsers, CancellationToken ct) + public async Task ProcessAsync(UserWithDataToSync[] syncTwitterUsers, CancellationToken ct) { - var usersWtTweets = new List(); + var usersWtTweets = new List(); //TODO multithread this - foreach (var user in syncTwitterUsers) + foreach (var userWtData in syncTwitterUsers) { + var user = userWtData.User; var tweets = RetrieveNewTweets(user); if (tweets.Length > 0 && user.LastTweetPostedId != -1) { - var userWtTweets = new UserWithTweetsToSync - { - User = user, - Tweets = tweets - }; - usersWtTweets.Add(userWtTweets); + userWtData.Tweets = tweets; + usersWtTweets.Add(userWtData); } else if (tweets.Length > 0 && user.LastTweetPostedId == -1) { var tweetId = tweets.Last().Id; var now = DateTime.UtcNow; - await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, now); + await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, user.FetchingErrorCount, now); } else { var now = DateTime.UtcNow; - await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, now); + await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, user.FetchingErrorCount, now); } } diff --git a/src/BirdsiteLive.Pipeline/Processors/SaveProgressionProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/SaveProgressionProcessor.cs index c2f3ff5..1437255 100644 --- a/src/BirdsiteLive.Pipeline/Processors/SaveProgressionProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/SaveProgressionProcessor.cs @@ -22,7 +22,7 @@ namespace BirdsiteLive.Pipeline.Processors } #endregion - public async Task ProcessAsync(UserWithTweetsToSync userWithTweetsToSync, CancellationToken ct) + public async Task ProcessAsync(UserWithDataToSync userWithTweetsToSync, CancellationToken ct) { try { @@ -49,7 +49,7 @@ namespace BirdsiteLive.Pipeline.Processors var lastPostedTweet = userWithTweetsToSync.Tweets.Select(x => x.Id).Max(); var minimumSync = followingSyncStatuses.Min(); var now = DateTime.UtcNow; - await _twitterUserDal.UpdateTwitterUserAsync(userId, lastPostedTweet, minimumSync, now); + await _twitterUserDal.UpdateTwitterUserAsync(userId, lastPostedTweet, minimumSync, userWithTweetsToSync.User.FetchingErrorCount, now); } catch (Exception e) { diff --git a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs index afdb00e..cb1efb6 100644 --- a/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/SendTweetsToFollowersProcessor.cs @@ -33,7 +33,7 @@ namespace BirdsiteLive.Pipeline.Processors } #endregion - public async Task ProcessAsync(UserWithTweetsToSync userWithTweetsToSync, CancellationToken ct) + public async Task ProcessAsync(UserWithDataToSync userWithTweetsToSync, CancellationToken ct) { var user = userWithTweetsToSync.User; diff --git a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs index d2436f0..c6917e7 100644 --- a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs +++ b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs @@ -1,4 +1,5 @@ using System; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; @@ -17,6 +18,7 @@ namespace BirdsiteLive.Pipeline public class StatusPublicationPipeline : IStatusPublicationPipeline { private readonly IRetrieveTwitterUsersProcessor _retrieveTwitterAccountsProcessor; + private readonly IRefreshTwitterUserStatusProcessor _refreshTwitterUserStatusProcessor; private readonly IRetrieveTweetsProcessor _retrieveTweetsProcessor; private readonly IRetrieveFollowersProcessor _retrieveFollowersProcessor; private readonly ISendTweetsToFollowersProcessor _sendTweetsToFollowersProcessor; @@ -24,13 +26,14 @@ namespace BirdsiteLive.Pipeline private readonly ILogger _logger; #region Ctor - public StatusPublicationPipeline(IRetrieveTweetsProcessor retrieveTweetsProcessor, IRetrieveTwitterUsersProcessor retrieveTwitterAccountsProcessor, IRetrieveFollowersProcessor retrieveFollowersProcessor, ISendTweetsToFollowersProcessor sendTweetsToFollowersProcessor, ISaveProgressionProcessor saveProgressionProcessor, ILogger logger) + public StatusPublicationPipeline(IRetrieveTweetsProcessor retrieveTweetsProcessor, IRetrieveTwitterUsersProcessor retrieveTwitterAccountsProcessor, IRetrieveFollowersProcessor retrieveFollowersProcessor, ISendTweetsToFollowersProcessor sendTweetsToFollowersProcessor, ISaveProgressionProcessor saveProgressionProcessor, IRefreshTwitterUserStatusProcessor refreshTwitterUserStatusProcessor, ILogger logger) { _retrieveTweetsProcessor = retrieveTweetsProcessor; _retrieveTwitterAccountsProcessor = retrieveTwitterAccountsProcessor; _retrieveFollowersProcessor = retrieveFollowersProcessor; _sendTweetsToFollowersProcessor = sendTweetsToFollowersProcessor; _saveProgressionProcessor = saveProgressionProcessor; + _refreshTwitterUserStatusProcessor = refreshTwitterUserStatusProcessor; _logger = logger; } @@ -39,16 +42,21 @@ namespace BirdsiteLive.Pipeline public async Task ExecuteAsync(CancellationToken ct) { // Create blocks - var twitterUsersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct }); - var retrieveTweetsBlock = new TransformBlock(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct)); - var retrieveTweetsBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct }); - var retrieveFollowersBlock = new TransformManyBlock(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct)); - var retrieveFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); - var sendTweetsToFollowersBlock = new TransformBlock(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct }); - var sendTweetsToFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); - var saveProgressionBlock = new ActionBlock(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct }); + var twitterUserToRefreshBufferBlock = new BufferBlock(new DataflowBlockOptions + { BoundedCapacity = 1, CancellationToken = ct }); + var twitterUserToRefreshBlock = new TransformBlock(async x => await _refreshTwitterUserStatusProcessor.ProcessAsync(x, ct)); + var twitterUsersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct }); + var retrieveTweetsBlock = new TransformBlock(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct)); + var retrieveTweetsBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct }); + var retrieveFollowersBlock = new TransformManyBlock(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct)); + var retrieveFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); + var sendTweetsToFollowersBlock = new TransformBlock(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct }); + var sendTweetsToFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); + var saveProgressionBlock = new ActionBlock(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct }); // Link pipeline + twitterUserToRefreshBufferBlock.LinkTo(twitterUserToRefreshBlock, new DataflowLinkOptions { PropagateCompletion = true }); + twitterUserToRefreshBlock.LinkTo(twitterUsersBufferBlock, new DataflowLinkOptions { PropagateCompletion = true }); twitterUsersBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true }); retrieveTweetsBlock.LinkTo(retrieveTweetsBufferBlock, new DataflowLinkOptions { PropagateCompletion = true }); retrieveTweetsBufferBlock.LinkTo(retrieveFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true }); @@ -58,7 +66,7 @@ namespace BirdsiteLive.Pipeline sendTweetsToFollowersBufferBlock.LinkTo(saveProgressionBlock, new DataflowLinkOptions { PropagateCompletion = true }); // Launch twitter user retriever - var retrieveTwitterAccountsTask = _retrieveTwitterAccountsProcessor.GetTwitterUsersAsync(twitterUsersBufferBlock, ct); + var retrieveTwitterAccountsTask = _retrieveTwitterAccountsProcessor.GetTwitterUsersAsync(twitterUserToRefreshBufferBlock, ct); // Wait await Task.WhenAny(new[] { retrieveTwitterAccountsTask, saveProgressionBlock.Completion }); diff --git a/src/BirdsiteLive.Twitter/TwitterUserService.cs b/src/BirdsiteLive.Twitter/TwitterUserService.cs index 2370cea..6a27dc1 100644 --- a/src/BirdsiteLive.Twitter/TwitterUserService.cs +++ b/src/BirdsiteLive.Twitter/TwitterUserService.cs @@ -49,6 +49,9 @@ namespace BirdsiteLive.Twitter catch (Exception e) { _logger.LogError(e, "Error retrieving user {Username}", username); + + // TODO keep track of error, see where to remove user if too much errors + return null; } diff --git a/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/DbInitializerPostgresDal.cs b/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/DbInitializerPostgresDal.cs index 814578e..0d656a7 100644 --- a/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/DbInitializerPostgresDal.cs +++ b/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/DbInitializerPostgresDal.cs @@ -23,7 +23,7 @@ namespace BirdsiteLive.DAL.Postgres.DataAccessLayers public class DbInitializerPostgresDal : PostgresBase, IDbInitializerDal { private readonly PostgresTools _tools; - private readonly Version _currentVersion = new Version(2, 1); + private readonly Version _currentVersion = new Version(2, 2); private const string DbVersionType = "db-version"; #region Ctor @@ -132,7 +132,8 @@ namespace BirdsiteLive.DAL.Postgres.DataAccessLayers return new[] { new Tuple(new Version(1,0), new Version(2,0)), - new Tuple(new Version(2,0), new Version(2,1)) + new Tuple(new Version(2,0), new Version(2,1)), + new Tuple(new Version(2,1), new Version(2,2)) }; } @@ -151,6 +152,11 @@ namespace BirdsiteLive.DAL.Postgres.DataAccessLayers var addActorId = $@"ALTER TABLE {_settings.FollowersTableName} ADD actorId VARCHAR(2048)"; await _tools.ExecuteRequestAsync(addActorId); } + else if (from == new Version(2, 1) && to == new Version(2, 2)) + { + var addLastSync = $@"ALTER TABLE {_settings.TwitterUserTableName} ADD fetchingErrorCount SMALLINT"; + await _tools.ExecuteRequestAsync(addLastSync); + } else { throw new NotImplementedException(); diff --git a/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/TwitterUserPostgresDal.cs b/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/TwitterUserPostgresDal.cs index 855df5e..506848c 100644 --- a/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/TwitterUserPostgresDal.cs +++ b/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/TwitterUserPostgresDal.cs @@ -99,23 +99,28 @@ namespace BirdsiteLive.DAL.Postgres.DataAccessLayers } } - public async Task UpdateTwitterUserAsync(int id, long lastTweetPostedId, long lastTweetSynchronizedForAllFollowersId, DateTime lastSync) + public async Task UpdateTwitterUserAsync(int id, long lastTweetPostedId, long lastTweetSynchronizedForAllFollowersId, int fetchingErrorCount, DateTime lastSync) { if(id == default) throw new ArgumentException("id"); if(lastTweetPostedId == default) throw new ArgumentException("lastTweetPostedId"); if(lastTweetSynchronizedForAllFollowersId == default) throw new ArgumentException("lastTweetSynchronizedForAllFollowersId"); if(lastSync == default) throw new ArgumentException("lastSync"); - var query = $"UPDATE {_settings.TwitterUserTableName} SET lastTweetPostedId = @lastTweetPostedId, lastTweetSynchronizedForAllFollowersId = @lastTweetSynchronizedForAllFollowersId, lastSync = @lastSync WHERE id = @id"; + var query = $"UPDATE {_settings.TwitterUserTableName} SET lastTweetPostedId = @lastTweetPostedId, lastTweetSynchronizedForAllFollowersId = @lastTweetSynchronizedForAllFollowersId, fetchingErrorCount = @fetchingErrorCount, lastSync = @lastSync WHERE id = @id"; using (var dbConnection = Connection) { dbConnection.Open(); - await dbConnection.QueryAsync(query, new { id, lastTweetPostedId, lastTweetSynchronizedForAllFollowersId, lastSync = lastSync.ToUniversalTime() }); + await dbConnection.QueryAsync(query, new { id, lastTweetPostedId, lastTweetSynchronizedForAllFollowersId, fetchingErrorCount, lastSync = lastSync.ToUniversalTime() }); } } + public async Task UpdateTwitterUserAsync(SyncTwitterUser user) + { + await UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, user.FetchingErrorCount, user.LastSync); + } + public async Task DeleteTwitterUserAsync(string acct) { if (string.IsNullOrWhiteSpace(acct)) throw new ArgumentException("acct"); diff --git a/src/DataAccessLayers/BirdsiteLive.DAL/Contracts/ITwitterUserDal.cs b/src/DataAccessLayers/BirdsiteLive.DAL/Contracts/ITwitterUserDal.cs index cfa422a..eb6602f 100644 --- a/src/DataAccessLayers/BirdsiteLive.DAL/Contracts/ITwitterUserDal.cs +++ b/src/DataAccessLayers/BirdsiteLive.DAL/Contracts/ITwitterUserDal.cs @@ -11,7 +11,8 @@ namespace BirdsiteLive.DAL.Contracts Task GetTwitterUserAsync(int id); Task GetAllTwitterUsersAsync(int maxNumber); Task GetAllTwitterUsersAsync(); - Task UpdateTwitterUserAsync(int id, long lastTweetPostedId, long lastTweetSynchronizedForAllFollowersId, DateTime lastSync); + Task UpdateTwitterUserAsync(int id, long lastTweetPostedId, long lastTweetSynchronizedForAllFollowersId, int fetchingErrorCount, DateTime lastSync); + Task UpdateTwitterUserAsync(SyncTwitterUser user); Task DeleteTwitterUserAsync(string acct); Task DeleteTwitterUserAsync(int id); Task GetTwitterUsersCountAsync(); diff --git a/src/DataAccessLayers/BirdsiteLive.DAL/Models/SyncTwitterUser.cs b/src/DataAccessLayers/BirdsiteLive.DAL/Models/SyncTwitterUser.cs index 59be0a5..8b18ba1 100644 --- a/src/DataAccessLayers/BirdsiteLive.DAL/Models/SyncTwitterUser.cs +++ b/src/DataAccessLayers/BirdsiteLive.DAL/Models/SyncTwitterUser.cs @@ -11,5 +11,7 @@ namespace BirdsiteLive.DAL.Models public long LastTweetSynchronizedForAllFollowersId { get; set; } public DateTime LastSync { get; set; } + + public int FetchingErrorCount { get; set; } //TODO: update DAL } } \ No newline at end of file diff --git a/src/Tests/BirdsiteLive.DAL.Postgres.Tests/DataAccessLayers/TwitterUserPostgresDalTests.cs b/src/Tests/BirdsiteLive.DAL.Postgres.Tests/DataAccessLayers/TwitterUserPostgresDalTests.cs index 0cf3ca1..3a742c8 100644 --- a/src/Tests/BirdsiteLive.DAL.Postgres.Tests/DataAccessLayers/TwitterUserPostgresDalTests.cs +++ b/src/Tests/BirdsiteLive.DAL.Postgres.Tests/DataAccessLayers/TwitterUserPostgresDalTests.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics; using System.Threading.Tasks; using System.Xml; using BirdsiteLive.DAL.Postgres.DataAccessLayers; @@ -47,6 +48,7 @@ namespace BirdsiteLive.DAL.Postgres.Tests.DataAccessLayers Assert.AreEqual(acct, result.Acct); Assert.AreEqual(lastTweetId, result.LastTweetPostedId); Assert.AreEqual(lastTweetId, result.LastTweetSynchronizedForAllFollowersId); + Assert.AreEqual(0, result.FetchingErrorCount); Assert.IsTrue(result.Id > 0); } @@ -83,13 +85,47 @@ namespace BirdsiteLive.DAL.Postgres.Tests.DataAccessLayers var updatedLastTweetId = 1600L; var updatedLastSyncId = 1550L; var now = DateTime.Now; - await dal.UpdateTwitterUserAsync(result.Id, updatedLastTweetId, updatedLastSyncId, now); + var errors = 15; + await dal.UpdateTwitterUserAsync(result.Id, updatedLastTweetId, updatedLastSyncId, errors, now); result = await dal.GetTwitterUserAsync(acct); Assert.AreEqual(acct, result.Acct); Assert.AreEqual(updatedLastTweetId, result.LastTweetPostedId); Assert.AreEqual(updatedLastSyncId, result.LastTweetSynchronizedForAllFollowersId); + Assert.AreEqual(errors, result.FetchingErrorCount); + Assert.IsTrue(Math.Abs((now.ToUniversalTime() - result.LastSync).Milliseconds) < 100); + } + + [TestMethod] + public async Task CreateUpdate2AndGetUser() + { + var acct = "myid"; + var lastTweetId = 1548L; + + var dal = new TwitterUserPostgresDal(_settings); + + await dal.CreateTwitterUserAsync(acct, lastTweetId); + var result = await dal.GetTwitterUserAsync(acct); + + + var updatedLastTweetId = 1600L; + var updatedLastSyncId = 1550L; + var now = DateTime.Now; + var errors = 15; + + result.LastTweetPostedId = updatedLastTweetId; + result.LastTweetSynchronizedForAllFollowersId = updatedLastSyncId; + result.FetchingErrorCount = errors; + result.LastSync = now; + await dal.UpdateTwitterUserAsync(result); + + result = await dal.GetTwitterUserAsync(acct); + + Assert.AreEqual(acct, result.Acct); + Assert.AreEqual(updatedLastTweetId, result.LastTweetPostedId); + Assert.AreEqual(updatedLastSyncId, result.LastTweetSynchronizedForAllFollowersId); + Assert.AreEqual(errors, result.FetchingErrorCount); Assert.IsTrue(Math.Abs((now.ToUniversalTime() - result.LastSync).Milliseconds) < 100); } @@ -98,7 +134,7 @@ namespace BirdsiteLive.DAL.Postgres.Tests.DataAccessLayers public async Task Update_NoId() { var dal = new TwitterUserPostgresDal(_settings); - await dal.UpdateTwitterUserAsync(default, default, default, DateTime.UtcNow); + await dal.UpdateTwitterUserAsync(default, default, default, default, DateTime.UtcNow); } [TestMethod] @@ -106,7 +142,7 @@ namespace BirdsiteLive.DAL.Postgres.Tests.DataAccessLayers public async Task Update_NoLastTweetPostedId() { var dal = new TwitterUserPostgresDal(_settings); - await dal.UpdateTwitterUserAsync(12, default, default, DateTime.UtcNow); + await dal.UpdateTwitterUserAsync(12, default, default, default, DateTime.UtcNow); } [TestMethod] @@ -114,7 +150,7 @@ namespace BirdsiteLive.DAL.Postgres.Tests.DataAccessLayers public async Task Update_NoLastTweetSynchronizedForAllFollowersId() { var dal = new TwitterUserPostgresDal(_settings); - await dal.UpdateTwitterUserAsync(12, 9556, default, DateTime.UtcNow); + await dal.UpdateTwitterUserAsync(12, 9556, default, default, DateTime.UtcNow); } [TestMethod] @@ -122,7 +158,7 @@ namespace BirdsiteLive.DAL.Postgres.Tests.DataAccessLayers public async Task Update_NoLastSync() { var dal = new TwitterUserPostgresDal(_settings); - await dal.UpdateTwitterUserAsync(12, 9556, 65, default); + await dal.UpdateTwitterUserAsync(12, 9556, 65, default, default); } [TestMethod] @@ -216,7 +252,7 @@ namespace BirdsiteLive.DAL.Postgres.Tests.DataAccessLayers { var user = allUsers[i]; var date = i % 2 == 0 ? oldest : newest; - await dal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, date); + await dal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, 0, date); } var result = await dal.GetAllTwitterUsersAsync(10); diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveFollowersProcessorTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveFollowersProcessorTests.cs index 98a86bf..4679259 100644 --- a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveFollowersProcessorTests.cs +++ b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveFollowersProcessorTests.cs @@ -21,16 +21,16 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var userId1 = 1; var userId2 = 2; - var users = new List + var users = new List { - new UserWithTweetsToSync + new UserWithDataToSync { User = new SyncTwitterUser { Id = userId1 } }, - new UserWithTweetsToSync + new UserWithDataToSync { User = new SyncTwitterUser { diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SaveProgressionProcessorTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SaveProgressionProcessorTests.cs index c95eed6..4587071 100644 --- a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SaveProgressionProcessorTests.cs +++ b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SaveProgressionProcessorTests.cs @@ -41,7 +41,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors } }; - var usersWithTweets = new UserWithTweetsToSync + var usersWithTweets = new UserWithDataToSync { Tweets = new [] { @@ -65,6 +65,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors It.Is(y => y == user.Id), It.Is(y => y == tweet2.Id), It.Is(y => y == tweet2.Id), + It.Is(y => y == 0), It.IsAny() )) .Returns(Task.CompletedTask); @@ -107,7 +108,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors } }; - var usersWithTweets = new UserWithTweetsToSync + var usersWithTweets = new UserWithDataToSync { Tweets = new[] { @@ -130,6 +131,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors It.Is(y => y == user.Id), It.Is(y => y == tweet3.Id), It.Is(y => y == tweet2.Id), + It.Is(y => y == 0), It.IsAny() )) .Returns(Task.CompletedTask); @@ -181,7 +183,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors } }; - var usersWithTweets = new UserWithTweetsToSync + var usersWithTweets = new UserWithDataToSync { Tweets = new[] { @@ -205,6 +207,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors It.Is(y => y == user.Id), It.Is(y => y == tweet3.Id), It.Is(y => y == tweet2.Id), + It.Is(y => y == 0), It.IsAny() )) .Returns(Task.CompletedTask); diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SendTweetsToFollowersProcessorTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SendTweetsToFollowersProcessorTests.cs index ad35c3e..7715342 100644 --- a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SendTweetsToFollowersProcessorTests.cs +++ b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SendTweetsToFollowersProcessorTests.cs @@ -26,7 +26,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var userId2 = 3; var userAcct = "user"; - var userWithTweets = new UserWithTweetsToSync() + var userWithTweets = new UserWithDataToSync() { Tweets = new [] { @@ -93,7 +93,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var userId2 = 3; var userAcct = "user"; - var userWithTweets = new UserWithTweetsToSync() + var userWithTweets = new UserWithDataToSync() { Tweets = new[] { @@ -163,7 +163,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var userId2 = 3; var userAcct = "user"; - var userWithTweets = new UserWithTweetsToSync() + var userWithTweets = new UserWithDataToSync() { Tweets = new[] { @@ -237,7 +237,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var userId2 = 3; var userAcct = "user"; - var userWithTweets = new UserWithTweetsToSync() + var userWithTweets = new UserWithDataToSync() { Tweets = new[] { @@ -306,7 +306,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var userId2 = 3; var userAcct = "user"; - var userWithTweets = new UserWithTweetsToSync() + var userWithTweets = new UserWithDataToSync() { Tweets = new[] { @@ -375,7 +375,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var userId2 = 3; var userAcct = "user"; - var userWithTweets = new UserWithTweetsToSync() + var userWithTweets = new UserWithDataToSync() { Tweets = new[] {