Add tootctl domains crawl (#9809)

This commit is contained in:
Eugen Rochko 2019-01-15 09:24:35 +01:00 committed by GitHub
parent 6cfb357940
commit ee5e24807f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 123 additions and 2 deletions

View file

@ -145,3 +145,5 @@ group :production do
gem 'lograge', '~> 0.10'
gem 'redis-rails', '~> 5.0'
end
gem 'concurrent-ruby', require: false

View file

@ -231,7 +231,7 @@ GEM
fuubar (2.3.2)
rspec-core (~> 3.0)
ruby-progressbar (~> 1.4)
get_process_mem (0.2.2)
get_process_mem (0.2.3)
globalid (0.4.1)
activesupport (>= 4.2.0)
goldfinger (2.1.0)
@ -674,6 +674,7 @@ DEPENDENCIES
chewy (~> 5.0)
cld3 (~> 3.2.3)
climate_control (~> 0.2)
concurrent-ruby
derailed_benchmarks
devise (~> 4.5)
devise-two-factor (~> 3.0)
@ -773,4 +774,4 @@ RUBY VERSION
ruby 2.5.3p105
BUNDLED WITH
1.16.6
1.17.3

View file

@ -1,5 +1,6 @@
# frozen_string_literal: true
require 'concurrent'
require_relative '../../config/boot'
require_relative '../../config/environment'
require_relative 'cli_helper'
@ -32,5 +33,122 @@ module Mastodon
say
say("Removed #{removed} accounts#{dry_run}", :green)
end
option :concurrency, type: :numeric, default: 50, aliases: [:c]
option :silent, type: :boolean, default: false, aliases: [:s]
option :format, type: :string, default: 'summary', aliases: [:f]
desc 'crawl [START]', 'Crawl all known peers, optionally beginning at START'
long_desc <<-LONG_DESC
Crawl the fediverse by using the Mastodon REST API endpoints that expose
all known peers, and collect statistics from those peers, as long as those
peers support those API endpoints. When no START is given, the command uses
this server's own database of known peers to seed the crawl.
The --concurrency (-c) option controls the number of threads performing HTTP
requests at the same time. More threads means the crawl may complete faster.
The --silent (-s) option controls progress output.
The --format (-f) option controls how the data is displayed at the end. By
default (`summary`), a summary of the statistics is returned. The other options
are `domains`, which returns a newline-delimited list of all discovered peers,
and `json`, which dumps all the aggregated data raw.
LONG_DESC
def crawl(start = nil)
stats = Concurrent::Hash.new
processed = Concurrent::AtomicFixnum.new(0)
failed = Concurrent::AtomicFixnum.new(0)
start_at = Time.now.to_f
seed = start ? [start] : Account.remote.domains
pool = Concurrent::ThreadPoolExecutor.new(min_threads: 0, max_threads: options[:concurrency], idletime: 10, auto_terminate: true, max_queue: 0)
work_unit = ->(domain) do
next if stats.key?(domain)
stats[domain] = nil
processed.increment
begin
Request.new(:get, "https://#{domain}/api/v1/instance").perform do |res|
next unless res.code == 200
stats[domain] = Oj.load(res.to_s)
end
Request.new(:get, "https://#{domain}/api/v1/instance/peers").perform do |res|
next unless res.code == 200
Oj.load(res.to_s).reject { |peer| stats.key?(peer) }.each do |peer|
pool.post(peer, &work_unit)
end
end
Request.new(:get, "https://#{domain}/api/v1/instance/activity").perform do |res|
next unless res.code == 200
stats[domain]['activity'] = Oj.load(res.to_s)
end
say('.', :green, false) unless options[:silent]
rescue StandardError
failed.increment
say('.', :red, false) unless options[:silent]
end
end
seed.each do |domain|
pool.post(domain, &work_unit)
end
sleep 20
sleep 20 until pool.queue_length.zero?
pool.shutdown
pool.wait_for_termination(20)
ensure
pool.shutdown
say unless options[:silent]
case options[:format]
when 'summary'
stats_to_summary(stats, processed, failed, start_at)
when 'domains'
stats_to_domains(stats)
when 'json'
stats_to_json(stats)
end
end
private
def stats_to_summary(stats, processed, failed, start_at)
stats.compact!
total_domains = stats.size
total_users = stats.reduce(0) { |sum, (_key, val)| val.is_a?(Hash) && val['stats'].is_a?(Hash) ? sum + val['stats']['user_count'].to_i : sum }
total_active = stats.reduce(0) { |sum, (_key, val)| val.is_a?(Hash) && val['activity'].is_a?(Array) && val['activity'].size > 2 && val['activity'][1].is_a?(Hash) ? sum + val['activity'][1]['logins'].to_i : sum }
total_joined = stats.reduce(0) { |sum, (_key, val)| val.is_a?(Hash) && val['activity'].is_a?(Array) && val['activity'].size > 2 && val['activity'][1].is_a?(Hash) ? sum + val['activity'][1]['registrations'].to_i : sum }
say("Visited #{processed.value} domains, #{failed.value} failed (#{(Time.now.to_f - start_at).round}s elapsed)", :green)
say("Total servers: #{total_domains}", :green)
say("Total registered: #{total_users}", :green)
say("Total active last week: #{total_active}", :green)
say("Total joined last week: #{total_joined}", :green)
end
def stats_to_domains(stats)
say(stats.keys.join("\n"))
end
def stats_to_json(stats)
totals.each_key do |domain|
if totals[domain].is_a?(Hash)
totals[domain]['activity'] = stats[domain]
else
totals.delete(domain)
end
end
say(Oj.dump(totals))
end
end
end