import http import os from time import sleep from typing import Optional from urllib.parse import urljoin import requests from django.conf import settings from django.core.files.base import ContentFile from requests import Request os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'poizonstore.settings') class CDEKClient: AUTH_ENDPOINT = 'oauth/token' ORDER_INFO_ENDPOINT = 'orders' CALCULATOR_TARIFF_ENDPOINT = 'calculator/tariff' BARCODE_ENDPOINT = 'print/barcodes' MAX_RETRIES = 2 def __init__(self, client_id, client_secret, grant_type='client_credentials'): self.api_url = 'https://api.cdek.ru/v2/' self.client_id = client_id self.client_secret = client_secret self.grant_type = grant_type self.session = requests.Session() def request(self, method, url, *args, **kwargs): joined_url = urljoin(self.api_url, url) request = Request(method, joined_url, *args, **kwargs) retries = 0 while retries < self.MAX_RETRIES: prepared = self.session.prepare_request(request) r = self.session.send(prepared) # TODO: handle/log errors if r.status_code == http.HTTPStatus.UNAUTHORIZED: self.authorize() continue return r def authorize(self): params = { 'client_id': self.client_id, 'client_secret': self.client_secret, 'grant_type': self.grant_type } r = self.request('POST', self.AUTH_ENDPOINT, params=params) if r: data = r.json() token = data['access_token'] self.session.headers.update({'Authorization': f'Bearer {token}'}) def get_order_info(self, im_number): params = { 'im_number': str(im_number) } return self.request('GET', self.ORDER_INFO_ENDPOINT, params=params) def create_order(self, order_data): return self.request('POST', self.ORDER_INFO_ENDPOINT, json=order_data) def edit_order(self, order_data): return self.request('PATCH', self.ORDER_INFO_ENDPOINT, json=order_data) def calculate_tariff(self, data): return self.request('POST', self.CALCULATOR_TARIFF_ENDPOINT, json=data) def generate_barcode(self, cdek_number, format="A6") -> Optional[str]: request_data = { "orders": [{"cdek_number": cdek_number}], "copy_count": 1, "format": format } r = self.request('POST', self.BARCODE_ENDPOINT, json=request_data) if not r: return None resp_data = r.json() if 'entity' not in resp_data: return None barcode_uuid = resp_data['entity']['uuid'] return barcode_uuid def get_barcode_url(self, uuid) -> Optional[str]: if not uuid: return None r = self.request('GET', f'{self.BARCODE_ENDPOINT}/{uuid}') if not r: return None resp_data = r.json() if 'entity' not in resp_data: return None url = resp_data['entity'].get('url') return url def get_barcode_file(self, cdek_number): uuid = self.generate_barcode(cdek_number) sleep(2) # Sometimes url are not yet created, so be prepared for this url = self.get_barcode_url(uuid) if not url: return None r = self.request('GET', url) return ContentFile(r.content) if r and r.content else None client = CDEKClient(settings.CDEK_CLIENT_ID, settings.CDEK_CLIENT_SECRET) client.authorize()