import http import os from time import sleep from typing import Optional from urllib.parse import urljoin import requests from django.conf import settings from django.core.files.base import ContentFile from store.utils import is_migration_running os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'poizonstore.settings') class CDEKClient: AUTH_ENDPOINT = 'oauth/token' ORDER_INFO_ENDPOINT = 'orders' CALCULATOR_TARIFF_ENDPOINT = 'calculator/tariff' BARCODE_ENDPOINT = 'print/barcodes' MAX_RETRIES = 2 def __init__(self, client_id, client_secret, grant_type='client_credentials'): self.api_url = 'https://api.cdek.ru/v2/' self.client_id = client_id self.client_secret = client_secret self.grant_type = grant_type self.session = requests.Session() def request(self, method, url, *args, **kwargs): joined_url = urljoin(self.api_url, url) request = requests.Request(method, joined_url, *args, **kwargs) retries = 0 while retries < self.MAX_RETRIES: retries += 1 prepared = self.session.prepare_request(request) r = self.session.send(prepared) # TODO: handle/log errors if r.status_code == http.HTTPStatus.UNAUTHORIZED: self.authorize() continue return r def authorize(self): params = { 'client_id': self.client_id, 'client_secret': self.client_secret, 'grant_type': self.grant_type } r = self.request('POST', self.AUTH_ENDPOINT, params=params) if r: data = r.json() token = data['access_token'] self.session.headers.update({'Authorization': f'Bearer {token}'}) def get_order_info(self, im_number): params = { 'im_number': str(im_number) } return self.request('GET', self.ORDER_INFO_ENDPOINT, params=params) def create_order(self, order_data): return self.request('POST', self.ORDER_INFO_ENDPOINT, json=order_data) def edit_order(self, order_data): return self.request('PATCH', self.ORDER_INFO_ENDPOINT, json=order_data) def calculate_tariff(self, data): return self.request('POST', self.CALCULATOR_TARIFF_ENDPOINT, json=data) def generate_barcode(self, cdek_number, format="A6") -> Optional[str]: request_data = { "orders": [{"cdek_number": cdek_number}], "copy_count": 1, "format": format } r = self.request('POST', self.BARCODE_ENDPOINT, json=request_data) if not r: return None resp_data = r.json() if 'entity' not in resp_data: return None barcode_uuid = resp_data['entity']['uuid'] return barcode_uuid def get_barcode_url(self, uuid) -> Optional[str]: if not uuid: return None r = self.request('GET', f'{self.BARCODE_ENDPOINT}/{uuid}') if not r: return None resp_data = r.json() if 'entity' not in resp_data: return None url = resp_data['entity'].get('url') return url def get_barcode_file(self, cdek_number): uuid = self.generate_barcode(cdek_number) sleep(2) # Sometimes url are not yet created, so be prepared for this url = self.get_barcode_url(uuid) if not url: return None r = self.request('GET', url) return ContentFile(r.content) if r and r.content else None client = CDEKClient(settings.CDEK_CLIENT_ID, settings.CDEK_CLIENT_SECRET) if not is_migration_running(): client.authorize()