test(components): fix k8s_client 401 unauthorized error (#9749)

* Initiate a new k8s client when calling _get_resource

* Remove k8s_client for methods that use _get_resource

* Initiate a new k8s client when calling _delete_resource
This commit is contained in:
rd-pong 2023-07-18 11:37:22 -07:00 committed by GitHub
parent 81e989a8c7
commit fdb25f6e6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 30 additions and 43 deletions

View File

@ -28,7 +28,6 @@ def test_create_v2_endpoint(kfp_client, experiment_id, boto3_session, test_file_
shallow_canary=True,
)
)
k8s_client = ack_utils.k8s_client()
input_model_name = utils.generate_random_string(10) + "-v2-model"
input_endpoint_config_name = (
utils.generate_random_string(10) + "-v2-endpoint-config"
@ -63,7 +62,7 @@ def test_create_v2_endpoint(kfp_client, experiment_id, boto3_session, test_file_
)
endpoint_describe = ack_utils._get_resource(
k8s_client, input_endpoint_name, "endpoints"
input_endpoint_name, "endpoints"
)
outputs = {
@ -142,11 +141,11 @@ def test_create_v2_endpoint(kfp_client, experiment_id, boto3_session, test_file_
)
utils.remove_dir(download_dir)
finally:
ack_utils._delete_resource(k8s_client, input_endpoint_name, "endpoints")
ack_utils._delete_resource(input_endpoint_name, "endpoints")
ack_utils._delete_resource(
k8s_client, input_endpoint_config_name, "endpointconfigs"
input_endpoint_config_name, "endpointconfigs"
)
ack_utils._delete_resource(k8s_client, input_model_name, "models")
ack_utils._delete_resource(input_model_name, "models")
@pytest.mark.v2
@ -159,7 +158,6 @@ def test_terminate_v2_endpoint(kfp_client, experiment_id):
os.path.join(download_dir, "config.yaml"),
)
)
k8s_client = ack_utils.k8s_client()
input_model_name = utils.generate_random_string(10) + "-v2-model"
input_endpoint_config_name = (
utils.generate_random_string(10) + "-v2-endpoint-config"
@ -181,7 +179,6 @@ def test_terminate_v2_endpoint(kfp_client, experiment_id):
"running",
)
assert ack_utils.wait_for_condition(
k8s_client,
input_endpoint_name,
ack_utils.does_endpoint_exist,
wait_periods=12,
@ -189,7 +186,6 @@ def test_terminate_v2_endpoint(kfp_client, experiment_id):
)
kfp_client_utils.terminate_run(kfp_client, run_id)
assert ack_utils.wait_for_condition(
k8s_client,
input_endpoint_name,
ack_utils.is_endpoint_deleted,
wait_periods=20,
@ -197,6 +193,6 @@ def test_terminate_v2_endpoint(kfp_client, experiment_id):
)
finally:
ack_utils._delete_resource(
k8s_client, input_endpoint_config_name, "endpointconfigs"
input_endpoint_config_name, "endpointconfigs"
)
ack_utils._delete_resource(k8s_client, input_model_name, "models")
ack_utils._delete_resource(input_model_name, "models")

View File

@ -33,7 +33,6 @@ def test_job_definitions(kfp_client, experiment_id, test_file_dir, deploy_endpoi
os.path.join(download_dir, "config.yaml"),
)
)
k8s_client = ack_utils.k8s_client()
job_definition_name = (
utils.generate_random_string(10) + "-v2-" + test_params["TestName"]
)
@ -56,7 +55,7 @@ def test_job_definitions(kfp_client, experiment_id, test_file_dir, deploy_endpoi
# Verify if the job definition CR is created
job_definition_describe = ack_utils._get_resource(
k8s_client, job_definition_name, test_params["Plural"]
job_definition_name, test_params["Plural"]
)
assert (
job_definition_name
@ -88,5 +87,5 @@ def test_job_definitions(kfp_client, experiment_id, test_file_dir, deploy_endpoi
finally:
ack_utils._delete_resource(
k8s_client, job_definition_name, test_params["Plural"]
job_definition_name, test_params["Plural"]
)

View File

@ -59,7 +59,6 @@ def test_create_v2_monitoring_schedule(
os.path.join(download_dir, "config.yaml"),
)
)
k8s_client = ack_utils.k8s_client()
# parameters for model bias job definition
job_definition_name = (
@ -92,7 +91,7 @@ def test_create_v2_monitoring_schedule(
# Verify if the job definition CR is created properly
job_definition_describe = ack_utils._get_resource(
k8s_client, job_definition_name, "modelbiasjobdefinitions"
job_definition_name, "modelbiasjobdefinitions"
)
assert (
job_definition_name
@ -107,7 +106,7 @@ def test_create_v2_monitoring_schedule(
# Verify if monitoring schedule CR is created properly
monitoring_schedule_describe = ack_utils._get_resource(
k8s_client, monitoring_schedule_name, "monitoringschedules"
monitoring_schedule_name, "monitoringschedules"
)
assert (
monitoring_schedule_name
@ -124,14 +123,12 @@ def test_create_v2_monitoring_schedule(
finally:
ack_utils._delete_resource(
k8s_client,
job_definition_name,
"modelbiasjobdefinitions",
wait_periods=10,
period_length=30,
)
ack_utils._delete_resource(
k8s_client,
monitoring_schedule_name,
"monitoringschedules",
wait_periods=10,
@ -164,7 +161,6 @@ def test_update_v2_monitoring_schedule(
os.path.join(download_dir, "config.yaml"),
)
)
k8s_client = ack_utils.k8s_client()
# parameters for job definition
test_params["Arguments"][test_params["JobInputName"]]["endpointInput"][
@ -202,7 +198,7 @@ def test_update_v2_monitoring_schedule(
# Verify if monitoring schedule CR is created properly
monitoring_schedule_describe = ack_utils._get_resource(
k8s_client, monitoring_schedule_name, "monitoringschedules"
monitoring_schedule_name, "monitoringschedules"
)
assert (
monitoring_schedule_name
@ -221,7 +217,7 @@ def test_update_v2_monitoring_schedule(
# Verify if the job definition CR is created properly
job_definition_1_describe = ack_utils._get_resource(
k8s_client, job_definition_name_1, "dataqualityjobdefinitions"
job_definition_name_1, "dataqualityjobdefinitions"
)
assert (
job_definition_name_1
@ -262,7 +258,7 @@ def test_update_v2_monitoring_schedule(
# Verify if monitoring schedule is updated with correct job definition
monitoring_schedule_updated_describe = ack_utils._get_resource(
k8s_client, monitoring_schedule_name, "monitoringschedules"
monitoring_schedule_name, "monitoringschedules"
)
assert (
monitoring_schedule_updated_describe["status"]["monitoringScheduleStatus"]
@ -277,7 +273,7 @@ def test_update_v2_monitoring_schedule(
# Verify if the new job definition CR is created properly
job_definition_2_describe = ack_utils._get_resource(
k8s_client, job_definition_name_2, "dataqualityjobdefinitions"
job_definition_name_2, "dataqualityjobdefinitions"
)
assert (
job_definition_name_2
@ -296,21 +292,18 @@ def test_update_v2_monitoring_schedule(
finally:
ack_utils._delete_resource(
k8s_client,
job_definition_name_1,
test_params["Plural"],
wait_periods=10,
period_length=30,
)
ack_utils._delete_resource(
k8s_client,
job_definition_name_2,
test_params["Plural"],
wait_periods=10,
period_length=30,
)
ack_utils._delete_resource(
k8s_client,
monitoring_schedule_name,
"monitoringschedules",
wait_periods=10,

View File

@ -18,7 +18,6 @@ import json
],
)
def test_trainingjobV2(kfp_client, experiment_id, test_file_dir):
k8s_client = ack_utils.k8s_client()
test_file_dir = "resources/config/ack-training-job"
download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated"))
test_params = utils.load_params(
@ -68,7 +67,7 @@ def test_trainingjobV2(kfp_client, experiment_id, test_file_dir):
# Verify Training job was successful on SageMaker
print(f"training job name: {input_job_name}")
train_response = ack_utils._get_resource(k8s_client, input_job_name, "trainingjobs")
train_response = ack_utils._get_resource(input_job_name, "trainingjobs")
assert (
train_response["status"]["trainingJobStatus"]
== output_training_job_status
@ -87,7 +86,6 @@ def test_trainingjobV2(kfp_client, experiment_id, test_file_dir):
@pytest.mark.v2
def test_terminate_trainingjob(kfp_client, experiment_id):
k8s_client = ack_utils.k8s_client()
test_file_dir = "resources/config/ack-training-job"
download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated_terminate"))
@ -114,7 +112,7 @@ def test_terminate_trainingjob(kfp_client, experiment_id):
kfp_client_utils.terminate_run(kfp_client, run_id)
desiredStatuses = ["Stopping", "Stopped"]
training_status_reached = ack_utils.wait_for_trainingjob_status(
k8s_client, input_job_name, desiredStatuses, 10, 6
input_job_name, desiredStatuses, 10, 6
)
assert training_status_reached

View File

@ -7,13 +7,14 @@ def k8s_client():
return config.new_client_from_config()
def _get_resource(k8s_client, job_name, plural):
def _get_resource(job_name, plural):
"""Get the custom resource detail similar to: kubectl describe <resource> JOB_NAME -n NAMESPACE.
Returns:
None or object: None if the resource doesn't exist in server or there is an error, otherwise the
custom object.
"""
_api = client.CustomObjectsApi(k8s_client)
# Instantiate a new client every time to avoid connection issues.
_api = client.CustomObjectsApi(k8s_client())
namespace = os.environ.get("NAMESPACE")
try:
job_description = _api.get_namespaced_custom_object(
@ -29,12 +30,12 @@ def _get_resource(k8s_client, job_name, plural):
return job_description
def _delete_resource(k8s_client, job_name, plural, wait_periods=10, period_length=20):
def _delete_resource(job_name, plural, wait_periods=10, period_length=20):
"""Delete the custom resource
Returns:
True or False: True if the resource is deleted, False if the resource deletion times out
"""
_api = client.CustomObjectsApi(k8s_client)
_api = client.CustomObjectsApi(k8s_client())
namespace = os.environ.get("NAMESPACE")
try:
@ -50,7 +51,7 @@ def _delete_resource(k8s_client, job_name, plural, wait_periods=10, period_lengt
for _ in range(wait_periods):
sleep(period_length)
if _get_resource(k8s_client, job_name, plural) is None:
if _get_resource(job_name, plural) is None:
print(f"Resource {job_name} deleted successfully.")
return True
@ -60,10 +61,10 @@ def _delete_resource(k8s_client, job_name, plural, wait_periods=10, period_lengt
# TODO: Make this a generalized function for non-job resources.
def wait_for_trainingjob_status(
k8s_client, training_job_name, desiredStatuses, wait_periods, period_length
training_job_name, desiredStatuses, wait_periods, period_length
):
for _ in range(wait_periods):
response = _get_resource(k8s_client, training_job_name, "trainingjobs")
response = _get_resource(training_job_name, "trainingjobs")
if response["status"]["trainingJobStatus"] in desiredStatuses:
return True
sleep(period_length)
@ -71,19 +72,19 @@ def wait_for_trainingjob_status(
def wait_for_condition(
k8s_client, resource_name, validator_function, wait_periods=10, period_length=8
resource_name, validator_function, wait_periods=10, period_length=8
):
for _ in range(wait_periods):
if not validator_function(k8s_client, resource_name):
if not validator_function(resource_name):
sleep(period_length)
else:
return True
return False
def does_endpoint_exist(k8s_client, endpoint_name):
def does_endpoint_exist(endpoint_name):
try:
response = _get_resource(k8s_client, endpoint_name, "endpoints")
response = _get_resource(endpoint_name, "endpoints")
if response:
return True
if response is None: # kubernetes module error
@ -92,8 +93,8 @@ def does_endpoint_exist(k8s_client, endpoint_name):
return False
def is_endpoint_deleted(k8s_client, endpoint_name):
response = _get_resource(k8s_client, endpoint_name, "endpoints")
def is_endpoint_deleted(endpoint_name):
response = _get_resource(endpoint_name, "endpoints")
if response:
return False
if response is None: