Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#### Features Added
* Added merge support. See [PR 42924](https://github.com/Azure/azure-sdk-for-python/pull/42924).
* Added support for priority-based throttling at the client level for sync and async clients. See [PR 43917](https://github.com/Azure/azure-sdk-for-python/pull/43917)

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ def __init__( # pylint: disable=too-many-statements
The connection policy for the client.
:param documents.ConsistencyLevel consistency_level:
The default consistency policy for client operations.
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for the
client. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
"""
self.client_id = str(uuid.uuid4())
self.url_connection = url_connection
Expand Down Expand Up @@ -165,6 +168,10 @@ def __init__( # pylint: disable=too-many-statements
if throughput_bucket:
self.default_headers[http_constants.HttpHeaders.ThroughputBucket] = throughput_bucket

priority = kwargs.pop('priority', None)
if priority:
self.default_headers[http_constants.HttpHeaders.PriorityLevel] = priority

# Keeps the latest response headers from the server.
self.last_response_headers: CaseInsensitiveDict = CaseInsensitiveDict()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ def __init__( # pylint: disable=too-many-statements
The connection policy for the client.
:param documents.ConsistencyLevel consistency_level:
The default consistency policy for client operations.
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for the
client. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
"""
self.client_id = str(uuid.uuid4())
self.url_connection = url_connection
Expand Down Expand Up @@ -167,6 +170,10 @@ def __init__( # pylint: disable=too-many-statements
if throughput_bucket:
self.default_headers[http_constants.HttpHeaders.ThroughputBucket] = throughput_bucket

priority = kwargs.pop('priority', None)
if priority:
self.default_headers[http_constants.HttpHeaders.PriorityLevel] = priority

if consistency_level is not None:
self.default_headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level

Expand Down
59 changes: 59 additions & 0 deletions sdk/cosmos/azure-cosmos/samples/examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,65 @@

# [END priority_level option]

# [START client_level_priority]
# Priority can also be set at the client level, which will apply to all requests made by that client.
# This is useful when you want all operations from a particular client to have the same priority.
# The client-level priority is set during client initialization with the `priority` parameter.

# Create a client with Low priority for all requests
low_priority_client = CosmosClient(url, key, priority="Low")
low_priority_database = low_priority_client.get_database_client(database_name)
low_priority_container = low_priority_database.get_container_client(container_name)

# Add some items to query
for i in range(1, 4):
low_priority_container.upsert_item(
dict(id="low_priority_item{}".format(i), productName="Widget", productModel="Model {}".format(i))
)

# All requests from this client will have Low priority by default
for queried_item in low_priority_container.query_items(
query='SELECT * FROM products p WHERE p.productName = "Widget"',
enable_cross_partition_query=True
):
print(json.dumps(queried_item, indent=True))

# [END client_level_priority]

# [START request_priority_precedence]
# Request-level priority takes precedence over client-level priority.
# This allows you to override the default priority for specific operations.

# Create a client with Low priority
client_with_default_priority = CosmosClient(url, key, priority="Low")
database_with_priority = client_with_default_priority.get_database_client(database_name)
container_with_priority = database_with_priority.get_container_client(container_name)

# Add items with different priority levels to the container
container_with_priority.upsert_item(
dict(id="urgent_item1", productName="Widget", priority="High", productModel="High Priority Model")
)
container_with_priority.upsert_item(
dict(id="normal_item1", productName="Widget", priority="Low", productModel="Low Priority Model")
)

# This query will use High priority, overriding the client's Low priority setting
for important_item in container_with_priority.query_items(
query='SELECT * FROM products p WHERE p.priority = "High"',
enable_cross_partition_query=True,
priority="High" # Request-level priority overrides client-level priority
):
print(json.dumps(important_item, indent=True))

# This query will use the client's default Low priority
for normal_item in container_with_priority.query_items(
query='SELECT * FROM products p WHERE p.priority = "Low"',
enable_cross_partition_query=True
):
print(json.dumps(normal_item, indent=True))

# [END request_priority_precedence]

# Delete items from the container.
# The Cosmos DB SQL API does not support 'DELETE' queries,
# so deletes must be done with the delete_item method
Expand Down
57 changes: 57 additions & 0 deletions sdk/cosmos/azure-cosmos/samples/examples_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,69 @@ async def examples_async():
# then Azure Cosmos DB will throttle low priority requests to allow high priority requests to execute.
# Can be used for Read, Write, and Query operations. This is specified with the `priority` keyword.
# the value can either be low or high.

async for queried_item in container.query_items(
query='SELECT * FROM products p WHERE p.productModel <> "DISCONTINUED"', priority="High"
):
print(json.dumps(queried_item, indent=True))
# [END priority_level option]

# [START client_level_priority]
# Priority can also be set at the client level, which will apply to all requests made by that client.
# This is useful when you want all operations from a particular client to have the same priority.
# The client-level priority is set during client initialization with the `priority` parameter.

# Create a client with Low priority for all requests
async with CosmosClient(url, key, priority="Low") as low_priority_client:
low_priority_database = low_priority_client.get_database_client(database_name)
low_priority_container = low_priority_database.get_container_client(container_name)

# Add some items to query
for i in range(1, 4):
await low_priority_container.upsert_item(
dict(id="low_priority_item{}".format(i), productName="Widget", productModel="Model {}".format(i))
)

# All requests from this client will have Low priority by default
async for queried_item in low_priority_container.query_items(
query='SELECT * FROM products p WHERE p.productName = "Widget"'
):
print(json.dumps(queried_item, indent=True))

# [END client_level_priority]

# [START request_priority_precedence]
# Request-level priority takes precedence over client-level priority.
# This allows you to override the default priority for specific operations.

# Create a client with Low priority
async with CosmosClient(url, key, priority="Low") as client_with_default_priority:
database_with_priority = client_with_default_priority.get_database_client(database_name)
container_with_priority = database_with_priority.get_container_client(container_name)

# Add items with different priority levels to the container
await container_with_priority.upsert_item(
dict(id="urgent_item1", productName="Widget", priority="Low", productModel="Low Priority Model")
)
await container_with_priority.upsert_item(
dict(id="normal_item1", productName="Widget", priority="High", productModel="High Priority Model")
)

# This query will use High priority, overriding the client's Low priority setting
async for important_item in container_with_priority.query_items(
query='SELECT * FROM products p WHERE p.priority = "High"',
priority="High" # Request-level priority overrides client-level priority
):
print(json.dumps(important_item, indent=True))

# This query will use the client's default Low priority
async for normal_item in container_with_priority.query_items(
query='SELECT * FROM products p WHERE p.priority = "Low"'
):
print(json.dumps(normal_item, indent=True))

# [END request_priority_precedence]

# Delete items from the container.
# The Cosmos DB SQL API does not support 'DELETE' queries,
# so deletes must be done with the delete_item method
Expand Down
30 changes: 30 additions & 0 deletions sdk/cosmos/azure-cosmos/tests/test_headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

client_throughput_bucket_number = 2
request_throughput_bucket_number = 3
client_priority = "Low"
request_priority = "High"

def client_raw_response_hook(response):
assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket]
== str(client_throughput_bucket_number))
Expand All @@ -23,6 +26,14 @@ def request_raw_response_hook(response):
assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket]
== str(request_throughput_bucket_number))

def client_priority_raw_response_hook(response):
assert (response.http_request.headers[http_constants.HttpHeaders.PriorityLevel]
== client_priority)

def request_priority_raw_response_hook(response):
assert (response.http_request.headers[http_constants.HttpHeaders.PriorityLevel]
== request_priority)

def partition_merge_support_response_hook(raw_response):
header = raw_response.http_request.headers
assert http_constants.HttpHeaders.SDKSupportedCapabilities in header
Expand Down Expand Up @@ -290,5 +301,24 @@ def test_partition_merge_support_header(self):
# base method to set the header(GetHeaders).
self.container.read(raw_response_hook=partition_merge_support_response_hook)

def test_client_level_priority(self):
# Test that priority level set at client level is used for all requests
cosmos_client.CosmosClient(self.host, self.masterKey,
priority=client_priority,
raw_response_hook=client_priority_raw_response_hook)

def test_request_precedence_priority(self):
# Test that request-level priority takes precedence over client-level priority
client = cosmos_client.CosmosClient(self.host, self.masterKey,
priority=client_priority)
created_db = client.get_database_client(self.configs.TEST_DATABASE_ID)
created_container = created_db.get_container_client(self.configs.TEST_MULTI_PARTITION_CONTAINER_ID)

# Create an item with request-level priority that overrides client-level priority
created_container.create_item(
body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'},
priority=request_priority,
raw_response_hook=request_priority_raw_response_hook)

if __name__ == "__main__":
unittest.main()
34 changes: 34 additions & 0 deletions sdk/cosmos/azure-cosmos/tests/test_headers_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

client_throughput_bucket_number = 2
request_throughput_bucket_number = 3
client_priority = "Low"
request_priority = "High"

async def client_raw_response_hook(response):
assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket]
== str(client_throughput_bucket_number))
Expand All @@ -24,6 +27,14 @@ async def request_raw_response_hook(response):
assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket]
== str(request_throughput_bucket_number))

async def client_priority_raw_response_hook(response):
assert (response.http_request.headers[http_constants.HttpHeaders.PriorityLevel]
== client_priority)

async def request_priority_raw_response_hook(response):
assert (response.http_request.headers[http_constants.HttpHeaders.PriorityLevel]
== request_priority)


class ClientIDVerificationError(Exception):
"""Custom exception for client ID verification errors."""
Expand Down Expand Up @@ -236,5 +247,28 @@ async def test_partition_merge_support_header(self):
# base method to set the header(GetHeaders).
await self.container.read(raw_response_hook=partition_merge_support_response_hook)

async def test_client_level_priority_async(self):
# Test that priority level set at client level is used for all requests
async with CosmosClient(self.host, self.masterKey,
priority=client_priority,
raw_response_hook=client_priority_raw_response_hook) as client:
# Make a request to trigger the hook
database = client.get_database_client(self.configs.TEST_DATABASE_ID)
container = database.get_container_client(self.configs.TEST_MULTI_PARTITION_CONTAINER_ID)
created_item = await container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'})

async def test_request_precedence_priority_async(self):
# Test that request-level priority takes precedence over client-level priority
async with CosmosClient(self.host, self.masterKey,
priority=client_priority) as client:
database = client.get_database_client(self.configs.TEST_DATABASE_ID)
created_container = database.get_container_client(self.configs.TEST_MULTI_PARTITION_CONTAINER_ID)

# Create an item with request-level priority that overrides client-level priority
await created_container.create_item(
body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'},
priority=request_priority,
raw_response_hook=request_priority_raw_response_hook)

if __name__ == "__main__":
unittest.main()
21 changes: 6 additions & 15 deletions sdk/cosmos/azure-cosmos/tests/test_vector_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ def test_create_vector_embedding_container(self):
indexing_policy = {
"vectorIndexes": [
{"path": "/vector1", "type": "flat"},
{"path": "/vector2", "type": "quantizedFlat", "quantizationByteSize": 8},
{"path": "/vector3", "type": "diskANN", "quantizationByteSize": 8, "vectorIndexShardKey": ["/city"], "indexingSearchListSize": 50}
{"path": "/vector2", "type": "quantizedFlat", "quantizerType": "product", "quantizationByteSize": 8},
{"path": "/vector3", "type": "diskANN", "quantizerType": "product", "quantizationByteSize": 8, "vectorIndexShardKey": ["/city"], "indexingSearchListSize": 50}
]
}
vector_embedding_policy = {
Expand Down Expand Up @@ -101,7 +101,7 @@ def test_create_vector_embedding_container(self):
# Pass a vector indexing policy with hierarchical vectorIndexShardKey value
indexing_policy = {
"vectorIndexes": [
{"path": "/vector2", "type": "diskANN", 'quantizationByteSize': 64, 'indexingSearchListSize': 100, "vectorIndexShardKey": ["/country/city"]}]
{"path": "/vector2", "type": "diskANN", "quantizerType": "product", 'quantizationByteSize': 64, 'indexingSearchListSize': 100, "vectorIndexShardKey": ["/country/city"]}]
}
container_id = "vector_container" + str(uuid.uuid4())
created_container = self.test_db.create_container(
Expand Down Expand Up @@ -149,6 +149,7 @@ def test_replace_vector_indexing_policy(self):
{
"path": "/vector1",
"type": "diskANN",
"quantizerType": "product",
"quantizationByteSize": 128,
"indexingSearchListSize": 100
}
Expand Down Expand Up @@ -179,6 +180,7 @@ def test_replace_vector_indexing_policy(self):
{
"path": "/vector1",
"type": "diskANN",
"quantizerType": "product",
"quantizationByteSize": 128,
"indexingSearchListSize": 100
}]
Expand Down Expand Up @@ -407,17 +409,6 @@ def test_fail_replace_vector_indexing_policy(self):
assert e.status_code == 400
assert ("The Vector Indexing Policy's path::/vector1 not matching in Embedding's path."
in e.http_error_message)
# don't provide vector indexing policy
try:
self.test_db.replace_container(
created_container,
PartitionKey(path="/id"),
vector_embedding_policy=vector_embedding_policy)
pytest.fail("Container replace should have failed for missing indexing policy.")
except exceptions.CosmosHttpResponseError as e:
assert e.status_code == 400
assert ("The Vector Indexing Policy cannot be changed in Collection Replace."
in e.http_error_message)
# using a new indexing policy
new_indexing_policy = {
"vectorIndexes": [
Expand Down Expand Up @@ -451,7 +442,7 @@ def test_fail_replace_vector_indexing_policy(self):
pytest.fail("Container replace should have failed for new embedding policy.")
except exceptions.CosmosHttpResponseError as e:
assert e.status_code == 400
assert ("The Vector Embedding Policy cannot be changed in Collection Replace"
assert ("Paths in existing embedding policy cannot be modified in Collection Replace"
in e.http_error_message)
self.test_db.delete_container(container_id)

Expand Down
Loading