Files
docs-harvester/sk/azure.py
Slawomir Koszewski f797cd098d
All checks were successful
/ unit-tests (push) Successful in 11s
Added a prototype authentication function using client certificate.
2025-11-03 14:49:52 +01:00

86 lines
2.9 KiB
Python

"""
Minimal Authentication package for Azure.
Uses client credentials - a secret or a certificate.
"""
import os, requests
import re
import jwt, uuid, time
from cryptography.hazmat.primitives import serialization
from cryptography import x509
import hashlib
import base64
def secret_credentials_auth(
scope: str = "https://app.vssps.visualstudio.com/.default",
tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
client_secret: str = os.environ.get("AZURE_CLIENT_SECRET")
) -> str:
"""
Authenticate using client credentials. Pass credentials via environment variables,
or directly as function parameters.
"""
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
r = requests.get(token_url, data={
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"scope": scope
})
r.raise_for_status()
return r.json().get("access_token", "")
def certificate_credentials_auth(
scope: str = "https://app.vssps.visualstudio.com/.default",
tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
pem_path: str = os.environ.get("AZURE_CLIENT_CERTIFICATE_PATH", "")
) -> str:
"""
Authenticate using client credentials with a certificate.
Pass credentials via environment variables, or directly as function parameters.
"""
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
# Wczytaj klucz prywatny (RSA)
with open(pem_path, "rb") as f:
pem = f.read()
key_pem = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0)
cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0)
private_key = serialization.load_pem_private_key(key_pem, password=None)
cert = x509.load_pem_x509_certificate(cert_pem)
der = cert.public_bytes(serialization.Encoding.DER)
sha1 = hashlib.sha1(der).digest()
x5t = base64.urlsafe_b64encode(sha1).rstrip(b"=").decode("ascii")
# Stwórz client_assertion JWT
now = int(time.time())
claims = {
"iss": client_id,
"sub": client_id,
"aud": token_url,
"jti": str(uuid.uuid4()),
"iat": now,
"exp": now + 600,
}
headers = {"x5t": x5t, "kid": x5t}
assertion = jwt.encode(claims, private_key, algorithm="RS256", headers=headers)
data = {
"grant_type": "client_credentials",
"client_id": client_id,
"scope": scope,
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": assertion,
}
r = requests.post(token_url, data=data)
r.raise_for_status()
return r.json().get("access_token")