Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#### Features Added
* Added merge support. See [PR 42924](https://github.com/Azure/azure-sdk-for-python/pull/42924).
* Added support for priority_level for sync and async clients. See [PR 43917](https://github.com/Azure/azure-sdk-for-python/pull/43917)

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ def __init__( # pylint: disable=too-many-statements
The connection policy for the client.
:param documents.ConsistencyLevel consistency_level:
The default consistency policy for client operations.
:keyword Literal["High", "Low"] priority_level: Priority based execution allows users to set a priority for the
client. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
"""
self.client_id = str(uuid.uuid4())
self.url_connection = url_connection
Expand Down Expand Up @@ -165,6 +168,10 @@ def __init__( # pylint: disable=too-many-statements
if throughput_bucket:
self.default_headers[http_constants.HttpHeaders.ThroughputBucket] = throughput_bucket

priority_level = kwargs.pop('priority_level', None)
if priority_level:
self.default_headers[http_constants.HttpHeaders.PriorityLevel] = priority_level

# Keeps the latest response headers from the server.
self.last_response_headers: CaseInsensitiveDict = CaseInsensitiveDict()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ def __init__( # pylint: disable=too-many-statements
The connection policy for the client.
:param documents.ConsistencyLevel consistency_level:
The default consistency policy for client operations.
:keyword Literal["High", "Low"] priority_level: Priority based execution allows users to set a priority for the
client. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
"""
self.client_id = str(uuid.uuid4())
self.url_connection = url_connection
Expand Down Expand Up @@ -167,6 +170,10 @@ def __init__( # pylint: disable=too-many-statements
if throughput_bucket:
self.default_headers[http_constants.HttpHeaders.ThroughputBucket] = throughput_bucket

priority_level = kwargs.pop('priority_level', None)
if priority_level:
self.default_headers[http_constants.HttpHeaders.PriorityLevel] = priority_level

if consistency_level is not None:
self.default_headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level

Expand Down
59 changes: 59 additions & 0 deletions sdk/cosmos/azure-cosmos/samples/examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,65 @@

# [END priority_level option]

# [START client_level_priority]
# Priority can also be set at the client level, which will apply to all requests made by that client.
# This is useful when you want all operations from a particular client to have the same priority.
# The client-level priority is set during client initialization with the `priority_level` parameter.

# Create a client with Low priority for all requests
low_priority_client = CosmosClient(url, key, priority_level="Low")
low_priority_database = low_priority_client.get_database_client(database_name)
low_priority_container = low_priority_database.get_container_client(container_name)

# Add some items to query
for i in range(1, 4):
low_priority_container.upsert_item(
dict(id="low_priority_item{}".format(i), productName="Widget", productModel="Model {}".format(i))
)

# All requests from this client will have Low priority by default
for queried_item in low_priority_container.query_items(
query='SELECT * FROM products p WHERE p.productName = "Widget"',
enable_cross_partition_query=True
):
print(json.dumps(queried_item, indent=True))

# [END client_level_priority]

# [START request_priority_precedence]
# Request-level priority takes precedence over client-level priority.
# This allows you to override the default priority for specific operations.

# Create a client with Low priority
client_with_default_priority = CosmosClient(url, key, priority_level="Low")
database_with_priority = client_with_default_priority.get_database_client(database_name)
container_with_priority = database_with_priority.get_container_client(container_name)

# Add items with different priority levels to the container
container_with_priority.upsert_item(
dict(id="urgent_item1", productName="Widget", priority="urgent", productModel="Urgent Model")
)
container_with_priority.upsert_item(
dict(id="normal_item1", productName="Widget", priority="normal", productModel="Normal Model")
)

# This query will use High priority, overriding the client's Low priority setting
for important_item in container_with_priority.query_items(
query='SELECT * FROM products p WHERE p.priority = "urgent"',
enable_cross_partition_query=True,
priority="High" # Request-level priority overrides client-level priority
):
print(json.dumps(important_item, indent=True))

# This query will use the client's default Low priority
for normal_item in container_with_priority.query_items(
query='SELECT * FROM products p WHERE p.priority = "normal"',
enable_cross_partition_query=True
):
print(json.dumps(normal_item, indent=True))

# [END request_priority_precedence]

# Delete items from the container.
# The Cosmos DB SQL API does not support 'DELETE' queries,
# so deletes must be done with the delete_item method
Expand Down
64 changes: 64 additions & 0 deletions sdk/cosmos/azure-cosmos/samples/examples_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,76 @@ async def examples_async():
# then Azure Cosmos DB will throttle low priority requests to allow high priority requests to execute.
# Can be used for Read, Write, and Query operations. This is specified with the `priority` keyword.
# the value can either be low or high.

# Ensure we have items to query (this reuses items from earlier in the example)
# If running this section independently, uncomment the lines below:
# for i in range(1, 10):
# await container.upsert_item(
# dict(id="item{}".format(i), productName="Widget", productModel="Model {}".format(i))
# )

async for queried_item in container.query_items(
query='SELECT * FROM products p WHERE p.productModel <> "DISCONTINUED"', priority="High"
):
print(json.dumps(queried_item, indent=True))
# [END priority_level option]

# [START client_level_priority]
# Priority can also be set at the client level, which will apply to all requests made by that client.
# This is useful when you want all operations from a particular client to have the same priority.
# The client-level priority is set during client initialization with the `priority_level` parameter.

# Create a client with Low priority for all requests
async with CosmosClient(url, key, priority_level="Low") as low_priority_client:
low_priority_database = low_priority_client.get_database_client(database_name)
low_priority_container = low_priority_database.get_container_client(container_name)

# Add some items to query
for i in range(1, 4):
await low_priority_container.upsert_item(
dict(id="low_priority_item{}".format(i), productName="Widget", productModel="Model {}".format(i))
)

# All requests from this client will have Low priority by default
async for queried_item in low_priority_container.query_items(
query='SELECT * FROM products p WHERE p.productName = "Widget"'
):
print(json.dumps(queried_item, indent=True))

# [END client_level_priority]

# [START request_priority_precedence]
# Request-level priority takes precedence over client-level priority.
# This allows you to override the default priority for specific operations.

# Create a client with Low priority
async with CosmosClient(url, key, priority_level="Low") as client_with_default_priority:
database_with_priority = client_with_default_priority.get_database_client(database_name)
container_with_priority = database_with_priority.get_container_client(container_name)

# Add items with different priority levels to the container
await container_with_priority.upsert_item(
dict(id="urgent_item1", productName="Widget", priority="urgent", productModel="Urgent Model")
)
await container_with_priority.upsert_item(
dict(id="normal_item1", productName="Widget", priority="normal", productModel="Normal Model")
)

# This query will use High priority, overriding the client's Low priority setting
async for important_item in container_with_priority.query_items(
query='SELECT * FROM products p WHERE p.priority = "urgent"',
priority="High" # Request-level priority overrides client-level priority
):
print(json.dumps(important_item, indent=True))

# This query will use the client's default Low priority
async for normal_item in container_with_priority.query_items(
query='SELECT * FROM products p WHERE p.priority = "normal"'
):
print(json.dumps(normal_item, indent=True))

# [END request_priority_precedence]

# Delete items from the container.
# The Cosmos DB SQL API does not support 'DELETE' queries,
# so deletes must be done with the delete_item method
Expand Down
30 changes: 30 additions & 0 deletions sdk/cosmos/azure-cosmos/tests/test_headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

client_throughput_bucket_number = 2
request_throughput_bucket_number = 3
client_priority_level = "Low"
request_priority_level = "High"

def client_raw_response_hook(response):
assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket]
== str(client_throughput_bucket_number))
Expand All @@ -23,6 +26,14 @@ def request_raw_response_hook(response):
assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket]
== str(request_throughput_bucket_number))

def client_priority_raw_response_hook(response):
assert (response.http_request.headers[http_constants.HttpHeaders.PriorityLevel]
== client_priority_level)

def request_priority_raw_response_hook(response):
assert (response.http_request.headers[http_constants.HttpHeaders.PriorityLevel]
== request_priority_level)

def partition_merge_support_response_hook(raw_response):
header = raw_response.http_request.headers
assert http_constants.HttpHeaders.SDKSupportedCapabilities in header
Expand Down Expand Up @@ -290,5 +301,24 @@ def test_partition_merge_support_header(self):
# base method to set the header(GetHeaders).
self.container.read(raw_response_hook=partition_merge_support_response_hook)

def test_client_level_priority_level(self):
# Test that priority level set at client level is used for all requests
cosmos_client.CosmosClient(self.host, self.masterKey,
priority_level=client_priority_level,
raw_response_hook=client_priority_raw_response_hook)

def test_request_precedence_priority_level(self):
# Test that request-level priority takes precedence over client-level priority
client = cosmos_client.CosmosClient(self.host, self.masterKey,
priority_level=client_priority_level)
created_db = client.get_database_client(self.configs.TEST_DATABASE_ID)
created_container = created_db.get_container_client(self.configs.TEST_MULTI_PARTITION_CONTAINER_ID)

# Create an item with request-level priority that overrides client-level priority
created_container.create_item(
body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'},
priority=request_priority_level,
raw_response_hook=request_priority_raw_response_hook)

if __name__ == "__main__":
unittest.main()
30 changes: 30 additions & 0 deletions sdk/cosmos/azure-cosmos/tests/test_headers_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

client_throughput_bucket_number = 2
request_throughput_bucket_number = 3
client_priority_level = "Low"
request_priority_level = "High"

async def client_raw_response_hook(response):
assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket]
== str(client_throughput_bucket_number))
Expand All @@ -24,6 +27,14 @@ async def request_raw_response_hook(response):
assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket]
== str(request_throughput_bucket_number))

async def client_priority_raw_response_hook(response):
assert (response.http_request.headers[http_constants.HttpHeaders.PriorityLevel]
== client_priority_level)

async def request_priority_raw_response_hook(response):
assert (response.http_request.headers[http_constants.HttpHeaders.PriorityLevel]
== request_priority_level)


class ClientIDVerificationError(Exception):
"""Custom exception for client ID verification errors."""
Expand Down Expand Up @@ -236,5 +247,24 @@ async def test_partition_merge_support_header(self):
# base method to set the header(GetHeaders).
await self.container.read(raw_response_hook=partition_merge_support_response_hook)

async def test_client_level_priority_level_async(self):
# Test that priority level set at client level is used for all requests
CosmosClient(self.host, self.masterKey,
priority_level=client_priority_level,
raw_response_hook=client_priority_raw_response_hook)

async def test_request_precedence_priority_level_async(self):
# Test that request-level priority takes precedence over client-level priority
client = CosmosClient(self.host, self.masterKey,
priority_level=client_priority_level)
database = client.get_database_client(self.configs.TEST_DATABASE_ID)
created_container = database.get_container_client(self.configs.TEST_MULTI_PARTITION_CONTAINER_ID)

# Create an item with request-level priority that overrides client-level priority
await created_container.create_item(
body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'},
priority=request_priority_level,
raw_response_hook=request_priority_raw_response_hook)

if __name__ == "__main__":
unittest.main()
Loading