Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

source "https://rubygems.org"
source 'https://rubygems.org'

git_source(:github) { |repo_name| "https://github.com/#{repo_name}" }
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ PLATFORMS
DEPENDENCIES

BUNDLED WITH
2.1.4
2.3.15
182 changes: 99 additions & 83 deletions src/main.rb
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

require "fileutils"
require "json"
require "logger"
require "open3"
require "optparse"
require "ostruct"
require "securerandom"
require "set"
require 'fileutils'
require 'json'
require 'logger'
require 'open3'
require 'ostruct'
require 'securerandom'
require 'set'

class Repository
MIN_GC_BLOB_AGE = ENV.fetch("MIN_GC_BLOB_AGE", "86400").to_i.clamp(0..)
MIN_GC_BLOB_AGE = ENV.fetch('MIN_GC_BLOB_AGE', '86400').to_i.clamp(0..)

attr_reader :manifests

Expand All @@ -36,7 +35,7 @@ def set_manifest_blob(name:, blob:)

def add_mention(blob:, from:)
# Clean the hash
blob = blob[7...] if blob.start_with?("sha256:")
blob = blob[7...] if blob.start_with?('sha256:')
raise "Bad blob #{blob}" unless /\A[0-9a-f]{64}\z/.match?(blob)

# Mark it
Expand All @@ -49,75 +48,92 @@ def add_mention(blob:, from:)
blobdata.mentioned_by << from
end

def orphaned_blobs(min_age: MIN_GC_BLOB_AGE)
time_cutoff = Time.now - min_age
@blobs.filter { |k, v| v.mentioned_by.empty? && v.mtime < time_cutoff }
def filenames_to_delete(dry_run:, min_age: MIN_GC_BLOB_AGE)
orphaned_blobs(min_age: min_age).tap do |filtered_blobs|
yield filtered_blobs.values.map(&:filename)
@blobs.delete_if { |k, _v| filtered_blobs.key?(k) } unless dry_run
end
end

def puts_summary
puts "Blobs: #{@blobs.count}"
puts "Total size: #{@blobs.values.map(&:size).sum} bytes"
puts "Blobs: #{humanize(@blobs.count)}"
puts "Total size: #{humanize(@blobs.values.map(&:size).sum)} bytes"
puts "Manifests: #{@manifests.count}"
puts "Orphaned blobs: #{orphaned_blobs.count}"
puts "Orphaned blob total size: #{orphaned_blobs.values.map(&:size).sum} bytes"
orphaned_blobs.tap do |filtered_blobs|
puts "Orphaned blobs: #{filtered_blobs.count}"
puts "Orphaned blob total size: #{filtered_blobs.values.map(&:size).sum} bytes"
end
end

private

def orphaned_blobs(min_age: MIN_GC_BLOB_AGE)
time_cutoff = Time.now - min_age
@blobs.filter { |_k, v| v.mentioned_by.empty? && v.mtime < time_cutoff }
end

def humanize(number)
number.to_s.reverse.gsub(/...(?=.)/, '\&,').reverse
end
end

class TrowGarbageCollector
POLL_INTERVAL = ENV.fetch("POLL_INTERVAL", "3600").to_i.clamp(1..)
TROW_NAMESPACE = ENV.fetch("TROW_NAMESPACE", "trow").freeze
TROW_POD = ENV.fetch("TROW_POD", "trow-0").freeze
POLL_INTERVAL = ENV.fetch('POLL_INTERVAL', '3600').to_i.clamp(1..)
TROW_NAMESPACE = ENV.fetch('TROW_NAMESPACE', 'trow').freeze
TROW_POD = ENV.fetch('TROW_POD', 'trow-0').freeze

def initialize
STDOUT.sync = true
@log = Logger.new(STDOUT)
$stdout.sync = true
@log = Logger.new($stdout)
@repo = Repository.new(log: @log)
@dry_run = (ENV.fetch('DRY_RUN', 'false') =~ /^(true|y|yes|1)$/i)
end

def trow_exec(*cmd_array, stdin_data: nil)
# Runs a command within the TROW_POD pod and captures standard output.
cmds = ["kubectl", "exec", "-n", TROW_NAMESPACE, TROW_POD, (["", nil].include?(stdin_data) ? nil : "-i"), "--"].compact + cmd_array
cmds = ['kubectl', 'exec', '-n', TROW_NAMESPACE, TROW_POD, (['', nil].include?(stdin_data) ? nil : '-i'), '--'].compact + cmd_array

stdout, stderr, status = Open3.capture3(*cmds, stdin_data: stdin_data)
OpenStruct.new(stdout: stdout, stderr: stderr, status: status)
end

def puts_df_data
result = trow_exec("df", "-h", "/data")
result = trow_exec('df', '-h', '/data')
raise "Error: #{result.status}" unless result.status.success?

puts ""
puts ''
puts result.stdout
puts ""
puts ''
end

def fetch_file_sizes
result = trow_exec("/bin/bash", "-c", "find /data/ -type f -print0 | xargs -0 -n 1 stat -c '%s %Y %n'")
result = trow_exec('/bin/bash', '-c', "find /data/ -type f -print0 | xargs --no-run-if-empty -0 -n 1 stat -c '%s %Y %n'")
raise "Error: #{result.status}" unless result.status.success?

result.stdout.strip.split("\n").each do |line|
line_sp = line.gsub(/\s+/, " ").split(" ")
line_sp = line.gsub(/\s+/, ' ').split
raise "Bad line '#{line_sp}'" unless line_sp.count == 3

fsize, fmtime, fname = line_sp
fsize = Integer(fsize)
fmtime = Time.at(Integer(fmtime))

if fname.start_with?("/data/blobs/sha256/")
blobname = fname[("/data/blobs/sha256/".length)...]
if fname.start_with?('/data/blobs/sha256/')
blobname = fname[('/data/blobs/sha256/'.length)...]
@repo.add_blob(name: blobname, filename: fname, size: fsize, mtime: fmtime)
elsif fname.start_with?("/data/manifests/")
elsif fname.start_with?('/data/manifests/')
@repo.add_manifest(name: fname)
end
end
end

def fetch_manifests
@_manifest_tmpdir = File.join("/dev/shm", SecureRandom.hex)
@_manifest_tmpdir = File.join('/dev/shm', SecureRandom.hex)
FileUtils.mkdir(@_manifest_tmpdir)

Open3.pipeline(["kubectl", "exec", "-n", TROW_NAMESPACE, TROW_POD, "--", "/bin/bash", "-c", "find /data/manifests/ -type f -print0 | xargs -0 tar -cf -"],
["tar", "-C", @_manifest_tmpdir, "-xf", "-"])
statuses = Open3.pipeline(['kubectl', 'exec', '-n', TROW_NAMESPACE, TROW_POD, '--', '/bin/bash', '-c', 'find /data/manifests/ -type f -print0 | xargs --no-run-if-empty -0 tar -cf -'],
['tar', '-C', @_manifest_tmpdir, '-xf', '-'])
raise 'Failed to fetch manifests' if statuses.reject(&:success?).any?
end

def parse_manifest(manifest_name:, contents:)
Expand All @@ -126,9 +142,9 @@ def parse_manifest(manifest_name:, contents:)
@log.info "Parsing manifest #{manifest_name}"

contents.each do |line|
next unless line.start_with?("sha256:")
next unless line.start_with?('sha256:')

hash = line[7...].split(" ")[0]
hash = line[7...].split[0]
raise "Bad hash #{hash}" unless /\A[0-9a-f]{64}\z/.match?(hash)

@repo.set_manifest_blob(name: manifest_name, blob: hash)
Expand All @@ -137,14 +153,11 @@ def parse_manifest(manifest_name:, contents:)
end

def parse_manifests
Dir.glob(File.join(@_manifest_tmpdir, "**/*")).filter { |fn| File.file?(fn) }.each do |filename|
if filename.start_with?(@_manifest_tmpdir)
manifest_name = filename[@_manifest_tmpdir.length...]
else
raise "Weird filename #{filename}"
end
Dir.glob(File.join(@_manifest_tmpdir, '**/*')).filter { |fn| File.file?(fn) }.each do |filename|
raise "Weird filename #{filename}" unless filename.start_with?(@_manifest_tmpdir)

raise "Weird manifest name #{manifest_name}" unless manifest_name.start_with?("/data/manifests/")
manifest_name = filename[@_manifest_tmpdir.length...]
raise "Weird manifest name #{manifest_name}" unless manifest_name.start_with?('/data/manifests/')

parse_manifest(manifest_name: manifest_name, contents: File.read(filename))
end
Expand All @@ -155,47 +168,49 @@ def delete_manifest_tmpdir
end

def fetch_jsons
@_jsons_tmpdir = File.join("/dev/shm", SecureRandom.hex)
@_jsons_tmpdir = File.join('/dev/shm', SecureRandom.hex)
FileUtils.mkdir(@_jsons_tmpdir)

jsons_filenames = @repo.manifests.map { |manifest_name, manifest_data|
jsons_filenames = @repo.manifests.map do |manifest_name, manifest_data|
blob = manifest_data.blob
raise "No blob found for manifest #{manifest_name}. Terminating!" if blob.nil?

blob
}.uniq
end.uniq
if jsons_filenames.empty?
@log.info "No JSONs to retrieve. Skipping"
@log.info 'No JSONs to retrieve. Skipping'
return
end

@log.info "Trying to fetch: #{jsons_filenames}"

stdin_data = jsons_filenames.join("\0")

cmd1 = ["kubectl", "exec", "-n", TROW_NAMESPACE, TROW_POD, "-i", "--", "/bin/bash", "-c", "xargs -0 tar -C /data/blobs/sha256/ -cf -"]
cmd2 = ["tar", "-C", @_jsons_tmpdir, "-xf", "-"]
cmd1 = ['kubectl', 'exec', '-n', TROW_NAMESPACE, TROW_POD, '-i', '--', '/bin/bash', '-c', 'xargs -0 tar -C /data/blobs/sha256/ -cf -']
cmd2 = ['tar', '-C', @_jsons_tmpdir, '-xf', '-']
Open3.pipeline_w(cmd1, cmd2) do |first_stdin, wait_threads|
first_stdin.write(stdin_data)
first_stdin.close
wait_threads.each(&:join)
raise 'Failed to fetch JSONs' if wait_threads.map(&:value).reject(&:success?).any?
end
end

def parse_json(blob:, contents:)
d = JSON.parse(contents)
raise "Bad JSON" unless d.is_a?(Hash)
raise "Bad mediaType" unless d.dig("mediaType") == "application/vnd.docker.distribution.manifest.v2+json"
raise "Can't find config" unless d["config"].is_a?(Hash)
raise "Can't find layers" unless d["layers"].is_a?(Array)

@repo.add_mention(blob: d.dig("config", "digest"), from: blob)
d["layers"].each do |layer|
@repo.add_mention(blob: layer["digest"], from: blob)
raise 'Bad JSON' unless d.is_a?(Hash)
raise 'Bad mediaType' unless d['mediaType'] == 'application/vnd.docker.distribution.manifest.v2+json'
raise "Can't find config" unless d['config'].is_a?(Hash)
raise "Can't find layers" unless d['layers'].is_a?(Array)

@repo.add_mention(blob: d.dig('config', 'digest'), from: blob)
d['layers'].each do |layer|
@repo.add_mention(blob: layer['digest'], from: blob)
end
end

def parse_jsons
Dir.glob(File.join(@_jsons_tmpdir, "**/*")).filter { |fn| File.file?(fn) }.each do |filename|
Dir.glob(File.join(@_jsons_tmpdir, '**/*')).filter { |fn| File.file?(fn) }.each do |filename|
parse_json(blob: File.basename(filename), contents: File.read(filename))
end
end
Expand All @@ -204,30 +219,21 @@ def delete_jsons_tmpdir
FileUtils.rm_r(@_jsons_tmpdir)
end

def delete_orphaned_blobs(dry_run: false)
@log.info "Deleting orphaned blobs..."
filenames_to_delete = @repo.orphaned_blobs.values.map(&:filename)
if filenames_to_delete.empty?
@log.info "No files to delete. Skipping."
return
end

stdin_data = filenames_to_delete.join("\0")
def delete_orphaned_blobs
@log.info 'Deleting orphaned blobs...'
@repo.filenames_to_delete(dry_run: dry_run) do |filenames|
if filenames.empty?
@log.info 'No files to delete. Skipping.'
break
end
result = trow_exec('/bin/bash', '-c', delete_cmd(dry_run: dry_run), stdin_data: filenames.join("\0"))
raise "Error: #{result.status}" unless result.status.success?

if dry_run
@log.info("DRY RUN -- not actually deleting")
cmd = "xargs -0 -n 1 stat -c '%s %Y %n'"
else
cmd = "xargs -0 -n 1 rm"
puts result.stdout
end

result = trow_exec("/bin/bash", "-c", cmd, stdin_data: stdin_data)
raise "Error: #{result.status}" unless result.status.success?

puts result.stdout
end

def garbage_collect(dry_run: false)
def garbage_collect
puts_df_data

# PHASE 1: Look at /data/blobs/ and /data/manifests/
Expand All @@ -246,21 +252,31 @@ def garbage_collect(dry_run: false)
@repo.puts_summary

# PHASE 4: Actually delete orphaned blobs
delete_orphaned_blobs(dry_run: dry_run)
delete_orphaned_blobs

puts_df_data
end

def main_loop
OptionParser.new { |opts|
}.parse!

@log.info("Starting main_loop with #{POLL_INTERVAL}s polling interval.")
loop do
garbage_collect(dry_run: false) # FIXME
garbage_collect
sleep(POLL_INTERVAL)
end
end

private

attr_reader :dry_run

def delete_cmd(dry_run:)
cmd = 'rm'
if dry_run
@log.info('DRY RUN -- not actually deleting')
cmd = "stat -c '%s %Y %n'"
end
"xargs -0 -n 1 #{cmd}"
end
end

TrowGarbageCollector.new.main_loop if $PROGRAM_NAME == __FILE__