Compare commits

...

11 Commits

Author SHA1 Message Date
d3344b8799 Add script to create Python virtual environment 2025-11-04 09:14:10 +01:00
05c9e5184c Implemented devops decorator and a get method that retrieves object properties by a key.
All checks were successful
/ unit-tests (push) Successful in 10s
2025-11-04 08:37:35 +01:00
9748230745 Added a new debug configuration for debugging
All checks were successful
/ unit-tests (push) Successful in 1m2s
the current file.
2025-11-04 07:55:34 +01:00
8c0a92a8b7 Remove unused import of DefaultAzureCredential from devops.py 2025-11-04 07:55:27 +01:00
31e1b88cd1 Updated authentication scenarios.
All checks were successful
/ unit-tests (push) Successful in 9s
2025-11-03 21:24:25 +01:00
678b6161cc Added PEM read/write functions.
All checks were successful
/ unit-tests (push) Successful in 11s
2025-11-03 20:42:42 +01:00
b7608dcdf8 Moved devops package to sk module.
All checks were successful
/ unit-tests (push) Successful in 9s
2025-11-03 14:52:20 +01:00
f797cd098d Added a prototype authentication function using client certificate.
All checks were successful
/ unit-tests (push) Successful in 11s
2025-11-03 14:49:52 +01:00
2f2cb1c337 Add minimal authentication package for Azure using client credentials
All checks were successful
/ unit-tests (push) Successful in 11s
2025-11-03 13:41:21 +01:00
ddab4df55f Add function to create self-signed certificates with PEM format 2025-11-03 13:37:34 +01:00
6e757dd0b8 Update .gitignore to include additional shell secrets and certificate files 2025-11-03 13:37:25 +01:00
8 changed files with 348 additions and 9 deletions

8
.gitignore vendored
View File

@@ -7,3 +7,11 @@ __pycache__/
# Ignore prototype scripts
prototype_*.py
# Shell secrets
*.secret
# Certificate files
*.pem
*.key
*.crt

7
.vscode/launch.json vendored
View File

@@ -10,6 +10,13 @@
"request": "launch",
"program": "${workspaceFolder}/harvester.py",
"console": "integratedTerminal"
},
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
}
]
}

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3
from devops import Organization
from sk.devops import Organization
org = Organization("https://dev.azure.com/mcovsandbox")
# print(org.projects["bafe0cf1-6c97-4088-864a-ea6dc02b2727"].repositories["feac266f-84d2-41bc-839b-736925a85eaa"].items["/generate-pat.py"])

8
make-venv.sh Executable file
View File

@@ -0,0 +1,8 @@
#! /usr/bin/env bash
python3 -m venv .venv
./.venv/bin/pip install --upgrade pip
./.venv/bin/pip install -r requirements.txt
echo "Add the following alias to your shell configuration file (e.g., .bashrc or .zshrc):"
echo "alias v_env='test -d $(pwd)/.venv && . $(pwd)/.venv/bin/activate'"

130
sk/azure.py Normal file
View File

@@ -0,0 +1,130 @@
"""
Minimal Authentication package for Azure.
Uses client credentials - a secret or a certificate.
"""
import os, requests
import re
import jwt, uuid, time
from cryptography.hazmat.primitives import serialization
from cryptography import x509
import hashlib
import base64
DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
def get_token(
tenant_id: str | None = None,
client_id: str | None = None,
client_secret: str | None = None,
pem_path: str | None = None
) -> str:
"""
Obtain a token for DevOps using DefaultAzureCredential.
"""
try:
if tenant_id and client_id and client_secret:
from azure.identity import ClientSecretCredential
return ClientSecretCredential(
tenant_id=tenant_id,
client_id=client_id,
client_secret=client_secret
).get_token(DEVOPS_SCOPE).token
elif tenant_id and client_id and pem_path:
from azure.identity import CertificateCredential
return CertificateCredential(
tenant_id=tenant_id,
client_id=client_id,
certificate_path=pem_path
).get_token(DEVOPS_SCOPE).token
else:
from azure.identity import DefaultAzureCredential
return DefaultAzureCredential().get_token(DEVOPS_SCOPE).token
except ImportError:
if tenant_id and client_id and client_secret:
return secret_credentials_auth(
tenant_id=tenant_id,
client_id=client_id,
client_secret=client_secret
)
elif tenant_id and client_id and pem_path:
return certificate_credentials_auth(
tenant_id=tenant_id,
client_id=client_id,
pem_path=pem_path
)
else:
raise ValueError("Either client_secret or pem_path must be provided, if no azure-identity package is installed.")
def secret_credentials_auth(
scope: str = DEVOPS_SCOPE,
tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
client_secret: str = os.environ.get("AZURE_CLIENT_SECRET")
) -> str:
"""
Authenticate using client credentials. Pass credentials via environment variables,
or directly as function parameters.
"""
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
r = requests.get(token_url, data={
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"scope": scope
})
r.raise_for_status()
return r.json().get("access_token", "")
def certificate_credentials_auth(
scope: str = DEVOPS_SCOPE,
tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
pem_path: str = os.environ.get("AZURE_CLIENT_CERTIFICATE_PATH", "")
) -> str:
"""
Authenticate using client credentials with a certificate.
Pass credentials via environment variables, or directly as function parameters.
"""
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
# Wczytaj klucz prywatny (RSA)
with open(pem_path, "rb") as f:
pem = f.read()
key_pem = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0)
cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0)
private_key = serialization.load_pem_private_key(key_pem, password=None)
cert = x509.load_pem_x509_certificate(cert_pem)
der = cert.public_bytes(serialization.Encoding.DER)
sha1 = hashlib.sha1(der).digest()
x5t = base64.urlsafe_b64encode(sha1).rstrip(b"=").decode("ascii")
# Stwórz client_assertion JWT
now = int(time.time())
claims = {
"iss": client_id,
"sub": client_id,
"aud": token_url,
"jti": str(uuid.uuid4()),
"iat": now,
"exp": now + 600,
}
headers = {"x5t": x5t, "kid": x5t}
assertion = jwt.encode(claims, private_key, algorithm="RS256", headers=headers)
data = {
"grant_type": "client_credentials",
"client_id": client_id,
"scope": scope,
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": assertion,
}
r = requests.post(token_url, data=data)
r.raise_for_status()
return r.json().get("access_token")

161
sk/certificates.py Normal file
View File

@@ -0,0 +1,161 @@
import datetime
from io import BufferedWriter
import re
import cryptography.x509 as x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes
import pathlib
def read_private_key(pem_path: str) -> PrivateKeyTypes:
"""
Read a PEM file and extract the private key in bytes.
"""
with open(pem_path, "rb") as f:
pem = f.read()
key_pem = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0)
key = serialization.load_pem_private_key(key_pem, password=None)
return key
def read_public_certificate(pem_path: str) -> any:
"""
Read a PEM file and extract the public certificate.
"""
with open(pem_path, "rb") as f:
pem = f.read()
cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0)
cert = x509.load_pem_x509_certificate(cert_pem)
return cert
def _write_pem_key(
writer: BufferedWriter,
key: PrivateKeyTypes,
encoding: serialization.Encoding = serialization.Encoding.PEM,
format: serialization.PrivateFormat = serialization.PrivateFormat.PKCS8,
password: str | None = None
):
"""
Write a PEM encoded private key to the given writer.
Allows optional password protection.
"""
if password:
encryption_algorithm = serialization.BestAvailableEncryption(password.encode())
else:
encryption_algorithm = serialization.NoEncryption()
key_pem = key.private_bytes(
encoding=encoding,
format=format,
encryption_algorithm=encryption_algorithm,
)
writer.write(key_pem)
def _write_pem_cert(
writer: BufferedWriter,
cert: x509.Certificate,
encoding: serialization.Encoding = serialization.Encoding.PEM
):
cert_pem = cert.public_bytes(encoding=encoding)
writer.write(cert_pem)
def write_pem_file(
pem_file: pathlib.Path,
cert: x509.Certificate | None = None,
key: PrivateKeyTypes | None = None,
encoding: serialization.Encoding = serialization.Encoding.PEM,
key_serialization_format: serialization.PrivateFormat = serialization.PrivateFormat.PKCS8,
password: str | None = None
):
"""
Write the certificate and/or private key to a PEM file.
"""
with open(pem_file, "wb") as f:
if cert:
_write_pem_cert(
f,
cert,
encoding=encoding
)
if key:
_write_pem_key(
f,
key,
encoding=encoding,
password=password,
format=key_serialization_format
)
def create_self_signed_certificate(
file_path: str,
subject_name: str,
organization_name: str,
country_name: str,
valid_days: int = 365,
key_size: int = 2048
):
"""
Create a self-signed certificate. It saves the certificate and private key
in PEM format to the specified file path. Three files are created:
- <file_path>.crt : The public certificate
- <file_path>.key : The private key
- <file_path>.pem : The certificate and private key combined in one file
:param file_path: Base file path to save the certificate and key.
:param subject_name: Common Name (CN) for the certificate.
:param organization_name: Organization Name (O) for the certificate.
:param country_name: Country Name (C) for the certificate.
:param valid_days: Number of days the certificate is valid for.
:param key_size: Size of the RSA key.
Use the following command to replace any credentials already defined for the
App Registration with that certificate:
az ad app credential reset --id <CLIENT_ID> --cert @<file_path>.crt
Use --append to add the certificate without removing existing credentials.
> Note: Do not upload the private key file (.key) nor the combined PEM file (.pem).
"""
key = rsa.generate_private_key(public_exponent=65537, key_size=key_size)
subject = issuer = x509.Name([
x509.oid.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, country_name),
x509.oid.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME, organization_name),
x509.oid.NameAttribute(x509.oid.NameOID.COMMON_NAME, subject_name),
])
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=1))
.not_valid_after(datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=valid_days))
.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True)
.sign(key, hashes.SHA256())
)
crt_path = pathlib.Path(file_path).with_suffix('.crt')
key_path = pathlib.Path(file_path).with_suffix('.key')
pem_path = pathlib.Path(file_path).with_suffix('.pem')
public_key = cert.public_bytes(serialization.Encoding.PEM)
private_key = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
# Write the certificate
write_pem_file(crt_path, cert=cert)
# Write the private key
write_pem_file(key_path, key=key)
# Write both to a combined PEM file
write_pem_file(pem_path, cert=cert, key=key)
print(f"Certificate created and saved.")
print(f" - Certificate: {crt_path}")
print(f" - Private Key: {key_path}")
print(f" - PEM File: {pem_path}")

View File

@@ -2,13 +2,13 @@ from __future__ import annotations
import requests
import urllib.parse
from uuid import UUID
from azure.identity import DefaultAzureCredential
from string import Template
DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
DEVOPS_API_VERSION = "7.1"
# Define a class decorator
def auto_properties(mapping: dict[str,str] | None = None):
def auto_properties(mapping: dict[str,str]):
def make_property(name: str):
private_var = f"_{name}"
@@ -54,6 +54,15 @@ def auto_properties(mapping: dict[str,str] | None = None):
return cls
return decorator
def devops(key: str, get_url: str, list_url: str = None, params: dict = {}):
def decorator(cls):
cls.__entity_key__ = key
cls.__entity_get_url__ = get_url # Use $key in the URL
cls.__entity_list_url__ = list_url # Use $key in the URL
cls.__entity_params__ = params
return cls
return decorator
class DevOps():
"""Base class for DevOps entities."""
@@ -78,7 +87,24 @@ class DevOps():
r.raise_for_status() # Ensure we raise an error for bad responses
return r
def _get_entity(self, key_name: str, get_url: str, params: dict = {}) -> object:
def _get(self, key: str):
if not hasattr(self.__class__, "__entity_key__") or not hasattr(self.__class__, "__entity_get_url__"):
raise NotImplementedError("Called _get on a class that has not been decorated with @devops.")
setattr(self, f"_{self.__class__.__entity_key__}", key) # Set the entity key
# Build the URL
url = Template(self.__class__.__entity_get_url__).substitute(key=key)
# Build parameters with key substituted
params = {}
if hasattr(self.__class__, "__entity_params__"):
params = {k: Template(v).substitute(key=key) for k, v in self.__class__.__entity_params__.items()}
# Fetch the object data from the URL
r = self._get_url_path(url, params=params)
# Populate attributes
self.from_json(r.json())
def _get_entity(self, key_name: str, get_url: str, params: dict = {}):
"""
Each entity class can use this method to populate its attributes, by defining
its own _get method that calls this one with the key name,
@@ -108,8 +134,6 @@ class DevOps():
class Organization(DevOps):
def __init__(self, org_url: str, token: str | None = None, api_version: str = DEVOPS_API_VERSION):
if token is None:
token = DefaultAzureCredential().get_token(DEVOPS_SCOPE).token
super().__init__(org_url, token, api_version)
@property
@@ -132,6 +156,7 @@ class Organization(DevOps):
"url": "url",
"description": "description"
})
@devops("id", "_apis/projects/$key", "_apis/projects")
class Project(DevOps):
def _get(self):
self._get_entity(

View File

@@ -1,12 +1,12 @@
#!/usr/bin/env python3
import unittest
import requests
from azure.identity import DefaultAzureCredential
from devops import DEVOPS_SCOPE, Organization, Repository, Project, Item
from sk.devops import Organization, Repository, Project, Item
from sk.azure import get_token
# Get the token outside the test class to speed up tests.
# Each Unit test instantinates the class, so doing it here avoids repeated authentication.
token = DefaultAzureCredential().get_token(DEVOPS_SCOPE).token
token = get_token()
class Test01(unittest.TestCase):
def setUp(self):