added pipeline processor to analyse user state

This commit is contained in:
Nicolas Constant 2021-09-05 13:58:33 -04:00
parent 2a4136cc8d
commit 71f6d3f3f4
No known key found for this signature in database
GPG key ID: 1E9F677FB01A5688
22 changed files with 200 additions and 54 deletions

View file

@ -13,6 +13,7 @@
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\BirdsiteLive.Domain\BirdsiteLive.Domain.csproj" /> <ProjectReference Include="..\BirdsiteLive.Domain\BirdsiteLive.Domain.csproj" />
<ProjectReference Include="..\BirdsiteLive.Moderation\BirdsiteLive.Moderation.csproj" />
<ProjectReference Include="..\BirdsiteLive.Twitter\BirdsiteLive.Twitter.csproj" /> <ProjectReference Include="..\BirdsiteLive.Twitter\BirdsiteLive.Twitter.csproj" />
<ProjectReference Include="..\DataAccessLayers\BirdsiteLive.DAL\BirdsiteLive.DAL.csproj" /> <ProjectReference Include="..\DataAccessLayers\BirdsiteLive.DAL\BirdsiteLive.DAL.csproj" />
</ItemGroup> </ItemGroup>

View file

@ -0,0 +1,12 @@
using System.Threading;
using System.Threading.Tasks;
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Pipeline.Models;
namespace BirdsiteLive.Pipeline.Contracts
{
public interface IRefreshTwitterUserStatusProcessor
{
Task<UserWithDataToSync[]> ProcessAsync(SyncTwitterUser[] syncTwitterUsers, CancellationToken ct);
}
}

View file

@ -7,7 +7,7 @@ namespace BirdsiteLive.Pipeline.Contracts
{ {
public interface IRetrieveFollowersProcessor public interface IRetrieveFollowersProcessor
{ {
Task<IEnumerable<UserWithTweetsToSync>> ProcessAsync(UserWithTweetsToSync[] userWithTweetsToSyncs, CancellationToken ct); Task<IEnumerable<UserWithDataToSync>> ProcessAsync(UserWithDataToSync[] userWithTweetsToSyncs, CancellationToken ct);
//IAsyncEnumerable<UserWithTweetsToSync> ProcessAsync(UserWithTweetsToSync[] userWithTweetsToSyncs, CancellationToken ct); //IAsyncEnumerable<UserWithTweetsToSync> ProcessAsync(UserWithTweetsToSync[] userWithTweetsToSyncs, CancellationToken ct);
} }
} }

View file

@ -7,6 +7,6 @@ namespace BirdsiteLive.Pipeline.Contracts
{ {
public interface IRetrieveTweetsProcessor public interface IRetrieveTweetsProcessor
{ {
Task<UserWithTweetsToSync[]> ProcessAsync(SyncTwitterUser[] syncTwitterUsers, CancellationToken ct); Task<UserWithDataToSync[]> ProcessAsync(UserWithDataToSync[] syncTwitterUsers, CancellationToken ct);
} }
} }

View file

@ -6,6 +6,6 @@ namespace BirdsiteLive.Pipeline.Contracts
{ {
public interface ISaveProgressionProcessor public interface ISaveProgressionProcessor
{ {
Task ProcessAsync(UserWithTweetsToSync userWithTweetsToSync, CancellationToken ct); Task ProcessAsync(UserWithDataToSync userWithTweetsToSync, CancellationToken ct);
} }
} }

View file

@ -6,6 +6,6 @@ namespace BirdsiteLive.Pipeline.Contracts
{ {
public interface ISendTweetsToFollowersProcessor public interface ISendTweetsToFollowersProcessor
{ {
Task<UserWithTweetsToSync> ProcessAsync(UserWithTweetsToSync userWithTweetsToSync, CancellationToken ct); Task<UserWithDataToSync> ProcessAsync(UserWithDataToSync userWithTweetsToSync, CancellationToken ct);
} }
} }

View file

@ -4,10 +4,13 @@ using Tweetinvi.Models;
namespace BirdsiteLive.Pipeline.Models namespace BirdsiteLive.Pipeline.Models
{ {
public class UserWithTweetsToSync public class UserWithDataToSync
{ {
public SyncTwitterUser User { get; set; } public SyncTwitterUser User { get; set; }
public ExtractedTweet[] Tweets { get; set; } public ExtractedTweet[] Tweets { get; set; }
public Follower[] Followers { get; set; } public Follower[] Followers { get; set; }
public bool IsUserProtected { get; set; }
public bool IsUserNotRetrieved { get; set; }
} }
} }

View file

@ -0,0 +1,69 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Moderation.Actions;
using BirdsiteLive.Pipeline.Contracts;
using BirdsiteLive.Pipeline.Models;
using BirdsiteLive.Twitter;
namespace BirdsiteLive.Pipeline.Processors
{
public class RefreshTwitterUserStatusProcessor : IRefreshTwitterUserStatusProcessor
{
private const int FetchingErrorCountThreshold = 10;
private readonly ICachedTwitterUserService _twitterUserService;
private readonly ITwitterUserDal _twitterUserDal;
private readonly IRemoveTwitterAccountAction _removeTwitterAccountAction;
#region Ctor
public RefreshTwitterUserStatusProcessor(ICachedTwitterUserService twitterUserService)
{
_twitterUserService = twitterUserService;
}
#endregion
public async Task<UserWithDataToSync[]> ProcessAsync(SyncTwitterUser[] syncTwitterUsers, CancellationToken ct)
{
var usersWtData = new List<UserWithDataToSync>();
foreach (var user in syncTwitterUsers)
{
var userView = _twitterUserService.GetUser(user.Acct);
if (userView == null)
{
await AnalyseFailingUserAsync(user);
}
else if (!userView.Protected)
{
var userWtData = new UserWithDataToSync
{
User = user
};
usersWtData.Add(userWtData);
}
}
return usersWtData.ToArray();
}
private async Task AnalyseFailingUserAsync(SyncTwitterUser user)
{
var dbUser = await _twitterUserDal.GetTwitterUserAsync(user.Acct);
dbUser.FetchingErrorCount++;
if (dbUser.FetchingErrorCount > FetchingErrorCountThreshold)
{
await _removeTwitterAccountAction.ProcessAsync(user);
}
else
{
await _twitterUserDal.UpdateTwitterUserAsync(dbUser);
}
// Purge
_twitterUserService.PurgeUser(user.Acct);
}
}
}

View file

@ -18,7 +18,7 @@ namespace BirdsiteLive.Pipeline.Processors
} }
#endregion #endregion
public async Task<IEnumerable<UserWithTweetsToSync>> ProcessAsync(UserWithTweetsToSync[] userWithTweetsToSyncs, CancellationToken ct) public async Task<IEnumerable<UserWithDataToSync>> ProcessAsync(UserWithDataToSync[] userWithTweetsToSyncs, CancellationToken ct)
{ {
//TODO multithread this //TODO multithread this
foreach (var user in userWithTweetsToSyncs) foreach (var user in userWithTweetsToSyncs)

View file

@ -31,33 +31,30 @@ namespace BirdsiteLive.Pipeline.Processors
} }
#endregion #endregion
public async Task<UserWithTweetsToSync[]> ProcessAsync(SyncTwitterUser[] syncTwitterUsers, CancellationToken ct) public async Task<UserWithDataToSync[]> ProcessAsync(UserWithDataToSync[] syncTwitterUsers, CancellationToken ct)
{ {
var usersWtTweets = new List<UserWithTweetsToSync>(); var usersWtTweets = new List<UserWithDataToSync>();
//TODO multithread this //TODO multithread this
foreach (var user in syncTwitterUsers) foreach (var userWtData in syncTwitterUsers)
{ {
var user = userWtData.User;
var tweets = RetrieveNewTweets(user); var tweets = RetrieveNewTweets(user);
if (tweets.Length > 0 && user.LastTweetPostedId != -1) if (tweets.Length > 0 && user.LastTweetPostedId != -1)
{ {
var userWtTweets = new UserWithTweetsToSync userWtData.Tweets = tweets;
{ usersWtTweets.Add(userWtData);
User = user,
Tweets = tweets
};
usersWtTweets.Add(userWtTweets);
} }
else if (tweets.Length > 0 && user.LastTweetPostedId == -1) else if (tweets.Length > 0 && user.LastTweetPostedId == -1)
{ {
var tweetId = tweets.Last().Id; var tweetId = tweets.Last().Id;
var now = DateTime.UtcNow; var now = DateTime.UtcNow;
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, now); await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, user.FetchingErrorCount, now);
} }
else else
{ {
var now = DateTime.UtcNow; var now = DateTime.UtcNow;
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, now); await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, user.FetchingErrorCount, now);
} }
} }

View file

@ -22,7 +22,7 @@ namespace BirdsiteLive.Pipeline.Processors
} }
#endregion #endregion
public async Task ProcessAsync(UserWithTweetsToSync userWithTweetsToSync, CancellationToken ct) public async Task ProcessAsync(UserWithDataToSync userWithTweetsToSync, CancellationToken ct)
{ {
try try
{ {
@ -49,7 +49,7 @@ namespace BirdsiteLive.Pipeline.Processors
var lastPostedTweet = userWithTweetsToSync.Tweets.Select(x => x.Id).Max(); var lastPostedTweet = userWithTweetsToSync.Tweets.Select(x => x.Id).Max();
var minimumSync = followingSyncStatuses.Min(); var minimumSync = followingSyncStatuses.Min();
var now = DateTime.UtcNow; var now = DateTime.UtcNow;
await _twitterUserDal.UpdateTwitterUserAsync(userId, lastPostedTweet, minimumSync, now); await _twitterUserDal.UpdateTwitterUserAsync(userId, lastPostedTweet, minimumSync, userWithTweetsToSync.User.FetchingErrorCount, now);
} }
catch (Exception e) catch (Exception e)
{ {

View file

@ -33,7 +33,7 @@ namespace BirdsiteLive.Pipeline.Processors
} }
#endregion #endregion
public async Task<UserWithTweetsToSync> ProcessAsync(UserWithTweetsToSync userWithTweetsToSync, CancellationToken ct) public async Task<UserWithDataToSync> ProcessAsync(UserWithDataToSync userWithTweetsToSync, CancellationToken ct)
{ {
var user = userWithTweetsToSync.User; var user = userWithTweetsToSync.User;

View file

@ -1,4 +1,5 @@
using System; using System;
using System.Runtime.CompilerServices;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow; using System.Threading.Tasks.Dataflow;
@ -17,6 +18,7 @@ namespace BirdsiteLive.Pipeline
public class StatusPublicationPipeline : IStatusPublicationPipeline public class StatusPublicationPipeline : IStatusPublicationPipeline
{ {
private readonly IRetrieveTwitterUsersProcessor _retrieveTwitterAccountsProcessor; private readonly IRetrieveTwitterUsersProcessor _retrieveTwitterAccountsProcessor;
private readonly IRefreshTwitterUserStatusProcessor _refreshTwitterUserStatusProcessor;
private readonly IRetrieveTweetsProcessor _retrieveTweetsProcessor; private readonly IRetrieveTweetsProcessor _retrieveTweetsProcessor;
private readonly IRetrieveFollowersProcessor _retrieveFollowersProcessor; private readonly IRetrieveFollowersProcessor _retrieveFollowersProcessor;
private readonly ISendTweetsToFollowersProcessor _sendTweetsToFollowersProcessor; private readonly ISendTweetsToFollowersProcessor _sendTweetsToFollowersProcessor;
@ -24,13 +26,14 @@ namespace BirdsiteLive.Pipeline
private readonly ILogger<StatusPublicationPipeline> _logger; private readonly ILogger<StatusPublicationPipeline> _logger;
#region Ctor #region Ctor
public StatusPublicationPipeline(IRetrieveTweetsProcessor retrieveTweetsProcessor, IRetrieveTwitterUsersProcessor retrieveTwitterAccountsProcessor, IRetrieveFollowersProcessor retrieveFollowersProcessor, ISendTweetsToFollowersProcessor sendTweetsToFollowersProcessor, ISaveProgressionProcessor saveProgressionProcessor, ILogger<StatusPublicationPipeline> logger) public StatusPublicationPipeline(IRetrieveTweetsProcessor retrieveTweetsProcessor, IRetrieveTwitterUsersProcessor retrieveTwitterAccountsProcessor, IRetrieveFollowersProcessor retrieveFollowersProcessor, ISendTweetsToFollowersProcessor sendTweetsToFollowersProcessor, ISaveProgressionProcessor saveProgressionProcessor, IRefreshTwitterUserStatusProcessor refreshTwitterUserStatusProcessor, ILogger<StatusPublicationPipeline> logger)
{ {
_retrieveTweetsProcessor = retrieveTweetsProcessor; _retrieveTweetsProcessor = retrieveTweetsProcessor;
_retrieveTwitterAccountsProcessor = retrieveTwitterAccountsProcessor; _retrieveTwitterAccountsProcessor = retrieveTwitterAccountsProcessor;
_retrieveFollowersProcessor = retrieveFollowersProcessor; _retrieveFollowersProcessor = retrieveFollowersProcessor;
_sendTweetsToFollowersProcessor = sendTweetsToFollowersProcessor; _sendTweetsToFollowersProcessor = sendTweetsToFollowersProcessor;
_saveProgressionProcessor = saveProgressionProcessor; _saveProgressionProcessor = saveProgressionProcessor;
_refreshTwitterUserStatusProcessor = refreshTwitterUserStatusProcessor;
_logger = logger; _logger = logger;
} }
@ -39,16 +42,21 @@ namespace BirdsiteLive.Pipeline
public async Task ExecuteAsync(CancellationToken ct) public async Task ExecuteAsync(CancellationToken ct)
{ {
// Create blocks // Create blocks
var twitterUsersBufferBlock = new BufferBlock<SyncTwitterUser[]>(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct }); var twitterUserToRefreshBufferBlock = new BufferBlock<SyncTwitterUser[]>(new DataflowBlockOptions
var retrieveTweetsBlock = new TransformBlock<SyncTwitterUser[], UserWithTweetsToSync[]>(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct)); { BoundedCapacity = 1, CancellationToken = ct });
var retrieveTweetsBufferBlock = new BufferBlock<UserWithTweetsToSync[]>(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct }); var twitterUserToRefreshBlock = new TransformBlock<SyncTwitterUser[], UserWithDataToSync[]>(async x => await _refreshTwitterUserStatusProcessor.ProcessAsync(x, ct));
var retrieveFollowersBlock = new TransformManyBlock<UserWithTweetsToSync[], UserWithTweetsToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct)); var twitterUsersBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct });
var retrieveFollowersBufferBlock = new BufferBlock<UserWithTweetsToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); var retrieveTweetsBlock = new TransformBlock<UserWithDataToSync[], UserWithDataToSync[]>(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct));
var sendTweetsToFollowersBlock = new TransformBlock<UserWithTweetsToSync, UserWithTweetsToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct }); var retrieveTweetsBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct });
var sendTweetsToFollowersBufferBlock = new BufferBlock<UserWithTweetsToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); var retrieveFollowersBlock = new TransformManyBlock<UserWithDataToSync[], UserWithDataToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct));
var saveProgressionBlock = new ActionBlock<UserWithTweetsToSync>(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct }); var retrieveFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
var sendTweetsToFollowersBlock = new TransformBlock<UserWithDataToSync, UserWithDataToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct });
var sendTweetsToFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
var saveProgressionBlock = new ActionBlock<UserWithDataToSync>(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct });
// Link pipeline // Link pipeline
twitterUserToRefreshBufferBlock.LinkTo(twitterUserToRefreshBlock, new DataflowLinkOptions { PropagateCompletion = true });
twitterUserToRefreshBlock.LinkTo(twitterUsersBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
twitterUsersBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true }); twitterUsersBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveTweetsBlock.LinkTo(retrieveTweetsBufferBlock, new DataflowLinkOptions { PropagateCompletion = true }); retrieveTweetsBlock.LinkTo(retrieveTweetsBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveTweetsBufferBlock.LinkTo(retrieveFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true }); retrieveTweetsBufferBlock.LinkTo(retrieveFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true });
@ -58,7 +66,7 @@ namespace BirdsiteLive.Pipeline
sendTweetsToFollowersBufferBlock.LinkTo(saveProgressionBlock, new DataflowLinkOptions { PropagateCompletion = true }); sendTweetsToFollowersBufferBlock.LinkTo(saveProgressionBlock, new DataflowLinkOptions { PropagateCompletion = true });
// Launch twitter user retriever // Launch twitter user retriever
var retrieveTwitterAccountsTask = _retrieveTwitterAccountsProcessor.GetTwitterUsersAsync(twitterUsersBufferBlock, ct); var retrieveTwitterAccountsTask = _retrieveTwitterAccountsProcessor.GetTwitterUsersAsync(twitterUserToRefreshBufferBlock, ct);
// Wait // Wait
await Task.WhenAny(new[] { retrieveTwitterAccountsTask, saveProgressionBlock.Completion }); await Task.WhenAny(new[] { retrieveTwitterAccountsTask, saveProgressionBlock.Completion });

View file

@ -49,6 +49,9 @@ namespace BirdsiteLive.Twitter
catch (Exception e) catch (Exception e)
{ {
_logger.LogError(e, "Error retrieving user {Username}", username); _logger.LogError(e, "Error retrieving user {Username}", username);
// TODO keep track of error, see where to remove user if too much errors
return null; return null;
} }

View file

@ -23,7 +23,7 @@ namespace BirdsiteLive.DAL.Postgres.DataAccessLayers
public class DbInitializerPostgresDal : PostgresBase, IDbInitializerDal public class DbInitializerPostgresDal : PostgresBase, IDbInitializerDal
{ {
private readonly PostgresTools _tools; private readonly PostgresTools _tools;
private readonly Version _currentVersion = new Version(2, 1); private readonly Version _currentVersion = new Version(2, 2);
private const string DbVersionType = "db-version"; private const string DbVersionType = "db-version";
#region Ctor #region Ctor
@ -132,7 +132,8 @@ namespace BirdsiteLive.DAL.Postgres.DataAccessLayers
return new[] return new[]
{ {
new Tuple<Version, Version>(new Version(1,0), new Version(2,0)), new Tuple<Version, Version>(new Version(1,0), new Version(2,0)),
new Tuple<Version, Version>(new Version(2,0), new Version(2,1)) new Tuple<Version, Version>(new Version(2,0), new Version(2,1)),
new Tuple<Version, Version>(new Version(2,1), new Version(2,2))
}; };
} }
@ -151,6 +152,11 @@ namespace BirdsiteLive.DAL.Postgres.DataAccessLayers
var addActorId = $@"ALTER TABLE {_settings.FollowersTableName} ADD actorId VARCHAR(2048)"; var addActorId = $@"ALTER TABLE {_settings.FollowersTableName} ADD actorId VARCHAR(2048)";
await _tools.ExecuteRequestAsync(addActorId); await _tools.ExecuteRequestAsync(addActorId);
} }
else if (from == new Version(2, 1) && to == new Version(2, 2))
{
var addLastSync = $@"ALTER TABLE {_settings.TwitterUserTableName} ADD fetchingErrorCount SMALLINT";
await _tools.ExecuteRequestAsync(addLastSync);
}
else else
{ {
throw new NotImplementedException(); throw new NotImplementedException();

View file

@ -99,23 +99,28 @@ namespace BirdsiteLive.DAL.Postgres.DataAccessLayers
} }
} }
public async Task UpdateTwitterUserAsync(int id, long lastTweetPostedId, long lastTweetSynchronizedForAllFollowersId, DateTime lastSync) public async Task UpdateTwitterUserAsync(int id, long lastTweetPostedId, long lastTweetSynchronizedForAllFollowersId, int fetchingErrorCount, DateTime lastSync)
{ {
if(id == default) throw new ArgumentException("id"); if(id == default) throw new ArgumentException("id");
if(lastTweetPostedId == default) throw new ArgumentException("lastTweetPostedId"); if(lastTweetPostedId == default) throw new ArgumentException("lastTweetPostedId");
if(lastTweetSynchronizedForAllFollowersId == default) throw new ArgumentException("lastTweetSynchronizedForAllFollowersId"); if(lastTweetSynchronizedForAllFollowersId == default) throw new ArgumentException("lastTweetSynchronizedForAllFollowersId");
if(lastSync == default) throw new ArgumentException("lastSync"); if(lastSync == default) throw new ArgumentException("lastSync");
var query = $"UPDATE {_settings.TwitterUserTableName} SET lastTweetPostedId = @lastTweetPostedId, lastTweetSynchronizedForAllFollowersId = @lastTweetSynchronizedForAllFollowersId, lastSync = @lastSync WHERE id = @id"; var query = $"UPDATE {_settings.TwitterUserTableName} SET lastTweetPostedId = @lastTweetPostedId, lastTweetSynchronizedForAllFollowersId = @lastTweetSynchronizedForAllFollowersId, fetchingErrorCount = @fetchingErrorCount, lastSync = @lastSync WHERE id = @id";
using (var dbConnection = Connection) using (var dbConnection = Connection)
{ {
dbConnection.Open(); dbConnection.Open();
await dbConnection.QueryAsync(query, new { id, lastTweetPostedId, lastTweetSynchronizedForAllFollowersId, lastSync = lastSync.ToUniversalTime() }); await dbConnection.QueryAsync(query, new { id, lastTweetPostedId, lastTweetSynchronizedForAllFollowersId, fetchingErrorCount, lastSync = lastSync.ToUniversalTime() });
} }
} }
public async Task UpdateTwitterUserAsync(SyncTwitterUser user)
{
await UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, user.FetchingErrorCount, user.LastSync);
}
public async Task DeleteTwitterUserAsync(string acct) public async Task DeleteTwitterUserAsync(string acct)
{ {
if (string.IsNullOrWhiteSpace(acct)) throw new ArgumentException("acct"); if (string.IsNullOrWhiteSpace(acct)) throw new ArgumentException("acct");

View file

@ -11,7 +11,8 @@ namespace BirdsiteLive.DAL.Contracts
Task<SyncTwitterUser> GetTwitterUserAsync(int id); Task<SyncTwitterUser> GetTwitterUserAsync(int id);
Task<SyncTwitterUser[]> GetAllTwitterUsersAsync(int maxNumber); Task<SyncTwitterUser[]> GetAllTwitterUsersAsync(int maxNumber);
Task<SyncTwitterUser[]> GetAllTwitterUsersAsync(); Task<SyncTwitterUser[]> GetAllTwitterUsersAsync();
Task UpdateTwitterUserAsync(int id, long lastTweetPostedId, long lastTweetSynchronizedForAllFollowersId, DateTime lastSync); Task UpdateTwitterUserAsync(int id, long lastTweetPostedId, long lastTweetSynchronizedForAllFollowersId, int fetchingErrorCount, DateTime lastSync);
Task UpdateTwitterUserAsync(SyncTwitterUser user);
Task DeleteTwitterUserAsync(string acct); Task DeleteTwitterUserAsync(string acct);
Task DeleteTwitterUserAsync(int id); Task DeleteTwitterUserAsync(int id);
Task<int> GetTwitterUsersCountAsync(); Task<int> GetTwitterUsersCountAsync();

View file

@ -11,5 +11,7 @@ namespace BirdsiteLive.DAL.Models
public long LastTweetSynchronizedForAllFollowersId { get; set; } public long LastTweetSynchronizedForAllFollowersId { get; set; }
public DateTime LastSync { get; set; } public DateTime LastSync { get; set; }
public int FetchingErrorCount { get; set; } //TODO: update DAL
} }
} }

View file

@ -1,4 +1,5 @@
using System; using System;
using System.Diagnostics;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Xml; using System.Xml;
using BirdsiteLive.DAL.Postgres.DataAccessLayers; using BirdsiteLive.DAL.Postgres.DataAccessLayers;
@ -47,6 +48,7 @@ namespace BirdsiteLive.DAL.Postgres.Tests.DataAccessLayers
Assert.AreEqual(acct, result.Acct); Assert.AreEqual(acct, result.Acct);
Assert.AreEqual(lastTweetId, result.LastTweetPostedId); Assert.AreEqual(lastTweetId, result.LastTweetPostedId);
Assert.AreEqual(lastTweetId, result.LastTweetSynchronizedForAllFollowersId); Assert.AreEqual(lastTweetId, result.LastTweetSynchronizedForAllFollowersId);
Assert.AreEqual(0, result.FetchingErrorCount);
Assert.IsTrue(result.Id > 0); Assert.IsTrue(result.Id > 0);
} }
@ -83,13 +85,47 @@ namespace BirdsiteLive.DAL.Postgres.Tests.DataAccessLayers
var updatedLastTweetId = 1600L; var updatedLastTweetId = 1600L;
var updatedLastSyncId = 1550L; var updatedLastSyncId = 1550L;
var now = DateTime.Now; var now = DateTime.Now;
await dal.UpdateTwitterUserAsync(result.Id, updatedLastTweetId, updatedLastSyncId, now); var errors = 15;
await dal.UpdateTwitterUserAsync(result.Id, updatedLastTweetId, updatedLastSyncId, errors, now);
result = await dal.GetTwitterUserAsync(acct); result = await dal.GetTwitterUserAsync(acct);
Assert.AreEqual(acct, result.Acct); Assert.AreEqual(acct, result.Acct);
Assert.AreEqual(updatedLastTweetId, result.LastTweetPostedId); Assert.AreEqual(updatedLastTweetId, result.LastTweetPostedId);
Assert.AreEqual(updatedLastSyncId, result.LastTweetSynchronizedForAllFollowersId); Assert.AreEqual(updatedLastSyncId, result.LastTweetSynchronizedForAllFollowersId);
Assert.AreEqual(errors, result.FetchingErrorCount);
Assert.IsTrue(Math.Abs((now.ToUniversalTime() - result.LastSync).Milliseconds) < 100);
}
[TestMethod]
public async Task CreateUpdate2AndGetUser()
{
var acct = "myid";
var lastTweetId = 1548L;
var dal = new TwitterUserPostgresDal(_settings);
await dal.CreateTwitterUserAsync(acct, lastTweetId);
var result = await dal.GetTwitterUserAsync(acct);
var updatedLastTweetId = 1600L;
var updatedLastSyncId = 1550L;
var now = DateTime.Now;
var errors = 15;
result.LastTweetPostedId = updatedLastTweetId;
result.LastTweetSynchronizedForAllFollowersId = updatedLastSyncId;
result.FetchingErrorCount = errors;
result.LastSync = now;
await dal.UpdateTwitterUserAsync(result);
result = await dal.GetTwitterUserAsync(acct);
Assert.AreEqual(acct, result.Acct);
Assert.AreEqual(updatedLastTweetId, result.LastTweetPostedId);
Assert.AreEqual(updatedLastSyncId, result.LastTweetSynchronizedForAllFollowersId);
Assert.AreEqual(errors, result.FetchingErrorCount);
Assert.IsTrue(Math.Abs((now.ToUniversalTime() - result.LastSync).Milliseconds) < 100); Assert.IsTrue(Math.Abs((now.ToUniversalTime() - result.LastSync).Milliseconds) < 100);
} }
@ -98,7 +134,7 @@ namespace BirdsiteLive.DAL.Postgres.Tests.DataAccessLayers
public async Task Update_NoId() public async Task Update_NoId()
{ {
var dal = new TwitterUserPostgresDal(_settings); var dal = new TwitterUserPostgresDal(_settings);
await dal.UpdateTwitterUserAsync(default, default, default, DateTime.UtcNow); await dal.UpdateTwitterUserAsync(default, default, default, default, DateTime.UtcNow);
} }
[TestMethod] [TestMethod]
@ -106,7 +142,7 @@ namespace BirdsiteLive.DAL.Postgres.Tests.DataAccessLayers
public async Task Update_NoLastTweetPostedId() public async Task Update_NoLastTweetPostedId()
{ {
var dal = new TwitterUserPostgresDal(_settings); var dal = new TwitterUserPostgresDal(_settings);
await dal.UpdateTwitterUserAsync(12, default, default, DateTime.UtcNow); await dal.UpdateTwitterUserAsync(12, default, default, default, DateTime.UtcNow);
} }
[TestMethod] [TestMethod]
@ -114,7 +150,7 @@ namespace BirdsiteLive.DAL.Postgres.Tests.DataAccessLayers
public async Task Update_NoLastTweetSynchronizedForAllFollowersId() public async Task Update_NoLastTweetSynchronizedForAllFollowersId()
{ {
var dal = new TwitterUserPostgresDal(_settings); var dal = new TwitterUserPostgresDal(_settings);
await dal.UpdateTwitterUserAsync(12, 9556, default, DateTime.UtcNow); await dal.UpdateTwitterUserAsync(12, 9556, default, default, DateTime.UtcNow);
} }
[TestMethod] [TestMethod]
@ -122,7 +158,7 @@ namespace BirdsiteLive.DAL.Postgres.Tests.DataAccessLayers
public async Task Update_NoLastSync() public async Task Update_NoLastSync()
{ {
var dal = new TwitterUserPostgresDal(_settings); var dal = new TwitterUserPostgresDal(_settings);
await dal.UpdateTwitterUserAsync(12, 9556, 65, default); await dal.UpdateTwitterUserAsync(12, 9556, 65, default, default);
} }
[TestMethod] [TestMethod]
@ -216,7 +252,7 @@ namespace BirdsiteLive.DAL.Postgres.Tests.DataAccessLayers
{ {
var user = allUsers[i]; var user = allUsers[i];
var date = i % 2 == 0 ? oldest : newest; var date = i % 2 == 0 ? oldest : newest;
await dal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, date); await dal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, 0, date);
} }
var result = await dal.GetAllTwitterUsersAsync(10); var result = await dal.GetAllTwitterUsersAsync(10);

View file

@ -21,16 +21,16 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var userId1 = 1; var userId1 = 1;
var userId2 = 2; var userId2 = 2;
var users = new List<UserWithTweetsToSync> var users = new List<UserWithDataToSync>
{ {
new UserWithTweetsToSync new UserWithDataToSync
{ {
User = new SyncTwitterUser User = new SyncTwitterUser
{ {
Id = userId1 Id = userId1
} }
}, },
new UserWithTweetsToSync new UserWithDataToSync
{ {
User = new SyncTwitterUser User = new SyncTwitterUser
{ {

View file

@ -41,7 +41,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
} }
}; };
var usersWithTweets = new UserWithTweetsToSync var usersWithTweets = new UserWithDataToSync
{ {
Tweets = new [] Tweets = new []
{ {
@ -65,6 +65,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
It.Is<int>(y => y == user.Id), It.Is<int>(y => y == user.Id),
It.Is<long>(y => y == tweet2.Id), It.Is<long>(y => y == tweet2.Id),
It.Is<long>(y => y == tweet2.Id), It.Is<long>(y => y == tweet2.Id),
It.Is<int>(y => y == 0),
It.IsAny<DateTime>() It.IsAny<DateTime>()
)) ))
.Returns(Task.CompletedTask); .Returns(Task.CompletedTask);
@ -107,7 +108,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
} }
}; };
var usersWithTweets = new UserWithTweetsToSync var usersWithTweets = new UserWithDataToSync
{ {
Tweets = new[] Tweets = new[]
{ {
@ -130,6 +131,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
It.Is<int>(y => y == user.Id), It.Is<int>(y => y == user.Id),
It.Is<long>(y => y == tweet3.Id), It.Is<long>(y => y == tweet3.Id),
It.Is<long>(y => y == tweet2.Id), It.Is<long>(y => y == tweet2.Id),
It.Is<int>(y => y == 0),
It.IsAny<DateTime>() It.IsAny<DateTime>()
)) ))
.Returns(Task.CompletedTask); .Returns(Task.CompletedTask);
@ -181,7 +183,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
} }
}; };
var usersWithTweets = new UserWithTweetsToSync var usersWithTweets = new UserWithDataToSync
{ {
Tweets = new[] Tweets = new[]
{ {
@ -205,6 +207,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
It.Is<int>(y => y == user.Id), It.Is<int>(y => y == user.Id),
It.Is<long>(y => y == tweet3.Id), It.Is<long>(y => y == tweet3.Id),
It.Is<long>(y => y == tweet2.Id), It.Is<long>(y => y == tweet2.Id),
It.Is<int>(y => y == 0),
It.IsAny<DateTime>() It.IsAny<DateTime>()
)) ))
.Returns(Task.CompletedTask); .Returns(Task.CompletedTask);

View file

@ -26,7 +26,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var userId2 = 3; var userId2 = 3;
var userAcct = "user"; var userAcct = "user";
var userWithTweets = new UserWithTweetsToSync() var userWithTweets = new UserWithDataToSync()
{ {
Tweets = new [] Tweets = new []
{ {
@ -93,7 +93,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var userId2 = 3; var userId2 = 3;
var userAcct = "user"; var userAcct = "user";
var userWithTweets = new UserWithTweetsToSync() var userWithTweets = new UserWithDataToSync()
{ {
Tweets = new[] Tweets = new[]
{ {
@ -163,7 +163,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var userId2 = 3; var userId2 = 3;
var userAcct = "user"; var userAcct = "user";
var userWithTweets = new UserWithTweetsToSync() var userWithTweets = new UserWithDataToSync()
{ {
Tweets = new[] Tweets = new[]
{ {
@ -237,7 +237,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var userId2 = 3; var userId2 = 3;
var userAcct = "user"; var userAcct = "user";
var userWithTweets = new UserWithTweetsToSync() var userWithTweets = new UserWithDataToSync()
{ {
Tweets = new[] Tweets = new[]
{ {
@ -306,7 +306,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var userId2 = 3; var userId2 = 3;
var userAcct = "user"; var userAcct = "user";
var userWithTweets = new UserWithTweetsToSync() var userWithTweets = new UserWithDataToSync()
{ {
Tweets = new[] Tweets = new[]
{ {
@ -375,7 +375,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors
var userId2 = 3; var userId2 = 3;
var userAcct = "user"; var userAcct = "user";
var userWithTweets = new UserWithTweetsToSync() var userWithTweets = new UserWithDataToSync()
{ {
Tweets = new[] Tweets = new[]
{ {