Compare commits

...

8 Commits

Author SHA1 Message Date
d3344b8799 Add script to create Python virtual environment 2025-11-04 09:14:10 +01:00
05c9e5184c Implemented devops decorator and a get method that retrieves object properties by a key.
All checks were successful
/ unit-tests (push) Successful in 10s
2025-11-04 08:37:35 +01:00
9748230745 Added a new debug configuration for debugging
All checks were successful
/ unit-tests (push) Successful in 1m2s
the current file.
2025-11-04 07:55:34 +01:00
8c0a92a8b7 Remove unused import of DefaultAzureCredential from devops.py 2025-11-04 07:55:27 +01:00
31e1b88cd1 Updated authentication scenarios.
All checks were successful
/ unit-tests (push) Successful in 9s
2025-11-03 21:24:25 +01:00
678b6161cc Added PEM read/write functions.
All checks were successful
/ unit-tests (push) Successful in 11s
2025-11-03 20:42:42 +01:00
b7608dcdf8 Moved devops package to sk module.
All checks were successful
/ unit-tests (push) Successful in 9s
2025-11-03 14:52:20 +01:00
f797cd098d Added a prototype authentication function using client certificate.
All checks were successful
/ unit-tests (push) Successful in 11s
2025-11-03 14:49:52 +01:00
7 changed files with 249 additions and 32 deletions

7
.vscode/launch.json vendored
View File

@@ -10,6 +10,13 @@
"request": "launch", "request": "launch",
"program": "${workspaceFolder}/harvester.py", "program": "${workspaceFolder}/harvester.py",
"console": "integratedTerminal" "console": "integratedTerminal"
},
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
} }
] ]
} }

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from devops import Organization from sk.devops import Organization
org = Organization("https://dev.azure.com/mcovsandbox") org = Organization("https://dev.azure.com/mcovsandbox")
# print(org.projects["bafe0cf1-6c97-4088-864a-ea6dc02b2727"].repositories["feac266f-84d2-41bc-839b-736925a85eaa"].items["/generate-pat.py"]) # print(org.projects["bafe0cf1-6c97-4088-864a-ea6dc02b2727"].repositories["feac266f-84d2-41bc-839b-736925a85eaa"].items["/generate-pat.py"])

8
make-venv.sh Executable file
View File

@@ -0,0 +1,8 @@
#! /usr/bin/env bash
python3 -m venv .venv
./.venv/bin/pip install --upgrade pip
./.venv/bin/pip install -r requirements.txt
echo "Add the following alias to your shell configuration file (e.g., .bashrc or .zshrc):"
echo "alias v_env='test -d $(pwd)/.venv && . $(pwd)/.venv/bin/activate'"

View File

@@ -4,11 +4,61 @@ Minimal Authentication package for Azure.
Uses client credentials - a secret or a certificate. Uses client credentials - a secret or a certificate.
""" """
import os import os, requests
import requests import re
import jwt, uuid, time
from cryptography.hazmat.primitives import serialization
from cryptography import x509
import hashlib
import base64
DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
def get_token(
tenant_id: str | None = None,
client_id: str | None = None,
client_secret: str | None = None,
pem_path: str | None = None
) -> str:
"""
Obtain a token for DevOps using DefaultAzureCredential.
"""
try:
if tenant_id and client_id and client_secret:
from azure.identity import ClientSecretCredential
return ClientSecretCredential(
tenant_id=tenant_id,
client_id=client_id,
client_secret=client_secret
).get_token(DEVOPS_SCOPE).token
elif tenant_id and client_id and pem_path:
from azure.identity import CertificateCredential
return CertificateCredential(
tenant_id=tenant_id,
client_id=client_id,
certificate_path=pem_path
).get_token(DEVOPS_SCOPE).token
else:
from azure.identity import DefaultAzureCredential
return DefaultAzureCredential().get_token(DEVOPS_SCOPE).token
except ImportError:
if tenant_id and client_id and client_secret:
return secret_credentials_auth(
tenant_id=tenant_id,
client_id=client_id,
client_secret=client_secret
)
elif tenant_id and client_id and pem_path:
return certificate_credentials_auth(
tenant_id=tenant_id,
client_id=client_id,
pem_path=pem_path
)
else:
raise ValueError("Either client_secret or pem_path must be provided, if no azure-identity package is installed.")
def secret_credentials_auth( def secret_credentials_auth(
scope: str = "https://app.vssps.visualstudio.com/.default", scope: str = DEVOPS_SCOPE,
tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""), tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
client_id: str = os.environ.get("AZURE_CLIENT_ID", ""), client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
client_secret: str = os.environ.get("AZURE_CLIENT_SECRET") client_secret: str = os.environ.get("AZURE_CLIENT_SECRET")
@@ -26,3 +76,55 @@ def secret_credentials_auth(
}) })
r.raise_for_status() r.raise_for_status()
return r.json().get("access_token", "") return r.json().get("access_token", "")
def certificate_credentials_auth(
scope: str = DEVOPS_SCOPE,
tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
pem_path: str = os.environ.get("AZURE_CLIENT_CERTIFICATE_PATH", "")
) -> str:
"""
Authenticate using client credentials with a certificate.
Pass credentials via environment variables, or directly as function parameters.
"""
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
# Wczytaj klucz prywatny (RSA)
with open(pem_path, "rb") as f:
pem = f.read()
key_pem = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0)
cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0)
private_key = serialization.load_pem_private_key(key_pem, password=None)
cert = x509.load_pem_x509_certificate(cert_pem)
der = cert.public_bytes(serialization.Encoding.DER)
sha1 = hashlib.sha1(der).digest()
x5t = base64.urlsafe_b64encode(sha1).rstrip(b"=").decode("ascii")
# Stwórz client_assertion JWT
now = int(time.time())
claims = {
"iss": client_id,
"sub": client_id,
"aud": token_url,
"jti": str(uuid.uuid4()),
"iat": now,
"exp": now + 600,
}
headers = {"x5t": x5t, "kid": x5t}
assertion = jwt.encode(claims, private_key, algorithm="RS256", headers=headers)
data = {
"grant_type": "client_credentials",
"client_id": client_id,
"scope": scope,
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": assertion,
}
r = requests.post(token_url, data=data)
r.raise_for_status()
return r.json().get("access_token")

View File

@@ -1,10 +1,90 @@
import datetime import datetime
from cryptography.x509 import Name, NameAttribute, CertificateBuilder, BasicConstraints, random_serial_number from io import BufferedWriter
from cryptography.x509.oid import NameOID import re
import cryptography.x509 as x509
from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes
import pathlib import pathlib
def read_private_key(pem_path: str) -> PrivateKeyTypes:
"""
Read a PEM file and extract the private key in bytes.
"""
with open(pem_path, "rb") as f:
pem = f.read()
key_pem = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0)
key = serialization.load_pem_private_key(key_pem, password=None)
return key
def read_public_certificate(pem_path: str) -> any:
"""
Read a PEM file and extract the public certificate.
"""
with open(pem_path, "rb") as f:
pem = f.read()
cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0)
cert = x509.load_pem_x509_certificate(cert_pem)
return cert
def _write_pem_key(
writer: BufferedWriter,
key: PrivateKeyTypes,
encoding: serialization.Encoding = serialization.Encoding.PEM,
format: serialization.PrivateFormat = serialization.PrivateFormat.PKCS8,
password: str | None = None
):
"""
Write a PEM encoded private key to the given writer.
Allows optional password protection.
"""
if password:
encryption_algorithm = serialization.BestAvailableEncryption(password.encode())
else:
encryption_algorithm = serialization.NoEncryption()
key_pem = key.private_bytes(
encoding=encoding,
format=format,
encryption_algorithm=encryption_algorithm,
)
writer.write(key_pem)
def _write_pem_cert(
writer: BufferedWriter,
cert: x509.Certificate,
encoding: serialization.Encoding = serialization.Encoding.PEM
):
cert_pem = cert.public_bytes(encoding=encoding)
writer.write(cert_pem)
def write_pem_file(
pem_file: pathlib.Path,
cert: x509.Certificate | None = None,
key: PrivateKeyTypes | None = None,
encoding: serialization.Encoding = serialization.Encoding.PEM,
key_serialization_format: serialization.PrivateFormat = serialization.PrivateFormat.PKCS8,
password: str | None = None
):
"""
Write the certificate and/or private key to a PEM file.
"""
with open(pem_file, "wb") as f:
if cert:
_write_pem_cert(
f,
cert,
encoding=encoding
)
if key:
_write_pem_key(
f,
key,
encoding=encoding,
password=password,
format=key_serialization_format
)
def create_self_signed_certificate( def create_self_signed_certificate(
file_path: str, file_path: str,
subject_name: str, subject_name: str,
@@ -39,21 +119,21 @@ def create_self_signed_certificate(
""" """
key = rsa.generate_private_key(public_exponent=65537, key_size=key_size) key = rsa.generate_private_key(public_exponent=65537, key_size=key_size)
subject = issuer = Name([ subject = issuer = x509.Name([
NameAttribute(NameOID.COUNTRY_NAME, country_name), x509.oid.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, country_name),
NameAttribute(NameOID.ORGANIZATION_NAME, organization_name), x509.oid.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME, organization_name),
NameAttribute(NameOID.COMMON_NAME, subject_name), x509.oid.NameAttribute(x509.oid.NameOID.COMMON_NAME, subject_name),
]) ])
cert = ( cert = (
CertificateBuilder() x509.CertificateBuilder()
.subject_name(subject) .subject_name(subject)
.issuer_name(issuer) .issuer_name(issuer)
.public_key(key.public_key()) .public_key(key.public_key())
.serial_number(random_serial_number()) .serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=1)) .not_valid_before(datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=1))
.not_valid_after(datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=valid_days)) .not_valid_after(datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=valid_days))
.add_extension(BasicConstraints(ca=True, path_length=None), critical=True) .add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True)
.sign(key, hashes.SHA256()) .sign(key, hashes.SHA256())
) )
@@ -69,16 +149,11 @@ def create_self_signed_certificate(
) )
# Write the certificate # Write the certificate
with open(crt_path, "wb") as f: write_pem_file(crt_path, cert=cert)
f.write(public_key)
# Write the private key # Write the private key
with open(key_path, "wb") as f: write_pem_file(key_path, key=key)
f.write(private_key) # Write both to a combined PEM file
write_pem_file(pem_path, cert=cert, key=key)
with open(pem_path, "wb") as f:
f.write(public_key)
f.write(private_key)
print(f"Certificate created and saved.") print(f"Certificate created and saved.")
print(f" - Certificate: {crt_path}") print(f" - Certificate: {crt_path}")

View File

@@ -2,13 +2,13 @@ from __future__ import annotations
import requests import requests
import urllib.parse import urllib.parse
from uuid import UUID from uuid import UUID
from azure.identity import DefaultAzureCredential from string import Template
DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default" DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
DEVOPS_API_VERSION = "7.1" DEVOPS_API_VERSION = "7.1"
# Define a class decorator # Define a class decorator
def auto_properties(mapping: dict[str,str] | None = None): def auto_properties(mapping: dict[str,str]):
def make_property(name: str): def make_property(name: str):
private_var = f"_{name}" private_var = f"_{name}"
@@ -54,6 +54,15 @@ def auto_properties(mapping: dict[str,str] | None = None):
return cls return cls
return decorator return decorator
def devops(key: str, get_url: str, list_url: str = None, params: dict = {}):
def decorator(cls):
cls.__entity_key__ = key
cls.__entity_get_url__ = get_url # Use $key in the URL
cls.__entity_list_url__ = list_url # Use $key in the URL
cls.__entity_params__ = params
return cls
return decorator
class DevOps(): class DevOps():
"""Base class for DevOps entities.""" """Base class for DevOps entities."""
@@ -78,7 +87,24 @@ class DevOps():
r.raise_for_status() # Ensure we raise an error for bad responses r.raise_for_status() # Ensure we raise an error for bad responses
return r return r
def _get_entity(self, key_name: str, get_url: str, params: dict = {}) -> object: def _get(self, key: str):
if not hasattr(self.__class__, "__entity_key__") or not hasattr(self.__class__, "__entity_get_url__"):
raise NotImplementedError("Called _get on a class that has not been decorated with @devops.")
setattr(self, f"_{self.__class__.__entity_key__}", key) # Set the entity key
# Build the URL
url = Template(self.__class__.__entity_get_url__).substitute(key=key)
# Build parameters with key substituted
params = {}
if hasattr(self.__class__, "__entity_params__"):
params = {k: Template(v).substitute(key=key) for k, v in self.__class__.__entity_params__.items()}
# Fetch the object data from the URL
r = self._get_url_path(url, params=params)
# Populate attributes
self.from_json(r.json())
def _get_entity(self, key_name: str, get_url: str, params: dict = {}):
""" """
Each entity class can use this method to populate its attributes, by defining Each entity class can use this method to populate its attributes, by defining
its own _get method that calls this one with the key name, its own _get method that calls this one with the key name,
@@ -108,8 +134,6 @@ class DevOps():
class Organization(DevOps): class Organization(DevOps):
def __init__(self, org_url: str, token: str | None = None, api_version: str = DEVOPS_API_VERSION): def __init__(self, org_url: str, token: str | None = None, api_version: str = DEVOPS_API_VERSION):
if token is None:
token = DefaultAzureCredential().get_token(DEVOPS_SCOPE).token
super().__init__(org_url, token, api_version) super().__init__(org_url, token, api_version)
@property @property
@@ -132,6 +156,7 @@ class Organization(DevOps):
"url": "url", "url": "url",
"description": "description" "description": "description"
}) })
@devops("id", "_apis/projects/$key", "_apis/projects")
class Project(DevOps): class Project(DevOps):
def _get(self): def _get(self):
self._get_entity( self._get_entity(

View File

@@ -1,12 +1,12 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import unittest import unittest
import requests import requests
from azure.identity import DefaultAzureCredential from sk.devops import Organization, Repository, Project, Item
from devops import DEVOPS_SCOPE, Organization, Repository, Project, Item from sk.azure import get_token
# Get the token outside the test class to speed up tests. # Get the token outside the test class to speed up tests.
# Each Unit test instantinates the class, so doing it here avoids repeated authentication. # Each Unit test instantinates the class, so doing it here avoids repeated authentication.
token = DefaultAzureCredential().get_token(DEVOPS_SCOPE).token token = get_token()
class Test01(unittest.TestCase): class Test01(unittest.TestCase):
def setUp(self): def setUp(self):