I was mocking a function that is used to read k8s secret to fetch secret token. But running unittest is creating error:
AttributeError: <module 'kubernetes.client' from '/usr/lib/python3.6/site-packages/kubernetes/client/init.py'> does not have the attribute 'read_namespaced_secret()'
I have gone through How do you mock Python Kubernetes client CoreV1Api , but its also not helping my case. Can anyone point out what I am doing wrong here?
My script - read_sec.py
import base64
from kubernetes import client, config
from logger import logger
class kubernetesServices():
def __init__(self):
pass
def get_secret_vault_token(self):
try:
config.load_kube_config()
api_instance = client.CoreV1Api()
sec = api_instance.read_namespaced_secret("random-sec", "random-ns").data
token = base64.b64decode(sec['token']).decode("utf-8")
return token
except Exception as e:
logger.error("got error at get_secret_vault_token: {}".format(str(e)))
Unittest - test_read_sec.py
import unittest
from unittest.mock import patch
from read_sec import *
class MockKubernetes():
def __init__(self):
pass
def mocker_read_namespaced_secret(*args, **kwargs):
class MockReadns():
def __init__(self, json_data):
self.json_data = json_data
def json(self):
return self.json_data
return MockReadns({"data":{"token":"abc123"}})
class TestkubernetesServices(unittest.TestCase):
@patch("kubernetes.client",side_effect=MockKubernetes)
@patch("kubernetes.config",side_effect=MockKubernetes)
@patch("kubernetes.client.read_namespaced_secret()",side_effect=mocker_read_namespaced_secret)
def test_get_secret_vault_token(self,mock_client,mock_config,mock_read):
k8s = kubernetesServices()
token = k8s.get_secret_vault_token()
You need to mock kubernetes.client.CoreV1Api
instead of kubernetes.client
. Here is an example:
import base64
import unittest
from unittest.mock import patch, Mock
import requests
from kubernetes import client, config
class kubernetesServices():
def get_secret_vault_token(self):
config.load_kube_config()
api_instance = client.CoreV1Api()
sec = api_instance.read_namespaced_secret('random-sec', 'random-ns').data
token = base64.b64decode(sec['token']).decode('utf-8')
return token
class TestkubernetesServices(unittest.TestCase):
@patch(
'kubernetes.client.CoreV1Api',
return_value=Mock(read_namespaced_secret=Mock(return_value=Mock(data={'token': b'YWJjMTIz'})))
)
@patch('kubernetes.config.load_kube_config', return_value=Mock())
def test_get_secret_vault_token(self, mock_client, mock_config):
k8s = kubernetesServices()
token = k8s.get_secret_vault_token()
self.assertEqual(token, 'abc123')
Result:
---------------------------------------------------------------------
Ran 1 tests in 0.071s
PASSED (successes=1)
JFYI: side_effect
better to use when you need multiple results. Example:
class TestRequest(unittest.TestCase):
def test_side_effect(self):
with patch('requests.get', side_effect=[1, 2, 3]):
print(requests.get('url1')) # 1
print(requests.get('url2')) # 2
print(requests.get('url3')) # 3