diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs index ef20cad..ffcf9a9 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -45,7 +46,8 @@ namespace BirdsiteLive.Pipeline.Processors else if (tweets.Length > 0 && user.LastTweetPostedId == -1) { var tweetId = tweets.Last().Id; - await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId); + var now = DateTime.UtcNow; + await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, now); } } diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs index f556831..847e23c 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs @@ -4,6 +4,7 @@ using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using BirdsiteLive.Common.Extensions; +using BirdsiteLive.Common.Settings; using BirdsiteLive.DAL.Contracts; using BirdsiteLive.DAL.Models; using BirdsiteLive.Pipeline.Contracts; @@ -15,12 +16,16 @@ namespace BirdsiteLive.Pipeline.Processors { private readonly ITwitterUserDal _twitterUserDal; private readonly ILogger _logger; + private readonly InstanceSettings _instanceSettings; + public int WaitFactor = 1000 * 60; //1 min + private int StartUpWarming = 4; #region Ctor - public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, ILogger logger) + public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, InstanceSettings instanceSettings, ILogger logger) { _twitterUserDal = twitterUserDal; + _instanceSettings = instanceSettings; _logger = logger; } #endregion @@ -33,7 +38,11 @@ namespace BirdsiteLive.Pipeline.Processors try { - var users = await _twitterUserDal.GetAllTwitterUsersAsync(); + var maxUsers = StartUpWarming > 0 + ? _instanceSettings.MaxUsersCapacity / 4 + : _instanceSettings.MaxUsersCapacity; + StartUpWarming--; + var users = await _twitterUserDal.GetAllTwitterUsersAsync(maxUsers); var userCount = users.Any() ? users.Length : 1; var splitNumber = (int) Math.Ceiling(userCount / 15d); diff --git a/src/BirdsiteLive.Pipeline/Processors/SaveProgressionProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/SaveProgressionProcessor.cs index 5b305e7..c7cbc36 100644 --- a/src/BirdsiteLive.Pipeline/Processors/SaveProgressionProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/SaveProgressionProcessor.cs @@ -1,4 +1,5 @@ -using System.Linq; +using System; +using System.Linq; using System.Threading; using System.Threading.Tasks; using BirdsiteLive.DAL.Contracts; @@ -23,7 +24,8 @@ namespace BirdsiteLive.Pipeline.Processors var userId = userWithTweetsToSync.User.Id; var lastPostedTweet = userWithTweetsToSync.Tweets.Select(x => x.Id).Max(); var minimumSync = userWithTweetsToSync.Followers.Select(x => x.FollowingsSyncStatus[userId]).Min(); - await _twitterUserDal.UpdateTwitterUserAsync(userId, lastPostedTweet, minimumSync); + var now = DateTime.UtcNow; + await _twitterUserDal.UpdateTwitterUserAsync(userId, lastPostedTweet, minimumSync, now); } } } \ No newline at end of file diff --git a/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/TwitterUserPostgresDal.cs b/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/TwitterUserPostgresDal.cs index 0cb811f..afbf7d1 100644 --- a/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/TwitterUserPostgresDal.cs +++ b/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/TwitterUserPostgresDal.cs @@ -62,15 +62,15 @@ namespace BirdsiteLive.DAL.Postgres.DataAccessLayers } } - public async Task GetAllTwitterUsersAsync() + public async Task GetAllTwitterUsersAsync(int maxNumber) { - var query = $"SELECT * FROM {_settings.TwitterUserTableName}"; + var query = $"SELECT * FROM {_settings.TwitterUserTableName} ORDER BY lastSync ASC LIMIT @maxNumber"; using (var dbConnection = Connection) { dbConnection.Open(); - var result = await dbConnection.QueryAsync(query); + var result = await dbConnection.QueryAsync(query, new { maxNumber }); return result.ToArray(); } } diff --git a/src/DataAccessLayers/BirdsiteLive.DAL/Contracts/ITwitterUserDal.cs b/src/DataAccessLayers/BirdsiteLive.DAL/Contracts/ITwitterUserDal.cs index 65d9697..1fa8127 100644 --- a/src/DataAccessLayers/BirdsiteLive.DAL/Contracts/ITwitterUserDal.cs +++ b/src/DataAccessLayers/BirdsiteLive.DAL/Contracts/ITwitterUserDal.cs @@ -8,7 +8,7 @@ namespace BirdsiteLive.DAL.Contracts { Task CreateTwitterUserAsync(string acct, long lastTweetPostedId); Task GetTwitterUserAsync(string acct); - Task GetAllTwitterUsersAsync(); + Task GetAllTwitterUsersAsync(int maxNumber); Task UpdateTwitterUserAsync(int id, long lastTweetPostedId, long lastTweetSynchronizedForAllFollowersId, DateTime lastSync); Task DeleteTwitterUserAsync(string acct); Task GetTwitterUsersCountAsync(); diff --git a/src/Tests/BirdsiteLive.DAL.Postgres.Tests/DataAccessLayers/TwitterUserPostgresDalTests.cs b/src/Tests/BirdsiteLive.DAL.Postgres.Tests/DataAccessLayers/TwitterUserPostgresDalTests.cs index 5fa600c..6a1e96b 100644 --- a/src/Tests/BirdsiteLive.DAL.Postgres.Tests/DataAccessLayers/TwitterUserPostgresDalTests.cs +++ b/src/Tests/BirdsiteLive.DAL.Postgres.Tests/DataAccessLayers/TwitterUserPostgresDalTests.cs @@ -111,7 +111,7 @@ namespace BirdsiteLive.DAL.Postgres.Tests.DataAccessLayers await dal.CreateTwitterUserAsync(acct, lastTweetId); } - var result = await dal.GetAllTwitterUsersAsync(); + var result = await dal.GetAllTwitterUsersAsync(1000); Assert.AreEqual(1000, result.Length); Assert.IsFalse(result[0].Id == default); Assert.IsFalse(result[0].Acct == default); diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTweetsProcessorTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTweetsProcessorTests.cs index 2bf5d74..d66c2f7 100644 --- a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTweetsProcessorTests.cs +++ b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTweetsProcessorTests.cs @@ -1,3 +1,4 @@ +using System; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -54,7 +55,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors .Setup(x => x.UpdateTwitterUserAsync( It.Is(y => y == user1.Id), It.Is(y => y == tweets.Last().Id), - It.Is(y => y == tweets.Last().Id) + It.Is(y => y == tweets.Last().Id), + It.IsAny() )) .Returns(Task.CompletedTask); #endregion diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs index 12c5682..6323889 100644 --- a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs +++ b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; +using BirdsiteLive.Common.Settings; using BirdsiteLive.DAL.Contracts; using BirdsiteLive.DAL.Models; using BirdsiteLive.Pipeline.Processors; @@ -26,18 +27,23 @@ namespace BirdsiteLive.Pipeline.Tests.Processors new SyncTwitterUser(), new SyncTwitterUser(), }; + var settings = new InstanceSettings + { + MaxUsersCapacity = 10 + }; #endregion #region Mocks var twitterUserDalMock = new Mock(MockBehavior.Strict); twitterUserDalMock - .Setup(x => x.GetAllTwitterUsersAsync()) + .Setup(x => x.GetAllTwitterUsersAsync( + It.Is(y => y == settings.MaxUsersCapacity))) .ReturnsAsync(users); var loggerMock = new Mock>(); #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, settings, loggerMock.Object); processor.WaitFactor = 10; processor.GetTwitterUsersAsync(buffer, CancellationToken.None); @@ -60,19 +66,25 @@ namespace BirdsiteLive.Pipeline.Tests.Processors for (var i = 0; i < 30; i++) users.Add(new SyncTwitterUser()); + + var settings = new InstanceSettings + { + MaxUsersCapacity = 100 + }; #endregion #region Mocks var twitterUserDalMock = new Mock(MockBehavior.Strict); twitterUserDalMock - .SetupSequence(x => x.GetAllTwitterUsersAsync()) + .SetupSequence(x => x.GetAllTwitterUsersAsync( + It.Is(y => y == settings.MaxUsersCapacity))) .ReturnsAsync(users.ToArray()) .ReturnsAsync(new SyncTwitterUser[0]); var loggerMock = new Mock>(); #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, settings, loggerMock.Object); processor.WaitFactor = 2; processor.GetTwitterUsersAsync(buffer, CancellationToken.None); @@ -95,19 +107,25 @@ namespace BirdsiteLive.Pipeline.Tests.Processors for (var i = 0; i < 31; i++) users.Add(new SyncTwitterUser()); + + var settings = new InstanceSettings + { + MaxUsersCapacity = 10 + }; #endregion #region Mocks var twitterUserDalMock = new Mock(MockBehavior.Strict); twitterUserDalMock - .SetupSequence(x => x.GetAllTwitterUsersAsync()) + .SetupSequence(x => x.GetAllTwitterUsersAsync( + It.Is(y => y == settings.MaxUsersCapacity))) .ReturnsAsync(users.ToArray()) .ReturnsAsync(new SyncTwitterUser[0]); var loggerMock = new Mock>(); #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, settings, loggerMock.Object); processor.WaitFactor = 2; processor.GetTwitterUsersAsync(buffer, CancellationToken.None); @@ -126,18 +144,24 @@ namespace BirdsiteLive.Pipeline.Tests.Processors { #region Stubs var buffer = new BufferBlock(); + + var settings = new InstanceSettings + { + MaxUsersCapacity = 10 + }; #endregion #region Mocks var twitterUserDalMock = new Mock(MockBehavior.Strict); twitterUserDalMock - .Setup(x => x.GetAllTwitterUsersAsync()) + .Setup(x => x.GetAllTwitterUsersAsync( + It.Is(y => y == settings.MaxUsersCapacity))) .ReturnsAsync(new SyncTwitterUser[0]); var loggerMock = new Mock>(); #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, settings, loggerMock.Object); processor.WaitFactor = 1; processor.GetTwitterUsersAsync(buffer, CancellationToken.None); @@ -154,18 +178,24 @@ namespace BirdsiteLive.Pipeline.Tests.Processors { #region Stubs var buffer = new BufferBlock(); + + var settings = new InstanceSettings + { + MaxUsersCapacity = 10 + }; #endregion #region Mocks var twitterUserDalMock = new Mock(MockBehavior.Strict); twitterUserDalMock - .Setup(x => x.GetAllTwitterUsersAsync()) + .Setup(x => x.GetAllTwitterUsersAsync( + It.Is(y => y == settings.MaxUsersCapacity))) .Returns(async () => await DelayFaultedTask(new Exception())); var loggerMock = new Mock>(); #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, settings, loggerMock.Object); processor.WaitFactor = 10; var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None); @@ -185,6 +215,11 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var buffer = new BufferBlock(); var canTokenS = new CancellationTokenSource(); canTokenS.Cancel(); + + var settings = new InstanceSettings + { + MaxUsersCapacity = 10 + }; #endregion #region Mocks @@ -192,7 +227,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var loggerMock = new Mock>(); #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, settings, loggerMock.Object); processor.WaitFactor = 1; await processor.GetTwitterUsersAsync(buffer, canTokenS.Token); } diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SaveProgressionProcessorTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SaveProgressionProcessorTests.cs index d3880e6..b2a99b9 100644 --- a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SaveProgressionProcessorTests.cs +++ b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/SaveProgressionProcessorTests.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using BirdsiteLive.DAL.Contracts; @@ -60,7 +61,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors .Setup(x => x.UpdateTwitterUserAsync( It.Is(y => y == user.Id), It.Is(y => y == tweet2.Id), - It.Is(y => y == tweet2.Id) + It.Is(y => y == tweet2.Id), + It.IsAny() )) .Returns(Task.CompletedTask); #endregion @@ -123,7 +125,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors .Setup(x => x.UpdateTwitterUserAsync( It.Is(y => y == user.Id), It.Is(y => y == tweet3.Id), - It.Is(y => y == tweet2.Id) + It.Is(y => y == tweet2.Id), + It.IsAny() )) .Returns(Task.CompletedTask); #endregion @@ -194,7 +197,8 @@ namespace BirdsiteLive.Pipeline.Tests.Processors .Setup(x => x.UpdateTwitterUserAsync( It.Is(y => y == user.Id), It.Is(y => y == tweet3.Id), - It.Is(y => y == tweet2.Id) + It.Is(y => y == tweet2.Id), + It.IsAny() )) .Returns(Task.CompletedTask); #endregion