diff --git a/setup.py b/setup.py index d4f0a95..c1e6296 100644 --- a/setup.py +++ b/setup.py @@ -16,43 +16,48 @@ from setuptools import setup def read(*names, **kwargs): - return io.open(join(dirname(__file__), *names), encoding=kwargs.get('encoding', 'utf8')).read() + return io.open( + join(dirname(__file__), *names), encoding=kwargs.get("encoding", "utf8") + ).read() setup( - name='octokitpy', - version='0.13.0', - license='MIT license', - description='Python client for GitHub API', - long_description='%s\n%s' % ( - re.compile('^.. start-badges.*^.. end-badges', re.M | re.S).sub('', read('README.rst')), - re.sub(':[a-z]+:`~?(.*?)`', r'``\1``', read('CHANGELOG.rst')) + name="octokitpy", + version="0.13.0", + license="MIT license", + description="Python client for GitHub API", + long_description="%s\n%s" + % ( + re.compile("^.. start-badges.*^.. end-badges", re.M | re.S).sub( + "", read("README.rst") + ), + re.sub(":[a-z]+:`~?(.*?)`", r"``\1``", read("CHANGELOG.rst")), ), - author='Kyle Hornberg', - author_email='kyle.hornberg@gmail.com', - url='https://github.com/khornberg/octokit.py', - packages=find_packages('src'), - package_dir={'': 'src'}, - py_modules=[splitext(basename(path))[0] for path in glob('src/*.py')], + author="Kyle Hornberg", + author_email="kyle.hornberg@gmail.com", + url="https://github.com/khornberg/octokit.py", + packages=find_packages("src"), + package_dir={"": "src"}, + py_modules=[splitext(basename(path))[0] for path in glob("src/*.py")], include_package_data=True, zip_safe=False, classifiers=[ # complete classifier list: http://pypi.python.org/pypi?%3Aaction=list_classifiers - 'Development Status :: 4 - Beta', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: MIT License', - 'Operating System :: Unix', - 'Operating System :: POSIX', - 'Operating System :: Microsoft :: Windows', - 'Programming Language :: Python', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: 3.8', - 'Programming Language :: Python :: Implementation :: CPython', - 'Topic :: Utilities', + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Operating System :: Unix", + "Operating System :: POSIX", + "Operating System :: Microsoft :: Windows", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: Implementation :: CPython", + "Topic :: Utilities", ], - keywords=['github', 'octokit', 'api'], - install_requires=open('requirements.txt').readlines(), + keywords=["github", "octokit", "api"], + install_requires=open("requirements.txt").readlines(), extras_require={}, ) diff --git a/src/octokit/__init__.py b/src/octokit/__init__.py index 107cb72..b6bb5ae 100644 --- a/src/octokit/__init__.py +++ b/src/octokit/__init__.py @@ -11,10 +11,9 @@ page_regex = re.compile(r'[\?\&]page=(\d+)[_&=%+\w\d]*>; rel="(\w+)"') class Octokit(Base): - def __init__(self, *args, **kwargs): super().__init__() - self._create(specifications[kwargs.get('routes', 'api.github.com')]) + self._create(specifications[kwargs.get("routes", "api.github.com")]) self._setup_authentication(kwargs) def _create(self, definitions): @@ -26,26 +25,29 @@ class Octokit(Base): def _create_classes(self, definitions): class_attributes = defaultdict(dict) - for path, path_object in definitions['paths'].items(): + for path, path_object in definitions["paths"].items(): for method, method_object in path_object.items(): - _cls_name, _id_name = method_object.get('operationId').split('/') + _cls_name, _id_name = method_object.get("operationId").split("/") cls_name = utils.snake_case(str(_cls_name)) method_id_name = utils.snake_case(str(_id_name)) - method_name = utils.snake_case(str(method_object.get('summary'))) + method_name = utils.snake_case(str(method_object.get("summary"))) class_attributes[cls_name].update( { - method_id_name: self._create_method(method_id_name, method_object, method, path), - method_name: self._create_method(method_name, method_object, method, path), + method_id_name: self._create_method( + method_id_name, method_object, method, path + ), + method_name: self._create_method( + method_name, method_object, method, path + ), } ) return class_attributes def _create_method(self, name, definition, method, path): - def _api_call(*args, **kwargs): - method_headers = kwargs.pop('headers') if kwargs.get('headers') else {} + method_headers = kwargs.pop("headers") if kwargs.get("headers") else {} self.validate(kwargs, definition) - requests_kwargs = {'headers': self._get_headers(method_headers)} + requests_kwargs = {"headers": self._get_headers(method_headers)} parameter_map = self._get_parameters(definition, method) url, data_kwargs = self._form_url(kwargs, path, parameter_map) requests_kwargs.update(self._data(data_kwargs, parameter_map, method)) @@ -56,43 +58,57 @@ class Octokit(Base): except ValueError: attributes = _response.text new_self = copy.deepcopy(self) - setattr(new_self, '_response', _response) - setattr(new_self, 'json', attributes) - setattr(new_self, 'response', new_self._convert_to_object(attributes)) + setattr(new_self, "_response", _response) + setattr(new_self, "json", attributes) + setattr(new_self, "response", new_self._convert_to_object(attributes)) return new_self _api_call.__name__ = name - _api_call.__doc__ = definition['description'] + _api_call.__doc__ = definition["description"] return _api_call def _convert_to_object(self, item): if isinstance(item, dict): - return type('ResponseData', (object, ), {k: self._convert_to_object(v) for k, v in item.items()}) + return type( + "ResponseData", + (object,), + {k: self._convert_to_object(v) for k, v in item.items()}, + ) if isinstance(item, list): - return list((self._convert_to_object(value) for index, value in enumerate(item))) + return list( + (self._convert_to_object(value) for index, value in enumerate(item)) + ) else: return item def set_pages(self, obj, previous_page_requested=None): response_headers = obj._response.headers - links = response_headers.get('Link', None) + links = response_headers.get("Link", None) if links: matches = re.findall(page_regex, links) if matches: for page, kind in matches: - setattr(obj, '{}_page'.format(kind), int(page)) - setattr(obj, 'pages', getattr(obj, 'last_page', previous_page_requested)) - setattr(obj, 'has_pages', True) - setattr(obj, 'current_page', previous_page_requested or getattr(obj, 'next_page') - 1) - setattr(obj, 'is_last_page', obj.pages == obj.current_page) + setattr(obj, "{}_page".format(kind), int(page)) + setattr( + obj, "pages", getattr(obj, "last_page", previous_page_requested) + ) + setattr(obj, "has_pages", True) + setattr( + obj, + "current_page", + previous_page_requested or getattr(obj, "next_page") - 1, + ) + setattr(obj, "is_last_page", obj.pages == obj.current_page) else: - setattr(obj, 'has_pages', False) + setattr(obj, "has_pages", False) return obj def paginate(self, obj, page=1, **kwargs): response = self.set_pages(obj(page=page, **kwargs)) yield response.json - if hasattr(response, 'is_last_page'): + if hasattr(response, "is_last_page"): while not response.is_last_page: - response = self.set_pages(obj(page=response.next_page, **kwargs), response.next_page) + response = self.set_pages( + obj(page=response.next_page, **kwargs), response.next_page + ) yield response.json diff --git a/src/octokit/base.py b/src/octokit/base.py index 9ed8939..035750b 100644 --- a/src/octokit/base.py +++ b/src/octokit/base.py @@ -12,28 +12,35 @@ from octokit import utils class Base(object): - base_url = 'https://api.github.com' + base_url = "https://api.github.com" def __init__(self): - self.headers = {'accept': 'application/vnd.github.v3+json', 'Content-Type': 'application/json'} + self.headers = { + "accept": "application/vnd.github.v3+json", + "Content-Type": "application/json", + } self._attribute_cache = defaultdict(dict) def _get_headers(self, method_headers): return dict(ChainMap(method_headers, self.headers)) def _get_required_params(self, params, cached_kwargs): - all_required = [k for k, v in params.items() if v.get('required')] + all_required = [k for k, v in params.items() if v.get("required")] required_params = all_required return required_params def validate(self, parameters, definition): - self.validate_required_parameters(parameters, self.get_required_parameters(definition)) + self.validate_required_parameters( + parameters, self.get_required_parameters(definition) + ) schema = {} - if definition.get('requestBody'): - schema = definition['requestBody']['content']['application/json']['schema'] + if definition.get("requestBody"): + schema = definition["requestBody"]["content"]["application/json"]["schema"] self.validate_schema(parameters, schema) - properties = schema.get('properties', {}) - valid_parameters = [p['name'] for p in definition.get('parameters')] + list(properties.keys()) + properties = schema.get("properties", {}) + valid_parameters = [p["name"] for p in definition.get("parameters")] + list( + properties.keys() + ) self.validate_other_parameters(parameters, valid_parameters, properties) return True @@ -42,15 +49,17 @@ class Base(object): try: assert parameter in valid_parameters except AssertionError: - message = '{} is not a valid parameter'.format(parameter) + message = "{} is not a valid parameter".format(parameter) raise errors.OctokitParameterError(message) if properties.get(parameter): self.validate_enum(parameter, value, properties) def validate_enum(self, parameter, value, properties): - if properties.get(parameter).get('enum') and value not in properties.get(parameter).get('enum'): - message = '{} is not a valid option for {}; must be one of {}'.format( - value, parameter, properties.get(parameter).get('enum') + if properties.get(parameter).get("enum") and value not in properties.get( + parameter + ).get("enum"): + message = "{} is not a valid option for {}; must be one of {}".format( + value, parameter, properties.get(parameter).get("enum") ) raise errors.OctokitParameterError(message) @@ -59,36 +68,42 @@ class Base(object): self.validate_list(parameters, schema) if isinstance(parameters, dict): self.validate_dict(parameters, schema) - self.validate_required_parameters(parameters, self.get_required_schema_properties(schema)) - self.type_match(parameters, schema.get('type')) + self.validate_required_parameters( + parameters, self.get_required_schema_properties(schema) + ) + self.type_match(parameters, schema.get("type")) def validate_dict(self, parameters, schema): for parameter, value in parameters.items(): if isinstance(value, dict): - self.validate_schema(value, schema.get('properties').get(parameter)) + self.validate_schema(value, schema.get("properties").get(parameter)) if isinstance(value, list): - self.validate_schema(value, schema.get('properties').get(parameter)) + self.validate_schema(value, schema.get("properties").get(parameter)) def validate_list(self, parameters, schema): - if not parameters and schema.get('items').get('required'): - message = f'property is missing required items' + if not parameters and schema.get("items").get("required"): + message = f"property is missing required items" raise errors.OctokitParameterError(message) for parameter in parameters: - self.validate_schema(parameter, schema.get('items', schema)) + self.validate_schema(parameter, schema.get("items", schema)) def type_match(self, symbol, expected_type): - types = {'array': list, 'object': dict, 'string': str} + types = {"array": list, "object": dict, "string": str} if not isinstance(symbol, types.get(expected_type)): name = symbol.__class__.__name__ - message = f'{name} type does not match the schema type of {expected_type} for the data of {symbol}' + message = f"{name} type does not match the schema type of {expected_type} for the data of {symbol}" raise errors.OctokitParameterError(message) def get_required_schema_properties(self, schema): - if schema.get('type') == 'object': - return schema.get('required') + if schema.get("type") == "object": + return schema.get("required") def get_required_parameters(self, definition): - return [p.get('name') for p in definition.get('parameters') if p.get('required') and p.get('in') in ['path']] + return [ + p.get("name") + for p in definition.get("parameters") + if p.get("required") and p.get("in") in ["path"] + ] def validate_required_parameters(self, parameters, required_parameters): for required_parameter in required_parameters or []: @@ -100,149 +115,159 @@ class Base(object): self._raise_must_have_value(required_parameter) def _raise_required_parameter(self, param): - message = '{} is a required parameter'.format(param) + message = "{} is a required parameter".format(param) raise errors.OctokitParameterError(message) def _raise_must_have_value(self, required_parameter): - message = '{} must have a value'.format(required_parameter) + message = "{} must have a value".format(required_parameter) raise errors.OctokitParameterError(message) def _form_url(self, values, _url, params): - _values = dict(ChainMap(values, self._attribute_cache['url'])) + _values = dict(ChainMap(values, self._attribute_cache["url"])) filtered_kwargs = {k: v for k, v in _values.items() if params.get(k)} data_values = filtered_kwargs.copy() for name, value in filtered_kwargs.items(): - _url, subs = re.subn(fr'{{{name}}}', str(value), _url) + _url, subs = re.subn(fr"{{{name}}}", str(value), _url) if subs != 0: - self._attribute_cache['url'][name] = data_values.pop(name) - url = '{}{}'.format(self.base_url, _url) + self._attribute_cache["url"][name] = data_values.pop(name) + url = "{}{}".format(self.base_url, _url) return url, data_values def _get_data(self, kwargs, params): data = array_data = {} for parameter_name, parameter in params.items(): - if parameter.get('type') == 'array': + if parameter.get("type") == "array": array_data = self._get_array_data(parameter_name, parameter, kwargs) data.update(self._get_default_data(parameter_name, parameter, kwargs)) return dict(ChainMap(data, array_data, kwargs)) def _get_default_data(self, parameter_name, parameter, kwargs): data = {} - if parameter.get('in') in ['query', 'body'] and not kwargs.get(parameter_name): - if parameter.get('schema', parameter).get('default') is not None: - data[parameter_name] = self._get_parameter_for_type(parameter.get('schema', parameter)) + if parameter.get("in") in ["query", "body"] and not kwargs.get(parameter_name): + if parameter.get("schema", parameter).get("default") is not None: + data[parameter_name] = self._get_parameter_for_type( + parameter.get("schema", parameter) + ) return data def _get_array_data(self, parameter_name, parameter, kwargs): data = {} - properties = parameter.get('items', parameter).get('properties') + properties = parameter.get("items", parameter).get("properties") if properties: for name, property_data in properties.items(): - if property_data.get('default') and not kwargs.get(parameter_name): + if property_data.get("default") and not kwargs.get(parameter_name): data[parameter_name] = self._get_parameter_for_type(property_data) return data def _get_parameter_for_type(self, schema): - if schema.get('type') == 'boolean' and schema.get('default') == 'true': + if schema.get("type") == "boolean" and schema.get("default") == "true": return True - if schema.get('type') == 'boolean' and schema.get('default') == 'false': + if schema.get("type") == "boolean" and schema.get("default") == "false": return False - return schema.get('default') + return schema.get("default") def _data(self, data_kwargs, params, method): data = self._get_data(data_kwargs, params) - if method == 'get': - return {'params': data} - if method in ['post', 'patch', 'put', 'delete']: - return {'data': json.dumps(data, sort_keys=True)} + if method == "get": + return {"params": data} + if method in ["post", "patch", "put", "delete"]: + return {"data": json.dumps(data, sort_keys=True)} return {} def _setup_authentication(self, kwargs): authentication_schemes = { - 'basic': self._setup_basic_authentication, - 'token': self._setup_token_authentication, - 'installation': self._setup_installation_authentication, - 'app': self._setup_app_authentication, + "basic": self._setup_basic_authentication, + "token": self._setup_token_authentication, + "installation": self._setup_installation_authentication, + "app": self._setup_app_authentication, } - if kwargs.get('auth'): - authentication_schemes.get(kwargs.get('auth'))(kwargs) + if kwargs.get("auth"): + authentication_schemes.get(kwargs.get("auth"))(kwargs) def _setup_basic_authentication(self, kwargs): - assert kwargs['username'] - assert kwargs['password'] - self.username = kwargs['username'] - self.password = kwargs['password'] - self.auth = kwargs['auth'] + assert kwargs["username"] + assert kwargs["password"] + self.username = kwargs["username"] + self.password = kwargs["password"] + self.auth = kwargs["auth"] def _setup_token_authentication(self, kwargs): - assert kwargs['token'] - self.token = kwargs['token'] - self.auth = kwargs['auth'] + assert kwargs["token"] + self.token = kwargs["token"] + self.auth = kwargs["auth"] def _setup_installation_authentication(self, kwargs): - assert kwargs['app_id'] - assert kwargs['private_key'] - self.token, self.expires_at = self._app_auth_get_token(kwargs['app_id'], kwargs['private_key']) - self.auth = kwargs['auth'] - self.headers['accept'] = 'application/vnd.github.machine-man-preview+json' + assert kwargs["app_id"] + assert kwargs["private_key"] + self.token, self.expires_at = self._app_auth_get_token( + kwargs["app_id"], kwargs["private_key"] + ) + self.auth = kwargs["auth"] + self.headers["accept"] = "application/vnd.github.machine-man-preview+json" def _setup_app_authentication(self, kwargs): - assert kwargs['app_id'] - assert kwargs['private_key'] - self.jwt = self._app_auth_get_jwt(kwargs['app_id'], kwargs['private_key']) - self.auth = kwargs['auth'] - self.headers['accept'] = 'application/vnd.github.machine-man-preview+json' + assert kwargs["app_id"] + assert kwargs["private_key"] + self.jwt = self._app_auth_get_jwt(kwargs["app_id"], kwargs["private_key"]) + self.auth = kwargs["auth"] + self.headers["accept"] = "application/vnd.github.machine-man-preview+json" def _app_auth_get_token(self, app_id, key): headers = { - 'Authorization': 'Bearer {}'.format(self._app_auth_get_jwt(app_id, key)), - 'Accept': 'application/vnd.github.machine-man-preview+json', + "Authorization": "Bearer {}".format(self._app_auth_get_jwt(app_id, key)), + "Accept": "application/vnd.github.machine-man-preview+json", } - installation_url = '{}/app/installations'.format(self.base_url) + installation_url = "{}/app/installations".format(self.base_url) installations = requests.get(installation_url, headers=headers).json() - self.installation_id = [x.get('id') for x in installations if str(x.get('app_id')) == app_id].pop() - installation_token_url = '{}/installations/{}/access_tokens'.format(self.base_url, self.installation_id) + self.installation_id = [ + x.get("id") for x in installations if str(x.get("app_id")) == app_id + ].pop() + installation_token_url = "{}/installations/{}/access_tokens".format( + self.base_url, self.installation_id + ) response = requests.post(installation_token_url, headers=headers).json() - return response['token'], response['expires_at'] + return response["token"], response["expires_at"] def _app_auth_get_jwt(self, app_id, key): payload = { - 'iat': int(datetime.datetime.timestamp(datetime.datetime.now())), - 'exp': int(datetime.datetime.timestamp(datetime.datetime.now())) + (9 * 60), - 'iss': app_id, + "iat": int(datetime.datetime.timestamp(datetime.datetime.now())), + "exp": int(datetime.datetime.timestamp(datetime.datetime.now())) + (9 * 60), + "iss": app_id, } - return jwt.encode(payload, key, algorithm='RS256') + return jwt.encode(payload, key, algorithm="RS256") def _auth(self, requests_kwargs): - if getattr(self, 'auth', None) == 'basic': - return {'auth': (self.username, self.password)} - if getattr(self, 'auth', None) in ['app', 'token', 'installation']: + if getattr(self, "auth", None) == "basic": + return {"auth": (self.username, self.password)} + if getattr(self, "auth", None) in ["app", "token", "installation"]: _headers = { - 'app': { - 'Authorization': 'Bearer {}'.format(getattr(self, 'jwt', None)) + "app": { + "Authorization": "Bearer {}".format(getattr(self, "jwt", None)) }, - 'token': { - 'Authorization': 'token {}'.format(getattr(self, 'token', None)) + "token": { + "Authorization": "token {}".format(getattr(self, "token", None)) }, - 'installation': { - 'Authorization': 'token {}'.format(getattr(self, 'token', None)) + "installation": { + "Authorization": "token {}".format(getattr(self, "token", None)) }, } - headers = requests_kwargs['headers'] - headers.update(_headers.get(getattr(self, 'auth', None))) - return {'headers': headers} + headers = requests_kwargs["headers"] + headers.update(_headers.get(getattr(self, "auth", None))) + return {"headers": headers} return {} def _get_parameters(self, definition, method): p = {} - if definition.get('requestBody'): - schema = definition.get('requestBody')['content']['application/json']['schema'] - if schema['type'] == 'object': + if definition.get("requestBody"): + schema = definition.get("requestBody")["content"]["application/json"][ + "schema" + ] + if schema["type"] == "object": body_parameters = {} - for k, v in schema['properties'].items(): - v['in'] = 'body' - v['required'] = k in schema.get('required', {}) + for k, v in schema["properties"].items(): + v["in"] = "body" + v["required"] = k in schema.get("required", {}) body_parameters.update({k: v}) p.update(body_parameters) - p.update(utils.parameter_transform(definition.get('parameters'))) + p.update(utils.parameter_transform(definition.get("parameters"))) return p diff --git a/src/octokit/utils.py b/src/octokit/utils.py index f3742d0..dbedb3b 100644 --- a/src/octokit/utils.py +++ b/src/octokit/utils.py @@ -5,13 +5,19 @@ import re def snake_case(string): # From https://gist.github.com/jaytaylor/3660565#gistcomment-2271689 - return re.compile(r'(?!^)(?' - assert sut.response.user.login == 'octocat' + assert sut.response.state == "open" + assert str(sut.response.user) == "" + assert sut.response.user.login == "octocat" assert sut.response.user.site_admin is False assert sut.response.labels[0].id == 208045946 def test_returned_object_is_a_list(self, mocker): - patch = mocker.patch('requests.patch') - data = [{ - "id": 208045946, - }, { - "id": 208045947, - }] - Request = namedtuple('Request', ['json']) + patch = mocker.patch("requests.patch") + data = [{"id": 208045946}, {"id": 208045947}] + Request = namedtuple("Request", ["json"]) patch.return_value = Request(json=lambda: data) - sut = Octokit().issues.update(owner='testUser', repo='testRepo', issue_number=1) + sut = Octokit().issues.update(owner="testUser", repo="testRepo", issue_number=1) assert sut.response[0].id == 208045946 def test_an_exception_with_json_is_replaced_by_the_raw_text(self, mocker): - patch = mocker.patch('requests.patch') - Request = namedtuple('Request', ['json']) - patch.return_value = Request(json=lambda: 'test') - sut = Octokit().issues.update(owner='testUser', repo='testRepo', issue_number=1) - assert sut.json == 'test' + patch = mocker.patch("requests.patch") + Request = namedtuple("Request", ["json"]) + patch.return_value = Request(json=lambda: "test") + sut = Octokit().issues.update(owner="testUser", repo="testRepo", issue_number=1) + assert sut.json == "test" def test_can_pass_in_optional_headers(self, mocker): - mocker.patch('requests.get') - headers = {'accept': 'application/vnd.github.ant-man-preview+json', 'Content-Type': 'application/json'} + mocker.patch("requests.get") + headers = { + "accept": "application/vnd.github.ant-man-preview+json", + "Content-Type": "application/json", + } Octokit().oauth_authorizations.get_authorization( - authorization_id=100, headers={ - 'accept': 'application/vnd.github.ant-man-preview+json' - } + authorization_id=100, + headers={"accept": "application/vnd.github.ant-man-preview+json"}, + ) + requests.get.assert_called_once_with( + "https://api.github.com/authorizations/100", params={}, headers=headers ) - requests.get.assert_called_once_with('https://api.github.com/authorizations/100', params={}, headers=headers) def test_dictionary_keys_are_validated(self, mocker): - mocker.patch('requests.put') - headers = {'accept': 'application/vnd.github.v3+json', 'Content-Type': 'application/json'} - data = { - "required_status_checks": { - "strict": True, - "contexts": [], - }, - "required_pull_request_reviews": { - "dismiss_stale_reviews": True - }, - "enforce_admins": True, - "restrictions": { - "users": [], - "teams": [], - } + mocker.patch("requests.put") + headers = { + "accept": "application/vnd.github.v3+json", + "Content-Type": "application/json", } - Octokit().repos.update_branch_protection(owner='user', repo='repo', branch='branch', **data) + data = { + "required_status_checks": {"strict": True, "contexts": []}, + "required_pull_request_reviews": {"dismiss_stale_reviews": True}, + "enforce_admins": True, + "restrictions": {"users": [], "teams": []}, + } + Octokit().repos.update_branch_protection( + owner="user", repo="repo", branch="branch", **data + ) requests.put.assert_called_with( - 'https://api.github.com/repos/user/repo/branches/branch/protection', + "https://api.github.com/repos/user/repo/branches/branch/protection", data=json.dumps(data, sort_keys=True), - headers=headers + headers=headers, ) @property diff --git a/tests/test_octokit.py b/tests/test_octokit.py index d85e515..edcacb6 100644 --- a/tests/test_octokit.py +++ b/tests/test_octokit.py @@ -2,19 +2,17 @@ import pytest class MockHeaders(object): - def __init__(self, requested_page, **kwargs): link = 'Links ; rel="next", ; rel="last"'.format( # noqa E501 min(requested_page + 1, 4) ) - self.headers = {'Link': kwargs.get('link', link)} + self.headers = {"Link": kwargs.get("link", link)} class MockObject(object): - def __init__(self, page, kwargs): self._response = MockHeaders(page, **kwargs) - self.json = {'page': page, 'kwargs': kwargs} + self.json = {"page": page, "kwargs": kwargs} def MockResponse(page=None, **kwargs): @@ -22,50 +20,55 @@ def MockResponse(page=None, **kwargs): def MockResponseWithoutLinks(page=None, **kwargs): - kwargs['link'] = '' + kwargs["link"] = "" return MockObject(page, kwargs) class TestOctokit(object): - def test_can_instantiate_class(self): import octokit + assert isinstance(octokit.Octokit, object) octo = octokit.Octokit() assert isinstance(octo, object) def test_can_import_class(self): from octokit import Octokit + assert isinstance(Octokit, object) octokit = Octokit() assert isinstance(octokit, object) def test_clients_are_lower_case(self): from octokit import Octokit + assert all(client.islower() for client in Octokit.__dict__) def test_pagination(self): from octokit import Octokit + sut_obj = MockResponse - p = Octokit().paginate(sut_obj, param='value') - assert next(p) == {'page': 1, 'kwargs': {'param': 'value'}} - assert next(p) == {'page': 2, 'kwargs': {'param': 'value'}} - assert next(p) == {'page': 3, 'kwargs': {'param': 'value'}} - assert next(p) == {'page': 4, 'kwargs': {'param': 'value'}} + p = Octokit().paginate(sut_obj, param="value") + assert next(p) == {"page": 1, "kwargs": {"param": "value"}} + assert next(p) == {"page": 2, "kwargs": {"param": "value"}} + assert next(p) == {"page": 3, "kwargs": {"param": "value"}} + assert next(p) == {"page": 4, "kwargs": {"param": "value"}} def test_pagination_does_break_when_iterating_over_a_single_page(self): from octokit import Octokit + sut_obj = MockResponseWithoutLinks - p = Octokit().paginate(sut_obj, param='value') - assert next(p) == {'page': 1, 'kwargs': {'param': 'value', 'link': ''}} + p = Octokit().paginate(sut_obj, param="value") + assert next(p) == {"page": 1, "kwargs": {"param": "value", "link": ""}} with pytest.raises(StopIteration): assert next(p) def test_can_speficy_the_route_specifications_used(self): from octokit import Octokit - octokit = Octokit(routes='ghe-2.18') - assert '/enterprise/2.18' in octokit.issues.create.__doc__ + + octokit = Octokit(routes="ghe-2.18") + assert "/enterprise/2.18" in octokit.issues.create.__doc__ octokit = Octokit() - assert '/developer.github.com' in octokit.issues.create.__doc__ - octokit = Octokit(routes='api.github.com') - assert '/developer.github.com' in octokit.issues.create.__doc__ + assert "/developer.github.com" in octokit.issues.create.__doc__ + octokit = Octokit(routes="api.github.com") + assert "/developer.github.com" in octokit.issues.create.__doc__ diff --git a/tests/test_validation.py b/tests/test_validation.py index 5985d56..691335b 100644 --- a/tests/test_validation.py +++ b/tests/test_validation.py @@ -4,49 +4,57 @@ from octokit.base import Base class TestBase(object): - def test_validate_required_parameters(self): - attrs = {'owner': 'me', 'repo': 'my_repo'} + attrs = {"owner": "me", "repo": "my_repo"} assert Base().validate(attrs, self.parameter_only_definition) def test_raise_error_for_missing_required_parameter(self): - attrs = {'owner': 'me'} + attrs = {"owner": "me"} with pytest.raises(errors.OctokitParameterError) as e: Base().validate(attrs, self.definition) - assert 'repo is a required parameter' == str(e.value) + assert "repo is a required parameter" == str(e.value) def test_raise_error_for_missing_required_request_body_property(self): - attrs = {'owner': 'me', 'repo': 'my_repo'} + attrs = {"owner": "me", "repo": "my_repo"} with pytest.raises(errors.OctokitParameterError) as e: Base().validate(attrs, self.definition) - assert 'name is a required parameter' == str(e.value) - attrs = {'owner': 'me', 'repo': 'my_repo', 'name': 'blah'} + assert "name is a required parameter" == str(e.value) + attrs = {"owner": "me", "repo": "my_repo", "name": "blah"} with pytest.raises(errors.OctokitParameterError) as e: Base().validate(attrs, self.definition) - assert 'head_sha is a required parameter' == str(e.value) + assert "head_sha is a required parameter" == str(e.value) def test_validate_parameters_and_request_body_properties(self): - attrs = {'owner': 'me', 'repo': 'my_repo', 'name': 'blah', 'head_sha': 'master'} + attrs = {"owner": "me", "repo": "my_repo", "name": "blah", "head_sha": "master"} assert Base().validate(attrs, self.definition) def test_validate_nested_request_body_properties(self): - attrs = {'owner': 'me', 'repo': 'my_repo', 'name': 'blah', 'head_sha': 'master', 'output': {}} - with pytest.raises(errors.OctokitParameterError) as e: - Base().validate(attrs, self.definition) - assert 'title is a required parameter' == str(e.value) - attrs = {'owner': 'me', 'repo': 'my_repo', 'name': 'blah', 'head_sha': 'master', 'output': {'title': 'here'}} - with pytest.raises(errors.OctokitParameterError) as e: - Base().validate(attrs, self.definition) - assert 'summary is a required parameter' == str(e.value) attrs = { - 'owner': 'me', - 'repo': 'my_repo', - 'name': 'blah', - 'head_sha': 'master', - 'output': { - 'title': 'here', - 'summary': 'there' - } + "owner": "me", + "repo": "my_repo", + "name": "blah", + "head_sha": "master", + "output": {}, + } + with pytest.raises(errors.OctokitParameterError) as e: + Base().validate(attrs, self.definition) + assert "title is a required parameter" == str(e.value) + attrs = { + "owner": "me", + "repo": "my_repo", + "name": "blah", + "head_sha": "master", + "output": {"title": "here"}, + } + with pytest.raises(errors.OctokitParameterError) as e: + Base().validate(attrs, self.definition) + assert "summary is a required parameter" == str(e.value) + attrs = { + "owner": "me", + "repo": "my_repo", + "name": "blah", + "head_sha": "master", + "output": {"title": "here", "summary": "there"}, } assert Base().validate(attrs, self.definition) @@ -60,26 +68,24 @@ class TestBase(object): "in": "header", "schema": { "type": "string", - "default": "application/vnd.github.antiope-preview+json" + "default": "application/vnd.github.antiope-preview+json", }, - "required": True - }, { + "required": True, + }, + { "name": "owner", "description": "owner parameter", "in": "path", "required": True, - "schema": { - "type": "string" - } - }, { + "schema": {"type": "string"}, + }, + { "name": "repo", "description": "repo parameter", "in": "path", "required": True, - "schema": { - "type": "string" - } - } + "schema": {"type": "string"}, + }, ], "requestBody": { "content": { @@ -89,89 +95,80 @@ class TestBase(object): "properties": { "name": { "type": "string", - "description": "The name of the check. For example, \"code-coverage\"." + "description": 'The name of the check. For example, "code-coverage".', }, "head_sha": { "type": "string", - "description": "The SHA of the commit." - }, - "details_url": { - "type": "string", + "description": "The SHA of the commit.", }, + "details_url": {"type": "string"}, "external_id": { "type": "string", - "description": "A reference for the run on the integrator's system." + "description": "A reference for the run on the integrator's system.", }, "status": { "type": "string", "enum": ["queued", "in_progress", "completed"], - "default": "queued" - }, - "started_at": { - "type": "string", + "default": "queued", }, + "started_at": {"type": "string"}, "conclusion": { "type": "string", "enum": [ - "success", "failure", "neutral", "cancelled", "timed_out", "action_required" - ] - }, - "completed_at": { - "type": "string", + "success", + "failure", + "neutral", + "cancelled", + "timed_out", + "action_required", + ], }, + "completed_at": {"type": "string"}, "output": { "type": "object", "properties": { "title": { "type": "string", - "description": "The title of the check run." - }, - "summary": { - "type": "string", - }, - "text": { - "type": "string", + "description": "The title of the check run.", }, + "summary": {"type": "string"}, + "text": {"type": "string"}, "annotations": { "type": "array", "items": { "type": "object", "properties": { - "path": { - "type": "string", - }, + "path": {"type": "string"}, "start_line": { "type": "integer", - "description": "The start line of the annotation." + "description": "The start line of the annotation.", }, "end_line": { "type": "integer", - "description": "The end line of the annotation." - }, - "start_column": { - "type": "integer", - }, - "end_column": { - "type": "integer", + "description": "The end line of the annotation.", }, + "start_column": {"type": "integer"}, + "end_column": {"type": "integer"}, "annotation_level": { "type": "string", - "enum": ["notice", "warning", "failure"] + "enum": [ + "notice", + "warning", + "failure", + ], }, - "message": { - "type": "string", - }, - "title": { - "type": "string", - }, - "raw_details": { - "type": "string", - } + "message": {"type": "string"}, + "title": {"type": "string"}, + "raw_details": {"type": "string"}, }, "required": [ - "path", "start_line", "end_line", "annotation_level", "message" - ] - } + "path", + "start_line", + "end_line", + "annotation_level", + "message", + ], + }, }, "images": { "type": "array", @@ -180,47 +177,45 @@ class TestBase(object): "properties": { "alt": { "type": "string", - "description": "The alternative text for the image." + "description": "The alternative text for the image.", }, "image_url": { "type": "string", - "description": "The full URL of the image." + "description": "The full URL of the image.", }, "caption": { "type": "string", - "description": "A short image description." - } + "description": "A short image description.", + }, }, - "required": ["alt", "image_url"] - } - } + "required": ["alt", "image_url"], + }, + }, }, - "required": ["title", "summary"] + "required": ["title", "summary"], }, "actions": { "type": "array", "items": { "type": "object", "properties": { - "label": { - "type": "string", - }, - "description": { - "type": "string", - }, - "identifier": { - "type": "string", - } + "label": {"type": "string"}, + "description": {"type": "string"}, + "identifier": {"type": "string"}, }, - "required": ["label", "description", "identifier"] - } - } + "required": [ + "label", + "description", + "identifier", + ], + }, + }, }, - "required": ["name", "head_sha"] - }, + "required": ["name", "head_sha"], + } } } - } + }, } @property @@ -233,25 +228,23 @@ class TestBase(object): "in": "header", "schema": { "type": "string", - "default": "application/vnd.github.antiope-preview+json" + "default": "application/vnd.github.antiope-preview+json", }, - "required": True - }, { + "required": True, + }, + { "name": "owner", "description": "owner parameter", "in": "path", "required": True, - "schema": { - "type": "string" - } - }, { + "schema": {"type": "string"}, + }, + { "name": "repo", "description": "repo parameter", "in": "path", "required": True, - "schema": { - "type": "string" - } - } + "schema": {"type": "string"}, + }, ] } diff --git a/tests/test_webhook.py b/tests/test_webhook.py index 72a581e..5363a2c 100644 --- a/tests/test_webhook.py +++ b/tests/test_webhook.py @@ -4,124 +4,134 @@ from octokit import webhook class TestWebhook(object): - def test_can_verify_webhook(self): headers = { - 'X-Hub-Signature': 'sha1=25af6174a0fcecc4d346680a72b7ce644b9a88e8', - 'X-GitHub-Event': 'push', - 'X-GitHub-Delivery': '72d3162f-cc78-11e3-81ab-4c9367dc0958' + "X-Hub-Signature": "sha1=25af6174a0fcecc4d346680a72b7ce644b9a88e8", + "X-GitHub-Event": "push", + "X-GitHub-Delivery": "72d3162f-cc78-11e3-81ab-4c9367dc0958", } - payload = '' - secret = 'secret' - events = ['push'] + payload = "" + secret = "secret" + events = ["push"] assert webhook.verify(headers, payload, secret, events=events) def test_can_filter_webhook_events(self): headers = { - 'X-Hub-Signature': 'sha1=5d61605c3feea9799210ddcb71307d4ba264225f', - 'X-GitHub-Delivery': '72d3162f-cc78-11e3-81ab-4c9367dc0958' + "X-Hub-Signature": "sha1=5d61605c3feea9799210ddcb71307d4ba264225f", + "X-GitHub-Delivery": "72d3162f-cc78-11e3-81ab-4c9367dc0958", } - payload = '' - secret = 'secret' - events = ['push'] + payload = "" + secret = "secret" + events = ["push"] assert webhook.verify(headers, payload, secret, events=events) is False def test_must_specify_events_to_allow(self): headers = { - 'X-Hub-Signature': 'sha1=5d61605c3feea9799210ddcb71307d4ba264225f', - 'X-GitHub-Delivery': '72d3162f-cc78-11e3-81ab-4c9367dc0958' + "X-Hub-Signature": "sha1=5d61605c3feea9799210ddcb71307d4ba264225f", + "X-GitHub-Delivery": "72d3162f-cc78-11e3-81ab-4c9367dc0958", } - payload = '' - secret = 'secret' + payload = "" + secret = "secret" assert webhook.verify(headers, payload, secret) is False def test_can_specify_all_events(self): headers = { - 'X-Hub-Signature': 'sha1=25af6174a0fcecc4d346680a72b7ce644b9a88e8', - 'X-GitHub-Event': 'push', - 'X-GitHub-Delivery': '72d3162f-cc78-11e3-81ab-4c9367dc0958' + "X-Hub-Signature": "sha1=25af6174a0fcecc4d346680a72b7ce644b9a88e8", + "X-GitHub-Event": "push", + "X-GitHub-Delivery": "72d3162f-cc78-11e3-81ab-4c9367dc0958", } - payload = '' - secret = 'secret' - events = ['*'] + payload = "" + secret = "secret" + events = ["*"] assert webhook.verify(headers, payload, secret, events=events) def test_only_known_events_are_valid(self): headers = { - 'X-Hub-Signature': 'sha1=5d61605c3feea9799210ddcb71307d4ba264225f', - 'X-GitHub-Event': 'pushy', - 'X-GitHub-Delivery': '72d3162f-cc78-11e3-81ab-4c9367dc0958' + "X-Hub-Signature": "sha1=5d61605c3feea9799210ddcb71307d4ba264225f", + "X-GitHub-Event": "pushy", + "X-GitHub-Delivery": "72d3162f-cc78-11e3-81ab-4c9367dc0958", } - payload = '' - secret = 'secret' - events = ['pushy'] + payload = "" + secret = "secret" + events = ["pushy"] assert webhook.verify(headers, payload, secret, events=events) is False def test_delivery_guids_must_be_valid_guids(self): headers = { - 'X-Hub-Signature': 'sha1=5d61605c3feea9799210ddcb71307d4ba264225f', - 'X-GitHub-Event': 'push', - 'X-GitHub-Delivery': 'not-a-guid' + "X-Hub-Signature": "sha1=5d61605c3feea9799210ddcb71307d4ba264225f", + "X-GitHub-Event": "push", + "X-GitHub-Delivery": "not-a-guid", } - payload = '' - secret = 'secret' - events = ['push'] + payload = "" + secret = "secret" + events = ["push"] assert webhook.verify(headers, payload, secret, events=events) is False def test_can_verify_user_agent(self): headers = { - 'X-Hub-Signature': 'sha1=25af6174a0fcecc4d346680a72b7ce644b9a88e8', - 'X-GitHub-Event': 'push', - 'X-GitHub-Delivery': '72d3162f-cc78-11e3-81ab-4c9367dc0958', - 'User-Agent': 'GitHub-Hookshot/', + "X-Hub-Signature": "sha1=25af6174a0fcecc4d346680a72b7ce644b9a88e8", + "X-GitHub-Event": "push", + "X-GitHub-Delivery": "72d3162f-cc78-11e3-81ab-4c9367dc0958", + "User-Agent": "GitHub-Hookshot/", } - payload = '' - secret = 'secret' - events = ['push'] - assert webhook.verify(headers, payload, secret, events=events, verify_user_agent=True) + payload = "" + secret = "secret" + events = ["push"] + assert webhook.verify( + headers, payload, secret, events=events, verify_user_agent=True + ) def test_verifies_user_agent(self): headers = { - 'X-Hub-Signature': 'sha1=25af6174a0fcecc4d346680a72b7ce644b9a88e8', - 'X-GitHub-Event': 'push', - 'X-GitHub-Delivery': '72d3162f-cc78-11e3-81ab-4c9367dc0958', - 'User-Agent': 'GitHub-Hooks', + "X-Hub-Signature": "sha1=25af6174a0fcecc4d346680a72b7ce644b9a88e8", + "X-GitHub-Event": "push", + "X-GitHub-Delivery": "72d3162f-cc78-11e3-81ab-4c9367dc0958", + "User-Agent": "GitHub-Hooks", } - payload = '' - secret = 'secret' - events = ['push'] - assert webhook.verify(headers, payload, secret, events=events, verify_user_agent=True) is False + payload = "" + secret = "secret" + events = ["push"] + assert ( + webhook.verify( + headers, payload, secret, events=events, verify_user_agent=True + ) + is False + ) def test_verify_ping_event(self): headers = { - 'X-Hub-Signature': 'sha1=76b55589eeb1d5609a01922fc9a52475cf746a5b', - 'X-GitHub-Event': 'ping', - 'X-GitHub-Delivery': '72d3162f-cc78-11e3-81ab-4c9367dc0958', - 'User-Agent': 'GitHub-Hookshot/', + "X-Hub-Signature": "sha1=76b55589eeb1d5609a01922fc9a52475cf746a5b", + "X-GitHub-Event": "ping", + "X-GitHub-Delivery": "72d3162f-cc78-11e3-81ab-4c9367dc0958", + "User-Agent": "GitHub-Hookshot/", } payload = json.dumps( { - 'hook': { - 'type': 'App', - 'id': 11, - 'active': True, - 'events': ['pull_request'], - 'app_id': 42, + "hook": { + "type": "App", + "id": 11, + "active": True, + "events": ["pull_request"], + "app_id": 42, } } ) payload = '{"hook": {"events": ["pull_request"], "app_id": 42, "id": 11, "active": true, "type": "App"}}' - secret = 'secret' - app_id = webhook.verify(headers, payload, secret, events=['*'], return_app_id=True) + secret = "secret" + app_id = webhook.verify( + headers, payload, secret, events=["*"], return_app_id=True + ) assert app_id == 42 def test_can_request_app_id_be_returned_on_non_ping_events(self): headers = { - 'X-Hub-Signature': 'sha1=25af6174a0fcecc4d346680a72b7ce644b9a88e8', - 'X-GitHub-Event': 'push', - 'X-GitHub-Delivery': '72d3162f-cc78-11e3-81ab-4c9367dc0958', - 'User-Agent': 'GitHub-Hookshot/', + "X-Hub-Signature": "sha1=25af6174a0fcecc4d346680a72b7ce644b9a88e8", + "X-GitHub-Event": "push", + "X-GitHub-Delivery": "72d3162f-cc78-11e3-81ab-4c9367dc0958", + "User-Agent": "GitHub-Hookshot/", } - payload = '' - secret = 'secret' - assert webhook.verify(headers, payload, secret, events=['*'], return_app_id=True) + payload = "" + secret = "secret" + assert webhook.verify( + headers, payload, secret, events=["*"], return_app_id=True + ) diff --git a/tox.ini b/tox.ini index 1a42bde..e58e2d4 100644 --- a/tox.ini +++ b/tox.ini @@ -49,11 +49,11 @@ deps = readme-renderer pygments isort - yapf<0.20 + black==19.30b skip_install = true commands = python setup.py check --strict --metadata --restructuredtext check-manifest {toxinidir} flake8 src tests setup.py isort --verbose --check-only --diff --recursive src tests setup.py - yapf src tests setup.py -p -r -d + black -t py36 --check --verbose src tests setup.py