Skip to content

Actually perform upload in setup part of Upload Performance tests #22611

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Aug 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 32 additions & 28 deletions ydb/tests/olap/load/lib/import_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,31 @@ class ImportFileCsvBase(UploadSuiteBase):
cpu_time: float = 0.0
send_format: str = '' # Optional parameter for send format

def init(self):
@classmethod
def init(cls):
# Create tables
yatest.common.execute(YdbCliHelper.get_cli_command() + ['workload', 'query', '-p', YdbCluster.get_tables_path(), 'init', '--suite-path', self.get_external_path(), '--clear'])
yatest.common.execute(YdbCliHelper.get_cli_command() + ['workload', 'query', '-p', YdbCluster.get_tables_path(), 'init', '--suite-path', cls.get_external_path(), '--clear'])

import_dir = os.path.join(self.get_external_path(), "import")
import_dir = os.path.join(cls.get_external_path(), "import")
table_names = sorted([name for name in os.listdir(import_dir) if os.path.isdir(os.path.join(import_dir, name))])
if not table_names:
raise RuntimeError(f"Found no directories in {import_dir}")
self.table_name = table_names[0] # importing just one table
logging.info(f'Importing table: {self.table_name}')
cls.table_name = table_names[0] # importing just one table
logging.info(f'Importing table: {cls.table_name}')

def import_data(self):
self.table_path = YdbCluster.get_tables_path(self.table_name)
logging.info(f'Table path: {self.table_path}')
import_dir = os.path.join(self.get_external_path(), 'import', self.table_name)
@classmethod
def import_data(cls):
cls.table_path = YdbCluster.get_tables_path(cls.table_name)
logging.info(f'Table path: {cls.table_path}')
import_dir = os.path.join(cls.get_external_path(), 'import', cls.table_name)
csv_files = [f for f in os.listdir(import_dir) if os.path.isfile(os.path.join(import_dir, f)) and f.endswith('.csv')]
if not csv_files:
raise RuntimeError(f'No .csv files found in {import_dir}')
import_path = os.path.join(import_dir, csv_files[0])

cmd = ['/usr/bin/time'] + YdbCliHelper.get_cli_command() + ['import', 'file', 'csv', '-p', self.table_path, import_path, '--header']
if self.send_format:
cmd.extend(['--send-format', self.send_format])
cmd = ['/usr/bin/time'] + YdbCliHelper.get_cli_command() + ['import', 'file', 'csv', '-p', cls.table_path, import_path, '--header']
if cls.send_format:
cmd.extend(['--send-format', cls.send_format])

result = yatest.common.execute(cmd)

Expand All @@ -53,41 +55,43 @@ def import_data(self):
parts = line.split()
user_time = float(parts[0].replace('user', ''))
system_time = float(parts[1].replace('system', ''))
self.cpu_time = user_time + system_time
cls.cpu_time = user_time + system_time
cpu_percent = float(line.split('%CPU')[0].strip().split()[-1])
self.cpu_cores = cpu_percent / 100.0
logging.info(f'CPU cores used: {self.cpu_cores}')
logging.info(f'Total CPU time (user + system): {self.cpu_time:.2f} seconds')
cls.cpu_cores = cpu_percent / 100.0
logging.info(f'CPU cores used: {cls.cpu_cores}')
logging.info(f'Total CPU time (user + system): {cls.cpu_time:.2f} seconds')
except (ValueError, IndexError) as e:
logging.warning(f'Failed to parse CPU usage information: {e}')

def validate(self, result: YdbCliHelper.WorkloadRunResult):
select_command = yatest.common.execute(YdbCliHelper.get_cli_command() + ['sql', '-s', f'SELECT COUNT (*) AS count FROM `{self.table_path}`', '--format', 'json-unicode'])
@classmethod
def validate(cls, result: YdbCliHelper.WorkloadRunResult):
select_command = yatest.common.execute(YdbCliHelper.get_cli_command() + ['sql', '-s', f'SELECT COUNT (*) AS count FROM `{cls.table_path}`', '--format', 'json-unicode'])
select_command_result = select_command.stdout.decode('utf-8')
count = json.loads(select_command_result)["count"]
assert count > 0, f'No rows imported into {self.table_path}'
logging.info(f'Rows in table {self.table_path} after import: {count}')
result.add_stat(self.query_name, 'rows_in_table', count)
assert count > 0, f'No rows imported into {cls.table_path}'
logging.info(f'Rows in table {cls.table_path} after import: {count}')
result.add_stat(cls.query_name, 'rows_in_table', count)

def save_result_additional_info(self, result: YdbCliHelper.WorkloadRunResult):
import_dir = os.path.join(self.get_external_path(), 'import', self.table_name)
@classmethod
def save_result_additional_info(cls, result: YdbCliHelper.WorkloadRunResult):
import_dir = os.path.join(cls.get_external_path(), 'import', cls.table_name)
file_size = sum(
os.path.getsize(os.path.join(import_dir, f))
for f in os.listdir(import_dir)
if os.path.isfile(os.path.join(import_dir, f)) and f.endswith('.csv')
)
logging.info(f'File size: {file_size} bytes')
result.add_stat(self.query_name, 'file_size', file_size)
result.add_stat(cls.query_name, 'file_size', file_size)
import_time = result.iterations[0].time
logging.info(f'Result import time: {import_time} s')
result.add_stat(self.query_name, 'import_time', import_time)
result.add_stat(cls.query_name, 'import_time', import_time)
import_speed = 0
if import_time > 0:
import_speed = file_size / import_time / 1024 / 1024 # MB/s
logging.info(f'Result import speed: {import_speed} MB/s')
result.add_stat(self.query_name, 'import_speed', import_speed)
result.add_stat(self.query_name, 'cpu_cores', self.cpu_cores)
result.add_stat(self.query_name, 'cpu_time', self.cpu_time)
result.add_stat(cls.query_name, 'import_speed', import_speed)
result.add_stat(cls.query_name, 'cpu_cores', cls.cpu_cores)
result.add_stat(cls.query_name, 'cpu_time', cls.cpu_time)

@classmethod
def teardown_class(cls) -> None:
Expand Down
150 changes: 89 additions & 61 deletions ydb/tests/olap/load/lib/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,42 @@

class UploadSuiteBase(LoadSuiteBase):
query_name = 'Upload'
upload_result: YdbCliHelper.WorkloadRunResult = None

def init(self):
@classmethod
def init(cls):
pass

def import_data(self):
@classmethod
def import_data(cls):
pass

def before_import_data(self):
@classmethod
def before_import_data(cls):
pass

def after_import_data(self):
@classmethod
def after_import_data(cls):
pass

def wait_compaction(self):
@classmethod
def wait_compaction(cls):
pass

def after_compaction(self):
@classmethod
def after_compaction(cls):
pass

def validate(self, result: YdbCliHelper.WorkloadRunResult):
@classmethod
def validate(cls, result: YdbCliHelper.WorkloadRunResult):
pass

def save_result_additional_info(self, result: YdbCliHelper.WorkloadRunResult):
@classmethod
def save_result_additional_info(cls, result: YdbCliHelper.WorkloadRunResult):
pass

def test(self):
@classmethod
def do_setup_class(cls) -> None:
start_time = time()
result = YdbCliHelper.WorkloadRunResult()
result.iterations[0] = YdbCliHelper.Iteration()
Expand All @@ -50,25 +60,30 @@ def test(self):
first_node_start_time = min(nodes_start_time) if len(nodes_start_time) > 0 else 0
result.start_time = max(start_time - 600, first_node_start_time)
try:
self.save_nodes_state()
cls.save_nodes_state()
with allure.step("init"):
self.init()
cls.init()
start_time = time()
with allure.step("import data"):
self.before_import_data()
self.import_data()
self.after_import_data()
self.wait_compaction()
self.after_compaction()
cls.before_import_data()
cls.import_data()
cls.after_import_data()
cls.wait_compaction()
cls.after_compaction()
except BaseException as e:
logging.error(f'Error: {e}')
result.add_error(str(e))
result.traceback = e.__traceback__
raise e
result.iterations[0].time = time() - start_time
self.validate(result)
self.save_result_additional_info(result)
self.process_query_result(result, self.query_name, True)
cls.validate(result)
cls.save_result_additional_info(result)
cls.upload_result = result

def test(self):
if self.upload_result is None:
raise RuntimeError("upload_result is None. Ensure do_setup_class() was called and completed successfully before running the test.")
self.process_query_result(self.upload_result, self.query_name, True)


class UploadClusterBase(UploadSuiteBase):
Expand All @@ -88,7 +103,8 @@ def __get_metrics(cls) -> dict[str, dict[str, float]]:
'compacted_bytes': {'Consumer': 'GENERAL_COMPACTION', 'component': 'Writer', 'sensor': 'Deriviative/Requests/Bytes'},
})

def __compaction_complete_for_table(self, table_full_path: str) -> bool:
@classmethod
def __compaction_complete_for_table(cls, table_full_path: str) -> bool:
sth = ScenarioTestHelper(None)
result = sth.execute_scan_query(f'''
SELECT COUNT(*)
Expand All @@ -97,14 +113,16 @@ def __compaction_complete_for_table(self, table_full_path: str) -> bool:
''')
return result.result_set.rows[0][0] == 0

def __compaction_complete(self) -> bool:
for table in YdbCluster.get_tables(self.get_path()):
if not self.__compaction_complete_for_table(table):
@classmethod
def __compaction_complete(cls) -> bool:
for table in YdbCluster.get_tables(cls.get_path()):
if not cls.__compaction_complete_for_table(table):
return False
return True

@classmethod
@allure.step
def __stats_ready_for_table(self, table_full_path: str) -> bool:
def __stats_ready_for_table(cls, table_full_path: str) -> bool:
def __max_e_rows(node: dict):
if node.get('Name') == 'TableFullScan' and node.get('Path') == table_full_path:
return int(node.get('E-Rows', 0))
Expand All @@ -115,65 +133,72 @@ def __max_e_rows(node: dict):
plan = json.loads(driver.table_client.session().create().explain(f'SELECT COUNT(*) FROM `{table_full_path}`').query_plan)
return __max_e_rows(plan.get('Plan', {})) > 0

def __stats_ready(self) -> bool:
for table in YdbCluster.get_tables(self.get_path()):
if not self.__stats_ready_for_table(table):
@classmethod
def __stats_ready(cls) -> bool:
for table in YdbCluster.get_tables(cls.get_path()):
if not cls.__stats_ready_for_table(table):
return False
return True

def __get_tables_size_bytes(self) -> tuple[int, int]:
@classmethod
def __get_tables_size_bytes(cls) -> tuple[int, int]:
sth = ScenarioTestHelper(None)
raw_bytes = 0
bytes = 0
for table in YdbCluster.get_tables(self.get_path()):
for table in YdbCluster.get_tables(cls.get_path()):
table_raw_bytes, table_bytes = sth.get_volumes_columns(table, '')
raw_bytes += table_raw_bytes
bytes += table_bytes
return raw_bytes, bytes

@classmethod
@allure.step
def wait_compaction(self):
while not self.__compaction_complete():
def wait_compaction(cls):
while not cls.__compaction_complete():
sleep(1)

def before_import_data(self):
self.__saved_metrics = self.__get_metrics()
self.__import_start_time = time()
@classmethod
def before_import_data(cls):
cls.__saved_metrics = cls.__get_metrics()
cls.__import_start_time = time()

def after_import_data(self):
self.__import_time = time() - self.__import_start_time
@classmethod
def after_import_data(cls):
cls.__import_time = time() - cls.__import_start_time

def after_compaction(self):
self.__gross_time = time() - self.__import_start_time
@classmethod
def after_compaction(cls):
cls.__gross_time = time() - cls.__import_start_time
metrics = {}
for slot, values in self.__get_metrics().items():
for slot, values in cls.__get_metrics().items():
for k, v in values.items():
metrics.setdefault(k, 0.)
metrics[k] += v - self.__saved_metrics.get(slot, {}).get(k, 0.)
self.__saved_metrics = metrics

def save_result_additional_info(self, result: YdbCliHelper.WorkloadRunResult):
result.add_stat(self.query_name, 'GrossTime', int(self.__gross_time * 1000))
result.add_stat(self.query_name, 'time_with_compaction', int(self.__gross_time * 1000))
result.add_stat(self.query_name, 'import_time', int(self.__import_time * 1000))
result.add_stat(self.query_name, 'Mean', int(self.__import_time * 1000))
written_bytes = self.__saved_metrics.get('written_bytes', 0.)
compacted_bytes = self.__saved_metrics.get('compacted_bytes', 0.)
result.add_stat(self.query_name, 'written_bytes', int(written_bytes))
result.add_stat(self.query_name, 'compacted_bytes', int(compacted_bytes))
metrics[k] += v - cls.__saved_metrics.get(slot, {}).get(k, 0.)
cls.__saved_metrics = metrics

@classmethod
def save_result_additional_info(cls, result: YdbCliHelper.WorkloadRunResult):
result.add_stat(cls.query_name, 'GrossTime', int(cls.__gross_time * 1000))
result.add_stat(cls.query_name, 'time_with_compaction', int(cls.__gross_time * 1000))
result.add_stat(cls.query_name, 'import_time', int(cls.__import_time * 1000))
result.add_stat(cls.query_name, 'Mean', int(cls.__import_time * 1000))
written_bytes = cls.__saved_metrics.get('written_bytes', 0.)
compacted_bytes = cls.__saved_metrics.get('compacted_bytes', 0.)
result.add_stat(cls.query_name, 'written_bytes', int(written_bytes))
result.add_stat(cls.query_name, 'compacted_bytes', int(compacted_bytes))
if written_bytes > 0.:
result.add_stat(self.query_name, 'write_amplification', compacted_bytes / written_bytes)
raw_tables_size_bytes, tables_size_bytes = self.__get_tables_size_bytes()
result.add_stat(self.query_name, 'tables_size_bytes', tables_size_bytes)
result.add_stat(self.query_name, 'raw_tables_size_bytes', raw_tables_size_bytes)
result.add_stat(cls.query_name, 'write_amplification', compacted_bytes / written_bytes)
raw_tables_size_bytes, tables_size_bytes = cls.__get_tables_size_bytes()
result.add_stat(cls.query_name, 'tables_size_bytes', tables_size_bytes)
result.add_stat(cls.query_name, 'raw_tables_size_bytes', raw_tables_size_bytes)
with allure.step("wait tables statistics"):
MAX_TIMEOUT = 1200
start_wait = time()
while not self.__stats_ready():
while not cls.__stats_ready():
if time() - start_wait > MAX_TIMEOUT:
pytest.fail(f'Stats not ready before timeout {MAX_TIMEOUT}s')
sleep(1)
result.add_stat(self.query_name, 'wait_stats_seconds', time() - start_wait)
result.add_stat(cls.query_name, 'wait_stats_seconds', time() - start_wait)


class UploadTpchBase(UploadClusterBase):
Expand Down Expand Up @@ -226,12 +251,15 @@ def do_setup_class(cls) -> None:
done
''')
re.deploy_binary(script_path, node.host, cls.get_remote_tmpdir())
super().do_setup_class()

def init(self):
yatest.common.execute(YdbCliHelper.get_cli_command() + ['workload', 'tpch', '-p', YdbCluster.get_tables_path(self.get_path()), 'init', '--store=column', '--clear'])
@classmethod
def init(cls):
yatest.common.execute(YdbCliHelper.get_cli_command() + ['workload', 'tpch', '-p', YdbCluster.get_tables_path(cls.get_path()), 'init', '--store=column', '--clear'])

def import_data(self):
self.__execute_on_nodes(f'{self.get_remote_tmpdir()}/ydb_upload_tpch.sh')
@classmethod
def import_data(cls):
cls.__execute_on_nodes(f'{cls.get_remote_tmpdir()}/ydb_upload_tpch.sh')

@classmethod
def do_teardown_class(cls) -> None:
Expand Down
Loading