-
Notifications
You must be signed in to change notification settings - Fork 67
on prem changes to disable cloud solutions #700
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
16bea80
d72f6c7
4442dd8
3febc44
a1fe268
61a67ae
10f9e4f
0dcd54a
de8cb1a
324fe4d
b138a2e
1ee453a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -114,12 +114,17 @@ def assume_role(role_arn: str, role_session_name: Optional[str] = None) -> AwsCr | |
) | ||
|
||
|
||
def session(role: Optional[str], session_type: SessionT = Session) -> SessionT: | ||
def session(role: Optional[str], session_type: SessionT = Session) -> Optional[SessionT]: | ||
|
||
"""Obtain an AWS session using an arbitrary caller-specified role. | ||
|
||
:param:`session_type` defines the type of session to return. Most users will use | ||
the default boto3 type. Some users required a special type (e.g aioboto3 session). | ||
""" | ||
# Check if AWS is disabled | ||
if os.environ.get('DISABLE_AWS') == 'true': | ||
logger.warning(f"AWS disabled - skipping role assumption (ignoring: {role})") | ||
return None | ||
|
||
# Do not assume roles in CIRCLECI | ||
if os.getenv("CIRCLECI"): | ||
logger.warning(f"In circleci, not assuming role (ignoring: {role})") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -80,9 +80,24 @@ def make_standard_logger(name: str, log_level: int = logging.INFO) -> logging.Lo | |
raise ValueError("Name must be a non-empty string.") | ||
logger = logging.getLogger(name) | ||
logger.setLevel(log_level) | ||
logging.basicConfig( | ||
format=LOG_FORMAT, | ||
) | ||
|
||
# Thread-safe logging configuration - only configure if not already configured | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. interesting, was this manifesting in a particular error? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, there was a specific recursive logging error causing worker crashes: RuntimeError: reentrant call inside <_io.BufferedWriter name=''> This occurred during Gunicorn worker startup when multiple processes tried to initialize logging simultaneously, causing thread-unsafe logging configuration and race conditions. The error led to worker crashes, which then triggered the WORKER TIMEOUT errors we were seeing. The issue was that multiple Gunicorn workers starting at the same time would compete to write to stderr during logging setup, causing a reentrant call error that crashed the worker processes. |
||
if not logger.handlers: | ||
# Use a lock to prevent race conditions in multi-threaded environments | ||
import threading | ||
with threading.Lock(): | ||
if not logger.handlers: # Double-check after acquiring lock | ||
# Configure basic logging only if not already configured | ||
if not logging.getLogger().handlers: | ||
logging.basicConfig( | ||
format=LOG_FORMAT, | ||
) | ||
# Add handler to this specific logger if needed | ||
if not logger.handlers: | ||
handler = logging.StreamHandler() | ||
handler.setFormatter(logging.Formatter(LOG_FORMAT)) | ||
logger.addHandler(handler) | ||
|
||
return logger | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
""" | ||
|
||
import argparse | ||
import os | ||
import subprocess | ||
from typing import List | ||
|
||
|
@@ -14,18 +15,25 @@ def start_gunicorn_server(port: int, num_workers: int, debug: bool) -> None: | |
additional_args: List[str] = [] | ||
if debug: | ||
additional_args.extend(["--reload", "--timeout", "0"]) | ||
|
||
# Use environment variables for configuration with fallbacks | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
timeout = int(os.environ.get('WORKER_TIMEOUT', os.environ.get('GUNICORN_TIMEOUT', 60))) | ||
graceful_timeout = int(os.environ.get('GUNICORN_GRACEFUL_TIMEOUT', timeout)) | ||
keep_alive = int(os.environ.get('GUNICORN_KEEP_ALIVE', 2)) | ||
worker_class = os.environ.get('GUNICORN_WORKER_CLASS', 'model_engine_server.api.worker.LaunchWorker') | ||
|
||
command = [ | ||
"gunicorn", | ||
"--bind", | ||
f"[::]:{port}", | ||
"--timeout", | ||
"60", | ||
str(timeout), | ||
"--graceful-timeout", | ||
"60", | ||
str(graceful_timeout), | ||
"--keep-alive", | ||
"2", | ||
str(keep_alive), | ||
"--worker-class", | ||
"model_engine_server.api.worker.LaunchWorker", | ||
worker_class, | ||
"--workers", | ||
f"{num_workers}", | ||
*additional_args, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
import json | ||
import os | ||
from typing import Any, Dict, List, Optional | ||
|
||
import botocore | ||
|
@@ -18,28 +19,61 @@ | |
logger = make_logger(logger_name()) | ||
backend_protocol = "abs" if infra_config().cloud_provider == "azure" else "s3" | ||
|
||
celery_redis = celery_app( | ||
None, | ||
s3_bucket=infra_config().s3_bucket, | ||
broker_type=str(BrokerType.REDIS.value), | ||
backend_protocol=backend_protocol, | ||
) | ||
celery_redis_24h = celery_app( | ||
None, | ||
s3_bucket=infra_config().s3_bucket, | ||
broker_type=str(BrokerType.REDIS.value), | ||
task_visibility=TaskVisibility.VISIBILITY_24H, | ||
backend_protocol=backend_protocol, | ||
) | ||
celery_sqs = celery_app( | ||
None, | ||
s3_bucket=infra_config().s3_bucket, | ||
broker_type=str(BrokerType.SQS.value), | ||
backend_protocol=backend_protocol, | ||
) | ||
celery_servicebus = celery_app( | ||
None, broker_type=str(BrokerType.SERVICEBUS.value), backend_protocol=backend_protocol | ||
) | ||
# Initialize celery apps lazily to avoid import-time AWS session creation | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. curious why we're not running into similar issues for our other non-AWS environments There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On Prem - This is likely due to no creds and the import failing |
||
celery_redis = None | ||
celery_redis_24h = None | ||
celery_sqs = None | ||
celery_servicebus = None | ||
|
||
def _get_celery_redis(): | ||
global celery_redis | ||
if celery_redis is None: | ||
celery_redis = celery_app( | ||
None, | ||
s3_bucket=infra_config().s3_bucket, | ||
broker_type=str(BrokerType.REDIS.value), | ||
backend_protocol=backend_protocol, | ||
) | ||
return celery_redis | ||
|
||
def _get_celery_redis_24h(): | ||
global celery_redis_24h | ||
if celery_redis_24h is None: | ||
celery_redis_24h = celery_app( | ||
None, | ||
s3_bucket=infra_config().s3_bucket, | ||
broker_type=str(BrokerType.REDIS.value), | ||
task_visibility=TaskVisibility.VISIBILITY_24H, | ||
backend_protocol=backend_protocol, | ||
) | ||
return celery_redis_24h | ||
|
||
def _get_celery_sqs(): | ||
global celery_sqs | ||
if celery_sqs is None: | ||
# Check if SQS broker is disabled or if we're forcing Redis | ||
if os.environ.get('DISABLE_SQS_BROKER') == 'true' or os.environ.get('FORCE_CELERY_REDIS') == 'true': | ||
logger.warning("SQS broker disabled - using Redis instead") | ||
return _get_celery_redis() | ||
celery_sqs = celery_app( | ||
None, | ||
s3_bucket=infra_config().s3_bucket, | ||
broker_type=str(BrokerType.SQS.value), | ||
backend_protocol=backend_protocol, | ||
) | ||
return celery_sqs | ||
|
||
def _get_celery_servicebus(): | ||
global celery_servicebus | ||
if celery_servicebus is None: | ||
# Check if ServiceBus broker is disabled or if we're forcing Redis | ||
if os.environ.get('DISABLE_SERVICEBUS_BROKER') == 'true' or os.environ.get('FORCE_CELERY_REDIS') == 'true': | ||
logger.warning("ServiceBus broker disabled - using Redis instead") | ||
return _get_celery_redis() | ||
celery_servicebus = celery_app( | ||
None, broker_type=str(BrokerType.SERVICEBUS.value), backend_protocol=backend_protocol | ||
) | ||
return celery_servicebus | ||
|
||
|
||
class CeleryTaskQueueGateway(TaskQueueGateway): | ||
|
@@ -55,13 +89,13 @@ def __init__(self, broker_type: BrokerType, tracing_gateway: TracingGateway): | |
|
||
def _get_celery_dest(self): | ||
if self.broker_type == BrokerType.SQS: | ||
return celery_sqs | ||
return _get_celery_sqs() | ||
elif self.broker_type == BrokerType.REDIS_24H: | ||
return celery_redis_24h | ||
return _get_celery_redis_24h() | ||
elif self.broker_type == BrokerType.REDIS: | ||
return celery_redis | ||
return _get_celery_redis() | ||
else: | ||
return celery_servicebus | ||
return _get_celery_servicebus() | ||
|
||
def send_task( | ||
self, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we not pass this in via config in the same way as the other redis configs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated