Moved devops related code to a separate package.
This commit is contained in:
130
devops/azure.py
Normal file
130
devops/azure.py
Normal file
@@ -0,0 +1,130 @@
|
||||
"""
|
||||
Minimal Authentication package for Azure.
|
||||
|
||||
Uses client credentials - a secret or a certificate.
|
||||
"""
|
||||
|
||||
import os, requests
|
||||
import re
|
||||
import jwt, uuid, time
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography import x509
|
||||
import hashlib
|
||||
import base64
|
||||
|
||||
DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
|
||||
|
||||
def get_token(
|
||||
tenant_id: str | None = None,
|
||||
client_id: str | None = None,
|
||||
client_secret: str | None = None,
|
||||
pem_path: str | None = None
|
||||
) -> str:
|
||||
"""
|
||||
Obtain a token for DevOps using DefaultAzureCredential.
|
||||
"""
|
||||
try:
|
||||
if tenant_id and client_id and client_secret:
|
||||
from azure.identity import ClientSecretCredential
|
||||
return ClientSecretCredential(
|
||||
tenant_id=tenant_id,
|
||||
client_id=client_id,
|
||||
client_secret=client_secret
|
||||
).get_token(DEVOPS_SCOPE).token
|
||||
elif tenant_id and client_id and pem_path:
|
||||
from azure.identity import CertificateCredential
|
||||
return CertificateCredential(
|
||||
tenant_id=tenant_id,
|
||||
client_id=client_id,
|
||||
certificate_path=pem_path
|
||||
).get_token(DEVOPS_SCOPE).token
|
||||
else:
|
||||
from azure.identity import DefaultAzureCredential
|
||||
return DefaultAzureCredential().get_token(DEVOPS_SCOPE).token
|
||||
except ImportError:
|
||||
if tenant_id and client_id and client_secret:
|
||||
return secret_credentials_auth(
|
||||
tenant_id=tenant_id,
|
||||
client_id=client_id,
|
||||
client_secret=client_secret
|
||||
)
|
||||
elif tenant_id and client_id and pem_path:
|
||||
return certificate_credentials_auth(
|
||||
tenant_id=tenant_id,
|
||||
client_id=client_id,
|
||||
pem_path=pem_path
|
||||
)
|
||||
else:
|
||||
raise ValueError("Either client_secret or pem_path must be provided, if no azure-identity package is installed.")
|
||||
|
||||
def secret_credentials_auth(
|
||||
scope: str = DEVOPS_SCOPE,
|
||||
tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
|
||||
client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
|
||||
client_secret: str = os.environ.get("AZURE_CLIENT_SECRET")
|
||||
) -> str:
|
||||
"""
|
||||
Authenticate using client credentials. Pass credentials via environment variables,
|
||||
or directly as function parameters.
|
||||
"""
|
||||
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
|
||||
r = requests.get(token_url, data={
|
||||
"grant_type": "client_credentials",
|
||||
"client_id": client_id,
|
||||
"client_secret": client_secret,
|
||||
"scope": scope
|
||||
})
|
||||
r.raise_for_status()
|
||||
return r.json().get("access_token", "")
|
||||
|
||||
def certificate_credentials_auth(
|
||||
scope: str = DEVOPS_SCOPE,
|
||||
tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
|
||||
client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
|
||||
pem_path: str = os.environ.get("AZURE_CLIENT_CERTIFICATE_PATH", "")
|
||||
) -> str:
|
||||
"""
|
||||
Authenticate using client credentials with a certificate.
|
||||
Pass credentials via environment variables, or directly as function parameters.
|
||||
"""
|
||||
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
|
||||
|
||||
# Wczytaj klucz prywatny (RSA)
|
||||
with open(pem_path, "rb") as f:
|
||||
pem = f.read()
|
||||
key_pem = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0)
|
||||
cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0)
|
||||
|
||||
private_key = serialization.load_pem_private_key(key_pem, password=None)
|
||||
cert = x509.load_pem_x509_certificate(cert_pem)
|
||||
|
||||
der = cert.public_bytes(serialization.Encoding.DER)
|
||||
sha1 = hashlib.sha1(der).digest()
|
||||
x5t = base64.urlsafe_b64encode(sha1).rstrip(b"=").decode("ascii")
|
||||
|
||||
# Stwórz client_assertion JWT
|
||||
now = int(time.time())
|
||||
claims = {
|
||||
"iss": client_id,
|
||||
"sub": client_id,
|
||||
"aud": token_url,
|
||||
"jti": str(uuid.uuid4()),
|
||||
"iat": now,
|
||||
"exp": now + 600,
|
||||
}
|
||||
|
||||
headers = {"x5t": x5t, "kid": x5t}
|
||||
|
||||
assertion = jwt.encode(claims, private_key, algorithm="RS256", headers=headers)
|
||||
|
||||
data = {
|
||||
"grant_type": "client_credentials",
|
||||
"client_id": client_id,
|
||||
"scope": scope,
|
||||
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
|
||||
"client_assertion": assertion,
|
||||
}
|
||||
|
||||
r = requests.post(token_url, data=data)
|
||||
r.raise_for_status()
|
||||
return r.json().get("access_token")
|
||||
Reference in New Issue
Block a user