Skip to content
Merged
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
e140109
feat: add time based benchmarks
chandra-siri Feb 10, 2026
ed67fcc
take sizes in kib
chandra-siri Feb 11, 2026
0471f3b
add support for queue_depth (num_ranges)
chandra-siri Feb 11, 2026
eaefc31
fix throughput calculation logic
chandra-siri Feb 11, 2026
a1fa044
remove commented out code
chandra-siri Feb 11, 2026
cc6826c
support regional buckets for time based benchmarks
chandra-siri Feb 11, 2026
b358199
run both seq and random
chandra-siri Feb 11, 2026
18393fd
Merge branch 'main' into time_based_benchmarks
chandra-siri Feb 11, 2026
c3d6ede
fix: init mp pool & grpc client once, use os.sched_setaffinity
chandra-siri Feb 12, 2026
b733577
fix lint issues
chandra-siri Feb 13, 2026
f44ebba
Merge branch 'time_based_benchmarks' of github.com:googleapis/python-…
chandra-siri Feb 13, 2026
50b8105
remove unused import
chandra-siri Feb 13, 2026
b625507
remove nonlocal reference
chandra-siri Feb 13, 2026
6bf0860
Merge branch 'time_based_benchmarks' of github.com:googleapis/python-…
chandra-siri Feb 13, 2026
9657d56
remove print statement
chandra-siri Feb 13, 2026
b320315
dynamically fetch cpu cores where hardIRQ is bound
chandra-siri Feb 17, 2026
a0a015d
use os.cpu_count()
chandra-siri Feb 17, 2026
171e496
lint changes
chandra-siri Feb 17, 2026
198c1af
Merge branch 'time_based_benchmarks' of github.com:googleapis/python-…
chandra-siri Feb 17, 2026
fcea896
dynamically fetch cpu cores where hardIRQ is bound
chandra-siri Feb 17, 2026
6b08f54
Merge branch 'main' of github.com:googleapis/python-storage into writ…
chandra-siri Feb 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 43 additions & 25 deletions tests/perf/microbenchmarks/writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@
from tests.perf.microbenchmarks._utils import (
publish_benchmark_extra_info,
RandomBytesIO,
get_irq_affinity,
)
from tests.perf.microbenchmarks.conftest import publish_resource_metrics
import tests.perf.microbenchmarks.writes.config as config
from google.cloud import storage

# Get write parameters
all_params = config.get_write_params()
Expand Down Expand Up @@ -318,10 +318,34 @@ def target_wrapper(*args, **kwargs):
)


# --- Global Variables for Worker Process ---
worker_loop = None
worker_client = None
worker_json_client = None


def _worker_init(bucket_type):
"""Initializes a persistent event loop and client for each worker process."""
cpu_affinity = get_irq_affinity()
if cpu_affinity:
os.sched_setaffinity(
0, {i for i in range(1, os.cpu_count()) if i not in cpu_affinity}
)
global worker_loop, worker_client, worker_json_client
if bucket_type == "zonal":
worker_loop = asyncio.new_event_loop()
asyncio.set_event_loop(worker_loop)
worker_client = worker_loop.run_until_complete(create_client())
else: # regional
from google.cloud import storage

worker_json_client = storage.Client()


def _upload_files_worker(files_to_upload, other_params, bucket_type):
"""A worker function for multi-processing uploads.

Initializes a client and calls the appropriate multi-coroutine upload function.
Calls the appropriate multi-coroutine upload function using the global client.
This function is intended to be called in a separate process.

Args:
Expand All @@ -333,41 +357,28 @@ def _upload_files_worker(files_to_upload, other_params, bucket_type):
float: The maximum latency from the uploads performed by this worker.
"""
if bucket_type == "zonal":
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = loop.run_until_complete(create_client())
try:
result = upload_files_using_grpc_multi_coro(
loop, client, files_to_upload, other_params
)
finally:
# cleanup loop
tasks = asyncio.all_tasks(loop=loop)
for task in tasks:
task.cancel()
loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
loop.close()
return result
return upload_files_using_grpc_multi_coro(
worker_loop, worker_client, files_to_upload, other_params
)
else: # regional
json_client = storage.Client()
return upload_files_using_json_multi_threaded(
None, json_client, files_to_upload, other_params
None, worker_json_client, files_to_upload, other_params
)


def upload_files_mp_mc_wrapper(files_names, params):
def upload_files_mp_mc_wrapper(pool, files_names, params):
"""Wrapper for multi-process, multi-coroutine uploads.

Distributes files among a pool of processes and calls the worker function.

Args:
pool: The multiprocessing pool.
files_names (list): The full list of filenames to upload.
params: An object containing benchmark parameters (num_processes, num_coros).
params: An object containing benchmark parameters (num_coros).

Returns:
float: The maximum latency observed across all processes.
"""
num_processes = params.num_processes
num_coros = params.num_coros

filenames_per_process = [
Expand All @@ -383,9 +394,7 @@ def upload_files_mp_mc_wrapper(files_names, params):
for filenames in filenames_per_process
]

ctx = multiprocessing.get_context("spawn")
with ctx.Pool(processes=num_processes) as pool:
results = pool.starmap(_upload_files_worker, args)
results = pool.starmap(_upload_files_worker, args)

return max(results)

Expand Down Expand Up @@ -414,18 +423,27 @@ def target_wrapper(*args, **kwargs):
output_times.append(result)
return output_times

ctx = multiprocessing.get_context("spawn")
pool = ctx.Pool(
processes=params.num_processes,
initializer=_worker_init,
initargs=(params.bucket_type,),
)
try:
with monitor() as m:
output_times = benchmark.pedantic(
target=target_wrapper,
iterations=1,
rounds=params.rounds,
args=(
pool,
files_names,
params,
),
)
finally:
pool.close()
pool.join()
publish_benchmark_extra_info(
benchmark, params, benchmark_group="write", true_times=output_times
)
Expand Down