diff --git a/sk/azure.py b/sk/azure.py index 056db70..b57ee43 100644 --- a/sk/azure.py +++ b/sk/azure.py @@ -4,8 +4,13 @@ Minimal Authentication package for Azure. Uses client credentials - a secret or a certificate. """ -import os -import requests +import os, requests +import re +import jwt, uuid, time +from cryptography.hazmat.primitives import serialization +from cryptography import x509 +import hashlib +import base64 def secret_credentials_auth( scope: str = "https://app.vssps.visualstudio.com/.default", @@ -26,3 +31,55 @@ def secret_credentials_auth( }) r.raise_for_status() return r.json().get("access_token", "") + +def certificate_credentials_auth( + scope: str = "https://app.vssps.visualstudio.com/.default", + tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""), + client_id: str = os.environ.get("AZURE_CLIENT_ID", ""), + pem_path: str = os.environ.get("AZURE_CLIENT_CERTIFICATE_PATH", "") + ) -> str: + """ + Authenticate using client credentials with a certificate. + Pass credentials via environment variables, or directly as function parameters. + """ + token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token" + + # Wczytaj klucz prywatny (RSA) + with open(pem_path, "rb") as f: + pem = f.read() + key_pem = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0) + cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0) + + private_key = serialization.load_pem_private_key(key_pem, password=None) + cert = x509.load_pem_x509_certificate(cert_pem) + + der = cert.public_bytes(serialization.Encoding.DER) + sha1 = hashlib.sha1(der).digest() + x5t = base64.urlsafe_b64encode(sha1).rstrip(b"=").decode("ascii") + + # Stwórz client_assertion JWT + now = int(time.time()) + claims = { + "iss": client_id, + "sub": client_id, + "aud": token_url, + "jti": str(uuid.uuid4()), + "iat": now, + "exp": now + 600, + } + + headers = {"x5t": x5t, "kid": x5t} + + assertion = jwt.encode(claims, private_key, algorithm="RS256", headers=headers) + + data = { + "grant_type": "client_credentials", + "client_id": client_id, + "scope": scope, + "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", + "client_assertion": assertion, + } + + r = requests.post(token_url, data=data) + r.raise_for_status() + return r.json().get("access_token")