showing inbound activities in frontend via redis pub/sub

This commit is contained in:
Sebastian Jambor 2022-12-06 19:37:33 +01:00
parent e17b6aa162
commit ddcfbc4745
8 changed files with 79 additions and 21 deletions

View file

@ -1,4 +1,5 @@
# frozen_string_literal: true
require 'redis'
class ActivityPub::InboxesController < ActivityPub::BaseController
include SignatureVerification
@ -71,6 +72,13 @@ class ActivityPub::InboxesController < ActivityPub::BaseController
end
def process_payload
event = ActivityLogEvent.new
event.type = 'inbound'
event.path = request.path
event.data = Oj.load(body, mode: :strict)
Redis.new.publish('activity_log', Oj.dump(event))
ActivityPub::ProcessingWorker.perform_async(signed_request_actor.id, body, @account&.id, signed_request_actor.class.name)
end
end

View file

@ -3,17 +3,8 @@
class Api::V1::ActivityLogController < Api::BaseController
include ActionController::Live
# before_action -> { doorkeeper_authorize! :read, :'read:lists' }, only: [:index, :show]
# before_action -> { doorkeeper_authorize! :write, :'write:lists' }, except: [:index, :show]
# before_action :require_user!
# before_action :set_list, except: [:index, :create]
#
#
#
class Foo
attr_accessor :test
end
# before_action -> { doorkeeper_authorize! :read }, only: [:show]
before_action :require_user!
rescue_from ArgumentError do |e|
render json: { error: e.to_s }, status: 422
@ -30,14 +21,21 @@ class Api::V1::ActivityLogController < Api::BaseController
sse = SSE.new(response.stream)
# id = current_account.local_username_and_domain
id = current_account.username
begin
100.times {
f = Foo.new
f.test = "xxx"
sse.write f.to_json
sleep 1
}
ActivityLogger.register(id, sse)
while true
event = ActivityLogEvent.new
event.type = "keep-alive"
ActivityLogger.log(id, event)
sleep 10
end
ensure
ActivityLogger.unregister(id)
Rails.logger.error "closed log"
sse.close
end
end

View file

@ -48,7 +48,7 @@ class Lists extends ImmutablePureComponent {
}
render () {
return <div>{this.state.logs.map(x => (<div>{JSON.stringify(x)}</div>))}</div>;
return <div>{this.state.logs.filter(x => x.type !== 'keep-alive').map(x => (<pre>{JSON.stringify(x, null, 2)}</pre>))}</div>;
}
}

View file

@ -0,0 +1,5 @@
# frozen_string_literal: true
class ActivityLogEvent
attr_accessor :type, :path, :data
end

View file

@ -0,0 +1,24 @@
# frozen_string_literal: true
class ActivityLogger
@@loggers = Hash.new
def self.register(id, sse)
Rails.logger.error "registering id #{id}"
@@loggers[id] = sse
end
def self.unregister(id)
Rails.logger.error "unregistering id #{id}"
@@loggers.delete(id)
end
def self.log(id, event)
Rails.logger.error "logging id #{id}, event #{event.type} #{event.data}, loggers #{@@loggers}"
if @@loggers[id]
Rails.logger.error "logged"
@@loggers[id].write event
end
end
end

View file

@ -6,9 +6,6 @@ class ActivityPub::ProcessingWorker
sidekiq_options queue: 'ingress', backtrace: true, retry: 8
def perform(actor_id, body, delivered_to_account_id = nil, actor_type = 'Account')
Rails.logger.error "=====INGRESS===== #{body}"
case actor_type
when 'Account'
actor = Account.find_by(id: actor_id)

View file

@ -1,3 +1,5 @@
require_relative '../lib/activity_subscriber'
persistent_timeout ENV.fetch('PERSISTENT_TIMEOUT') { 20 }.to_i
max_threads_count = ENV.fetch('MAX_THREADS') { 5 }.to_i
@ -19,6 +21,10 @@ on_worker_boot do
ActiveSupport.on_load(:active_record) do
ActiveRecord::Base.establish_connection
end
Thread.new {
ActivitySubscriber.new.start
}
end
plugin :tmp_restart

View file

@ -0,0 +1,20 @@
# frozen_string_literal: true
require 'redis'
class ActivitySubscriber
def start
redis = Redis.new
redis.subscribe('activity_log') do |on|
on.message do |channel, message|
json = Oj.load(message, mode: :strict)
event = ActivityLogEvent.new
event.type = json['type']
event.path = json['path']
event.data = json['data']
ActivityLogger.log('admin', event)
end
end
end
end