Compare commits
42 Commits
850bc99aa2
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| d3344b8799 | |||
| 05c9e5184c | |||
| 9748230745 | |||
| 8c0a92a8b7 | |||
| 31e1b88cd1 | |||
| 678b6161cc | |||
| b7608dcdf8 | |||
| f797cd098d | |||
| 2f2cb1c337 | |||
| ddab4df55f | |||
| 6e757dd0b8 | |||
| 07c662b1e8 | |||
| 6c8bcc775f | |||
| 32ebcf74b0 | |||
| 1caf0f3069 | |||
| 9acb223ef1 | |||
| 9ef54860a5 | |||
| 77ec8354c8 | |||
| da11714629 | |||
| f640db21e3 | |||
| f7bd2136ee | |||
| 9f418332ba | |||
| 8535648c3d | |||
| f43564d019 | |||
| 6bc913d43e | |||
| b430721c05 | |||
| be2a6870c6 | |||
| 54709dd281 | |||
| 2f1e1a583f | |||
| 53e1ae186e | |||
| d8674462df | |||
| 17bd314a20 | |||
| b7bb29224f | |||
| 26b192c840 | |||
| 71d83b8d76 | |||
| 837543d44a | |||
| 91b806637c | |||
| 96ccd4d7c7 | |||
| deed075727 | |||
| ce38e275a9 | |||
| 4685103e60 | |||
| 3ce14912e4 |
41
.gitea/workflows/unit-tests.yml
Normal file
41
.gitea/workflows/unit-tests.yml
Normal file
@@ -0,0 +1,41 @@
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
paths:
|
||||
# Run tests only if Python files change or this workflow file changes
|
||||
- '.gitea/workflows/run-unit-tests.yml'
|
||||
- '**/*.py'
|
||||
|
||||
jobs:
|
||||
unit-tests:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v5
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v6
|
||||
with:
|
||||
python-version: '3.12'
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install -r requirements.txt
|
||||
|
||||
- name: Run tests
|
||||
run: |
|
||||
./tests.py
|
||||
env:
|
||||
AZURE_CLIENT_ID: ${{ vars.AZURE_CLIENT_ID }}
|
||||
AZURE_TENANT_ID: ${{ vars.AZURE_TENANT_ID }}
|
||||
AZURE_CLIENT_SECRET: ${{ secrets.AZURE_CLIENT_SECRET }}
|
||||
|
||||
# AI: generated... check validity
|
||||
# - name: Upload test results
|
||||
# if: always()
|
||||
# uses: actions/upload-artifact@v2
|
||||
# with:
|
||||
# name: test-results
|
||||
# path: tests/test_results.xml
|
||||
14
.gitignore
vendored
14
.gitignore
vendored
@@ -1,3 +1,17 @@
|
||||
# Python
|
||||
.venv
|
||||
__pycache__/
|
||||
|
||||
# Ignore sample JSON files
|
||||
*.sample.json
|
||||
|
||||
# Ignore prototype scripts
|
||||
prototype_*.py
|
||||
|
||||
# Shell secrets
|
||||
*.secret
|
||||
|
||||
# Certificate files
|
||||
*.pem
|
||||
*.key
|
||||
*.crt
|
||||
|
||||
7
.vscode/launch.json
vendored
7
.vscode/launch.json
vendored
@@ -10,6 +10,13 @@
|
||||
"request": "launch",
|
||||
"program": "${workspaceFolder}/harvester.py",
|
||||
"console": "integratedTerminal"
|
||||
},
|
||||
{
|
||||
"name": "Python Debugger: Current File",
|
||||
"type": "debugpy",
|
||||
"request": "launch",
|
||||
"program": "${file}",
|
||||
"console": "integratedTerminal"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
21
LICENSE
Normal file
21
LICENSE
Normal file
@@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2025 Slawomir Koszewski (slawek@koszewscy.waw.pl)
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
@@ -1,3 +1,5 @@
|
||||
# Markdown Docs Harvester
|
||||
|
||||
[](https://gitea.koszewscy.waw.pl/slawek/docs-harvester/actions?workflow=unit-tests.yml)
|
||||
|
||||
This project is designed to harvest and process Markdown documentation files from Git repositories.
|
||||
|
||||
74
devops.py
74
devops.py
@@ -1,74 +0,0 @@
|
||||
from unicodedata import name
|
||||
import requests
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
import json
|
||||
from uuid import UUID
|
||||
|
||||
DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
|
||||
DEVOPS_API_VERSION = "7.1"
|
||||
|
||||
class Organization:
|
||||
def __init__(self, org_url: str, token: str, api_version: str = DEVOPS_API_VERSION):
|
||||
self._org_url = org_url.rstrip("/") + "/" # Ensure trailing slash
|
||||
self._token = token
|
||||
self._api_version = api_version
|
||||
|
||||
def get(self, path: str, params: dict = {}):
|
||||
request_parameters = {
|
||||
"api-version": self._api_version,
|
||||
**params
|
||||
}
|
||||
encoded_path = urllib.parse.quote(path.lstrip("/")) # Ensure single slash between base and path
|
||||
url = self._org_url + encoded_path
|
||||
r = requests.get(url=url, params=request_parameters, headers={
|
||||
"Authorization": f"Bearer {self._token}"
|
||||
})
|
||||
return r
|
||||
|
||||
@property
|
||||
def projects(self):
|
||||
r = self.get("_apis/projects")
|
||||
r.raise_for_status()
|
||||
|
||||
# Return a list of Project instances
|
||||
projects_data = r.json().get("value", [])
|
||||
return [
|
||||
Project(self,
|
||||
id=proj.get("id"),
|
||||
name=proj.get("name"),
|
||||
url=proj.get("url"),
|
||||
description=proj.get("description"),
|
||||
) for proj in projects_data
|
||||
]
|
||||
|
||||
class Project:
|
||||
def __init__(self, org: Organization, id: str, name: str | None = None, url: str | None = None, description: str | None = None):
|
||||
self._org = org
|
||||
|
||||
# Check, if the id is a valid UUID
|
||||
try:
|
||||
self._id = str(UUID(id))
|
||||
except ValueError:
|
||||
raise ValueError(f"Invalid project ID: {self._id}")
|
||||
|
||||
if name is not None and url is not None and name != "" and url != "":
|
||||
self._name = name
|
||||
self._url = url
|
||||
self._description = description if description is not None else "" # Ensure description is a string
|
||||
else:
|
||||
# Fetch project details from the API
|
||||
r = org.get(f"_apis/projects/{urllib.parse.quote(self._id)}")
|
||||
r.raise_for_status()
|
||||
proj_data = r.json()
|
||||
self._id = proj_data.get("id", "")
|
||||
self._name = proj_data.get("name", "")
|
||||
self._url = proj_data.get("url", "")
|
||||
self._description = proj_data.get("description", "")
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return self._name
|
||||
|
||||
def __str__(self):
|
||||
return f"Project(name={self._name}, id={self._id})"
|
||||
15
harvester.py
15
harvester.py
@@ -1,13 +1,8 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from devops import Organization, Project, DEVOPS_SCOPE
|
||||
from azure.identity import DefaultAzureCredential
|
||||
from json import dumps
|
||||
from sk.devops import Organization
|
||||
org = Organization("https://dev.azure.com/mcovsandbox")
|
||||
|
||||
org = Organization("https://dev.azure.com/mcovsandbox", DefaultAzureCredential().get_token(DEVOPS_SCOPE).token)
|
||||
projects = org.projects
|
||||
print([str(p) for p in projects])
|
||||
|
||||
ado_sandbox = Project(org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
||||
|
||||
print(ado_sandbox)
|
||||
# print(org.projects["bafe0cf1-6c97-4088-864a-ea6dc02b2727"].repositories["feac266f-84d2-41bc-839b-736925a85eaa"].items["/generate-pat.py"])
|
||||
print(org["ADO Sandbox"]["ado-auth-lab"]["/container"].url)
|
||||
print(org["ADO Sandbox"]["ado-auth-lab"]["/generate-pat.py"].url)
|
||||
|
||||
8
make-venv.sh
Executable file
8
make-venv.sh
Executable file
@@ -0,0 +1,8 @@
|
||||
#! /usr/bin/env bash
|
||||
|
||||
python3 -m venv .venv
|
||||
./.venv/bin/pip install --upgrade pip
|
||||
./.venv/bin/pip install -r requirements.txt
|
||||
|
||||
echo "Add the following alias to your shell configuration file (e.g., .bashrc or .zshrc):"
|
||||
echo "alias v_env='test -d $(pwd)/.venv && . $(pwd)/.venv/bin/activate'"
|
||||
130
sk/azure.py
Normal file
130
sk/azure.py
Normal file
@@ -0,0 +1,130 @@
|
||||
"""
|
||||
Minimal Authentication package for Azure.
|
||||
|
||||
Uses client credentials - a secret or a certificate.
|
||||
"""
|
||||
|
||||
import os, requests
|
||||
import re
|
||||
import jwt, uuid, time
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography import x509
|
||||
import hashlib
|
||||
import base64
|
||||
|
||||
DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
|
||||
|
||||
def get_token(
|
||||
tenant_id: str | None = None,
|
||||
client_id: str | None = None,
|
||||
client_secret: str | None = None,
|
||||
pem_path: str | None = None
|
||||
) -> str:
|
||||
"""
|
||||
Obtain a token for DevOps using DefaultAzureCredential.
|
||||
"""
|
||||
try:
|
||||
if tenant_id and client_id and client_secret:
|
||||
from azure.identity import ClientSecretCredential
|
||||
return ClientSecretCredential(
|
||||
tenant_id=tenant_id,
|
||||
client_id=client_id,
|
||||
client_secret=client_secret
|
||||
).get_token(DEVOPS_SCOPE).token
|
||||
elif tenant_id and client_id and pem_path:
|
||||
from azure.identity import CertificateCredential
|
||||
return CertificateCredential(
|
||||
tenant_id=tenant_id,
|
||||
client_id=client_id,
|
||||
certificate_path=pem_path
|
||||
).get_token(DEVOPS_SCOPE).token
|
||||
else:
|
||||
from azure.identity import DefaultAzureCredential
|
||||
return DefaultAzureCredential().get_token(DEVOPS_SCOPE).token
|
||||
except ImportError:
|
||||
if tenant_id and client_id and client_secret:
|
||||
return secret_credentials_auth(
|
||||
tenant_id=tenant_id,
|
||||
client_id=client_id,
|
||||
client_secret=client_secret
|
||||
)
|
||||
elif tenant_id and client_id and pem_path:
|
||||
return certificate_credentials_auth(
|
||||
tenant_id=tenant_id,
|
||||
client_id=client_id,
|
||||
pem_path=pem_path
|
||||
)
|
||||
else:
|
||||
raise ValueError("Either client_secret or pem_path must be provided, if no azure-identity package is installed.")
|
||||
|
||||
def secret_credentials_auth(
|
||||
scope: str = DEVOPS_SCOPE,
|
||||
tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
|
||||
client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
|
||||
client_secret: str = os.environ.get("AZURE_CLIENT_SECRET")
|
||||
) -> str:
|
||||
"""
|
||||
Authenticate using client credentials. Pass credentials via environment variables,
|
||||
or directly as function parameters.
|
||||
"""
|
||||
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
|
||||
r = requests.get(token_url, data={
|
||||
"grant_type": "client_credentials",
|
||||
"client_id": client_id,
|
||||
"client_secret": client_secret,
|
||||
"scope": scope
|
||||
})
|
||||
r.raise_for_status()
|
||||
return r.json().get("access_token", "")
|
||||
|
||||
def certificate_credentials_auth(
|
||||
scope: str = DEVOPS_SCOPE,
|
||||
tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
|
||||
client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
|
||||
pem_path: str = os.environ.get("AZURE_CLIENT_CERTIFICATE_PATH", "")
|
||||
) -> str:
|
||||
"""
|
||||
Authenticate using client credentials with a certificate.
|
||||
Pass credentials via environment variables, or directly as function parameters.
|
||||
"""
|
||||
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
|
||||
|
||||
# Wczytaj klucz prywatny (RSA)
|
||||
with open(pem_path, "rb") as f:
|
||||
pem = f.read()
|
||||
key_pem = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0)
|
||||
cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0)
|
||||
|
||||
private_key = serialization.load_pem_private_key(key_pem, password=None)
|
||||
cert = x509.load_pem_x509_certificate(cert_pem)
|
||||
|
||||
der = cert.public_bytes(serialization.Encoding.DER)
|
||||
sha1 = hashlib.sha1(der).digest()
|
||||
x5t = base64.urlsafe_b64encode(sha1).rstrip(b"=").decode("ascii")
|
||||
|
||||
# Stwórz client_assertion JWT
|
||||
now = int(time.time())
|
||||
claims = {
|
||||
"iss": client_id,
|
||||
"sub": client_id,
|
||||
"aud": token_url,
|
||||
"jti": str(uuid.uuid4()),
|
||||
"iat": now,
|
||||
"exp": now + 600,
|
||||
}
|
||||
|
||||
headers = {"x5t": x5t, "kid": x5t}
|
||||
|
||||
assertion = jwt.encode(claims, private_key, algorithm="RS256", headers=headers)
|
||||
|
||||
data = {
|
||||
"grant_type": "client_credentials",
|
||||
"client_id": client_id,
|
||||
"scope": scope,
|
||||
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
|
||||
"client_assertion": assertion,
|
||||
}
|
||||
|
||||
r = requests.post(token_url, data=data)
|
||||
r.raise_for_status()
|
||||
return r.json().get("access_token")
|
||||
161
sk/certificates.py
Normal file
161
sk/certificates.py
Normal file
@@ -0,0 +1,161 @@
|
||||
import datetime
|
||||
from io import BufferedWriter
|
||||
import re
|
||||
import cryptography.x509 as x509
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes
|
||||
import pathlib
|
||||
|
||||
def read_private_key(pem_path: str) -> PrivateKeyTypes:
|
||||
"""
|
||||
Read a PEM file and extract the private key in bytes.
|
||||
"""
|
||||
with open(pem_path, "rb") as f:
|
||||
pem = f.read()
|
||||
key_pem = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0)
|
||||
key = serialization.load_pem_private_key(key_pem, password=None)
|
||||
return key
|
||||
|
||||
def read_public_certificate(pem_path: str) -> any:
|
||||
"""
|
||||
Read a PEM file and extract the public certificate.
|
||||
"""
|
||||
with open(pem_path, "rb") as f:
|
||||
pem = f.read()
|
||||
cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0)
|
||||
cert = x509.load_pem_x509_certificate(cert_pem)
|
||||
return cert
|
||||
|
||||
def _write_pem_key(
|
||||
writer: BufferedWriter,
|
||||
key: PrivateKeyTypes,
|
||||
encoding: serialization.Encoding = serialization.Encoding.PEM,
|
||||
format: serialization.PrivateFormat = serialization.PrivateFormat.PKCS8,
|
||||
password: str | None = None
|
||||
):
|
||||
"""
|
||||
Write a PEM encoded private key to the given writer.
|
||||
Allows optional password protection.
|
||||
"""
|
||||
if password:
|
||||
encryption_algorithm = serialization.BestAvailableEncryption(password.encode())
|
||||
else:
|
||||
encryption_algorithm = serialization.NoEncryption()
|
||||
|
||||
key_pem = key.private_bytes(
|
||||
encoding=encoding,
|
||||
format=format,
|
||||
encryption_algorithm=encryption_algorithm,
|
||||
)
|
||||
writer.write(key_pem)
|
||||
|
||||
def _write_pem_cert(
|
||||
writer: BufferedWriter,
|
||||
cert: x509.Certificate,
|
||||
encoding: serialization.Encoding = serialization.Encoding.PEM
|
||||
):
|
||||
cert_pem = cert.public_bytes(encoding=encoding)
|
||||
writer.write(cert_pem)
|
||||
|
||||
def write_pem_file(
|
||||
pem_file: pathlib.Path,
|
||||
cert: x509.Certificate | None = None,
|
||||
key: PrivateKeyTypes | None = None,
|
||||
encoding: serialization.Encoding = serialization.Encoding.PEM,
|
||||
key_serialization_format: serialization.PrivateFormat = serialization.PrivateFormat.PKCS8,
|
||||
password: str | None = None
|
||||
):
|
||||
"""
|
||||
Write the certificate and/or private key to a PEM file.
|
||||
"""
|
||||
with open(pem_file, "wb") as f:
|
||||
if cert:
|
||||
_write_pem_cert(
|
||||
f,
|
||||
cert,
|
||||
encoding=encoding
|
||||
)
|
||||
if key:
|
||||
_write_pem_key(
|
||||
f,
|
||||
key,
|
||||
encoding=encoding,
|
||||
password=password,
|
||||
format=key_serialization_format
|
||||
)
|
||||
|
||||
def create_self_signed_certificate(
|
||||
file_path: str,
|
||||
subject_name: str,
|
||||
organization_name: str,
|
||||
country_name: str,
|
||||
valid_days: int = 365,
|
||||
key_size: int = 2048
|
||||
):
|
||||
"""
|
||||
Create a self-signed certificate. It saves the certificate and private key
|
||||
in PEM format to the specified file path. Three files are created:
|
||||
|
||||
- <file_path>.crt : The public certificate
|
||||
- <file_path>.key : The private key
|
||||
- <file_path>.pem : The certificate and private key combined in one file
|
||||
|
||||
:param file_path: Base file path to save the certificate and key.
|
||||
:param subject_name: Common Name (CN) for the certificate.
|
||||
:param organization_name: Organization Name (O) for the certificate.
|
||||
:param country_name: Country Name (C) for the certificate.
|
||||
:param valid_days: Number of days the certificate is valid for.
|
||||
:param key_size: Size of the RSA key.
|
||||
|
||||
Use the following command to replace any credentials already defined for the
|
||||
App Registration with that certificate:
|
||||
|
||||
az ad app credential reset --id <CLIENT_ID> --cert @<file_path>.crt
|
||||
|
||||
Use --append to add the certificate without removing existing credentials.
|
||||
|
||||
> Note: Do not upload the private key file (.key) nor the combined PEM file (.pem).
|
||||
"""
|
||||
key = rsa.generate_private_key(public_exponent=65537, key_size=key_size)
|
||||
|
||||
subject = issuer = x509.Name([
|
||||
x509.oid.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, country_name),
|
||||
x509.oid.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME, organization_name),
|
||||
x509.oid.NameAttribute(x509.oid.NameOID.COMMON_NAME, subject_name),
|
||||
])
|
||||
|
||||
cert = (
|
||||
x509.CertificateBuilder()
|
||||
.subject_name(subject)
|
||||
.issuer_name(issuer)
|
||||
.public_key(key.public_key())
|
||||
.serial_number(x509.random_serial_number())
|
||||
.not_valid_before(datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=1))
|
||||
.not_valid_after(datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=valid_days))
|
||||
.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True)
|
||||
.sign(key, hashes.SHA256())
|
||||
)
|
||||
|
||||
crt_path = pathlib.Path(file_path).with_suffix('.crt')
|
||||
key_path = pathlib.Path(file_path).with_suffix('.key')
|
||||
pem_path = pathlib.Path(file_path).with_suffix('.pem')
|
||||
|
||||
public_key = cert.public_bytes(serialization.Encoding.PEM)
|
||||
private_key = key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption(),
|
||||
)
|
||||
|
||||
# Write the certificate
|
||||
write_pem_file(crt_path, cert=cert)
|
||||
# Write the private key
|
||||
write_pem_file(key_path, key=key)
|
||||
# Write both to a combined PEM file
|
||||
write_pem_file(pem_path, cert=cert, key=key)
|
||||
|
||||
print(f"Certificate created and saved.")
|
||||
print(f" - Certificate: {crt_path}")
|
||||
print(f" - Private Key: {key_path}")
|
||||
print(f" - PEM File: {pem_path}")
|
||||
307
sk/devops.py
Normal file
307
sk/devops.py
Normal file
@@ -0,0 +1,307 @@
|
||||
from __future__ import annotations
|
||||
import requests
|
||||
import urllib.parse
|
||||
from uuid import UUID
|
||||
from string import Template
|
||||
|
||||
DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
|
||||
DEVOPS_API_VERSION = "7.1"
|
||||
|
||||
# Define a class decorator
|
||||
def auto_properties(mapping: dict[str,str]):
|
||||
|
||||
def make_property(name: str):
|
||||
private_var = f"_{name}"
|
||||
|
||||
def getter(self):
|
||||
try:
|
||||
i = getattr(self, private_var)
|
||||
if i is not None:
|
||||
return i
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
# Fetch repository details from the API if it is set to None or not existing
|
||||
self._get()
|
||||
return getattr(self, private_var)
|
||||
|
||||
return property(fget=getter)
|
||||
|
||||
def set_auto_properties(self, **kwargs):
|
||||
allowed = set(self.__class__.__auto_properties__)
|
||||
unknown = [k for k in kwargs if k not in allowed]
|
||||
if unknown:
|
||||
raise ValueError(f"Unknown properties for {self.__class__.__name__}: {', '.join(unknown)}")
|
||||
for k, v in kwargs.items():
|
||||
setattr(self, f"_{k}", v)
|
||||
return self
|
||||
|
||||
def from_json(self, json_data: dict):
|
||||
for name in self.__class__.__auto_properties__:
|
||||
setattr(self, f"_{name}", json_data.get(self.__class__.__auto_properties__[name], None))
|
||||
return self
|
||||
|
||||
def decorator(cls):
|
||||
cls.__auto_properties__ = mapping # Make a copy of the mapping
|
||||
|
||||
# Create properties dynamically
|
||||
for name in mapping:
|
||||
setattr(cls, name, make_property(name))
|
||||
|
||||
setattr(cls, "set_auto_properties", set_auto_properties)
|
||||
setattr(cls, "from_json", from_json)
|
||||
|
||||
return cls
|
||||
return decorator
|
||||
|
||||
def devops(key: str, get_url: str, list_url: str = None, params: dict = {}):
|
||||
def decorator(cls):
|
||||
cls.__entity_key__ = key
|
||||
cls.__entity_get_url__ = get_url # Use $key in the URL
|
||||
cls.__entity_list_url__ = list_url # Use $key in the URL
|
||||
cls.__entity_params__ = params
|
||||
return cls
|
||||
return decorator
|
||||
|
||||
class DevOps():
|
||||
"""Base class for DevOps entities."""
|
||||
|
||||
def __init__(self, org_url: str, token: str, api_version: str = DEVOPS_API_VERSION):
|
||||
self._org_url = org_url.rstrip("/") + "/" # Ensure trailing slash
|
||||
self._token = token
|
||||
self._api_version = api_version
|
||||
|
||||
def _get_url_path(self, path: str, params: dict = {}) -> requests.Response:
|
||||
if not self._org_url or not self._token or not self._api_version:
|
||||
raise ValueError("Organization URL, token, and API version must be set before making requests.")
|
||||
|
||||
request_parameters = {
|
||||
"api-version": self._api_version,
|
||||
**params
|
||||
}
|
||||
encoded_path = urllib.parse.quote(path.lstrip("/")) # Ensure single slash between base and path
|
||||
url = self._org_url + encoded_path
|
||||
r = requests.get(url=url, params=request_parameters, headers={
|
||||
"Authorization": f"Bearer {self._token}"
|
||||
})
|
||||
r.raise_for_status() # Ensure we raise an error for bad responses
|
||||
return r
|
||||
|
||||
def _get(self, key: str):
|
||||
if not hasattr(self.__class__, "__entity_key__") or not hasattr(self.__class__, "__entity_get_url__"):
|
||||
raise NotImplementedError("Called _get on a class that has not been decorated with @devops.")
|
||||
setattr(self, f"_{self.__class__.__entity_key__}", key) # Set the entity key
|
||||
# Build the URL
|
||||
url = Template(self.__class__.__entity_get_url__).substitute(key=key)
|
||||
# Build parameters with key substituted
|
||||
params = {}
|
||||
if hasattr(self.__class__, "__entity_params__"):
|
||||
params = {k: Template(v).substitute(key=key) for k, v in self.__class__.__entity_params__.items()}
|
||||
|
||||
# Fetch the object data from the URL
|
||||
r = self._get_url_path(url, params=params)
|
||||
|
||||
# Populate attributes
|
||||
self.from_json(r.json())
|
||||
|
||||
def _get_entity(self, key_name: str, get_url: str, params: dict = {}):
|
||||
"""
|
||||
Each entity class can use this method to populate its attributes, by defining
|
||||
its own _get method that calls this one with the key name,
|
||||
and the URL.
|
||||
"""
|
||||
r = self._get_url_path(get_url, params=params) # Fetch the object data from the URL
|
||||
setattr(self, f"_{key_name}", r.json().get(key_name, None)) # Set the key attribute
|
||||
self.from_json(r.json()) # Populate other attributes from JSON
|
||||
|
||||
def _entity(self, entity_class: type, key_name: str, entity_data: dict) -> object:
|
||||
"""A generic method to create an entity from JSON data."""
|
||||
args = { key_name: entity_data.get(key_name) }
|
||||
e = entity_class(self, **args)
|
||||
e.from_json(entity_data)
|
||||
return e
|
||||
|
||||
def _entities(self, entity_class: type, key_name: str, list_url: str, params: dict = {}) -> list[object]:
|
||||
"""A generic method to retrieve a list of entities."""
|
||||
r = self._get_url_path(list_url, params=params)
|
||||
entities_data = r.json().get("value", [])
|
||||
|
||||
entities_list = []
|
||||
for entity in entities_data:
|
||||
entities_list.append(self._entity(entity_class, key_name, entity))
|
||||
|
||||
return entities_list
|
||||
|
||||
class Organization(DevOps):
|
||||
def __init__(self, org_url: str, token: str | None = None, api_version: str = DEVOPS_API_VERSION):
|
||||
super().__init__(org_url, token, api_version)
|
||||
|
||||
@property
|
||||
def projects(self):
|
||||
if not hasattr(self, "_projects"):
|
||||
self._projects = self._entities(
|
||||
entity_class=Project,
|
||||
key_name="id",
|
||||
list_url="_apis/projects")
|
||||
return self._projects
|
||||
|
||||
def __getitem__(self, key: str) -> Project:
|
||||
for project in self.projects:
|
||||
if project.id == key or project.name == key:
|
||||
return project
|
||||
raise KeyError(f"Project with ID or name '{key}' not found.")
|
||||
|
||||
@auto_properties({
|
||||
"name": "name",
|
||||
"url": "url",
|
||||
"description": "description"
|
||||
})
|
||||
@devops("id", "_apis/projects/$key", "_apis/projects")
|
||||
class Project(DevOps):
|
||||
def _get(self):
|
||||
self._get_entity(
|
||||
key_name="id",
|
||||
get_url=f"_apis/projects/{self._id}"
|
||||
)
|
||||
|
||||
def __init__(self, org: Organization, id: str, **kwargs):
|
||||
super().__init__(org._org_url, org._token, org._api_version)
|
||||
|
||||
try:
|
||||
self._id = str(UUID(id))
|
||||
except ValueError:
|
||||
raise ValueError(f"Invalid project ID: {id}")
|
||||
|
||||
self.set_auto_properties(**kwargs)
|
||||
|
||||
def __str__(self):
|
||||
return f"Project(name={self._name}, id={self._id})"
|
||||
|
||||
@property
|
||||
def id(self):
|
||||
return self._id
|
||||
|
||||
@property
|
||||
def repositories(self):
|
||||
if not hasattr(self, "_repositories"):
|
||||
self._repositories = self._entities(
|
||||
entity_class=Repository,
|
||||
key_name="id",
|
||||
list_url=f"{self._id}/_apis/git/repositories")
|
||||
return self._repositories
|
||||
|
||||
def __getitem__(self, key: str) -> Repository:
|
||||
for repo in self.repositories:
|
||||
if repo.id == key or repo.name == key:
|
||||
return repo
|
||||
raise KeyError(f"Repository with ID or name '{key}' not found.")
|
||||
|
||||
@auto_properties({
|
||||
"name": "name",
|
||||
"url": "url",
|
||||
"default_branch": "defaultBranch",
|
||||
"is_disabled": "isDisabled",
|
||||
"is_in_maintenance": "isInMaintenance",
|
||||
"remote_url": "remoteUrl",
|
||||
"ssh_url": "sshUrl",
|
||||
"web_url": "webUrl"
|
||||
})
|
||||
class Repository(DevOps):
|
||||
def _get(self):
|
||||
self._get_entity(
|
||||
key_name="id",
|
||||
get_url=f"{self._project.id}/_apis/git/repositories/{self._id}"
|
||||
)
|
||||
|
||||
def __init__(self, project: Project, id: str, **kwargs):
|
||||
super().__init__(project._org_url, project._token, project._api_version)
|
||||
self._project = project
|
||||
self._id = id
|
||||
|
||||
try:
|
||||
UUID(id) # Check if it's a valid UUID
|
||||
except ValueError:
|
||||
# Called with a repository name, fetch by name
|
||||
self._get()
|
||||
if kwargs:
|
||||
raise ValueError("Automatic properties cannot be set when retrieving by name.")
|
||||
return
|
||||
|
||||
# set other properties if provided
|
||||
self.set_auto_properties(**kwargs)
|
||||
|
||||
@property
|
||||
def id(self):
|
||||
return self._id
|
||||
|
||||
@property
|
||||
def project(self):
|
||||
return self._project
|
||||
|
||||
def __str__(self):
|
||||
return f"Repository(name={self.name}, id={self._id})"
|
||||
|
||||
@property
|
||||
def items(self):
|
||||
if not hasattr(self, "_items"):
|
||||
self._items = self._entities(
|
||||
entity_class=Item,
|
||||
key_name="path",
|
||||
list_url=f"{self._project.id}/_apis/git/repositories/{self._id}/items",
|
||||
params={
|
||||
"scopePath": "/",
|
||||
"recursionLevel": "oneLevel"
|
||||
}
|
||||
)
|
||||
return self._items
|
||||
|
||||
def __getitem__(self, key: str) -> Item:
|
||||
for item in self.items:
|
||||
if item.path == key:
|
||||
return item
|
||||
raise KeyError(f"Item with path '{key}' not found.")
|
||||
|
||||
@auto_properties({
|
||||
"object_id": "objectId",
|
||||
"git_object_type": "gitObjectType",
|
||||
"commit_id": "commitId",
|
||||
"is_folder": "isFolder",
|
||||
"url": "url"
|
||||
})
|
||||
class Item(DevOps):
|
||||
def __init__(self, repository: Repository, path: str, **kwargs):
|
||||
super().__init__(repository._org_url, repository._token, repository._api_version)
|
||||
self._repository = repository
|
||||
self._path = path
|
||||
self.set_auto_properties(**kwargs) # set properties defined in decorator
|
||||
|
||||
def _get(self):
|
||||
self._get_entity(
|
||||
key_name="path",
|
||||
get_url=f"{self._repository._project.id}/_apis/git/repositories/{self._repository.id}/items",
|
||||
params={
|
||||
"path": self._path,
|
||||
"$format": "json",
|
||||
"recursionLevel": "none"
|
||||
}
|
||||
)
|
||||
|
||||
@property
|
||||
def path(self):
|
||||
return self._path
|
||||
|
||||
@property
|
||||
def children(self):
|
||||
"""List items under this item if it is a folder."""
|
||||
if self.git_object_type == "tree":
|
||||
return self._entities(
|
||||
entity_class=Item,
|
||||
key_name="path",
|
||||
list_url=f"{self._repository._project.id}/_apis/git/repositories/{self._repository.id}/items",
|
||||
params={
|
||||
"scopePath": self._path,
|
||||
"recursionLevel": "oneLevel"
|
||||
}
|
||||
)
|
||||
else:
|
||||
raise ValueError("Items can only be listed for folder items.")
|
||||
127
tests.py
Executable file
127
tests.py
Executable file
@@ -0,0 +1,127 @@
|
||||
#!/usr/bin/env python3
|
||||
import unittest
|
||||
import requests
|
||||
from sk.devops import Organization, Repository, Project, Item
|
||||
from sk.azure import get_token
|
||||
|
||||
# Get the token outside the test class to speed up tests.
|
||||
# Each Unit test instantinates the class, so doing it here avoids repeated authentication.
|
||||
token = get_token()
|
||||
|
||||
class Test01(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.org = Organization("https://dev.azure.com/mcovsandbox", token=token)
|
||||
|
||||
def test_01(self):
|
||||
"""Listing projects in the organization"""
|
||||
self.assertGreater(len(list(self.org.projects)), 0)
|
||||
def test_02(self):
|
||||
"""Getting a specific project by ID (object instantiation)"""
|
||||
project = Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
||||
self.assertEqual(project.id, "bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
||||
self.assertEqual(project.name, "ADO Sandbox")
|
||||
def test_03(self):
|
||||
"""Getting a specific project by name using org indexing (object retrieval)"""
|
||||
project = self.org["ADO Sandbox"]
|
||||
self.assertEqual(project.id, "bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
||||
self.assertEqual(project.name, "ADO Sandbox")
|
||||
def test_04(self):
|
||||
"""Getting a specific project by ID using org indexing (object retrieval)"""
|
||||
project = self.org["bafe0cf1-6c97-4088-864a-ea6dc02b2727"]
|
||||
self.assertEqual(project.id, "bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
||||
self.assertEqual(project.name, "ADO Sandbox")
|
||||
|
||||
class Test02(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.org = Organization("https://dev.azure.com/mcovsandbox", token=token)
|
||||
|
||||
def test_01(self):
|
||||
"""Listing repositories in a project"""
|
||||
project = Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
||||
repos = list(project.repositories)
|
||||
self.assertGreater(len(repos), 0)
|
||||
|
||||
def test_02(self):
|
||||
"""Getting a specific repository by ID (object instantiation)"""
|
||||
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
|
||||
self.assertEqual(repo.id, "feac266f-84d2-41bc-839b-736925a85eaa")
|
||||
self.assertEqual(repo.name, "ado-auth-lab")
|
||||
|
||||
def test_03(self):
|
||||
"""Getting a specific repository by name using project indexing (object retrieval)"""
|
||||
project = Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
||||
repo = project["ado-auth-lab"]
|
||||
self.assertEqual(repo.id, "feac266f-84d2-41bc-839b-736925a85eaa")
|
||||
self.assertEqual(repo.name, "ado-auth-lab")
|
||||
|
||||
def test_04(self):
|
||||
"""Getting a specific repository by ID using project indexing (object retrieval)"""
|
||||
project = Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
||||
repo = project["feac266f-84d2-41bc-839b-736925a85eaa"]
|
||||
self.assertEqual(repo.id, "feac266f-84d2-41bc-839b-736925a85eaa")
|
||||
self.assertEqual(repo.name, "ado-auth-lab")
|
||||
|
||||
class Test03(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.org = Organization("https://dev.azure.com/mcovsandbox", token=token)
|
||||
|
||||
def test_01(self):
|
||||
"""Getting details of a specific item in a repository"""
|
||||
item = Item(Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa"), path="/generate-pat.py")
|
||||
self.assertEqual(item.path, "/generate-pat.py")
|
||||
self.assertIsNotNone(item.commit_id)
|
||||
|
||||
def test_02(self):
|
||||
"""Listing items in a folder within a repository"""
|
||||
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
|
||||
docs_item = Item(repo, path="/container")
|
||||
children = list(docs_item.children)
|
||||
self.assertGreater(len(children), 0)
|
||||
|
||||
def test_03(self):
|
||||
"""Getting a specific item from a repository using indexing"""
|
||||
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
|
||||
item = repo["/container"]
|
||||
self.assertEqual(item.path, "/container")
|
||||
self.assertTrue(item.is_folder)
|
||||
|
||||
def test_04(self):
|
||||
"""Getting a specific item from a repository using indexing"""
|
||||
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
|
||||
item = repo["/generate-pat.py"]
|
||||
self.assertEqual(item.path, "/generate-pat.py")
|
||||
self.assertFalse(item.is_folder)
|
||||
|
||||
def test_05(self):
|
||||
"""Attempting to get a non-existent item from a repository using indexing"""
|
||||
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
|
||||
with self.assertRaises(KeyError):
|
||||
repo["/non-existent-file.txt"] # This file does not exist
|
||||
|
||||
class Test04(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.org = Organization("https://dev.azure.com/mcovsandbox", token=token)
|
||||
|
||||
def test_01(self):
|
||||
"""Getting details of a specific item in a repository"""
|
||||
item = Item(Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa"), path="/generate-pat.py")
|
||||
self.assertEqual(item.path, "/generate-pat.py")
|
||||
self.assertIsNotNone(item.commit_id)
|
||||
|
||||
def test_02(self):
|
||||
"""Trying to instantiate Item for a item that does not exist"""
|
||||
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
|
||||
with self.assertRaises(requests.exceptions.HTTPError):
|
||||
item = Item(repo, path="/non-existent-file.txt")
|
||||
self.assertEqual(item.path, "/non-existent-file.txt")
|
||||
commit_id = item.commit_id # This will raise HTTPError when trying to fetch details of a non-existent item
|
||||
|
||||
def test_03(self):
|
||||
"""Listing items in a folder within a repository"""
|
||||
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
|
||||
item = Item(repo, path="/container")
|
||||
children = list(item.children)
|
||||
self.assertGreater(len(children), 0)
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user