""" Minimal Authentication package for Azure. Uses client credentials - a secret or a certificate. """ import os, requests import re import jwt, uuid, time from cryptography.hazmat.primitives import serialization from cryptography import x509 import hashlib import base64 DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default" def get_token( tenant_id: str | None = None, client_id: str | None = None, client_secret: str | None = None, pem_path: str | None = None ) -> str: """ Obtain a token for DevOps using DefaultAzureCredential. """ try: if tenant_id and client_id and client_secret: from azure.identity import ClientSecretCredential return ClientSecretCredential( tenant_id=tenant_id, client_id=client_id, client_secret=client_secret ).get_token(DEVOPS_SCOPE).token elif tenant_id and client_id and pem_path: from azure.identity import CertificateCredential return CertificateCredential( tenant_id=tenant_id, client_id=client_id, certificate_path=pem_path ).get_token(DEVOPS_SCOPE).token else: from azure.identity import DefaultAzureCredential return DefaultAzureCredential().get_token(DEVOPS_SCOPE).token except ImportError: if tenant_id and client_id and client_secret: return secret_credentials_auth( tenant_id=tenant_id, client_id=client_id, client_secret=client_secret ) elif tenant_id and client_id and pem_path: return certificate_credentials_auth( tenant_id=tenant_id, client_id=client_id, pem_path=pem_path ) else: raise ValueError("Either client_secret or pem_path must be provided, if no azure-identity package is installed.") def secret_credentials_auth( scope: str = DEVOPS_SCOPE, tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""), client_id: str = os.environ.get("AZURE_CLIENT_ID", ""), client_secret: str = os.environ.get("AZURE_CLIENT_SECRET") ) -> str: """ Authenticate using client credentials. Pass credentials via environment variables, or directly as function parameters. """ token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token" r = requests.get(token_url, data={ "grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret, "scope": scope }) r.raise_for_status() return r.json().get("access_token", "") def certificate_credentials_auth( scope: str = DEVOPS_SCOPE, tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""), client_id: str = os.environ.get("AZURE_CLIENT_ID", ""), pem_path: str = os.environ.get("AZURE_CLIENT_CERTIFICATE_PATH", "") ) -> str: """ Authenticate using client credentials with a certificate. Pass credentials via environment variables, or directly as function parameters. """ token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token" # Wczytaj klucz prywatny (RSA) with open(pem_path, "rb") as f: pem = f.read() key_pem = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0) cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0) private_key = serialization.load_pem_private_key(key_pem, password=None) cert = x509.load_pem_x509_certificate(cert_pem) der = cert.public_bytes(serialization.Encoding.DER) sha1 = hashlib.sha1(der).digest() x5t = base64.urlsafe_b64encode(sha1).rstrip(b"=").decode("ascii") # Stwórz client_assertion JWT now = int(time.time()) claims = { "iss": client_id, "sub": client_id, "aud": token_url, "jti": str(uuid.uuid4()), "iat": now, "exp": now + 600, } headers = {"x5t": x5t, "kid": x5t} assertion = jwt.encode(claims, private_key, algorithm="RS256", headers=headers) data = { "grant_type": "client_credentials", "client_id": client_id, "scope": scope, "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", "client_assertion": assertion, } r = requests.post(token_url, data=data) r.raise_for_status() return r.json().get("access_token")