diff --git a/src/BirdsiteLive.Common/Settings/InstanceSettings.cs b/src/BirdsiteLive.Common/Settings/InstanceSettings.cs index 56ae0b6..1505793 100644 --- a/src/BirdsiteLive.Common/Settings/InstanceSettings.cs +++ b/src/BirdsiteLive.Common/Settings/InstanceSettings.cs @@ -18,6 +18,9 @@ public int TweetCacheCapacity { get; set; } = 20_000; // "AAAAAAAAAAAAAAAAAAAAAPYXBAAAAAAACLXUNDekMxqa8h%2F40K4moUkGsoc%3DTYfbDKbT3jJPCEVnMYqilB28NHfOPqkca3qaAxGfsyKCs0wRbw" public string TwitterBearerToken { get; set; } = "AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA"; + public int m { get; set; } = 1; + public int n_start { get; set; } = 0; + public int n_end { get; set; } = 1; public int ParallelTwitterRequests { get; set; } = 10; public int ParallelFediversePosts { get; set; } = 10; } diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs index 31b4ea4..39a6400 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs @@ -16,16 +16,18 @@ namespace BirdsiteLive.Pipeline.Processors { private readonly ITwitterUserDal _twitterUserDal; private readonly IFollowersDal _followersDal; + private readonly InstanceSettings _instanceSettings; private readonly ILogger _logger; private static Random rng = new Random(); public int WaitFactor = 1000 * 60; //1 min #region Ctor - public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, IFollowersDal followersDal, ILogger logger) + public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, IFollowersDal followersDal, InstanceSettings instanceSettings, ILogger logger) { _twitterUserDal = twitterUserDal; _followersDal = followersDal; + _instanceSettings = instanceSettings; _logger = logger; } #endregion @@ -38,7 +40,7 @@ namespace BirdsiteLive.Pipeline.Processors try { - var users = await _twitterUserDal.GetAllTwitterUsersWithFollowersAsync(2000); + var users = await _twitterUserDal.GetAllTwitterUsersWithFollowersAsync(2000, _instanceSettings.n_start, _instanceSettings.n_end, _instanceSettings.m); var userCount = users.Any() ? Math.Min(users.Length, 200) : 1; var splitUsers = users.OrderBy(a => rng.Next()).ToArray().Split(userCount).ToList(); diff --git a/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/TwitterUserPostgresDal.cs b/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/TwitterUserPostgresDal.cs index 8b06e85..b8f2047 100644 --- a/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/TwitterUserPostgresDal.cs +++ b/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/TwitterUserPostgresDal.cs @@ -118,14 +118,20 @@ namespace BirdsiteLive.DAL.Postgres.DataAccessLayers } } - public async Task GetAllTwitterUsersWithFollowersAsync(int maxNumber) + public async Task GetAllTwitterUsersWithFollowersAsync(int maxNumber, int nStart, int nEnd, int m) { - var query = "SELECT * FROM (SELECT unnest(followings) as follow FROM followers GROUP BY follow) AS f INNER JOIN twitter_users ON f.follow=twitter_users.id ORDER BY lastSync ASC NULLS FIRST LIMIT $1"; + var query = "SELECT * FROM (SELECT unnest(followings) as follow FROM followers GROUP BY follow) AS f INNER JOIN twitter_users ON f.follow=twitter_users.id WHERE mod(id, $2) >= $3 AND mod(id, $2) <= $4 ORDER BY lastSync ASC NULLS FIRST LIMIT $1"; await using var connection = DataSource.CreateConnection(); await connection.OpenAsync(); await using var command = new NpgsqlCommand(query, connection) { - Parameters = { new() { Value = maxNumber}} + Parameters = + { + new() { Value = maxNumber}, + new() { Value = m}, + new() { Value = nStart}, + new() { Value = nEnd} + } }; var reader = await command.ExecuteReaderAsync(); var results = new List(); diff --git a/src/DataAccessLayers/BirdsiteLive.DAL/Contracts/ITwitterUserDal.cs b/src/DataAccessLayers/BirdsiteLive.DAL/Contracts/ITwitterUserDal.cs index 7126b98..9e82b74 100644 --- a/src/DataAccessLayers/BirdsiteLive.DAL/Contracts/ITwitterUserDal.cs +++ b/src/DataAccessLayers/BirdsiteLive.DAL/Contracts/ITwitterUserDal.cs @@ -9,7 +9,7 @@ namespace BirdsiteLive.DAL.Contracts Task CreateTwitterUserAsync(string acct, long lastTweetPostedId); Task GetTwitterUserAsync(string acct); Task GetTwitterUserAsync(int id); - Task GetAllTwitterUsersWithFollowersAsync(int maxNumber); + Task GetAllTwitterUsersWithFollowersAsync(int maxNumber, int nStart, int nEnd, int m); Task GetAllTwitterUsersAsync(int maxNumber); Task GetAllTwitterUsersAsync(); Task UpdateTwitterUserAsync(int id, long lastTweetPostedId, long lastTweetSynchronizedForAllFollowersId, int fetchingErrorCount, DateTime lastSync); diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs index a9cddd3..bcd954d 100644 --- a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs +++ b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs @@ -29,12 +29,19 @@ namespace BirdsiteLive.Pipeline.Tests.Processors new SyncTwitterUser(), }; var maxUsers = 1000; + var instanceSettings = new InstanceSettings() + { + n_start = 1, + }; #endregion #region Mocks var twitterUserDalMock = new Mock(MockBehavior.Strict); twitterUserDalMock .Setup(x => x.GetAllTwitterUsersWithFollowersAsync( + It.Is(y => true), + It.Is(y => true), + It.Is(y => true), It.Is(y => true))) .ReturnsAsync(users); @@ -47,7 +54,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, instanceSettings, loggerMock.Object); processor.WaitFactor = 10; var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None); @@ -72,12 +79,19 @@ namespace BirdsiteLive.Pipeline.Tests.Processors users.Add(new SyncTwitterUser()); var maxUsers = 1000; + var instanceSettings = new InstanceSettings() + { + n_start = 1, + }; #endregion #region Mocks var twitterUserDalMock = new Mock(MockBehavior.Strict); twitterUserDalMock .SetupSequence(x => x.GetAllTwitterUsersWithFollowersAsync( + It.Is(y => true), + It.Is(y => true), + It.Is(y => true), It.Is(y => true))) .ReturnsAsync(users.ToArray()) .ReturnsAsync(new SyncTwitterUser[0]) @@ -93,7 +107,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var loggerMock = new Mock>(); #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, instanceSettings, loggerMock.Object); processor.WaitFactor = 2; var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None); @@ -118,12 +132,19 @@ namespace BirdsiteLive.Pipeline.Tests.Processors users.Add(new SyncTwitterUser()); var maxUsers = 1000; + var instanceSettings = new InstanceSettings() + { + n_start = 1, + }; #endregion #region Mocks var twitterUserDalMock = new Mock(MockBehavior.Strict); twitterUserDalMock .SetupSequence(x => x.GetAllTwitterUsersWithFollowersAsync( + It.Is(y => true), + It.Is(y => true), + It.Is(y => true), It.Is(y => true))) .ReturnsAsync(users.ToArray()) .ReturnsAsync(new SyncTwitterUser[0]) @@ -139,7 +160,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var loggerMock = new Mock>(); #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, instanceSettings, loggerMock.Object); processor.WaitFactor = 2; var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None); @@ -160,6 +181,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var buffer = new BufferBlock(); var maxUsers = 1000; + var instanceSettings = new InstanceSettings() + { + n_start = 1, + }; #endregion #region Mocks @@ -167,6 +192,9 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var twitterUserDalMock = new Mock(MockBehavior.Strict); twitterUserDalMock .Setup(x => x.GetAllTwitterUsersWithFollowersAsync( + It.Is(y => true), + It.Is(y => true), + It.Is(y => true), It.Is(y => true))) .ReturnsAsync(new SyncTwitterUser[0]); @@ -178,7 +206,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var loggerMock = new Mock>(); #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, instanceSettings, loggerMock.Object); processor.WaitFactor = 1; var t =processor.GetTwitterUsersAsync(buffer, CancellationToken.None); @@ -197,12 +225,19 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var buffer = new BufferBlock(); var maxUsers = 1000; + var instanceSettings = new InstanceSettings() + { + n_start = 1, + }; #endregion #region Mocks var twitterUserDalMock = new Mock(MockBehavior.Strict); twitterUserDalMock .Setup(x => x.GetAllTwitterUsersWithFollowersAsync( + It.Is(y => true), + It.Is(y => true), + It.Is(y => true), It.Is(y => true))) .Returns(async () => await DelayFaultedTask(new Exception())); @@ -214,7 +249,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var loggerMock = new Mock>(); #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, instanceSettings, loggerMock.Object); processor.WaitFactor = 10; var t = processor.GetTwitterUsersAsync(buffer, CancellationToken.None); @@ -236,6 +271,10 @@ namespace BirdsiteLive.Pipeline.Tests.Processors canTokenS.Cancel(); var maxUsers = 1000; + var instanceSettings = new InstanceSettings() + { + n_start = 1, + }; #endregion #region Mocks @@ -249,7 +288,7 @@ namespace BirdsiteLive.Pipeline.Tests.Processors var loggerMock = new Mock>(); #endregion - var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, loggerMock.Object); + var processor = new RetrieveTwitterUsersProcessor(twitterUserDalMock.Object, followersDalMock.Object, instanceSettings, loggerMock.Object); processor.WaitFactor = 1; await processor.GetTwitterUsersAsync(buffer, canTokenS.Token); }