fix crash
This commit is contained in:
parent
c702357cc1
commit
bd46afa350
2 changed files with 26 additions and 17 deletions
|
@ -52,24 +52,33 @@ namespace BirdsiteLive.Pipeline.Processors
|
||||||
index++;
|
index++;
|
||||||
|
|
||||||
var t = Task.Run(async () => {
|
var t = Task.Run(async () => {
|
||||||
var user = userWtData.User;
|
try
|
||||||
var tweets = await RetrieveNewTweets(user);
|
|
||||||
_logger.LogInformation(index + "/" + syncTwitterUsers.Count() + " Got " + tweets.Length + " tweets from user " + user.Acct + " " );
|
|
||||||
if (tweets.Length > 0 && user.LastTweetPostedId != -1)
|
|
||||||
{
|
{
|
||||||
userWtData.Tweets = tweets;
|
var user = userWtData.User;
|
||||||
usersWtTweets.Add(userWtData);
|
var tweets = await RetrieveNewTweets(user);
|
||||||
}
|
_logger.LogInformation(index + "/" + syncTwitterUsers.Count() + " Got " + tweets.Length + " tweets from user " + user.Acct + " " );
|
||||||
else if (tweets.Length > 0 && user.LastTweetPostedId == -1)
|
if (tweets.Length > 0 && user.LastTweetPostedId != -1)
|
||||||
|
{
|
||||||
|
userWtData.Tweets = tweets;
|
||||||
|
usersWtTweets.Add(userWtData);
|
||||||
|
}
|
||||||
|
else if (tweets.Length > 0 && user.LastTweetPostedId == -1)
|
||||||
|
{
|
||||||
|
var tweetId = tweets.Last().Id;
|
||||||
|
var now = DateTime.UtcNow;
|
||||||
|
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, user.FetchingErrorCount, now);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
var now = DateTime.UtcNow;
|
||||||
|
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, user.FetchingErrorCount, now);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
catch(Exception e)
|
||||||
{
|
{
|
||||||
var tweetId = tweets.Last().Id;
|
_logger.LogError(e.Message);
|
||||||
var now = DateTime.UtcNow;
|
|
||||||
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, user.FetchingErrorCount, now);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
var now = DateTime.UtcNow;
|
|
||||||
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, user.FetchingErrorCount, now);
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
todo.Add(t);
|
todo.Add(t);
|
||||||
|
|
|
@ -46,7 +46,7 @@ namespace BirdsiteLive.Pipeline
|
||||||
var retrieveTweetsBlock = new TransformBlock<UserWithDataToSync[], UserWithDataToSync[]>(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 } );
|
var retrieveTweetsBlock = new TransformBlock<UserWithDataToSync[], UserWithDataToSync[]>(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 } );
|
||||||
var retrieveTweetsBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
|
var retrieveTweetsBufferBlock = new BufferBlock<UserWithDataToSync[]>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
|
||||||
var retrieveFollowersBlock = new TransformManyBlock<UserWithDataToSync[], UserWithDataToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions);
|
var retrieveFollowersBlock = new TransformManyBlock<UserWithDataToSync[], UserWithDataToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct), standardBlockOptions);
|
||||||
var retrieveFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 1000, CancellationToken = ct });
|
var retrieveFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 500, CancellationToken = ct });
|
||||||
var sendTweetsToFollowersBlock = new TransformBlock<UserWithDataToSync, UserWithDataToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10, CancellationToken = ct, BoundedCapacity = 1 });
|
var sendTweetsToFollowersBlock = new TransformBlock<UserWithDataToSync, UserWithDataToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10, CancellationToken = ct, BoundedCapacity = 1 });
|
||||||
var sendTweetsToFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
|
var sendTweetsToFollowersBufferBlock = new BufferBlock<UserWithDataToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
|
||||||
var saveProgressionBlock = new ActionBlock<UserWithDataToSync>(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10, CancellationToken = ct, BoundedCapacity = 1 });
|
var saveProgressionBlock = new ActionBlock<UserWithDataToSync>(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10, CancellationToken = ct, BoundedCapacity = 1 });
|
||||||
|
|
Loading…
Add table
Reference in a new issue