class MyClass:
my_class_field = 'my_class_field'
def __init__(self):
self.my_instance_field = 'my_instance_field'
def my_method(self, my_arg):
return 'init_return'
class Mock(CallableMixin, NonCallableMock):
def __init__(self, spec=None, side_effect=None, return_value=DEFAULT,
wraps=None, name=None, spec_set=None, parent=None,
_spec_state=None, _new_name='', _new_parent=None, **kwargs):
...
class MagicMock(MagicMixin, Mock):
...
# Параметр spec задает спецификацию (описание/источник атрибутов) объекта, который мы хотим создать.
# При попытке использовать атрибут мока, который не существует в спецификации, будет выброшена AttributeError.
# Значение параметра spec может быть представлено списком строк, классом или экземпляром класса.
my_class_attribs = dir(MyClass) # [... , 'my_class_field', 'my_method']
my_mock = MagicMock(spec=MyClass)
ret = my_mock.my_not_existing_attrib # Получим AttributeError, так как имеем в спецификации MyClass, а у него нет такого атрибута
# Мок объект получает атрибуты класса MyClass ('my_class_field' и 'my_method'), но также имеет и свои атрибуты ('assert_any_call' и т д)
my_mock_attribs = dir(my_mock)
# [..., 'assert_any_call', 'assert_called', 'assert_called_once', 'assert_called_once_with', 'assert_called_with', 'assert_has_calls',
# 'assert_not_called', 'attach_mock', 'call_args', 'call_args_list', 'call_count', 'called', 'configure_mock', 'method_calls',
# 'mock_add_spec', 'mock_calls', 'my_class_field', 'my_method', 'reset_mock', 'return_value', 'side_effect']
is_instance = isinstance(my_mock, MyClass) # True
# Несмотря на то, что у мока есть атрибут my_method, по сути его нет - он вернет нам объект самого мока
returned_my_method = my_mock.my_method() # <MagicMock name='mock.my_method()' id='4510823424'>
# Вывод: мок имеет атрибуты своей спецификации, но по сути эти атрибуты - пустышки, которые нужно заполнять.
# Если мы не указываем их значения, то вместо значения атрибута возвращается еще один объект мока.
my_mock = MagicMock() # При дальнейшем вызове мока будет возвращаться сам MagicMock object, так как не указан return_value
my_mock = MagicMock(return_value=5) # При вызове мока далее будет возвращаться return_value (равное 5)
ret = my_mock('my_arg1') # 5
ret = my_mock('my_arg2') # 5
ret = my_mock('my_arg3') # 5
calls_count = my_mock.call_count # 3
calls_list = my_mock.mock_calls # [call('my_arg1'), call('my_arg2'), call('my_arg3'), call.__len__()]
my_mock.my_attrib = 'my_attrib_value' # Задаем значение атрибута
my_attrib = my_mock.my_attrib # 'my_attrib_value'
my_mock.side_effect = KeyError('My error!') # Сразу после вызова my_mock будет выброшено исключение
with pytest.raises(KeyError): # pytest перехватит ожидаемое исключение и тест пройдет успешно
my_mock('my_arg3')
# side_effect также может быть функцией.
# Она будет вызываться с теми же аргументами, что и мок.
# Ее возвращаемое значение становится возвращаемым значением мока
def side_effect(x):
return x * 2
my_mock = MagicMock(side_effect=side_effect, return_value='default_return_value')
two = my_mock(1) # 2 (не 'default_return_value' !!!)
hundred = my_mock(50) # 100
# Если мы все же хотим возвращать при вызове мока return_value, то надо вернуть его из side_affect()
my_mock = MagicMock(return_value='default_return_value')
def side_effect_with_return_value(*args, **kwargs):
# return DEFAULT # вариант 1
return my_mock.return_value # вариант 1
my_mock.side_effect = side_effect_with_return_value
two = my_mock(1) # 'default_return_value'
hundred = my_mock(50) # 'default_return_value'
# side_affect еще может быть Iterable объектом - с каждым последующим вызовом мока будет возвращаться значение из списка
my_mock = MagicMock(side_effect=[100, KeyError, 'str'])
hundred = my_mock() # 100
with pytest.raises(KeyError):
my_mock() # Исключение KeyError
string = my_mock() # 'str'
# Убираем side_affect
my_mock.side_effect = None
# Моки создают атрибуты на лету. Это позволяет им притворяться объектами любого типа.
# Но иногда возникает необходимость удалить атрибут.
my_mock = MagicMock()
ret = hasattr(my_mock, 'my_attrib') # True
del my_mock.my_attrib
ret = hasattr(my_mock, 'my_attrib') # False
# Проверяем, что метод my_method() был вызван хотя бы 1 раз
my_mock = MagicMock()
my_mock.my_method()
my_mock.my_method.assert_called_once()
# Проверяем, что метод my_method() был вызван с указанным именованным параметром
my_mock.my_method(my_arg_name='my_arg_value')
my_mock.my_method.assert_called_with(my_arg_name='my_arg_value')
# Аналогичные проверки:
# assert_called()
# assert_called_once()
# assert_called_with(*args, **kwargs)
# assert_called_once_with(*args, **kwargs)
# assert_any_call(*args, **kwargs)
# assert_has_calls(calls, any_order=False)
# assert_not_called()
class _patch(object):
def patch(
target, new=DEFAULT, spec=None, create=False,
spec_set=None, autospec=None, new_callable=None, *, unsafe=False, **kwargs
):
# 1) Используем patch() как декоратор - создаем мок и передаем его в функцию как параметр
# 2) Параметр autospec=True передаст атрибуты класса MyClass создаваемому моку
# 3) Рабочий экземпляр пропатченного класса MyClass создается во внутренней функции inner_function()
# Мы патчим класс снаружи, НЕ меняя содержимое inner_function() функции.
def inner_function():
my_instance = MyClass()
ret1 = MyClass.my_class_field # 'patched_class_field'
ret2 = my_instance.my_class_field # 'patched_class_field'
ret3 = my_instance.my_instance_field # 'patched_ins_field'
# Если при создании патча указываем autospec=True и вызовем my_method() без аргумента, то получим TypeError: missing a required argument: 'my_arg'
# Если при создании патча указываем spec=True и вызовем my_method() без аргумента, то получим 'patched_return' (ошибки не будет)
# Если при создании патча НЕ указываем spec/autospec и вызовем my_method() аргумента, то получим 'patched_return' (ошибки не будет)
# ret4 = my_instance.my_method()
ret4 = my_instance.my_method(my_arg='my_arg') # 'patched_return'
@patch('basics.MyClass', autospec=True)
def my_function(_unused_arg, mock_class):
mock_class.my_class_field = 'patched_class_field'
mock_class().my_class_field = 'patched_class_field'
mock_class().my_instance_field = 'patched_ins_field'
mock_class().my_method.return_value = 'patched_return'
inner_function()
my_function('my_arg')
patch.object(target, attribute, new=DEFAULT, spec=None, create=False, spec_set=None,
autospec=None, new_callable=None, **kwargs):
# Создает мок на атрибут (attribute) объекта (target)
# Обратите внимание, что параметр target в данном случае это объект, а не строка для импорта как в методе patch()
# Также обратите внимание на второй обязательный параметр - attribute
with patch.object(target=MyClass, attribute='my_method') as my_mock_method:
MyClass().my_method(100)
my_mock_method.assert_called_with(100) # Error (100,) != (3,)
import logging
import json
import simplejson
import urllib3
import requests
from requests import Timeout, HTTPError, TooManyRedirects, ConnectionError
from requests.auth import HTTPBasicAuth
from utilities import truncate_str_by_symbols, retry
log = logging.getLogger(__name__)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class HTTPHelper:
GET = 'GET'
POST = 'POST'
PUT = 'PUT'
DELETE = 'DELETE'
HTTP_PORT = 80
HTTPS_PORT = 443
HTTP = 'http'
HTTPS = 'https'
HEADERS = {'Accept': 'application/json;charset=UTF-8'}
def __init__(self, host, port=HTTPS_PORT, protocol=HTTPS, headers=None, username=None, password=None, s_cert=False,
c_cert=None, c_key=None):
self.host = host
self.base_url = f"{protocol}://{host}:{port}"
if not port:
self.base_url = f"{protocol}://{host}"
self.headers = headers or self.HEADERS
if username and password:
self.auth = HTTPBasicAuth(username, password)
else:
self.auth = None
self.verify = s_cert
if c_cert and c_key:
self.cert = (c_cert, c_key)
else:
self.cert = None
@retry(max_retries=2, timeout=300, period=60, exceptions=[HTTPError])
def requester(self, method, rel_url, headers=None, data=None, params=None, json=None, files=None, expected_error=None):
url = f"{self.base_url}{rel_url}"
log.debug('-------------------- HTTP_REQUEST_BEGIN_SESSION --------------------')
log.debug(f'URL: {url}')
log.debug(f'HEADERS: {headers or self.headers}')
log.debug(f'METHOD: {method}')
if params:
log.debug(f'PARAMS: {params}')
if data:
log.debug(f'DATA: {data}')
if json:
log.debug(f'JSON: {json}')
response = requests.request(method, url, headers=headers or self.headers, data=data, json=json, params=params,
cert=self.cert, verify=self.verify, auth=self.auth, files=files)
log.debug(f'RESPONSE CODE: {response.status_code}')
try:
response.raise_for_status()
except (Timeout, ConnectionError, TooManyRedirects, HTTPError) as e:
if expected_error and expected_error in str(e):
log.warning(f"Expected error: {e}")
return
else:
raise e
return self._parse(response)
def get(self, url: str, headers: dict = None, params: dict = None, expected_error: str = None):
return self.requester(self.GET, url, headers=headers, params=params, expected_error=expected_error)
def post(self, url: str, headers: dict = None, data: dict = None, params: dict = None, json: dict = None,
files: dict = None, expected_error: str = None):
return self.requester(self.POST, url, headers=headers, data=data, params=params, json=json, files=files,
expected_error=expected_error)
def put(self, url: str, headers: dict = None, data: dict = None, params: dict = None, json: dict = None,
expected_error: str = None):
return self.requester(self.PUT, url, headers=headers, data=data, params=params, json=json, expected_error=expected_error)
def delete(self, url: str, headers: dict = None, expected_error: str = None):
return self.requester(self.DELETE, url, headers=headers, expected_error=expected_error)
@staticmethod
def _parse(response):
content = response.content
if response.headers.get('Content-Type') in ['application/json', 'application/json;charset=UTF-8', 'text/plain; charset=utf-8'] \
or 'application/json' in str(response.request.headers):
try:
content = response.json()
log.debug(f'RESPONSE DATA: {truncate_str_by_symbols(data=content, max_length=20000)}')
except (json.JSONDecodeError, simplejson.JSONDecodeError):
# We don't log non-json response since it could be not readable and large (if it's a file content for example)
pass
log.debug('-------------------- HTTP_REQUEST_END_SESSION --------------------')
return content
from helpers.HttpHelper import HTTPHelper
from requests import Response, PreparedRequest
from unittest.mock import MagicMock, patch
import pytest
from json import JSONDecodeError
from simplejson import JSONDecodeError as SimpleJsonJSONDecodeError
@pytest.fixture(scope='module')
@patch('test_http_helper.Response', autospec=True)
def m_response(MockedResponse):
m_response = MockedResponse()
return m_response
content_types = ['application/json', 'application/json;charset=UTF-8', 'text/plain; charset=utf-8', None]
@pytest.mark.parametrize("content_type", content_types)
def test_parse_method(m_response, content_type):
if content_type:
m_response.headers = {'Content-Type': content_type}
else:
m_response.headers = {}
m_response.request = MagicMock(target=PreparedRequest, attribute='headers')
m_response.request.headers = 'application/json'
m_response.json.return_value = {'RequestId': 'E126ABAD-A242-46D0-BEEC-C67F2454BD33', 'Error': ''}
HTTPHelper._parse(response=m_response)
m_response.json.assert_called()
@pytest.mark.parametrize("error", [JSONDecodeError, SimpleJsonJSONDecodeError])
def test_parse_method_catch_exception(m_response, error):
m_response.headers = {'Content-Type': 'application/json'}
m_response.json.side_effect = error('JSON error was suppressed', 'doc', 0)
HTTPHelper._parse(response=m_response)
m_response.json.assert_called()
import logging
import time
from requests.exceptions import HTTPError
from clones import CLONES
from helpers.ProxmoxVM import ProxmoxVM
from helpers.api.ProxmoxAPI import ProxmoxAPI
from utilities import wait_until
from constants import PROXMOX
log = logging.getLogger(__name__)
class VMManager:
DO_NOT_DELETE_PREFIX = 'do-not-delete'
MAX_VMS_NUMBER = 20
def __init__(self, token, node=PROXMOX.NODE):
self._api = ProxmoxAPI(token=token)
self.node = node
self.token = token
@property
def api(self):
return self._api
@api.setter
def api(self, api):
self._api = api
def clone_vm_and_power_on(self, template_vm_id, template_vm_name, node=None, additional_reset=False,
network_configuration=False, new_vm_name=None):
node = node or self.node
def cloning_is_successful():
free_vm_id = self._get_free_id(template_vm_id, node)
try:
self.api.vm_clone(node=node, vmid=template_vm_id,
newid=free_vm_id, new_name=new_vm_name)
return free_vm_id
except (HTTPError, TimeoutError):
return False
cloned_vm_id = wait_until(condition=cloning_is_successful, description="Cloning with free id is successful",
timeout=30 * 60, period=30)
time.sleep(20)
if network_configuration and not self.api.get_config(node, cloned_vm_id).get('net0'):
self.api.vm_configure_network(node, cloned_vm_id)
time.sleep(20)
self.api.vm_power_on(node, cloned_vm_id)
time.sleep(10)
if additional_reset:
self.api.vm_power_off(node, cloned_vm_id)
time.sleep(10)
self.api.vm_power_on(node, cloned_vm_id)
time.sleep(30)
return ProxmoxVM(id=cloned_vm_id, name=template_vm_name, node=node, token=self.token)
def delete_cloned_vms(self, nodes: list, vm_ids: list = None):
log.info('Powering off and removing vms...')
for node in nodes:
proxmox_vms = self._api.get_all_vms_from_node(node=node)
proxmox_vm_ids = [int(vm['vmid']) for vm in proxmox_vms if not vm['name'].startswith(self.DO_NOT_DELETE_PREFIX)]
self._log_info(f"{node} node vm ids: {proxmox_vm_ids}")
# Delete only specified vm ids if they are present on current node
if vm_ids:
for vm_id in vm_ids:
if int(vm_id) in proxmox_vm_ids:
self._log_info(f'vm with id={vm_id} is present on {node} node - deleting...')
ProxmoxVM(vm_id, '', node, token=self.token).destroy()
self._log_info(f'vm with id={vm_id} was deleted')
# Delete all vm ids on current node basing on CLONES
else:
for vm_name, vm_info in CLONES.items():
log.info(f'Deleting...: {vm_name}')
if vm_info.node == node:
vmid = vm_info.id
for _ in range(self.MAX_VMS_NUMBER):
vmid += 1000
if vmid in proxmox_vm_ids:
ProxmoxVM(vmid, vm_name, node, token=self.token).destroy()
log.info('All vms were successfully removed')
def _get_free_id(self, start_id, node, max_vms_number=MAX_VMS_NUMBER):
self._log_info('attempting to find free vm id...')
id = start_id
for index in range(max_vms_number):
id += 1000
if not self.api.get_config(node, id, expected_error='does not exist'):
self._log_info(f'found free id: {id}')
return id
raise RuntimeError(f"You exceeded VMs number limit: {max_vms_number}. Please delete unnecessary VMs and try again.")
@staticmethod
def compose_new_vm_name(base_name, ip, prefix):
new_name = base_name.replace('_', '-')
if ip:
new_name = f"{new_name}-{ip}"
if prefix:
new_name = f"{VMManager.DO_NOT_DELETE_PREFIX}-{new_name}"
return new_name
def _log_info(self, msg):
log.info(f'{self.__class__.__name__}: {msg}')
from unittest.mock import MagicMock, patch
import pytest
from assertpy import assert_that
import time
from helpers.VMManager import VMManager
from helpers.api.ProxmoxAPI import ProxmoxAPI
@pytest.fixture
def t_proxmox_manager():
t_proxmox_manager = VMManager('token')
t_proxmox_manager.api = MagicMock(ProxmoxAPI)
return t_proxmox_manager
# region 1. Test self._get_free_id()
vm_config = {
'scsihw': 'virtio-scsi-pci',
'ostype': 'other',
'smbios1': 'uuid=63cdab3a-cbfe-4f77-942c-dacdcf0420e2',
'sockets': 4,
'net0': 'e1000=F2:4A:26:72:CB:C3,bridge=vmbr0',
'memory': 8192,
'digest': 'afec73da6f2e171ff88d5f792ca07f38c3a55cb6',
'boot': 'order=sata0;ide2;net0',
'ide2': 'none,media=cdrom',
'name': 'test-winserver22-x64-192.168.23.250',
'vmgenid': '0d93f94b-d30f-4ac4-bbe1-b706d669f477',
'numa': 0,
'sata0': 'nvme:vm-2213-disk-0,size=70G',
'cores': 4}
@pytest.mark.parametrize("start_id,get_config_value,expected_result",
[(1200, [None], 2200), # id is found at first attempt
(1200, [vm_config, None], 3200)]) # id is found at second attempt
def test_get_free_id_success(t_proxmox_manager, start_id, get_config_value, expected_result):
t_proxmox_manager.api.get_config.side_effect = get_config_value
free_id = t_proxmox_manager._get_free_id(start_id, t_proxmox_manager.node)
assert_that(expected_result).is_equal_to(free_id)
def test_get_free_id_fail(t_proxmox_manager):
t_proxmox_manager.api.get_config.return_value = vm_config
with pytest.raises(RuntimeError):
t_proxmox_manager._get_free_id(1200, t_proxmox_manager.node)
# endregion
# region 2. Test self.clone_vm_and_power_on()
@patch.object(target=time, attribute='sleep')
def test_clone_vm_and_power_on_basic(p_sleep, t_proxmox_manager):
expected_vm_id = 2200
expected_vm_name = 'test_win10-x64'
t_proxmox_manager._get_free_id = MagicMock()
t_proxmox_manager._get_free_id.return_value = expected_vm_id
proxmox_vm = t_proxmox_manager.clone_vm_and_power_on(template_vm_id=1200,
template_vm_name=expected_vm_name)
t_proxmox_manager.api.vm_power_on.assert_called_once()
t_proxmox_manager.api.vm_power_off.assert_not_called()
t_proxmox_manager.api.vm_configure_network.assert_not_called()
assert_that(proxmox_vm.name).is_equal_to(expected_vm_name)
assert_that(proxmox_vm.id).is_equal_to(expected_vm_id)
assert_that(proxmox_vm.node).is_equal_to(t_proxmox_manager.node)
@patch.object(target=time, attribute='sleep')
def test_clone_vm_and_power_on_with_network_configuration(p_sleep, t_proxmox_manager):
t_proxmox_manager._get_free_id = MagicMock()
t_proxmox_manager._get_free_id.return_value = 2200
t_proxmox_manager.api.get_config.return_value = {}
t_proxmox_manager.clone_vm_and_power_on(template_vm_id=1200,
template_vm_name='test_win10-x64',
network_configuration=True)
t_proxmox_manager.api.vm_configure_network.assert_called_once()
t_proxmox_manager.api.vm_power_off.assert_not_called()
@patch.object(target=time, attribute='sleep')
def test_clone_vm_and_power_on_with_additional_reset(p_sleep, t_proxmox_manager):
t_proxmox_manager._get_free_id = MagicMock()
t_proxmox_manager._get_free_id.return_value = 2200
t_proxmox_manager.clone_vm_and_power_on(template_vm_id=1200,
template_vm_name='test_win10-x64',
additional_reset=True)
t_proxmox_manager.api.vm_power_off.assert_called_once()
t_proxmox_manager.api.vm_configure_network.assert_not_called()
# endregion