added better logging

This commit is contained in:
Nicolas Constant 2021-01-16 00:34:09 -05:00
parent 8f593e5591
commit 3af2ef05d9
No known key found for this signature in database
GPG key ID: 1E9F677FB01A5688
8 changed files with 36 additions and 40 deletions

View file

@ -18,7 +18,7 @@ namespace BirdsiteLive.Domain
{ {
Task<Actor> GetUser(string objectId); Task<Actor> GetUser(string objectId);
Task<HttpStatusCode> PostDataAsync<T>(T data, string targetHost, string actorUrl, string inbox = null); Task<HttpStatusCode> PostDataAsync<T>(T data, string targetHost, string actorUrl, string inbox = null);
Task<HttpStatusCode> PostNewNoteActivity(Note note, string username, string noteId, string targetHost, Task PostNewNoteActivity(Note note, string username, string noteId, string targetHost,
string targetInbox); string targetInbox);
} }
@ -46,7 +46,7 @@ namespace BirdsiteLive.Domain
} }
} }
public async Task<HttpStatusCode> PostNewNoteActivity(Note note, string username, string noteId, string targetHost, string targetInbox) public async Task PostNewNoteActivity(Note note, string username, string noteId, string targetHost, string targetInbox)
{ {
var actor = UrlFactory.GetActorUrl(_instanceSettings.Domain, username); var actor = UrlFactory.GetActorUrl(_instanceSettings.Domain, username);
var noteUri = UrlFactory.GetNoteUrl(_instanceSettings.Domain, username, noteId); var noteUri = UrlFactory.GetNoteUrl(_instanceSettings.Domain, username, noteId);
@ -67,7 +67,7 @@ namespace BirdsiteLive.Domain
apObject = note apObject = note
}; };
return await PostDataAsync(noteActivity, targetHost, actor, targetInbox); await PostDataAsync(noteActivity, targetHost, actor, targetInbox);
} }
public async Task<HttpStatusCode> PostDataAsync<T>(T data, string targetHost, string actorUrl, string inbox = null) public async Task<HttpStatusCode> PostDataAsync<T>(T data, string targetHost, string actorUrl, string inbox = null)
@ -85,7 +85,7 @@ namespace BirdsiteLive.Domain
var signature = _cryptoService.SignAndGetSignatureHeader(date, actorUrl, targetHost, digest, usedInbox); var signature = _cryptoService.SignAndGetSignatureHeader(date, actorUrl, targetHost, digest, usedInbox);
var client = new HttpClient(); var client = new HttpClient(); //TODO: remove this from here
var httpRequestMessage = new HttpRequestMessage var httpRequestMessage = new HttpRequestMessage
{ {
Method = HttpMethod.Post, Method = HttpMethod.Post,
@ -101,9 +101,8 @@ namespace BirdsiteLive.Domain
}; };
var response = await client.SendAsync(httpRequestMessage); var response = await client.SendAsync(httpRequestMessage);
response.EnsureSuccessStatusCode();
return response.StatusCode; return response.StatusCode;
} }
} }
} }

View file

@ -144,7 +144,7 @@ namespace BirdsiteLive.Domain
} }
}; };
var result = await _activityPubService.PostDataAsync(acceptFollow, followerHost, activity.apObject); var result = await _activityPubService.PostDataAsync(acceptFollow, followerHost, activity.apObject);
return result == HttpStatusCode.Accepted || result == HttpStatusCode.OK; return result == HttpStatusCode.Accepted || result == HttpStatusCode.OK; //TODO: revamp this for better error handling
} }
private string OnlyKeepRoute(string inbox, string host) private string OnlyKeepRoute(string inbox, string host)
@ -188,7 +188,7 @@ namespace BirdsiteLive.Domain
} }
}; };
var result = await _activityPubService.PostDataAsync(acceptFollow, followerHost, activity.apObject.apObject); var result = await _activityPubService.PostDataAsync(acceptFollow, followerHost, activity.apObject.apObject);
return result == HttpStatusCode.Accepted || result == HttpStatusCode.OK; return result == HttpStatusCode.Accepted || result == HttpStatusCode.OK; //TODO: revamp this for better error handling
} }
private async Task<SignatureValidationResult> ValidateSignature(string actor, string rawSig, string method, string path, string queryString, Dictionary<string, string> requestHeaders, string body) private async Task<SignatureValidationResult> ValidateSignature(string actor, string rawSig, string method, string path, string queryString, Dictionary<string, string> requestHeaders, string body)

View file

@ -7,6 +7,7 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="1.1.1" /> <PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="1.1.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="5.0.0" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.11.1" /> <PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.11.1" />
</ItemGroup> </ItemGroup>

View file

@ -5,18 +5,21 @@ using System.Threading.Tasks.Dataflow;
using BirdsiteLive.DAL.Contracts; using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.DAL.Models; using BirdsiteLive.DAL.Models;
using BirdsiteLive.Pipeline.Contracts; using BirdsiteLive.Pipeline.Contracts;
using Microsoft.Extensions.Logging;
namespace BirdsiteLive.Pipeline.Processors namespace BirdsiteLive.Pipeline.Processors
{ {
public class RetrieveTwitterUsersProcessor : IRetrieveTwitterUsersProcessor public class RetrieveTwitterUsersProcessor : IRetrieveTwitterUsersProcessor
{ {
private readonly ITwitterUserDal _twitterUserDal; private readonly ITwitterUserDal _twitterUserDal;
private readonly ILogger<RetrieveTwitterUsersProcessor> _logger;
private const int SyncPeriod = 15; //in minutes private const int SyncPeriod = 15; //in minutes
#region Ctor #region Ctor
public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal) public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, ILogger<RetrieveTwitterUsersProcessor> logger)
{ {
_twitterUserDal = twitterUserDal; _twitterUserDal = twitterUserDal;
_logger = logger;
} }
#endregion #endregion
@ -35,8 +38,7 @@ namespace BirdsiteLive.Pipeline.Processors
} }
catch (Exception e) catch (Exception e)
{ {
Console.WriteLine(e); _logger.LogError(e, "Failing retrieving Twitter Users.");
//TODO handle error
} }
await Task.Delay(SyncPeriod * 1000 * 60, ct); await Task.Delay(SyncPeriod * 1000 * 60, ct);

View file

@ -13,6 +13,7 @@ using BirdsiteLive.Pipeline.Models;
using BirdsiteLive.Pipeline.Processors.SubTasks; using BirdsiteLive.Pipeline.Processors.SubTasks;
using BirdsiteLive.Twitter; using BirdsiteLive.Twitter;
using BirdsiteLive.Twitter.Models; using BirdsiteLive.Twitter.Models;
using Microsoft.Extensions.Logging;
using Tweetinvi.Models; using Tweetinvi.Models;
namespace BirdsiteLive.Pipeline.Processors namespace BirdsiteLive.Pipeline.Processors
@ -21,12 +22,14 @@ namespace BirdsiteLive.Pipeline.Processors
{ {
private readonly ISendTweetsToInboxTask _sendTweetsToInboxTask; private readonly ISendTweetsToInboxTask _sendTweetsToInboxTask;
private readonly ISendTweetsToSharedInboxTask _sendTweetsToSharedInbox; private readonly ISendTweetsToSharedInboxTask _sendTweetsToSharedInbox;
private readonly ILogger<SendTweetsToFollowersProcessor> _logger;
#region Ctor #region Ctor
public SendTweetsToFollowersProcessor(ISendTweetsToInboxTask sendTweetsToInboxTask, ISendTweetsToSharedInboxTask sendTweetsToSharedInbox) public SendTweetsToFollowersProcessor(ISendTweetsToInboxTask sendTweetsToInboxTask, ISendTweetsToSharedInboxTask sendTweetsToSharedInbox, ILogger<SendTweetsToFollowersProcessor> logger)
{ {
_sendTweetsToInboxTask = sendTweetsToInboxTask; _sendTweetsToInboxTask = sendTweetsToInboxTask;
_sendTweetsToSharedInbox = sendTweetsToSharedInbox; _sendTweetsToSharedInbox = sendTweetsToSharedInbox;
_logger = logger;
} }
#endregion #endregion
@ -61,8 +64,8 @@ namespace BirdsiteLive.Pipeline.Processors
} }
catch (Exception e) catch (Exception e)
{ {
Console.WriteLine(e); var follower = followersPerInstance.First();
//TODO handle error _logger.LogError(e, "Posting to {Host}{Route} failed", follower.Host, follower.SharedInboxRoute);
} }
} }
} }
@ -77,8 +80,7 @@ namespace BirdsiteLive.Pipeline.Processors
} }
catch (Exception e) catch (Exception e)
{ {
Console.WriteLine(e); _logger.LogError(e, "Posting to {Host}{Route} failed", follower.Host, follower.InboxRoute);
//TODO handle error
} }
} }
} }

View file

@ -47,12 +47,8 @@ namespace BirdsiteLive.Pipeline.Processors.SubTasks
foreach (var tweet in tweetsToSend) foreach (var tweet in tweetsToSend)
{ {
var note = _statusService.GetStatus(user.Acct, tweet); var note = _statusService.GetStatus(user.Acct, tweet);
var result = await _activityPubService.PostNewNoteActivity(note, user.Acct, tweet.Id.ToString(), follower.Host, inbox); await _activityPubService.PostNewNoteActivity(note, user.Acct, tweet.Id.ToString(), follower.Host, inbox);
syncStatus = tweet.Id;
if (result == HttpStatusCode.Accepted || result == HttpStatusCode.OK)
syncStatus = tweet.Id;
else
throw new Exception("Posting new note activity failed");
} }
} }
finally finally

View file

@ -19,7 +19,7 @@ namespace BirdsiteLive.Pipeline.Processors.SubTasks
private readonly IStatusService _statusService; private readonly IStatusService _statusService;
private readonly IActivityPubService _activityPubService; private readonly IActivityPubService _activityPubService;
private readonly IFollowersDal _followersDal; private readonly IFollowersDal _followersDal;
#region Ctor #region Ctor
public SendTweetsToSharedInboxTask(IActivityPubService activityPubService, IStatusService statusService, IFollowersDal followersDal) public SendTweetsToSharedInboxTask(IActivityPubService activityPubService, IStatusService statusService, IFollowersDal followersDal)
{ {
@ -48,13 +48,8 @@ namespace BirdsiteLive.Pipeline.Processors.SubTasks
foreach (var tweet in tweetsToSend) foreach (var tweet in tweetsToSend)
{ {
var note = _statusService.GetStatus(user.Acct, tweet); var note = _statusService.GetStatus(user.Acct, tweet);
var result = await _activityPubService.PostNewNoteActivity(note, user.Acct, tweet.Id.ToString(), host, inbox);
await _activityPubService.PostNewNoteActivity(note, user.Acct, tweet.Id.ToString(), host, inbox); syncStatus = tweet.Id;
if (result == HttpStatusCode.Accepted || result == HttpStatusCode.OK)
syncStatus = tweet.Id;
else
throw new Exception("Posting new note activity failed");
} }
} }
finally finally

View file

@ -5,6 +5,7 @@ using System.Threading.Tasks.Dataflow;
using BirdsiteLive.DAL.Models; using BirdsiteLive.DAL.Models;
using BirdsiteLive.Pipeline.Contracts; using BirdsiteLive.Pipeline.Contracts;
using BirdsiteLive.Pipeline.Models; using BirdsiteLive.Pipeline.Models;
using Microsoft.Extensions.Logging;
namespace BirdsiteLive.Pipeline namespace BirdsiteLive.Pipeline
{ {
@ -19,29 +20,31 @@ namespace BirdsiteLive.Pipeline
private readonly IRetrieveTweetsProcessor _retrieveTweetsProcessor; private readonly IRetrieveTweetsProcessor _retrieveTweetsProcessor;
private readonly IRetrieveFollowersProcessor _retrieveFollowersProcessor; private readonly IRetrieveFollowersProcessor _retrieveFollowersProcessor;
private readonly ISendTweetsToFollowersProcessor _sendTweetsToFollowersProcessor; private readonly ISendTweetsToFollowersProcessor _sendTweetsToFollowersProcessor;
private readonly ILogger<StatusPublicationPipeline> _logger;
#region Ctor #region Ctor
public StatusPublicationPipeline(IRetrieveTweetsProcessor retrieveTweetsProcessor, IRetrieveTwitterUsersProcessor retrieveTwitterAccountsProcessor, IRetrieveFollowersProcessor retrieveFollowersProcessor, ISendTweetsToFollowersProcessor sendTweetsToFollowersProcessor) public StatusPublicationPipeline(IRetrieveTweetsProcessor retrieveTweetsProcessor, IRetrieveTwitterUsersProcessor retrieveTwitterAccountsProcessor, IRetrieveFollowersProcessor retrieveFollowersProcessor, ISendTweetsToFollowersProcessor sendTweetsToFollowersProcessor, ILogger<StatusPublicationPipeline> logger)
{ {
_retrieveTweetsProcessor = retrieveTweetsProcessor; _retrieveTweetsProcessor = retrieveTweetsProcessor;
_retrieveTwitterAccountsProcessor = retrieveTwitterAccountsProcessor; _retrieveTwitterAccountsProcessor = retrieveTwitterAccountsProcessor;
_retrieveFollowersProcessor = retrieveFollowersProcessor; _retrieveFollowersProcessor = retrieveFollowersProcessor;
_sendTweetsToFollowersProcessor = sendTweetsToFollowersProcessor; _sendTweetsToFollowersProcessor = sendTweetsToFollowersProcessor;
_logger = logger;
} }
#endregion #endregion
public async Task ExecuteAsync(CancellationToken ct) public async Task ExecuteAsync(CancellationToken ct)
{ {
// Create blocks // Create blocks
var twitterUsersBufferBlock = new BufferBlock<SyncTwitterUser[]>(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct}); var twitterUsersBufferBlock = new BufferBlock<SyncTwitterUser[]>(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct });
var retrieveTweetsBlock = new TransformBlock<SyncTwitterUser[], UserWithTweetsToSync[]>(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct)); var retrieveTweetsBlock = new TransformBlock<SyncTwitterUser[], UserWithTweetsToSync[]>(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct));
var retrieveTweetsBufferBlock = new BufferBlock<UserWithTweetsToSync[]>(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct }); var retrieveTweetsBufferBlock = new BufferBlock<UserWithTweetsToSync[]>(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct });
var retrieveFollowersBlock = new TransformManyBlock<UserWithTweetsToSync[], UserWithTweetsToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct)); var retrieveFollowersBlock = new TransformManyBlock<UserWithTweetsToSync[], UserWithTweetsToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct));
var retrieveFollowersBufferBlock = new BufferBlock<UserWithTweetsToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct }); var retrieveFollowersBufferBlock = new BufferBlock<UserWithTweetsToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
var sendTweetsToFollowersBlock = new ActionBlock<UserWithTweetsToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct}); var sendTweetsToFollowersBlock = new ActionBlock<UserWithTweetsToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct });
// Link pipeline // Link pipeline
twitterUsersBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions {PropagateCompletion = true}); twitterUsersBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveTweetsBlock.LinkTo(retrieveTweetsBufferBlock, new DataflowLinkOptions { PropagateCompletion = true }); retrieveTweetsBlock.LinkTo(retrieveTweetsBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveTweetsBufferBlock.LinkTo(retrieveFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true }); retrieveTweetsBufferBlock.LinkTo(retrieveFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveFollowersBlock.LinkTo(retrieveFollowersBufferBlock, new DataflowLinkOptions { PropagateCompletion = true }); retrieveFollowersBlock.LinkTo(retrieveFollowersBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
@ -51,12 +54,10 @@ namespace BirdsiteLive.Pipeline
var retrieveTwitterAccountsTask = _retrieveTwitterAccountsProcessor.GetTwitterUsersAsync(twitterUsersBufferBlock, ct); var retrieveTwitterAccountsTask = _retrieveTwitterAccountsProcessor.GetTwitterUsersAsync(twitterUsersBufferBlock, ct);
// Wait // Wait
await Task.WhenAny(new []{ retrieveTwitterAccountsTask , sendTweetsToFollowersBlock.Completion}); await Task.WhenAny(new[] { retrieveTwitterAccountsTask, sendTweetsToFollowersBlock.Completion });
var foreground = Console.ForegroundColor; var ex = retrieveTwitterAccountsTask.IsFaulted ? retrieveTwitterAccountsTask.Exception : sendTweetsToFollowersBlock.Completion.Exception;
Console.ForegroundColor = ConsoleColor.Red; _logger.LogCritical(ex, "An error occurred, pipeline stopped");
Console.WriteLine("An error occured, pipeline stopped");
Console.ForegroundColor = foreground;
} }
} }
} }