|
8 | 8 | import java.io.IOException;
|
9 | 9 | import java.lang.System.Logger.Level;
|
10 | 10 | import java.nio.file.Files;
|
| 11 | +import java.nio.file.Path; |
11 | 12 | import java.util.List;
|
12 | 13 | import java.util.Objects;
|
13 | 14 | import java.util.concurrent.CopyOnWriteArrayList;
|
@@ -44,6 +45,8 @@ public final class BlocksFilesHistoricPlugin implements BlockProviderPlugin, Blo
|
44 | 45 | private final CopyOnWriteArrayList<LongRange> inProgressZipRanges = new CopyOnWriteArrayList<>();
|
45 | 46 | /** Running total of bytes stored in the historic tier */
|
46 | 47 | private final AtomicLong totalBytesStored = new AtomicLong(0);
|
| 48 | + /** The config used for this plugin */ |
| 49 | + private FilesHistoricConfig config; |
47 | 50 | /** The Storage Retention Policy Threshold */
|
48 | 51 | private long retentionPolicyThreshold;
|
49 | 52 |
|
@@ -74,7 +77,7 @@ public List<Class<? extends Record>> configDataTypes() {
|
74 | 77 | @Override
|
75 | 78 | public void init(final BlockNodeContext context, final ServiceBuilder serviceBuilder) {
|
76 | 79 | this.context = Objects.requireNonNull(context);
|
77 |
| - final FilesHistoricConfig config = context.configuration().getConfigData(FilesHistoricConfig.class); |
| 80 | + this.config = context.configuration().getConfigData(FilesHistoricConfig.class); |
78 | 81 | this.retentionPolicyThreshold = context.storageRetentionPolicyThreshold();
|
79 | 82 | // Initialize metrics
|
80 | 83 | initMetrics(context.metrics());
|
@@ -198,6 +201,11 @@ public BlockRangeSet availableBlocks() {
|
198 | 201 | public void handlePersisted(PersistedNotification notification) {
|
199 | 202 | if (notification.blockProviderPriority() > defaultPriority()) {
|
200 | 203 | attemptZipping();
|
| 204 | + // @todo(1268) like in the recents plugin, should we cleanup only if the priority check passes? |
| 205 | + // here I think yes, because we only get new data if the check passes, so there is no issue |
| 206 | + // to keep within the bounds of the retention policy threshold. But for the recents plugin, |
| 207 | + // maybe we need to think about it since it gets new data via the verification notification. |
| 208 | + cleanup(); |
201 | 209 | } // todo this is not enough of an assertion that the blocks will be coming from the right place
|
202 | 210 | // as notifications are async and things can happen, when we get the accessors later, we should
|
203 | 211 | // be able to get accessors only from places that have higher priority than us. We should probably
|
@@ -226,6 +234,32 @@ private void attemptZipping() {
|
226 | 234 | }
|
227 | 235 | }
|
228 | 236 |
|
| 237 | + private void cleanup() { |
| 238 | + final long totalStored = availableBlocks.size(); |
| 239 | + long excess = totalStored - retentionPolicyThreshold; |
| 240 | + while (excess >= numberOfBlocksPerZipFile) { |
| 241 | + // if we have passed the above check, we can delete at least one zip file |
| 242 | + // we assume there are no gaps in the zips, the number of blocks per zip file |
| 243 | + // setting is not possible to change after starting the system for the first time, |
| 244 | + // also the number of blocks per zip file is a power of ten, so we can safely |
| 245 | + // say that whatever the range is, it will always start/end with a predictable number |
| 246 | + // e.g. 0-9, 10-19, 20-29 (batch 10s) or 10_000-19_999, 20_000-29_999 (batch 1000s) etc. |
| 247 | + // depending on the setting |
| 248 | + final long minBlockNumberStored = availableBlocks.min(); |
| 249 | + // no need to compute existing below, we need the path to the zip file, we do not need to |
| 250 | + // check if it exists, moreover we do not need to know actual block compression type. |
| 251 | + final Path zipToDelete = |
| 252 | + BlockPath.computeBlockPath(config, minBlockNumberStored).zipFilePath(); |
| 253 | + try { |
| 254 | + Files.deleteIfExists(zipToDelete); |
| 255 | + } catch (final IOException e) { |
| 256 | + throw new RuntimeException(e); |
| 257 | + // @todo(1268) what to do here if we cannot delete the zip file? |
| 258 | + } |
| 259 | + excess -= numberOfBlocksPerZipFile; |
| 260 | + } |
| 261 | + } |
| 262 | + |
229 | 263 | /**
|
230 | 264 | * Start moving a batch of blocks to a zip file in background as long as batch is not already in progress or queued
|
231 | 265 | * to be started.
|
|
0 commit comments