import time
# Максимальное количество попыток
RETRIES = 3
# Промежуток времени (60 сек), в течение которого мы будем ретраиться
TIMEOUT = 60
# Частота, c которой будем ретраиться
PERIOD = 5
# Функция-декоратор, которая модифицирует переданный ей метод
# В нашем случае модификация заключается в повторении вызовов переданного метода
def retry(max_retries, timeout, period):
def outer(func):
def inner(*args, **kwargs):
# Задаем время, когда необходимо остановить попытки
end_time = time.time() + timeout
# Создаем счетчик, с каждой попыткой будем уменьшать его значение на 1
retries = max_retries
# Бесконечно крутимся в цикле до тех пор, пока не случится одно из событий:
# 1) метод func() выполнится успешно
# 2) закончится отведенное на ретраи время
# 3) исчерпаются попытки
while True:
try:
return func(*args, **kwargs)
except Exception as e:
print(f'{e}')
if time.time() > end_time:
raise 'Timeout has expired!'
if retries == 1:
raise e
else:
retries -= 1
print(f"Attempts left: {retries}")
print(f"Sleeping {period} seconds ..." )
time.sleep(period)
return inner
return outer
# Протестируем наш декоратор на примере падающей функции
@retry(RETRIES, TIMEOUT, PERIOD)
def send_request():
raise Exception('Code block has failed. This is expected.')
# Тесты
send_request()
def modified_retry(max_retries, timeout, exception=None, error_msg=None, period=1):
def outer(func):
def inner(*args, **kwargs):
end_time = time.time() + timeout
retries = max_retries
while retries:
try:
return func(*args, **kwargs)
except Exception as e:
if (e and e == exception) or (error_msg and error_msg in str(e)):
log.error(f'ERROR: {e}')
if time.time() > end_time:
log.error(f'Failed to successfully execute cmd during {timeout} sec')
raise e
if retries == 1:
log.error(f'Failed to successfully execute cmd with {max_retries} attempts')
raise e
retries -= 1
time.sleep(period)
else:
raise e
return inner
return outer
# Будем делать повторные попытки, только если упали с TimeoutExpired
@retry(5, 60, exception=subprocess.TimeoutExpired)
def send_request():
raise TimeoutExpired('Operation timed out. This is expected.')
# Тест
send_request()
import time
# Главная функция
def wait_until(condition, description, timeout=300, period=0.25, *args, **kwargs):
final_time = time.time() + timeout
while time.time() < final_time:
# Вызываем переданную функцию
# Если ее возвращаемое значение эквивалентно True, то ожидание завершается
if condition(*args, **kwargs):
return True
# Если нет - ждем дальше
time.sleep(period)
raise TimeoutError(f'Timed out waiting for condition: [{description}]')
# Через замыкание создаем метод, который отсчитывает n секунд
def closure(n):
def sleep_several_seconds():
nonlocal n
time.sleep(1)
n = n - 1
print(f"{n} seconds left")
return n
return sleep_several_seconds
# Метод, который проверяет, что отведенное количество секунд закончилось
def five_seconds_passed():
return sleep_five_seconds() == 0
# Создаем метод, который будет отсчитывать 5 секунд
sleep_five_seconds = closure(5)
# 1. Позитивный кейс
# Ждем, пока будут отсчитаны 5 секунд с таймаутом 10 секунд - не выходим за пределы таймаута
wait_until(condition=five_seconds_passed, description='Five seconds are over', timeout=10)
# 2. Негативный кейс
# Ждем, пока будут отсчитаны 5 секунд с таймаутом 3 секунды - превышаем таймаут и падаем с TimeoutError
wait_until(condition=five_seconds_passed, description='Five seconds are over', timeout=3)
# Проверяем, пингуется ли машина
def host_is_pingable(ip):
return os.system(f"ping -c 1 {ip}")
def wait_until(condition, description, timeout=300, period=5, *args, **kwargs):
final_time = time.time() + timeout
while time.time() < final_time:
output = condition(*args, **kwargs)
if output:
return output
time.sleep(period)
raise TimeoutError(f'Timed out waiting for condition: [{description}]')
# Ждем, пока машина станет доступна в сети
def wait_until_host_is_pingable(ip):
wait_until(condition=host_is_pingable, ip=ip, timeout=60, period=5, description=f"Host {ip} is up")
# Тест
wait_until_host_is_pingable()
import uuid
class Fruit:
def __init__(self, name, id=uuid.uuid4(), color='Green'):
self.name = name
self.id = id
self.color = color
import uuid
from dataclasses import dataclass
@dataclass
class Fruit:
name: str
id: int = uuid.uuid4()
color: str = 'Green'
import uuid
from dataclasses import dataclass
from typing import Literal
@dataclass
class Fruit:
name: str
id: int = uuid.uuid4()
color: Literal['Green', 'Red', 'Orange', 'Blue', 'Yellow', 'Black', 'White'] = 'Green'
from dataclasses import dataclass
from typing import Literal
import uuid
# class Fruit:
#
# def __init__(self, name, id=uuid.uuid4(), color='Green'):
# self.name = name
# self.id = id
# self.color = color
@dataclass
class Fruit:
name: str
id: int = uuid.uuid4()
color: Literal['Green', 'Red', 'Orange', 'Blue', 'Yellow', 'Black', 'White'] = 'Green'
# Тест
fruits = [
Fruit(name='Peach', id=1, color='Orange'),
Fruit(name='Orange', id=2, color='Orange'),
Fruit(name='Banana', color='Yellow'),
Fruit(name='Apple')
]
for fruit in fruits:
print(fruit)
# Результат
# /Users/tati/Documents/Projects/inherit_composition/venv/bin/python
# /Users/tati/Documents/Projects/Pycharm/demo/dataclass.py
# Fruit(name='Peach', id=1, color='Orange')
# Fruit(name='Orange', id=2, color='Orange')
# Fruit(name='Banana', id=UUID('b4022f39-6a61-463b-a7cb-71a744f5b4df'), color='Yellow')
# Fruit(name='Apple', id=UUID('b4022f39-6a61-463b-a7cb-71a744f5b4df'), color='Green')
# Process finished with exit code 0
def my_func_args(*args):
# [1, 2, 3]
pass
def my_func_kwargs(**kwargs):
# {'a': 1, 'b': 2, 'c': 3}
pass
my_func_args(1, 2, 3)
my_func_kwargs(a=1, b=2, c=3)
# Counts arguments quantity
def count_items_in_bag(*args):
sum = 0
for item in args:
print(f'Counting {item}')
sum += 1
print(f'Total items: {sum}')
return
def count_named_items_in_fridge(**kwargs):
sum = 0
for name, quantity in kwargs.items():
print(f'Counting {name}={quantity}')
sum += quantity
print(f'Total items: {sum}')
return
count_items_in_bag('phone', 'credit card', 'pen', 'notebook', 'handkerchief')
count_named_items_in_fridge(milk=1, apple=5, eggs=10, butter=1)
def print_args(a, b=0, **kwargs):
print("Lets print args:")
print(f'a={a}')
print(f'b={b}')
for key, value in kwargs.items():
print(f'{key}={value}')
# The first 2 args could be unnamed since they are passed in the initial order
print_args(1, 2, named_arg=100, one_more_named_arg="I'm one more named arg!")
# We can change args order - but we need to specify their names in such case
print_args(named_arg=100, b=2, a=1, one_more_named_arg="I'm one more named arg!")
# We can skip 'b' param value since it already has default one (b=0)
print_args(named_arg=100, a=1, one_more_named_arg="I'm one more named arg!")
def print_func_name_and_call(times, func, *args):
for iteration in range(0, times):
print(f'Function name is "{func.__name__}" ({iteration})')
return func(*args)
def sum(*args):
sum = 0
for item in args:
sum += item
return sum
# Here we pass sum() function name and its arguments 10, 100, 1000
s = print_func_name_and_call(3, sum, 10, 100, 1000)
print(f'Sum is {s}')
# 1. Dump dict to json str or file
my_dict = {
'str_item': 'Im a string',
'int_item': 100,
'bool_item': True
}
# dict -> json str
my_str = json.dumps(my_dict)
my_str_with_indent = json.dumps(my_dict, indent=2)
print(my_str)
print(my_str_with_indent)
# dict -> json file
with open('my_json_dump.json', 'w') as f:
json.dump(my_dict, f, indent=2)
# 2. Load json from string/file to dict
dict_loaded_from_str = json.loads(my_str)
with open('my_json_dump.json', 'r') as f:
dict_loaded_from_file = json.load(f)
# 1. Dump dict to *.yaml file
my_dict = {
'str_item': 'Im a string',
'int_items': [1, 2, 3, 4, 5],
'bool_items': {'true_item': True, 'false_item': False}
}
with open('my_yaml_dump.yml', 'w') as f:
yaml.dump(my_dict, f)
bool_items:
false_item: false
true_item: true
int_items:
- 1
- 2
- 3
- 4
- 5
str_item: Im a string
# 2. Read *.yaml file content to dict
with open('my_yaml_dump.yml', 'r') as f:
loaded_from_file_dict = yaml.safe_load(f)
(venv) tati@tati useful_samples % pip3 install flake8
Collecting flake8
Successfully installed flake8-6.0.0 mccabe-0.7.0 pycodestyle-2.10.0 pyflakes-3.0.1
[flake8]
ignore=
# Line too long
E501,
#Line break after binary operator
W504
# Could also be ommited via commandline param: --exclude ./venv
exclude = ./venv
# Execution check before style fixes
(venv) tati@tati useful_samples % flake8 . --config .flake8 --count
./args_kwargs.py:75:21: W292 no newline at end of file
./oop_pizza.py:150:15: F541 f-string is missing placeholders
./oop_pizza.py:153:22: E225 missing whitespace around operator
./oop_pizza.py:209:22: W292 no newline at end of file
./retry.py:7:1: E302 expected 2 blank lines, found 1
./retry.py:24:63: E202 whitespace before ')'
./retry.py:43:1: W391 blank line at end of file
7
# Execution result after all errors/warnings were fixed
(venv) tati@tati useful_samples % flake8 . --config .flake8 --exclude ./venv --count
0
class ExtendedJSONEncoder(simplejson.JSONEncoder):
def default(self, obj):
if isinstance(obj, uuid.UUID) or isinstance(obj, datetime.datetime) or isinstance(obj, datetime.date):
return str(obj)
return simplejson.JSONEncoder.default(self, obj)
class Utils:
SECONDS_IN_HOUR = 3600
SECONDS_IN_DAY = 86400
NANOSECONDS_IN_SECOND = 1_000_000_000
NANOSECONDS_IN_HOUR = 3_600_000_000_000
Faker.seed(randrange(100))
faker = Faker()
decimal_ctx = Context()
decimal_ctx.prec = 20
# Выполнить блок кода с подавлением исключения/ошибки
@staticmethod
def execute_with_error_suppression(method, error_type, error_code: StatusCode = None, error_msg: str = None, *args, **kwargs):
try:
return method(*args, **kwargs)
except error_type as e:
if error_code is not None:
if e.args[0].code != error_code:
raise e
elif error_msg is not None:
if error_msg not in str(e):
raise e
log.warning(f"Suppressed error: {e}")
# Счетчик: при каждом вызове увеличивает возвращаемое значение на 1
@staticmethod
def counter(_count=count(1)):
return next(_count)
# Деление с заданным количеством знаков после запятой
@staticmethod
def divide_with_precision(num1, num2, precision=20):
return round(Decimal(num1) / Decimal(num2), precision)
# Удаление незначащих нулей после запятой
@staticmethod
def remove_trailing_zeros_from_number(string: str):
return str(float(string)).rstrip('0').rstrip('.')
# Математическое округление
# Note the rounding half to even. This is also called bankers rounding; instead of always rounding up or down
# (compounding rounding errors), by rounding to the nearest even number you average out rounding errors.
# If you need more control over the rounding behaviour, use the decimal module, which lets you specify exactly what
# rounding strategy should be used.
@staticmethod
def round_half_up(num: Decimal):
with localcontext() as ctx:
ctx.rounding = ROUND_HALF_UP
rounded_num = round(num, 2)
return rounded_num
# Конвертация числа в строку (включая числа с плавающей запятой)
@staticmethod
def number_to_str(number: float | Decimal):
if isinstance(number, float):
number = Utils.decimal_ctx.create_decimal(repr(number))
if isinstance(number, Decimal):
return '{:f}'.format(number.normalize())
return str(number)
# Генерация рандомного числа
@staticmethod
def random_number(range=10):
return randrange(range)
# Генерация рандомной строки
@staticmethod
def random_word():
return f"{Utils.faker.word().capitalize()}_{Utils.short_date()}"
# Обрезка данных (можно использовать, если нужно сократить размер данных для вывода в лог файлы)
@staticmethod
def truncate_str(data, max_length=100):
if type(data) is not str:
data = str(data)
if len(data) > 400:
return Utils.truncate_str_by_lines(data)
else:
return Utils.truncate_str_by_symbols(data, max_length)
# Обрезка строки по количеству символов
@staticmethod
def truncate_str_by_symbols(data, max_length):
if type(data) is not str:
data = str(data)
data = data.replace('\r', '').replace('\n', '').replace('--', '').replace('==', '').replace(' ', '')
return (data[:max_length] + '<..>') if len(data) > max_length else data
# Обрезка строки по количеству строк
@staticmethod
def truncate_str_by_lines(data, max_lines=1):
if type(data) is not str:
data = str(data)
lst = list(filter(None, data.replace('\r', '').split('\n')))
if len(lst) == 1:
return lst[0]
elif len(lst) > max_lines:
return '\n'.join(lst[-max_lines:])
else:
return '\n'.join(lst)
# Перевод строки из CamelCase в snake_case
@staticmethod
def camel_to_snake(camel_str):
return ''.join(['_' + w.lower() if w.isupper() else w for w in camel_str]).lstrip('_')
# Перевод строки из snakeCase в CamelCase
@staticmethod
def snake_to_camel(snake_str):
return "".join(w.capitalize() for w in snake_str.lower().split("_"))
# Перевод строки из множественного числа в единственное
@staticmethod
def singularize_str(string):
return inflect.engine().singular_noun(string)
# Генерация рандомной строки
@staticmethod
def random_str(length):
return ''.join(choice(ascii_uppercase) for i in range(length))
# Замена подстроки
@staticmethod
def replace_str_by_regex(init_str: str, sub_regex: str, sub_str: str):
if re.search(sub_regex, init_str):
return re.sub(sub_regex, sub_str, init_str)
else:
return f"{init_str}{sub_str}"
# Увеличение значения строки на 1
@staticmethod
def str_increment(init_str: str):
num = re.search(r"\d.*", init_str)
num = int(num[0]) + 1 if num else 1
return Utils.replace_str_by_regex(init_str, r"\d.*", str(num))
# Вывод в strerr
def print_to_stderr(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
# Вывод в консоль малиновым цветом (жирный)
@staticmethod
def log_info_magenta(msg):
log.info(f"{Fore.MAGENTA}{Style.BRIGHT}{msg}{Style.RESET_ALL}")
# Вывод в консоль синим цветом (жирный)
@staticmethod
def log_info_blue(msg):
log.info(f"{Fore.BLUE}{Style.BRIGHT}{msg}{Style.RESET_ALL}")
# Вывод в консоль бирюзовым цветом (жирный)
@staticmethod
def log_info_cyan(msg):
log.info(f"{Fore.CYAN}{Style.BRIGHT}{msg}{Style.RESET_ALL}")
# Вывод в консоль зеленым цветом (жирный)
@staticmethod
def log_info_green(msg):
log.info(f"{Fore.GREEN}{Style.BRIGHT}{msg}{Style.RESET_ALL}")
# Сброс форматирования при выводе в консоль
@staticmethod
def reset_formatting(formatted_msg: str):
for color in (vars(Fore) | vars(Style)).values():
if color in formatted_msg:
formatted_msg = formatted_msg.replace(color, '')
return formatted_msg
# Ожидание открытия TCP порта
@staticmethod
def wait_until_port_is_listening(address, port):
Utils.wait_until(condition=lambda: Utils.port_is_listening(address, port),
description=f"Port {address}:{port} is listening")
# Проверка открытия TCP порта
@staticmethod
def port_is_listening(address, port):
try:
listening = socket.socket().connect_ex((address, int(port))) == 0
if listening:
log.info(f"Port {address}:{port} is listening")
else:
log.warning(f"Port {address}:{port} is NOT listening")
return listening
except socket.gaierror:
return
# Кодирование числа в base64
@staticmethod
def number_to_base64_string(number: int):
number_bytes = Utils.int_to_big_endian_bytes(number)
return base64.b64encode(number_bytes).decode('unicode_escape')
# Декодирование числа в base64
@staticmethod
def decode_base64_string(string: str):
return base64.b64decode(string)
# Кодирование строки в sha256
@staticmethod
def string_to_sha256(string: str, key: str):
return hmac.new(str.encode(key), msg=str.encode(string), digestmod=hashlib.sha256).digest().hex().upper()
# Перевод числа int в big endian bytes
@staticmethod
def int_to_big_endian_bytes(number: int):
return number.to_bytes((number.bit_length() + 7) // 8, byteorder="big")
# Перевод числа из big endian bytes в int
@staticmethod
def big_endian_bytes_to_int(data: bytes):
return int.from_bytes(data, "big")
# Фильтрация словаря по ключу
@staticmethod
def filter_dict(d: dict, ignore_keys=None):
if not ignore_keys:
ignore_keys = []
return {k: (Utils.filter_dict(v, ignore_keys) if isinstance(v, dict) else v) for k, v in d.items() if
k not in ignore_keys}
# Проверка словарей на равенство
@staticmethod
def dicts_are_equal(d1: dict, d2: dict, ignore_keys=None):
return Utils.filter_dict(d1, ignore_keys) == Utils.filter_dict(d2, ignore_keys)
# Проверка словарей на равенство (с выбросом исключения)
@staticmethod
def assert_dicts_are_equal(d1: dict, d2: dict, ignore_keys=None):
return assert_that(Utils.filter_dict(d1, ignore_keys)).is_equal_to(Utils.filter_dict(d2, ignore_keys))
# Формирование плоского списка из вложенного
@staticmethod
def flatten_list(list_of_lists: list):
return [item for lst in list_of_lists for item in lst]
# Текущее время в формате unix time
@staticmethod
def unix_timestamp_now():
return int(time.time())
# Перевод времени из формата datetime в google date
@staticmethod
def datetime_to_google_date(dt: datetime.datetime):
return Date(year=dt.year, month=dt.month, day=dt.day)
# Текущее время в формате короткой строки
@staticmethod
def short_date(base_time: datetime = None):
if not base_time:
base_time = datetime.datetime.utcnow()
return base_time.strftime('%Y%m%d%H%M%S') # 20230322113549
# Текущее время в UTC
@staticmethod
def datetime_utc(delta_milli_seconds: int = 0, delta_seconds: int = 0, delta_minutes: int = 0, delta_hours: int = 0,
delta_days: int = 0):
return datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=delta_days,
hours=delta_hours,
minutes=delta_minutes,
seconds=delta_seconds,
milliseconds=delta_milli_seconds)
# Текущей время в таймзоне MSK
@staticmethod
def datetime_msk():
return datetime.datetime.now()
# Перевод строки в формат времени datetime
@staticmethod
def str_to_datetime(string: str):
return datetime.datetime.strptime(string, '%Y%m%d%H%M%S')
# Перевод datetime в строку
@staticmethod
def datetime_to_str(dt: datetime.datetime) -> str:
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
# Перевод unix time в секундах в миллисекунды
@staticmethod
def unix_time_in_ms(seconds_ago=0):
return time.time_ns() // 1000
# Перевод секунд в миллисекунды
@staticmethod
def seconds_to_milliseconds(seconds: float):
return math.floor(seconds * 1000)
# Перевод миллисекунд в секунды
@staticmethod
def milliseconds_to_seconds(milliseconds: float):
return math.floor(milliseconds / 1000)
# Количество дней в месяце
@staticmethod
def days_in_month(year: int, month: int):
return calendar.monthrange(year, month)[1]
# Генерация UUID4 строки
@staticmethod
def uuid4():
return str(uuid.uuid4())
# Текущее время в формате proto
@staticmethod
def protobuf_timestamp_now():
ts = Timestamp()
ts.GetCurrentTime()
return ts
@staticmethod
def datetime_to_protobuf_timestamp(base_time: datetime.datetime = None, delta_milli_seconds: int = 0,
delta_seconds: int = 0, delta_minutes: int = 0, delta_hours: int = 0,
delta_days: int = 0):
if not base_time:
base_time = datetime.datetime.utcnow()
time_with_delta = base_time + datetime.timedelta(days=delta_days, hours=delta_hours, minutes=delta_minutes,
seconds=delta_seconds, milliseconds=delta_milli_seconds)
protobuf_timestamp = Timestamp()
protobuf_timestamp.FromDatetime(time_with_delta)
return protobuf_timestamp
@staticmethod
def protobuf_timestamp_to_datetime(ts: Timestamp):
return datetime.datetime.utcfromtimestamp(ts.seconds + ts.nanos / 1e9)
@staticmethod
def protobuf_duration_now(seconds=5):
td = datetime.timedelta(seconds=seconds)
duration = Duration()
duration.FromTimedelta(td)
return duration
# Формирование proto message из байтов
@staticmethod
def proto_message_from_bytes(data: bytes, model: Any):
obj = model()
obj.ParseFromString(data)
return obj
# Формирование байтов из proto message
@staticmethod
def proto_message_to_bytes(model: Any):
return model.SerializeToString()
# Формирование proto message из словаря
@staticmethod
def proto_message_from_dict(data: dict, model: Any):
obj = model()
ParseDict(data, obj)
return obj
# Формирование json строки из proto message
@staticmethod
def proto_message_to_bytes(model: Any):
return model.SerializeToString()
@staticmethod
def proto_message_to_json(obj: Message, indent: int = 2):
if obj is None:
return "null"
return MessageToJson(obj, indent=indent)
# Формирование словаря из proto message
@staticmethod
def proto_message_to_dict(obj: Message):
return MessageToDict(obj, preserving_proto_field_name=True)
# Формирование json строки из смешанной структуры данных
@staticmethod
def mixed_message_to_json(obj: Any, indent=4):
obj_dct = Utils.mixed_message_to_dict(obj)
return simplejson.dumps(obj_dct, cls=ExtendedJSONEncoder, indent=indent)
# Формирование словаря из смешанной структуры данных
@staticmethod
def mixed_message_to_dict(obj: Any):
if isinstance(obj, Message):
return Utils.proto_message_to_dict(obj)
elif isinstance(obj, BaseModel):
return obj.model_dump()
elif isinstance(obj, BaseClass):
return {str(k): Utils.mixed_message_to_dict(v) for k, v in vars(obj).items()}
elif is_dataclass(obj):
return asdict(obj)
elif isinstance(obj, list):
return [Utils.mixed_message_to_dict(item) for item in obj]
elif isinstance(obj, dict):
return {str(k): Utils.mixed_message_to_dict(v) for k, v in obj.items()}
else:
return obj