Compare commits
8 Commits
2f2cb1c337
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| d3344b8799 | |||
| 05c9e5184c | |||
| 9748230745 | |||
| 8c0a92a8b7 | |||
| 31e1b88cd1 | |||
| 678b6161cc | |||
| b7608dcdf8 | |||
| f797cd098d |
7
.vscode/launch.json
vendored
7
.vscode/launch.json
vendored
@@ -10,6 +10,13 @@
|
|||||||
"request": "launch",
|
"request": "launch",
|
||||||
"program": "${workspaceFolder}/harvester.py",
|
"program": "${workspaceFolder}/harvester.py",
|
||||||
"console": "integratedTerminal"
|
"console": "integratedTerminal"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Python Debugger: Current File",
|
||||||
|
"type": "debugpy",
|
||||||
|
"request": "launch",
|
||||||
|
"program": "${file}",
|
||||||
|
"console": "integratedTerminal"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
from devops import Organization
|
from sk.devops import Organization
|
||||||
org = Organization("https://dev.azure.com/mcovsandbox")
|
org = Organization("https://dev.azure.com/mcovsandbox")
|
||||||
|
|
||||||
# print(org.projects["bafe0cf1-6c97-4088-864a-ea6dc02b2727"].repositories["feac266f-84d2-41bc-839b-736925a85eaa"].items["/generate-pat.py"])
|
# print(org.projects["bafe0cf1-6c97-4088-864a-ea6dc02b2727"].repositories["feac266f-84d2-41bc-839b-736925a85eaa"].items["/generate-pat.py"])
|
||||||
|
|||||||
8
make-venv.sh
Executable file
8
make-venv.sh
Executable file
@@ -0,0 +1,8 @@
|
|||||||
|
#! /usr/bin/env bash
|
||||||
|
|
||||||
|
python3 -m venv .venv
|
||||||
|
./.venv/bin/pip install --upgrade pip
|
||||||
|
./.venv/bin/pip install -r requirements.txt
|
||||||
|
|
||||||
|
echo "Add the following alias to your shell configuration file (e.g., .bashrc or .zshrc):"
|
||||||
|
echo "alias v_env='test -d $(pwd)/.venv && . $(pwd)/.venv/bin/activate'"
|
||||||
108
sk/azure.py
108
sk/azure.py
@@ -4,11 +4,61 @@ Minimal Authentication package for Azure.
|
|||||||
Uses client credentials - a secret or a certificate.
|
Uses client credentials - a secret or a certificate.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import os
|
import os, requests
|
||||||
import requests
|
import re
|
||||||
|
import jwt, uuid, time
|
||||||
|
from cryptography.hazmat.primitives import serialization
|
||||||
|
from cryptography import x509
|
||||||
|
import hashlib
|
||||||
|
import base64
|
||||||
|
|
||||||
|
DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
|
||||||
|
|
||||||
|
def get_token(
|
||||||
|
tenant_id: str | None = None,
|
||||||
|
client_id: str | None = None,
|
||||||
|
client_secret: str | None = None,
|
||||||
|
pem_path: str | None = None
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
Obtain a token for DevOps using DefaultAzureCredential.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if tenant_id and client_id and client_secret:
|
||||||
|
from azure.identity import ClientSecretCredential
|
||||||
|
return ClientSecretCredential(
|
||||||
|
tenant_id=tenant_id,
|
||||||
|
client_id=client_id,
|
||||||
|
client_secret=client_secret
|
||||||
|
).get_token(DEVOPS_SCOPE).token
|
||||||
|
elif tenant_id and client_id and pem_path:
|
||||||
|
from azure.identity import CertificateCredential
|
||||||
|
return CertificateCredential(
|
||||||
|
tenant_id=tenant_id,
|
||||||
|
client_id=client_id,
|
||||||
|
certificate_path=pem_path
|
||||||
|
).get_token(DEVOPS_SCOPE).token
|
||||||
|
else:
|
||||||
|
from azure.identity import DefaultAzureCredential
|
||||||
|
return DefaultAzureCredential().get_token(DEVOPS_SCOPE).token
|
||||||
|
except ImportError:
|
||||||
|
if tenant_id and client_id and client_secret:
|
||||||
|
return secret_credentials_auth(
|
||||||
|
tenant_id=tenant_id,
|
||||||
|
client_id=client_id,
|
||||||
|
client_secret=client_secret
|
||||||
|
)
|
||||||
|
elif tenant_id and client_id and pem_path:
|
||||||
|
return certificate_credentials_auth(
|
||||||
|
tenant_id=tenant_id,
|
||||||
|
client_id=client_id,
|
||||||
|
pem_path=pem_path
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise ValueError("Either client_secret or pem_path must be provided, if no azure-identity package is installed.")
|
||||||
|
|
||||||
def secret_credentials_auth(
|
def secret_credentials_auth(
|
||||||
scope: str = "https://app.vssps.visualstudio.com/.default",
|
scope: str = DEVOPS_SCOPE,
|
||||||
tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
|
tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
|
||||||
client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
|
client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
|
||||||
client_secret: str = os.environ.get("AZURE_CLIENT_SECRET")
|
client_secret: str = os.environ.get("AZURE_CLIENT_SECRET")
|
||||||
@@ -26,3 +76,55 @@ def secret_credentials_auth(
|
|||||||
})
|
})
|
||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
return r.json().get("access_token", "")
|
return r.json().get("access_token", "")
|
||||||
|
|
||||||
|
def certificate_credentials_auth(
|
||||||
|
scope: str = DEVOPS_SCOPE,
|
||||||
|
tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
|
||||||
|
client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
|
||||||
|
pem_path: str = os.environ.get("AZURE_CLIENT_CERTIFICATE_PATH", "")
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
Authenticate using client credentials with a certificate.
|
||||||
|
Pass credentials via environment variables, or directly as function parameters.
|
||||||
|
"""
|
||||||
|
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
|
||||||
|
|
||||||
|
# Wczytaj klucz prywatny (RSA)
|
||||||
|
with open(pem_path, "rb") as f:
|
||||||
|
pem = f.read()
|
||||||
|
key_pem = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0)
|
||||||
|
cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0)
|
||||||
|
|
||||||
|
private_key = serialization.load_pem_private_key(key_pem, password=None)
|
||||||
|
cert = x509.load_pem_x509_certificate(cert_pem)
|
||||||
|
|
||||||
|
der = cert.public_bytes(serialization.Encoding.DER)
|
||||||
|
sha1 = hashlib.sha1(der).digest()
|
||||||
|
x5t = base64.urlsafe_b64encode(sha1).rstrip(b"=").decode("ascii")
|
||||||
|
|
||||||
|
# Stwórz client_assertion JWT
|
||||||
|
now = int(time.time())
|
||||||
|
claims = {
|
||||||
|
"iss": client_id,
|
||||||
|
"sub": client_id,
|
||||||
|
"aud": token_url,
|
||||||
|
"jti": str(uuid.uuid4()),
|
||||||
|
"iat": now,
|
||||||
|
"exp": now + 600,
|
||||||
|
}
|
||||||
|
|
||||||
|
headers = {"x5t": x5t, "kid": x5t}
|
||||||
|
|
||||||
|
assertion = jwt.encode(claims, private_key, algorithm="RS256", headers=headers)
|
||||||
|
|
||||||
|
data = {
|
||||||
|
"grant_type": "client_credentials",
|
||||||
|
"client_id": client_id,
|
||||||
|
"scope": scope,
|
||||||
|
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
|
||||||
|
"client_assertion": assertion,
|
||||||
|
}
|
||||||
|
|
||||||
|
r = requests.post(token_url, data=data)
|
||||||
|
r.raise_for_status()
|
||||||
|
return r.json().get("access_token")
|
||||||
|
|||||||
@@ -1,10 +1,90 @@
|
|||||||
import datetime
|
import datetime
|
||||||
from cryptography.x509 import Name, NameAttribute, CertificateBuilder, BasicConstraints, random_serial_number
|
from io import BufferedWriter
|
||||||
from cryptography.x509.oid import NameOID
|
import re
|
||||||
|
import cryptography.x509 as x509
|
||||||
from cryptography.hazmat.primitives import hashes, serialization
|
from cryptography.hazmat.primitives import hashes, serialization
|
||||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||||
|
from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes
|
||||||
import pathlib
|
import pathlib
|
||||||
|
|
||||||
|
def read_private_key(pem_path: str) -> PrivateKeyTypes:
|
||||||
|
"""
|
||||||
|
Read a PEM file and extract the private key in bytes.
|
||||||
|
"""
|
||||||
|
with open(pem_path, "rb") as f:
|
||||||
|
pem = f.read()
|
||||||
|
key_pem = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0)
|
||||||
|
key = serialization.load_pem_private_key(key_pem, password=None)
|
||||||
|
return key
|
||||||
|
|
||||||
|
def read_public_certificate(pem_path: str) -> any:
|
||||||
|
"""
|
||||||
|
Read a PEM file and extract the public certificate.
|
||||||
|
"""
|
||||||
|
with open(pem_path, "rb") as f:
|
||||||
|
pem = f.read()
|
||||||
|
cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0)
|
||||||
|
cert = x509.load_pem_x509_certificate(cert_pem)
|
||||||
|
return cert
|
||||||
|
|
||||||
|
def _write_pem_key(
|
||||||
|
writer: BufferedWriter,
|
||||||
|
key: PrivateKeyTypes,
|
||||||
|
encoding: serialization.Encoding = serialization.Encoding.PEM,
|
||||||
|
format: serialization.PrivateFormat = serialization.PrivateFormat.PKCS8,
|
||||||
|
password: str | None = None
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Write a PEM encoded private key to the given writer.
|
||||||
|
Allows optional password protection.
|
||||||
|
"""
|
||||||
|
if password:
|
||||||
|
encryption_algorithm = serialization.BestAvailableEncryption(password.encode())
|
||||||
|
else:
|
||||||
|
encryption_algorithm = serialization.NoEncryption()
|
||||||
|
|
||||||
|
key_pem = key.private_bytes(
|
||||||
|
encoding=encoding,
|
||||||
|
format=format,
|
||||||
|
encryption_algorithm=encryption_algorithm,
|
||||||
|
)
|
||||||
|
writer.write(key_pem)
|
||||||
|
|
||||||
|
def _write_pem_cert(
|
||||||
|
writer: BufferedWriter,
|
||||||
|
cert: x509.Certificate,
|
||||||
|
encoding: serialization.Encoding = serialization.Encoding.PEM
|
||||||
|
):
|
||||||
|
cert_pem = cert.public_bytes(encoding=encoding)
|
||||||
|
writer.write(cert_pem)
|
||||||
|
|
||||||
|
def write_pem_file(
|
||||||
|
pem_file: pathlib.Path,
|
||||||
|
cert: x509.Certificate | None = None,
|
||||||
|
key: PrivateKeyTypes | None = None,
|
||||||
|
encoding: serialization.Encoding = serialization.Encoding.PEM,
|
||||||
|
key_serialization_format: serialization.PrivateFormat = serialization.PrivateFormat.PKCS8,
|
||||||
|
password: str | None = None
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Write the certificate and/or private key to a PEM file.
|
||||||
|
"""
|
||||||
|
with open(pem_file, "wb") as f:
|
||||||
|
if cert:
|
||||||
|
_write_pem_cert(
|
||||||
|
f,
|
||||||
|
cert,
|
||||||
|
encoding=encoding
|
||||||
|
)
|
||||||
|
if key:
|
||||||
|
_write_pem_key(
|
||||||
|
f,
|
||||||
|
key,
|
||||||
|
encoding=encoding,
|
||||||
|
password=password,
|
||||||
|
format=key_serialization_format
|
||||||
|
)
|
||||||
|
|
||||||
def create_self_signed_certificate(
|
def create_self_signed_certificate(
|
||||||
file_path: str,
|
file_path: str,
|
||||||
subject_name: str,
|
subject_name: str,
|
||||||
@@ -39,46 +119,41 @@ def create_self_signed_certificate(
|
|||||||
"""
|
"""
|
||||||
key = rsa.generate_private_key(public_exponent=65537, key_size=key_size)
|
key = rsa.generate_private_key(public_exponent=65537, key_size=key_size)
|
||||||
|
|
||||||
subject = issuer = Name([
|
subject = issuer = x509.Name([
|
||||||
NameAttribute(NameOID.COUNTRY_NAME, country_name),
|
x509.oid.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, country_name),
|
||||||
NameAttribute(NameOID.ORGANIZATION_NAME, organization_name),
|
x509.oid.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME, organization_name),
|
||||||
NameAttribute(NameOID.COMMON_NAME, subject_name),
|
x509.oid.NameAttribute(x509.oid.NameOID.COMMON_NAME, subject_name),
|
||||||
])
|
])
|
||||||
|
|
||||||
cert = (
|
cert = (
|
||||||
CertificateBuilder()
|
x509.CertificateBuilder()
|
||||||
.subject_name(subject)
|
.subject_name(subject)
|
||||||
.issuer_name(issuer)
|
.issuer_name(issuer)
|
||||||
.public_key(key.public_key())
|
.public_key(key.public_key())
|
||||||
.serial_number(random_serial_number())
|
.serial_number(x509.random_serial_number())
|
||||||
.not_valid_before(datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=1))
|
.not_valid_before(datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=1))
|
||||||
.not_valid_after(datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=valid_days))
|
.not_valid_after(datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=valid_days))
|
||||||
.add_extension(BasicConstraints(ca=True, path_length=None), critical=True)
|
.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True)
|
||||||
.sign(key, hashes.SHA256())
|
.sign(key, hashes.SHA256())
|
||||||
)
|
)
|
||||||
|
|
||||||
crt_path = pathlib.Path(file_path).with_suffix('.crt')
|
crt_path = pathlib.Path(file_path).with_suffix('.crt')
|
||||||
key_path = pathlib.Path(file_path).with_suffix('.key')
|
key_path = pathlib.Path(file_path).with_suffix('.key')
|
||||||
pem_path = pathlib.Path(file_path).with_suffix('.pem')
|
pem_path = pathlib.Path(file_path).with_suffix('.pem')
|
||||||
|
|
||||||
public_key = cert.public_bytes(serialization.Encoding.PEM)
|
public_key = cert.public_bytes(serialization.Encoding.PEM)
|
||||||
private_key = key.private_bytes(
|
private_key = key.private_bytes(
|
||||||
encoding=serialization.Encoding.PEM,
|
encoding=serialization.Encoding.PEM,
|
||||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||||
encryption_algorithm=serialization.NoEncryption(),
|
encryption_algorithm=serialization.NoEncryption(),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Write the certificate
|
|
||||||
with open(crt_path, "wb") as f:
|
|
||||||
f.write(public_key)
|
|
||||||
|
|
||||||
|
# Write the certificate
|
||||||
|
write_pem_file(crt_path, cert=cert)
|
||||||
# Write the private key
|
# Write the private key
|
||||||
with open(key_path, "wb") as f:
|
write_pem_file(key_path, key=key)
|
||||||
f.write(private_key)
|
# Write both to a combined PEM file
|
||||||
|
write_pem_file(pem_path, cert=cert, key=key)
|
||||||
with open(pem_path, "wb") as f:
|
|
||||||
f.write(public_key)
|
|
||||||
f.write(private_key)
|
|
||||||
|
|
||||||
print(f"Certificate created and saved.")
|
print(f"Certificate created and saved.")
|
||||||
print(f" - Certificate: {crt_path}")
|
print(f" - Certificate: {crt_path}")
|
||||||
|
|||||||
@@ -2,13 +2,13 @@ from __future__ import annotations
|
|||||||
import requests
|
import requests
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
from azure.identity import DefaultAzureCredential
|
from string import Template
|
||||||
|
|
||||||
DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
|
DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
|
||||||
DEVOPS_API_VERSION = "7.1"
|
DEVOPS_API_VERSION = "7.1"
|
||||||
|
|
||||||
# Define a class decorator
|
# Define a class decorator
|
||||||
def auto_properties(mapping: dict[str,str] | None = None):
|
def auto_properties(mapping: dict[str,str]):
|
||||||
|
|
||||||
def make_property(name: str):
|
def make_property(name: str):
|
||||||
private_var = f"_{name}"
|
private_var = f"_{name}"
|
||||||
@@ -54,6 +54,15 @@ def auto_properties(mapping: dict[str,str] | None = None):
|
|||||||
return cls
|
return cls
|
||||||
return decorator
|
return decorator
|
||||||
|
|
||||||
|
def devops(key: str, get_url: str, list_url: str = None, params: dict = {}):
|
||||||
|
def decorator(cls):
|
||||||
|
cls.__entity_key__ = key
|
||||||
|
cls.__entity_get_url__ = get_url # Use $key in the URL
|
||||||
|
cls.__entity_list_url__ = list_url # Use $key in the URL
|
||||||
|
cls.__entity_params__ = params
|
||||||
|
return cls
|
||||||
|
return decorator
|
||||||
|
|
||||||
class DevOps():
|
class DevOps():
|
||||||
"""Base class for DevOps entities."""
|
"""Base class for DevOps entities."""
|
||||||
|
|
||||||
@@ -78,7 +87,24 @@ class DevOps():
|
|||||||
r.raise_for_status() # Ensure we raise an error for bad responses
|
r.raise_for_status() # Ensure we raise an error for bad responses
|
||||||
return r
|
return r
|
||||||
|
|
||||||
def _get_entity(self, key_name: str, get_url: str, params: dict = {}) -> object:
|
def _get(self, key: str):
|
||||||
|
if not hasattr(self.__class__, "__entity_key__") or not hasattr(self.__class__, "__entity_get_url__"):
|
||||||
|
raise NotImplementedError("Called _get on a class that has not been decorated with @devops.")
|
||||||
|
setattr(self, f"_{self.__class__.__entity_key__}", key) # Set the entity key
|
||||||
|
# Build the URL
|
||||||
|
url = Template(self.__class__.__entity_get_url__).substitute(key=key)
|
||||||
|
# Build parameters with key substituted
|
||||||
|
params = {}
|
||||||
|
if hasattr(self.__class__, "__entity_params__"):
|
||||||
|
params = {k: Template(v).substitute(key=key) for k, v in self.__class__.__entity_params__.items()}
|
||||||
|
|
||||||
|
# Fetch the object data from the URL
|
||||||
|
r = self._get_url_path(url, params=params)
|
||||||
|
|
||||||
|
# Populate attributes
|
||||||
|
self.from_json(r.json())
|
||||||
|
|
||||||
|
def _get_entity(self, key_name: str, get_url: str, params: dict = {}):
|
||||||
"""
|
"""
|
||||||
Each entity class can use this method to populate its attributes, by defining
|
Each entity class can use this method to populate its attributes, by defining
|
||||||
its own _get method that calls this one with the key name,
|
its own _get method that calls this one with the key name,
|
||||||
@@ -108,8 +134,6 @@ class DevOps():
|
|||||||
|
|
||||||
class Organization(DevOps):
|
class Organization(DevOps):
|
||||||
def __init__(self, org_url: str, token: str | None = None, api_version: str = DEVOPS_API_VERSION):
|
def __init__(self, org_url: str, token: str | None = None, api_version: str = DEVOPS_API_VERSION):
|
||||||
if token is None:
|
|
||||||
token = DefaultAzureCredential().get_token(DEVOPS_SCOPE).token
|
|
||||||
super().__init__(org_url, token, api_version)
|
super().__init__(org_url, token, api_version)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@@ -132,6 +156,7 @@ class Organization(DevOps):
|
|||||||
"url": "url",
|
"url": "url",
|
||||||
"description": "description"
|
"description": "description"
|
||||||
})
|
})
|
||||||
|
@devops("id", "_apis/projects/$key", "_apis/projects")
|
||||||
class Project(DevOps):
|
class Project(DevOps):
|
||||||
def _get(self):
|
def _get(self):
|
||||||
self._get_entity(
|
self._get_entity(
|
||||||
6
tests.py
6
tests.py
@@ -1,12 +1,12 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import unittest
|
import unittest
|
||||||
import requests
|
import requests
|
||||||
from azure.identity import DefaultAzureCredential
|
from sk.devops import Organization, Repository, Project, Item
|
||||||
from devops import DEVOPS_SCOPE, Organization, Repository, Project, Item
|
from sk.azure import get_token
|
||||||
|
|
||||||
# Get the token outside the test class to speed up tests.
|
# Get the token outside the test class to speed up tests.
|
||||||
# Each Unit test instantinates the class, so doing it here avoids repeated authentication.
|
# Each Unit test instantinates the class, so doing it here avoids repeated authentication.
|
||||||
token = DefaultAzureCredential().get_token(DEVOPS_SCOPE).token
|
token = get_token()
|
||||||
|
|
||||||
class Test01(unittest.TestCase):
|
class Test01(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
|||||||
Reference in New Issue
Block a user