131 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			131 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""
 | 
						|
Minimal Authentication package for Azure.
 | 
						|
 | 
						|
Uses client credentials - a secret or a certificate.
 | 
						|
"""
 | 
						|
 | 
						|
import os, requests
 | 
						|
import re
 | 
						|
import jwt, uuid, time
 | 
						|
from cryptography.hazmat.primitives import serialization
 | 
						|
from cryptography import x509
 | 
						|
import hashlib
 | 
						|
import base64
 | 
						|
 | 
						|
DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
 | 
						|
 | 
						|
def get_token(
 | 
						|
        tenant_id: str | None = None,
 | 
						|
        client_id: str | None = None,
 | 
						|
        client_secret: str | None = None,
 | 
						|
        pem_path: str | None = None
 | 
						|
    ) -> str:
 | 
						|
    """
 | 
						|
    Obtain a token for DevOps using DefaultAzureCredential.
 | 
						|
    """
 | 
						|
    try:
 | 
						|
        if tenant_id and client_id and client_secret:
 | 
						|
            from azure.identity import ClientSecretCredential
 | 
						|
            return ClientSecretCredential(
 | 
						|
                tenant_id=tenant_id,
 | 
						|
                client_id=client_id,
 | 
						|
                client_secret=client_secret
 | 
						|
            ).get_token(DEVOPS_SCOPE).token
 | 
						|
        elif tenant_id and client_id and pem_path:
 | 
						|
            from azure.identity import CertificateCredential
 | 
						|
            return CertificateCredential(
 | 
						|
                tenant_id=tenant_id,
 | 
						|
                client_id=client_id,
 | 
						|
                certificate_path=pem_path
 | 
						|
            ).get_token(DEVOPS_SCOPE).token
 | 
						|
        else:
 | 
						|
            from azure.identity import DefaultAzureCredential
 | 
						|
            return DefaultAzureCredential().get_token(DEVOPS_SCOPE).token
 | 
						|
    except ImportError:
 | 
						|
        if tenant_id and client_id and client_secret:
 | 
						|
            return secret_credentials_auth(
 | 
						|
                tenant_id=tenant_id,
 | 
						|
                client_id=client_id,
 | 
						|
                client_secret=client_secret
 | 
						|
            )
 | 
						|
        elif tenant_id and client_id and pem_path:
 | 
						|
            return certificate_credentials_auth(
 | 
						|
                tenant_id=tenant_id,
 | 
						|
                client_id=client_id,
 | 
						|
                pem_path=pem_path
 | 
						|
            )
 | 
						|
        else:
 | 
						|
            raise ValueError("Either client_secret or pem_path must be provided, if no azure-identity package is installed.")
 | 
						|
 | 
						|
def secret_credentials_auth(
 | 
						|
        scope: str = DEVOPS_SCOPE,
 | 
						|
        tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
 | 
						|
        client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
 | 
						|
        client_secret: str = os.environ.get("AZURE_CLIENT_SECRET")
 | 
						|
        ) -> str:
 | 
						|
    """
 | 
						|
    Authenticate using client credentials. Pass credentials via environment variables,
 | 
						|
    or directly as function parameters.
 | 
						|
    """
 | 
						|
    token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
 | 
						|
    r = requests.get(token_url, data={
 | 
						|
        "grant_type": "client_credentials",
 | 
						|
        "client_id": client_id,
 | 
						|
        "client_secret": client_secret,
 | 
						|
        "scope": scope
 | 
						|
    })
 | 
						|
    r.raise_for_status()
 | 
						|
    return r.json().get("access_token", "")
 | 
						|
 | 
						|
def certificate_credentials_auth(
 | 
						|
        scope: str = DEVOPS_SCOPE,
 | 
						|
        tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
 | 
						|
        client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
 | 
						|
        pem_path: str = os.environ.get("AZURE_CLIENT_CERTIFICATE_PATH", "")
 | 
						|
        ) -> str:
 | 
						|
    """
 | 
						|
    Authenticate using client credentials with a certificate.
 | 
						|
    Pass credentials via environment variables, or directly as function parameters.
 | 
						|
    """
 | 
						|
    token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
 | 
						|
 | 
						|
    # Wczytaj klucz prywatny (RSA)
 | 
						|
    with open(pem_path, "rb") as f:
 | 
						|
        pem = f.read()
 | 
						|
        key_pem  = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0)
 | 
						|
        cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0)
 | 
						|
 | 
						|
        private_key = serialization.load_pem_private_key(key_pem, password=None)
 | 
						|
        cert = x509.load_pem_x509_certificate(cert_pem)
 | 
						|
 | 
						|
        der = cert.public_bytes(serialization.Encoding.DER)
 | 
						|
        sha1 = hashlib.sha1(der).digest()
 | 
						|
        x5t  = base64.urlsafe_b64encode(sha1).rstrip(b"=").decode("ascii")
 | 
						|
 | 
						|
    # Stwórz client_assertion JWT
 | 
						|
    now = int(time.time())
 | 
						|
    claims = {
 | 
						|
        "iss": client_id,
 | 
						|
        "sub": client_id,
 | 
						|
        "aud": token_url,
 | 
						|
        "jti": str(uuid.uuid4()),
 | 
						|
        "iat": now,
 | 
						|
        "exp": now + 600,
 | 
						|
    }
 | 
						|
 | 
						|
    headers = {"x5t": x5t, "kid": x5t}
 | 
						|
 | 
						|
    assertion = jwt.encode(claims, private_key, algorithm="RS256", headers=headers)
 | 
						|
 | 
						|
    data = {
 | 
						|
        "grant_type": "client_credentials",
 | 
						|
        "client_id": client_id,
 | 
						|
        "scope": scope,
 | 
						|
        "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
 | 
						|
        "client_assertion": assertion,
 | 
						|
    }
 | 
						|
 | 
						|
    r = requests.post(token_url, data=data)
 | 
						|
    r.raise_for_status()
 | 
						|
    return r.json().get("access_token")
 |