Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions ydb/core/base/ut/memory_stats_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,12 @@ Y_UNIT_TEST_SUITE (TMemoryStatsAggregator) {
"AnonRss: 36 CGroupLimit: 66 MemTotal: 65 MemAvailable: 85 AllocatedMemory: 156 AllocatorCachesMemory: 186 HardLimit: 145 SoftLimit: 165 TargetUtilization: 185 ExternalConsumption: 194 SharedCacheConsumption: 336 SharedCacheLimit: 366 MemTableConsumption: 396 MemTableLimit: 426 QueryExecutionConsumption: 456 QueryExecutionLimit: 486");
}

Y_UNIT_TEST(ColumnShard_Single) {
Y_UNIT_TEST(Compaction_Single) {
TMemoryStatsAggregator aggregator;

TMemoryStats stats;
stats.SetColumnTablesReadExecutionConsumption(1);
stats.SetColumnTablesReadExecutionLimit(2);
stats.SetColumnTablesCompactionConsumption(3);
stats.SetColumnTablesCompactionLimit(4);
stats.SetColumnTablesCacheConsumption(5);
stats.SetColumnTablesCacheLimit(6);
stats.SetCompactionConsumption(3);
stats.SetCompactionLimit(4);

aggregator.Add(stats, "host");

Expand Down
28 changes: 12 additions & 16 deletions ydb/core/memory_controller/memory_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
LOG_INFO_S(ctx, NKikimrServices::MEMORY_CONTROLLER, "Consumer QueryExecution state:" << " Consumption: " << HumanReadableBytes(queryExecutionConsumption) << " Limit: " << HumanReadableBytes(config.QueueLimits[NLocalDb::KqpResourceManagerQueue]));
Counters->GetCounter("Consumer/QueryExecution/Consumption")->Set(queryExecutionConsumption);
Counters->GetCounter("Consumer/QueryExecution/Limit")->Set(config.QueueLimits[NLocalDb::KqpResourceManagerQueue]);
memoryStats.SetQueryExecutionConsumption(queryExecutionConsumption);
memoryStats.SetQueryExecutionConsumption(memoryStats.GetQueryExecutionConsumption() + queryExecutionConsumption);
memoryStats.SetQueryExecutionLimit(config.QueueLimits[NLocalDb::KqpResourceManagerQueue]);

// Note: for now ResourceBroker and its queues aren't MemoryController consumers and don't share limits with other caches
Expand Down Expand Up @@ -487,32 +487,28 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
break;
}
case EMemoryConsumerKind::SharedCache: {
Y_ASSERT(!stats.HasSharedCacheConsumption());
Y_ASSERT(!stats.HasSharedCacheLimit());
stats.SetSharedCacheConsumption(consumer.Consumption);
stats.SetSharedCacheConsumption(stats.GetSharedCacheConsumption() + consumer.Consumption);
stats.SetSharedCacheLimit(limitBytes);
break;
}
case EMemoryConsumerKind::ColumnTablesScanGroupedMemory:
case EMemoryConsumerKind::ColumnTablesDeduplicationGroupedMemory: {
stats.SetColumnTablesReadExecutionConsumption(stats.GetColumnTablesReadExecutionConsumption() + consumer.Consumption);
stats.SetColumnTablesReadExecutionLimit(stats.GetColumnTablesReadExecutionLimit() + limitBytes);
break;
}
case EMemoryConsumerKind::ColumnTablesCompGroupedMemory: {
// TODO: what about resource broker queues?
Y_ASSERT(!stats.HasColumnTablesCompactionConsumption());
Y_ASSERT(!stats.HasColumnTablesCompactionLimit());
stats.SetColumnTablesCompactionConsumption(consumer.Consumption);
stats.SetColumnTablesCompactionLimit(limitBytes);
Y_ASSERT(!stats.HasCompactionConsumption());
Y_ASSERT(!stats.HasCompactionLimit());
stats.SetCompactionConsumption(consumer.Consumption);
stats.SetCompactionLimit(limitBytes);
break;
}
case EMemoryConsumerKind::ColumnTablesPortionsMetaDataCache:
case EMemoryConsumerKind::ColumnTablesDataAccessorCache:
case EMemoryConsumerKind::ColumnTablesColumnDataCache:
case EMemoryConsumerKind::ColumnTablesBlobCache: {
stats.SetColumnTablesCacheConsumption(stats.GetColumnTablesCacheConsumption() + consumer.Consumption);
stats.SetColumnTablesCacheLimit(stats.GetColumnTablesCacheLimit() + limitBytes);
stats.SetSharedCacheConsumption(stats.GetSharedCacheConsumption() + consumer.Consumption);
break;
}
case EMemoryConsumerKind::ColumnTablesScanGroupedMemory:
case EMemoryConsumerKind::ColumnTablesDeduplicationGroupedMemory: {
stats.SetQueryExecutionConsumption(stats.GetQueryExecutionConsumption() + consumer.Consumption);
break;
}
}
Expand Down
32 changes: 16 additions & 16 deletions ydb/core/memory_controller/memory_controller_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,7 @@ GET_MIN_LIMIT(SharedCache)
GET_MAX_LIMIT(SharedCache)

GET_LIMIT(QueryExecutionLimit)

GET_LIMIT(ColumnTablesReadExecutionLimit)
GET_LIMIT(ColumnTablesCompactionLimit)
GET_LIMIT(ColumnTablesCacheLimit)
GET_LIMIT(CompactionLimit)

// ColumnTablesReadExecution memory is split into:
// - ColumnTablesScanGroupedMemory
Expand All @@ -100,11 +97,11 @@ static_assert(ColumnTablesReadExecutionFraction + ColumnTablesDeduplicationGroup

inline ui64 GetColumnTablesScanGroupedMemoryLimitBytes(const NKikimrConfig::TMemoryControllerConfig& config, const ui64 hardLimitBytes) {
return GetFraction(ColumnTablesReadExecutionFraction,
GetColumnTablesReadExecutionLimitBytes(config, hardLimitBytes));
GetQueryExecutionLimitBytes(config, hardLimitBytes));
}
inline ui64 GetColumnTablesDeduplicationGroupedMemoryLimitBytes(const NKikimrConfig::TMemoryControllerConfig& config, const ui64 hardLimitBytes) {
return GetFraction(ColumnTablesDeduplicationGroupedMemoryFraction,
GetColumnTablesReadExecutionLimitBytes(config, hardLimitBytes));
GetQueryExecutionLimitBytes(config, hardLimitBytes));
}

// ColumnTablesCompaction memory is split into:
Expand All @@ -124,29 +121,30 @@ static_assert(ColumnTablesCompactionIndexationQueueFraction
+ ColumnTablesTtlQueueFraction
+ ColumnTablesGeneralQueueFraction
+ ColumnTablesNormalizerQueueFraction == 1);
static constexpr float ColumnTablesCompGroupedMemoryHardLimitMultiplier = 4.0f;

inline ui64 GetColumnTablesCompGroupedMemoryLimitBytes(const NKikimrConfig::TMemoryControllerConfig& config, const ui64 hardLimitBytes) {
return GetColumnTablesCompactionLimitBytes(config, hardLimitBytes);
return GetCompactionLimitBytes(config, hardLimitBytes) * static_cast<double>(ColumnTablesCompGroupedMemoryHardLimitMultiplier);
}

inline ui64 GetColumnTablesCompactionIndexationQueueLimitBytes(const NKikimrConfig::TMemoryControllerConfig& config, const ui64 hardLimitBytes) {
return GetFraction(ColumnTablesCompactionIndexationQueueFraction,
GetColumnTablesCompactionLimitBytes(config, hardLimitBytes));
GetCompactionLimitBytes(config, hardLimitBytes));
}

inline ui64 GetColumnTablesTtlQueueLimitBytes(const NKikimrConfig::TMemoryControllerConfig& config, const ui64 hardLimitBytes) {
return GetFraction(ColumnTablesTtlQueueFraction,
GetColumnTablesCompactionLimitBytes(config, hardLimitBytes));
GetCompactionLimitBytes(config, hardLimitBytes));
}

inline ui64 GetColumnTablesGeneralQueueQueueLimitBytes(const NKikimrConfig::TMemoryControllerConfig& config, const ui64 hardLimitBytes) {
return GetFraction(ColumnTablesGeneralQueueFraction,
GetColumnTablesCompactionLimitBytes(config, hardLimitBytes));
GetCompactionLimitBytes(config, hardLimitBytes));
}

inline ui64 GetColumnTablesNormalizerQueueLimitBytes(const NKikimrConfig::TMemoryControllerConfig& config, const ui64 hardLimitBytes) {
return GetFraction(ColumnTablesNormalizerQueueFraction,
GetColumnTablesCompactionLimitBytes(config, hardLimitBytes));
GetCompactionLimitBytes(config, hardLimitBytes));
}

// ColumnTablesCache memory is split into:
Expand All @@ -164,23 +162,25 @@ static_assert(ColumnTablesBlobCacheFraction
+ ColumnTablesColumnDataCacheFraction
+ ColumnTablesPortionsMetaDataCacheFraction == 1);

static constexpr float ColumnTablesCachesPercentFromShared = 20.0f;

inline ui64 GetColumnTablesBlobCacheLimitBytes(const NKikimrConfig::TMemoryControllerConfig& config, const ui64 hardLimitBytes) {
return GetFraction(ColumnTablesBlobCacheFraction,
GetColumnTablesCacheLimitBytes(config, hardLimitBytes));
GetPercent(ColumnTablesCachesPercentFromShared, GetSharedCacheMaxBytes(config, hardLimitBytes)));
}

inline ui64 GetColumnTablesDataAccessorCacheLimitBytes(const NKikimrConfig::TMemoryControllerConfig& config, const ui64 hardLimitBytes) {
return GetFraction(ColumnTablesDeduplicationGroupedMemoryFraction,
GetColumnTablesCacheLimitBytes(config, hardLimitBytes));
return GetFraction(ColumnTablesColumnTablesDataAccessorCacheFraction,
GetPercent(ColumnTablesCachesPercentFromShared, GetSharedCacheMaxBytes(config, hardLimitBytes)));
}

inline ui64 GetColumnTablesColumnDataCacheLimitBytes(const NKikimrConfig::TMemoryControllerConfig& config, const ui64 hardLimitBytes) {
return GetFraction(ColumnTablesColumnDataCacheFraction,
GetColumnTablesCacheLimitBytes(config, hardLimitBytes));
GetPercent(ColumnTablesCachesPercentFromShared, GetSharedCacheMaxBytes(config, hardLimitBytes)));
}

inline ui64 GetPortionsMetaDataCacheLimitBytes(const NKikimrConfig::TMemoryControllerConfig& config, const ui64 hardLimitBytes) {
return GetFraction(ColumnTablesPortionsMetaDataCacheFraction,
GetColumnTablesCacheLimitBytes(config, hardLimitBytes));
GetPercent(ColumnTablesCachesPercentFromShared, GetSharedCacheMaxBytes(config, hardLimitBytes)));
}
}
73 changes: 68 additions & 5 deletions ydb/core/memory_controller/memory_controller_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/core/tablet_flat/shared_sausagecache.h>
#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
#include <ydb/core/tx/columnshard/common/limits.h>
#include <ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h>
#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>
#include <ydb/library/actors/testlib/test_runtime.h>

Expand Down Expand Up @@ -505,7 +506,7 @@ Y_UNIT_TEST(ResourceBroker_ConfigCS) {

const ui64 compactionMemoryLimitPercent = 36;
auto memoryControllerConfig = serverSettings.AppConfig->MutableMemoryControllerConfig();
memoryControllerConfig->SetColumnTablesCompactionLimitPercent(compactionMemoryLimitPercent);
memoryControllerConfig->SetCompactionLimitPercent(compactionMemoryLimitPercent);

auto server = MakeIntrusive<TWithMemoryControllerServer>(serverSettings);
auto& runtime = *server->GetRuntime();
Expand Down Expand Up @@ -553,8 +554,8 @@ Y_UNIT_TEST(GroupedMemoryLimiter_ConfigCS) {
const ui64 compactionMemoryLimitPercent = 36;
const ui64 readExecutionMemoryLimitPercent = 20;
auto memoryControllerConfig = serverSettings.AppConfig->MutableMemoryControllerConfig();
memoryControllerConfig->SetColumnTablesCompactionLimitPercent(compactionMemoryLimitPercent);
memoryControllerConfig->SetColumnTablesReadExecutionLimitPercent(readExecutionMemoryLimitPercent);
memoryControllerConfig->SetCompactionLimitPercent(compactionMemoryLimitPercent);
memoryControllerConfig->SetQueryExecutionLimitPercent(readExecutionMemoryLimitPercent);

ui64 currentHardMemoryLimit = 1000_MB;
auto server = MakeIntrusive<TWithMemoryControllerServer>(serverSettings);
Expand Down Expand Up @@ -583,12 +584,12 @@ Y_UNIT_TEST(GroupedMemoryLimiter_ConfigCS) {
1_KB);

UNIT_ASSERT_DOUBLES_EQUAL(
static_cast<double>(currentHardMemoryLimit * OlapLimits::GroupedMemoryLimiterSoftLimitCoefficient * compactionMemoryLimitPercent / 100),
static_cast<double>(currentHardMemoryLimit * OlapLimits::GroupedMemoryLimiterSoftLimitCoefficient * compactionMemoryLimitPercent / 100 * ColumnTablesCompGroupedMemoryHardLimitMultiplier),
static_cast<double>(compactionCounters->GetCounter("Value/Limit/Soft/Bytes")->Val()),
1_KB);

UNIT_ASSERT_DOUBLES_EQUAL(
static_cast<double>(currentHardMemoryLimit * compactionMemoryLimitPercent / 100.0),
static_cast<double>(currentHardMemoryLimit * compactionMemoryLimitPercent / 100.0 * ColumnTablesCompGroupedMemoryHardLimitMultiplier),
static_cast<double>(compactionCounters->GetCounter("Value/Limit/Hard/Bytes")->Val()),
1_KB);
};
Expand All @@ -608,6 +609,68 @@ Y_UNIT_TEST(GroupedMemoryLimiter_ConfigCS) {
runtime.SimulateSleep(TDuration::Seconds(2));
checkMemoryLimits();
}

Y_UNIT_TEST(ColumnShardCaches_Config) {
using namespace NResourceBroker;

TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root").SetUseRealThreads(false);

const ui64 sharedCacheMaxPercent = 50;
auto memoryControllerConfig = serverSettings.AppConfig->MutableMemoryControllerConfig();
memoryControllerConfig->SetSharedCacheMaxPercent(sharedCacheMaxPercent);

ui64 currentHardMemoryLimit = 1000_MB;
auto server = MakeIntrusive<TWithMemoryControllerServer>(serverSettings);
server->ProcessMemoryInfo->CGroupLimit = currentHardMemoryLimit;
auto& runtime = *server->GetRuntime();
TAutoPtr<IEventHandle> handle;
auto sender = runtime.AllocateEdgeActor();

InitRoot(server, sender);
auto counters = runtime.GetAppData().Counters;
auto dataAccessorCache = counters->GetSubgroup("module_id", "general_cache")->GetSubgroup("cache_name", "portions_metadata")->GetSubgroup("signals_owner", "manager");
auto columnDataCache = counters->GetSubgroup("module_id", "general_cache")->GetSubgroup("cache_name", "column_data")->GetSubgroup("signals_owner", "manager");
auto blobCache = counters->GetSubgroup("type", "BLOB_CACHE");

auto checkMemoryLimits = [&]() {
UNIT_ASSERT_DOUBLES_EQUAL(
static_cast<double>(currentHardMemoryLimit * ColumnTablesPortionsMetaDataCacheFraction * sharedCacheMaxPercent / 100.0 * ColumnTablesCachesPercentFromShared / 100.0),
static_cast<double>(NKikimr::NOlap::NStorageOptimizer::IOptimizerPlanner::GetPortionsCacheLimit()),
1_KB);

UNIT_ASSERT_DOUBLES_EQUAL(
static_cast<double>(currentHardMemoryLimit * ColumnTablesColumnTablesDataAccessorCacheFraction * sharedCacheMaxPercent / 100.0 * ColumnTablesCachesPercentFromShared / 100.0),
static_cast<double>(dataAccessorCache->GetCounter("Value/Cache/SizeLimit/Bytes")->Val()),
1_KB);

UNIT_ASSERT_DOUBLES_EQUAL(
static_cast<double>(currentHardMemoryLimit * ColumnTablesColumnDataCacheFraction * sharedCacheMaxPercent / 100.0 * ColumnTablesCachesPercentFromShared / 100.0),
static_cast<double>(columnDataCache->GetCounter("Value/Cache/SizeLimit/Bytes")->Val()),
1_KB);

UNIT_ASSERT_DOUBLES_EQUAL(
static_cast<double>(currentHardMemoryLimit * ColumnTablesBlobCacheFraction * sharedCacheMaxPercent / 100.0 * ColumnTablesCachesPercentFromShared / 100.0),
static_cast<double>(blobCache->GetCounter("MaxSizeBytes")->Val()),
1_KB);
};

runtime.SimulateSleep(TDuration::Seconds(2));
checkMemoryLimits();

// Check memory decrease
currentHardMemoryLimit = 500_MB;
server->ProcessMemoryInfo->CGroupLimit = currentHardMemoryLimit;
runtime.SimulateSleep(TDuration::Seconds(2));
checkMemoryLimits();

// Check memory increase
currentHardMemoryLimit = 2000_MB;
server->ProcessMemoryInfo->CGroupLimit = currentHardMemoryLimit;
runtime.SimulateSleep(TDuration::Seconds(2));
checkMemoryLimits();
}
}

}
8 changes: 2 additions & 6 deletions ydb/core/protos/memory_controller_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ message TMemoryControllerConfig {
optional float QueryExecutionLimitPercent = 120 [default = 20];
optional uint64 QueryExecutionLimitBytes = 121;

optional float ColumnTablesReadExecutionLimitPercent = 200 [default = 20];
optional uint64 ColumnTablesReadExecutionLimitBytes = 201;
optional float ColumnTablesCompactionLimitPercent = 202 [default = 20];
optional uint64 ColumnTablesCompactionLimitBytes = 203;
optional float ColumnTablesCacheLimitPercent = 204 [default = 10];
optional uint64 ColumnTablesCacheLimitBytes = 205;
optional float CompactionLimitPercent = 130 [default = 10];
optional uint64 CompactionLimitBytes = 131;
}
9 changes: 2 additions & 7 deletions ydb/core/protos/memory_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ message TMemoryStats {
optional uint64 QueryExecutionConsumption = 18;
optional uint64 QueryExecutionLimit = 19;

optional uint64 ColumnTablesReadExecutionConsumption = 20;
optional uint64 ColumnTablesReadExecutionLimit = 21;
optional uint64 ColumnTablesCompactionConsumption = 22;
optional uint64 ColumnTablesCompactionLimit = 23;
optional uint64 ColumnTablesCacheConsumption = 24;
optional uint64 ColumnTablesCacheLimit = 25;

optional uint64 CompactionConsumption = 22;
optional uint64 CompactionLimit = 23;
}
2 changes: 1 addition & 1 deletion ydb/core/testlib/basics/services.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ namespace NPDisk {
{
runtime.AddLocalService(NBlobCache::MakeBlobCacheServiceId(),
TActorSetupCmd(
NBlobCache::CreateBlobCache(20<<20, runtime.GetDynamicCounters(nodeIndex)),
NBlobCache::CreateBlobCache(std::nullopt, runtime.GetDynamicCounters(nodeIndex)->GetSubgroup("type", "BLOB_CACHE")),
TMailboxType::ReadAsFilled,
0),
nodeIndex);
Expand Down
8 changes: 7 additions & 1 deletion ydb/core/tx/columnshard/blob_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class TBlobCache: public TActorBootstrapped<TBlobCache> {
const TCounterPtr SizeBlobsInFlight;
const TCounterPtr ReadRequests;
const TCounterPtr ReadsInQueue;
const TCounterPtr MaxSizeBytes;

TIntrusivePtr<NMemory::IMemoryConsumer> MemoryConsumer;

Expand Down Expand Up @@ -174,7 +175,8 @@ class TBlobCache: public TActorBootstrapped<TBlobCache> {
, SizeBlobsInFlight(counters->GetCounter("SizeBlobsInFlight"))
, ReadRequests(counters->GetCounter("ReadRequests", true))
, ReadsInQueue(counters->GetCounter("ReadsInQueue"))
{}
, MaxSizeBytes(counters->GetCounter("MaxSizeBytes")) {
}

void Bootstrap(const TActorContext& ctx) {
auto& icb = AppData(ctx)->Icb;
Expand All @@ -184,6 +186,8 @@ class TBlobCache: public TActorBootstrapped<TBlobCache> {
LOG_S_NOTICE("MaxCacheDataSize: " << (i64)MaxCacheDataSize
<< " InFlightDataSize: " << (i64)InFlightDataSize);

MaxSizeBytes->Set((i64)MaxCacheDataSize);

Send(NMemory::MakeMemoryControllerId(), new NMemory::TEvConsumerRegister(NMemory::EMemoryConsumerKind::ColumnTablesBlobCache));

Become(&TBlobCache::StateFunc);
Expand Down Expand Up @@ -362,6 +366,8 @@ class TBlobCache: public TActorBootstrapped<TBlobCache> {
LOG_S_DEBUG("Updating max cache data size: " << newMaxCacheDataSize);

MaxCacheDataSize = newMaxCacheDataSize;

MaxSizeBytes->Set((i64)MaxCacheDataSize);
}

void UpdateConsumption() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ class IOptimizerPlanner {
DynamicPortionsCountLimit.store(portionsCacheLimitBytes / NKikimr::NOlap::TGlobalLimits::AveragePortionSizeLimit);
}

static ui64 GetPortionsCacheLimit() {
return DynamicPortionsCountLimit.load() * NKikimr::NOlap::TGlobalLimits::AveragePortionSizeLimit;
}

virtual ui32 GetAppropriateLevel(const ui32 baseLevel, const TPortionInfoForCompaction& /*info*/) const {
return baseLevel;
}
Expand Down
3 changes: 1 addition & 2 deletions ydb/tests/functional/tpc/large/test_tpch_spilling.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ class TestTpchSpillingS10(tpch.TestTpch10, FunctionalTestBase):
memory_controller_config = {
'activities_limit_percent': 60,
'query_execution_limit_percent': 50,
'hard_limit_bytes': 6 * 1073741824,
'column_tables_read_execution_limit_bytes': 10 * 1073741824,
'hard_limit_bytes': 6 * 1073741824
}

@classmethod
Expand Down
3 changes: 0 additions & 3 deletions ydb/tests/olap/oom/overlapping_portions.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ def setup_class(cls):
"memory_limit": 1024 * 1024,
"hard_memory_limit": 1024 * 1024,
},
memory_controller_config={
"column_tables_read_execution_limit_bytes": 1024 * 1024
},
)
cls.cluster = KiKiMR(config)
cls.cluster.start()
Expand Down
Loading
Loading