diff --git a/src/BirdsiteLive.ActivityPub/BirdsiteLive.ActivityPub.csproj b/src/BirdsiteLive.ActivityPub/BirdsiteLive.ActivityPub.csproj index 0230f61..e046b1c 100644 --- a/src/BirdsiteLive.ActivityPub/BirdsiteLive.ActivityPub.csproj +++ b/src/BirdsiteLive.ActivityPub/BirdsiteLive.ActivityPub.csproj @@ -7,7 +7,6 @@ - diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs index ec2c53b..232e812 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs @@ -35,7 +35,6 @@ namespace BirdsiteLive.Pipeline.Processors { var usersWtTweets = new List(); - //TODO multithread this int index = 0; foreach (var userWtData in syncTwitterUsers) { @@ -60,7 +59,7 @@ namespace BirdsiteLive.Pipeline.Processors await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, user.FetchingErrorCount, now); } - await Task.Delay(150); + //await Task.Delay(150); } return usersWtTweets.ToArray(); diff --git a/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs b/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs index 6683f24..dc5f96d 100644 --- a/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs +++ b/src/BirdsiteLive.Pipeline/Processors/RetrieveTwitterUsersProcessor.cs @@ -41,8 +41,8 @@ namespace BirdsiteLive.Pipeline.Processors //var users = await _twitterUserDal.GetAllTwitterUsersAsync(50); //var splitUsers = users.Split(25).ToList(); - var maxUsersNumber = await _maxUsersNumberProvider.GetMaxUsersNumberAsync(); - var users = await _twitterUserDal.GetAllTwitterUsersWithFollowersAsync(maxUsersNumber); + //var maxUsersNumber = await _maxUsersNumberProvider.GetMaxUsersNumberAsync(); + var users = await _twitterUserDal.GetAllTwitterUsersWithFollowersAsync(1000); var userCount = users.Any() ? Math.Min(users.Length, 25) : 1; //var splitNumber = (int) Math.Ceiling(userCount / 15d); diff --git a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs index a61412d..52778a2 100644 --- a/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs +++ b/src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs @@ -49,9 +49,9 @@ namespace BirdsiteLive.Pipeline var retrieveTweetsBlock = new TransformBlock(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct)); var retrieveTweetsBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 2, CancellationToken = ct }); var retrieveFollowersBlock = new TransformManyBlock(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct)); - var retrieveFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 10, CancellationToken = ct }); + var retrieveFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = ct }); var sendTweetsToFollowersBlock = new TransformBlock(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct }); - var sendTweetsToFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 10, CancellationToken = ct }); + var sendTweetsToFollowersBufferBlock = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = ct }); var saveProgressionBlock = new ActionBlock(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct }); // Link pipeline diff --git a/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/TwitterUserPostgresDal.cs b/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/TwitterUserPostgresDal.cs index b859b6d..b4f02e0 100644 --- a/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/TwitterUserPostgresDal.cs +++ b/src/DataAccessLayers/BirdsiteLive.DAL.Postgres/DataAccessLayers/TwitterUserPostgresDal.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using BirdsiteLive.DAL.Contracts; @@ -110,11 +111,29 @@ namespace BirdsiteLive.DAL.Postgres.DataAccessLayers { var query = "SELECT * FROM (SELECT unnest(followings) as follow FROM followers GROUP BY follow) AS f INNER JOIN twitter_users ON f.follow=twitter_users.id ORDER BY lastSync ASC NULLS FIRST LIMIT @maxNumber"; - using (var dbConnection = Connection) + await using var connection = DataSource.CreateConnection(); + await connection.OpenAsync(); + await using var command = new NpgsqlCommand(query, connection) { + Parameters = { new() { Value = maxNumber}} + }; + var reader = await command.ExecuteReaderAsync(); + var results = new List(); + while (await reader.ReadAsync()) { - var result = await dbConnection.QueryAsync(query, new { maxNumber }); - return result.ToArray(); + results.Add(new SyncTwitterUser + { + Id = reader["id"] as int? ?? default, + Acct = reader["acct"] as string, + TwitterUserId = reader["twitterUserId"] as long? ?? default, + LastTweetPostedId = reader["lastTweetPostedId"] as long? ?? default, + LastTweetSynchronizedForAllFollowersId = reader["lastTweetSynchronizedForAllFollowersId"] as long? ?? default, + LastSync = reader["lastSync"] as DateTime? ?? default, + FetchingErrorCount = reader["fetchingErrorCount"] as int? ?? default, + } + ); + } + return results.ToArray(); } public async Task GetAllTwitterUsersAsync(int maxNumber) diff --git a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs index 9459cb4..833e287 100644 --- a/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs +++ b/src/Tests/BirdsiteLive.Pipeline.Tests/Processors/RetrieveTwitterUsersProcessorTests.cs @@ -53,7 +53,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors await Task.WhenAny(t, Task.Delay(50)); #region Validations - maxUsersNumberProviderMock.VerifyAll(); twitterUserDalMock.VerifyAll(); Assert.IsTrue(0 < buffer.Count); buffer.TryReceive(out var result); @@ -100,7 +99,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors await Task.WhenAny(t, Task.Delay(300)); #region Validations - maxUsersNumberProviderMock.VerifyAll(); twitterUserDalMock.VerifyAll(); Assert.IsTrue(0 < buffer.Count); buffer.TryReceive(out var result); @@ -147,7 +145,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors await Task.WhenAny(t, Task.Delay(5000)); #region Validations - maxUsersNumberProviderMock.VerifyAll(); twitterUserDalMock.VerifyAll(); Assert.IsTrue(0 < buffer.Count); buffer.TryReceive(out var result); @@ -186,7 +183,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors await Task.WhenAny(t, Task.Delay(50)); #region Validations - maxUsersNumberProviderMock.VerifyAll(); twitterUserDalMock.VerifyAll(); Assert.AreEqual(0, buffer.Count); #endregion @@ -223,7 +219,6 @@ namespace BirdsiteLive.Pipeline.Tests.Processors await Task.WhenAny(t, Task.Delay(50)); #region Validations - maxUsersNumberProviderMock.VerifyAll(); twitterUserDalMock.VerifyAll(); Assert.AreEqual(0, buffer.Count); #endregion