Compare commits

...

31 Commits

Author SHA1 Message Date
d3344b8799 Add script to create Python virtual environment 2025-11-04 09:14:10 +01:00
05c9e5184c Implemented devops decorator and a get method that retrieves object properties by a key.
All checks were successful
/ unit-tests (push) Successful in 10s
2025-11-04 08:37:35 +01:00
9748230745 Added a new debug configuration for debugging
All checks were successful
/ unit-tests (push) Successful in 1m2s
the current file.
2025-11-04 07:55:34 +01:00
8c0a92a8b7 Remove unused import of DefaultAzureCredential from devops.py 2025-11-04 07:55:27 +01:00
31e1b88cd1 Updated authentication scenarios.
All checks were successful
/ unit-tests (push) Successful in 9s
2025-11-03 21:24:25 +01:00
678b6161cc Added PEM read/write functions.
All checks were successful
/ unit-tests (push) Successful in 11s
2025-11-03 20:42:42 +01:00
b7608dcdf8 Moved devops package to sk module.
All checks were successful
/ unit-tests (push) Successful in 9s
2025-11-03 14:52:20 +01:00
f797cd098d Added a prototype authentication function using client certificate.
All checks were successful
/ unit-tests (push) Successful in 11s
2025-11-03 14:49:52 +01:00
2f2cb1c337 Add minimal authentication package for Azure using client credentials
All checks were successful
/ unit-tests (push) Successful in 11s
2025-11-03 13:41:21 +01:00
ddab4df55f Add function to create self-signed certificates with PEM format 2025-11-03 13:37:34 +01:00
6e757dd0b8 Update .gitignore to include additional shell secrets and certificate files 2025-11-03 13:37:25 +01:00
07c662b1e8 Fix badge link in README.md to point to the correct unit tests workflow 2025-11-03 10:36:34 +01:00
6c8bcc775f Renamed unit-tests workflow.
All checks were successful
/ unit-tests (push) Successful in 11s
2025-11-03 10:35:28 +01:00
32ebcf74b0 Wrapped the badge around a link to the workflow. 2025-11-03 10:33:54 +01:00
1caf0f3069 Fix formatting in README.md for project description 2025-11-03 09:31:11 +01:00
9acb223ef1 Added a badge. 2025-11-03 09:29:05 +01:00
9ef54860a5 Updated test workflow. Narrowed trigger conditions.
All checks were successful
/ unit-tests (push) Successful in 12s
2025-11-03 08:50:26 +01:00
77ec8354c8 Added MIT license.
All checks were successful
/ unit-tests (push) Successful in 10s
2025-11-03 08:48:29 +01:00
da11714629 Added Azure authentication environment variables.
All checks were successful
/ unit-tests (push) Successful in 10s
2025-11-03 08:15:03 +01:00
f640db21e3 Fix: wrong workflow directory name.
Some checks failed
/ unit-tests (push) Failing after 55s
2025-11-03 07:42:45 +01:00
f7bd2136ee Added automatic unit testing. 2025-11-03 07:40:10 +01:00
9f418332ba Added missing item and children listing tests. 2025-11-03 07:39:56 +01:00
8535648c3d Implemented item indexer on Repository class. 2025-11-03 02:37:40 +01:00
f43564d019 Added finding projects by id and name using an indexer. 2025-11-03 01:55:43 +01:00
6bc913d43e Add prototype script pattern to .gitignore 2025-11-03 01:31:01 +01:00
b430721c05 Renamed tests and added human readable descriptions. 2025-11-03 01:29:59 +01:00
be2a6870c6 Reorganized test setup to avoid repeated authentication during tests. 2025-11-03 01:07:26 +01:00
54709dd281 Implemented unittests. 2025-11-03 00:57:37 +01:00
2f1e1a583f Added listing descendant items. 2025-11-03 00:09:05 +01:00
53e1ae186e Renamed _get_attributes to _get and implemented for Item. 2025-11-03 00:00:38 +01:00
d8674462df Enhance output formatting for project and repository listings 2025-11-02 23:41:22 +01:00
11 changed files with 625 additions and 57 deletions

View File

@@ -0,0 +1,41 @@
on:
push:
branches:
- main
paths:
# Run tests only if Python files change or this workflow file changes
- '.gitea/workflows/run-unit-tests.yml'
- '**/*.py'
jobs:
unit-tests:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v5
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: '3.12'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run tests
run: |
./tests.py
env:
AZURE_CLIENT_ID: ${{ vars.AZURE_CLIENT_ID }}
AZURE_TENANT_ID: ${{ vars.AZURE_TENANT_ID }}
AZURE_CLIENT_SECRET: ${{ secrets.AZURE_CLIENT_SECRET }}
# AI: generated... check validity
# - name: Upload test results
# if: always()
# uses: actions/upload-artifact@v2
# with:
# name: test-results
# path: tests/test_results.xml

11
.gitignore vendored
View File

@@ -4,3 +4,14 @@ __pycache__/
# Ignore sample JSON files # Ignore sample JSON files
*.sample.json *.sample.json
# Ignore prototype scripts
prototype_*.py
# Shell secrets
*.secret
# Certificate files
*.pem
*.key
*.crt

7
.vscode/launch.json vendored
View File

@@ -10,6 +10,13 @@
"request": "launch", "request": "launch",
"program": "${workspaceFolder}/harvester.py", "program": "${workspaceFolder}/harvester.py",
"console": "integratedTerminal" "console": "integratedTerminal"
},
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
} }
] ]
} }

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Slawomir Koszewski (slawek@koszewscy.waw.pl)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -1,3 +1,5 @@
# Markdown Docs Harvester # Markdown Docs Harvester
[![Unit Tests](https://gitea.koszewscy.waw.pl/slawek/docs-harvester/actions/workflows/unit-tests.yml/badge.svg)](https://gitea.koszewscy.waw.pl/slawek/docs-harvester/actions?workflow=unit-tests.yml)
This project is designed to harvest and process Markdown documentation files from Git repositories. This project is designed to harvest and process Markdown documentation files from Git repositories.

View File

@@ -1,31 +1,8 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from devops import Organization, Project, Repository, DEVOPS_SCOPE from sk.devops import Organization
from azure.identity import DefaultAzureCredential org = Organization("https://dev.azure.com/mcovsandbox")
from json import dumps
org = Organization("https://dev.azure.com/mcovsandbox", DefaultAzureCredential().get_token(DEVOPS_SCOPE).token) # print(org.projects["bafe0cf1-6c97-4088-864a-ea6dc02b2727"].repositories["feac266f-84d2-41bc-839b-736925a85eaa"].items["/generate-pat.py"])
print(org["ADO Sandbox"]["ado-auth-lab"]["/container"].url)
# Listing projects in the organization print(org["ADO Sandbox"]["ado-auth-lab"]["/generate-pat.py"].url)
print("Projects in the organization:")
for project in org.projects:
print(f"- {project.name} (ID: {project.id})")
ado_sandbox = Project(org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
print(f"Listing repositories in project: {ado_sandbox.name}")
for repo in ado_sandbox.repositories:
print(f"- {repo.name} (ID: {repo.id})")
repo = Repository(ado_sandbox, id="ado-auth-lab")
print(str(repo))
print(f"Repository name: {repo.name} and URL: {repo.web_url}")
repo = Repository(ado_sandbox, id="feac266f-84d2-41bc-839b-736925a85eaa")
print(str(repo))
print(f"Repository name: {repo.name} and URL: {repo.web_url}")
print("Listing items in the repository:")
for item in repo.items:
print(f"{item.path} ({item.git_object_type}): {item.commit_id}")

8
make-venv.sh Executable file
View File

@@ -0,0 +1,8 @@
#! /usr/bin/env bash
python3 -m venv .venv
./.venv/bin/pip install --upgrade pip
./.venv/bin/pip install -r requirements.txt
echo "Add the following alias to your shell configuration file (e.g., .bashrc or .zshrc):"
echo "alias v_env='test -d $(pwd)/.venv && . $(pwd)/.venv/bin/activate'"

130
sk/azure.py Normal file
View File

@@ -0,0 +1,130 @@
"""
Minimal Authentication package for Azure.
Uses client credentials - a secret or a certificate.
"""
import os, requests
import re
import jwt, uuid, time
from cryptography.hazmat.primitives import serialization
from cryptography import x509
import hashlib
import base64
DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
def get_token(
tenant_id: str | None = None,
client_id: str | None = None,
client_secret: str | None = None,
pem_path: str | None = None
) -> str:
"""
Obtain a token for DevOps using DefaultAzureCredential.
"""
try:
if tenant_id and client_id and client_secret:
from azure.identity import ClientSecretCredential
return ClientSecretCredential(
tenant_id=tenant_id,
client_id=client_id,
client_secret=client_secret
).get_token(DEVOPS_SCOPE).token
elif tenant_id and client_id and pem_path:
from azure.identity import CertificateCredential
return CertificateCredential(
tenant_id=tenant_id,
client_id=client_id,
certificate_path=pem_path
).get_token(DEVOPS_SCOPE).token
else:
from azure.identity import DefaultAzureCredential
return DefaultAzureCredential().get_token(DEVOPS_SCOPE).token
except ImportError:
if tenant_id and client_id and client_secret:
return secret_credentials_auth(
tenant_id=tenant_id,
client_id=client_id,
client_secret=client_secret
)
elif tenant_id and client_id and pem_path:
return certificate_credentials_auth(
tenant_id=tenant_id,
client_id=client_id,
pem_path=pem_path
)
else:
raise ValueError("Either client_secret or pem_path must be provided, if no azure-identity package is installed.")
def secret_credentials_auth(
scope: str = DEVOPS_SCOPE,
tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
client_secret: str = os.environ.get("AZURE_CLIENT_SECRET")
) -> str:
"""
Authenticate using client credentials. Pass credentials via environment variables,
or directly as function parameters.
"""
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
r = requests.get(token_url, data={
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"scope": scope
})
r.raise_for_status()
return r.json().get("access_token", "")
def certificate_credentials_auth(
scope: str = DEVOPS_SCOPE,
tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
pem_path: str = os.environ.get("AZURE_CLIENT_CERTIFICATE_PATH", "")
) -> str:
"""
Authenticate using client credentials with a certificate.
Pass credentials via environment variables, or directly as function parameters.
"""
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
# Wczytaj klucz prywatny (RSA)
with open(pem_path, "rb") as f:
pem = f.read()
key_pem = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0)
cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0)
private_key = serialization.load_pem_private_key(key_pem, password=None)
cert = x509.load_pem_x509_certificate(cert_pem)
der = cert.public_bytes(serialization.Encoding.DER)
sha1 = hashlib.sha1(der).digest()
x5t = base64.urlsafe_b64encode(sha1).rstrip(b"=").decode("ascii")
# Stwórz client_assertion JWT
now = int(time.time())
claims = {
"iss": client_id,
"sub": client_id,
"aud": token_url,
"jti": str(uuid.uuid4()),
"iat": now,
"exp": now + 600,
}
headers = {"x5t": x5t, "kid": x5t}
assertion = jwt.encode(claims, private_key, algorithm="RS256", headers=headers)
data = {
"grant_type": "client_credentials",
"client_id": client_id,
"scope": scope,
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": assertion,
}
r = requests.post(token_url, data=data)
r.raise_for_status()
return r.json().get("access_token")

161
sk/certificates.py Normal file
View File

@@ -0,0 +1,161 @@
import datetime
from io import BufferedWriter
import re
import cryptography.x509 as x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes
import pathlib
def read_private_key(pem_path: str) -> PrivateKeyTypes:
"""
Read a PEM file and extract the private key in bytes.
"""
with open(pem_path, "rb") as f:
pem = f.read()
key_pem = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0)
key = serialization.load_pem_private_key(key_pem, password=None)
return key
def read_public_certificate(pem_path: str) -> any:
"""
Read a PEM file and extract the public certificate.
"""
with open(pem_path, "rb") as f:
pem = f.read()
cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0)
cert = x509.load_pem_x509_certificate(cert_pem)
return cert
def _write_pem_key(
writer: BufferedWriter,
key: PrivateKeyTypes,
encoding: serialization.Encoding = serialization.Encoding.PEM,
format: serialization.PrivateFormat = serialization.PrivateFormat.PKCS8,
password: str | None = None
):
"""
Write a PEM encoded private key to the given writer.
Allows optional password protection.
"""
if password:
encryption_algorithm = serialization.BestAvailableEncryption(password.encode())
else:
encryption_algorithm = serialization.NoEncryption()
key_pem = key.private_bytes(
encoding=encoding,
format=format,
encryption_algorithm=encryption_algorithm,
)
writer.write(key_pem)
def _write_pem_cert(
writer: BufferedWriter,
cert: x509.Certificate,
encoding: serialization.Encoding = serialization.Encoding.PEM
):
cert_pem = cert.public_bytes(encoding=encoding)
writer.write(cert_pem)
def write_pem_file(
pem_file: pathlib.Path,
cert: x509.Certificate | None = None,
key: PrivateKeyTypes | None = None,
encoding: serialization.Encoding = serialization.Encoding.PEM,
key_serialization_format: serialization.PrivateFormat = serialization.PrivateFormat.PKCS8,
password: str | None = None
):
"""
Write the certificate and/or private key to a PEM file.
"""
with open(pem_file, "wb") as f:
if cert:
_write_pem_cert(
f,
cert,
encoding=encoding
)
if key:
_write_pem_key(
f,
key,
encoding=encoding,
password=password,
format=key_serialization_format
)
def create_self_signed_certificate(
file_path: str,
subject_name: str,
organization_name: str,
country_name: str,
valid_days: int = 365,
key_size: int = 2048
):
"""
Create a self-signed certificate. It saves the certificate and private key
in PEM format to the specified file path. Three files are created:
- <file_path>.crt : The public certificate
- <file_path>.key : The private key
- <file_path>.pem : The certificate and private key combined in one file
:param file_path: Base file path to save the certificate and key.
:param subject_name: Common Name (CN) for the certificate.
:param organization_name: Organization Name (O) for the certificate.
:param country_name: Country Name (C) for the certificate.
:param valid_days: Number of days the certificate is valid for.
:param key_size: Size of the RSA key.
Use the following command to replace any credentials already defined for the
App Registration with that certificate:
az ad app credential reset --id <CLIENT_ID> --cert @<file_path>.crt
Use --append to add the certificate without removing existing credentials.
> Note: Do not upload the private key file (.key) nor the combined PEM file (.pem).
"""
key = rsa.generate_private_key(public_exponent=65537, key_size=key_size)
subject = issuer = x509.Name([
x509.oid.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, country_name),
x509.oid.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME, organization_name),
x509.oid.NameAttribute(x509.oid.NameOID.COMMON_NAME, subject_name),
])
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=1))
.not_valid_after(datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=valid_days))
.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True)
.sign(key, hashes.SHA256())
)
crt_path = pathlib.Path(file_path).with_suffix('.crt')
key_path = pathlib.Path(file_path).with_suffix('.key')
pem_path = pathlib.Path(file_path).with_suffix('.pem')
public_key = cert.public_bytes(serialization.Encoding.PEM)
private_key = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
# Write the certificate
write_pem_file(crt_path, cert=cert)
# Write the private key
write_pem_file(key_path, key=key)
# Write both to a combined PEM file
write_pem_file(pem_path, cert=cert, key=key)
print(f"Certificate created and saved.")
print(f" - Certificate: {crt_path}")
print(f" - Private Key: {key_path}")
print(f" - PEM File: {pem_path}")

View File

@@ -1,12 +1,14 @@
from __future__ import annotations
import requests import requests
import urllib.parse import urllib.parse
from uuid import UUID from uuid import UUID
from string import Template
DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default" DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
DEVOPS_API_VERSION = "7.1" DEVOPS_API_VERSION = "7.1"
# Define a class decorator # Define a class decorator
def auto_properties(mapping: dict[str,str] | None = None): def auto_properties(mapping: dict[str,str]):
def make_property(name: str): def make_property(name: str):
private_var = f"_{name}" private_var = f"_{name}"
@@ -20,7 +22,7 @@ def auto_properties(mapping: dict[str,str] | None = None):
pass pass
# Fetch repository details from the API if it is set to None or not existing # Fetch repository details from the API if it is set to None or not existing
self._get_attributes() self._get()
return getattr(self, private_var) return getattr(self, private_var)
return property(fget=getter) return property(fget=getter)
@@ -52,6 +54,15 @@ def auto_properties(mapping: dict[str,str] | None = None):
return cls return cls
return decorator return decorator
def devops(key: str, get_url: str, list_url: str = None, params: dict = {}):
def decorator(cls):
cls.__entity_key__ = key
cls.__entity_get_url__ = get_url # Use $key in the URL
cls.__entity_list_url__ = list_url # Use $key in the URL
cls.__entity_params__ = params
return cls
return decorator
class DevOps(): class DevOps():
"""Base class for DevOps entities.""" """Base class for DevOps entities."""
@@ -76,10 +87,27 @@ class DevOps():
r.raise_for_status() # Ensure we raise an error for bad responses r.raise_for_status() # Ensure we raise an error for bad responses
return r return r
def _get_entity_attributes(self, key_name: str, get_url: str, params: dict = {}) -> object: def _get(self, key: str):
if not hasattr(self.__class__, "__entity_key__") or not hasattr(self.__class__, "__entity_get_url__"):
raise NotImplementedError("Called _get on a class that has not been decorated with @devops.")
setattr(self, f"_{self.__class__.__entity_key__}", key) # Set the entity key
# Build the URL
url = Template(self.__class__.__entity_get_url__).substitute(key=key)
# Build parameters with key substituted
params = {}
if hasattr(self.__class__, "__entity_params__"):
params = {k: Template(v).substitute(key=key) for k, v in self.__class__.__entity_params__.items()}
# Fetch the object data from the URL
r = self._get_url_path(url, params=params)
# Populate attributes
self.from_json(r.json())
def _get_entity(self, key_name: str, get_url: str, params: dict = {}):
""" """
Each entity class can use this method to populate its attributes, by defining Each entity class can use this method to populate its attributes, by defining
its own _get_attributes method that calls this one with the key name, its own _get method that calls this one with the key name,
and the URL. and the URL.
""" """
r = self._get_url_path(get_url, params=params) # Fetch the object data from the URL r = self._get_url_path(get_url, params=params) # Fetch the object data from the URL
@@ -105,24 +133,33 @@ class DevOps():
return entities_list return entities_list
class Organization(DevOps): class Organization(DevOps):
def __init__(self, org_url: str, token: str, api_version: str = DEVOPS_API_VERSION): def __init__(self, org_url: str, token: str | None = None, api_version: str = DEVOPS_API_VERSION):
super().__init__(org_url, token, api_version) super().__init__(org_url, token, api_version)
@property @property
def projects(self): def projects(self):
return self._entities( if not hasattr(self, "_projects"):
self._projects = self._entities(
entity_class=Project, entity_class=Project,
key_name="id", key_name="id",
list_url="_apis/projects") list_url="_apis/projects")
return self._projects
def __getitem__(self, key: str) -> Project:
for project in self.projects:
if project.id == key or project.name == key:
return project
raise KeyError(f"Project with ID or name '{key}' not found.")
@auto_properties({ @auto_properties({
"name": "name", "name": "name",
"url": "url", "url": "url",
"description": "description" "description": "description"
}) })
@devops("id", "_apis/projects/$key", "_apis/projects")
class Project(DevOps): class Project(DevOps):
def _get_attributes(self): def _get(self):
self._get_entity_attributes( self._get_entity(
key_name="id", key_name="id",
get_url=f"_apis/projects/{self._id}" get_url=f"_apis/projects/{self._id}"
) )
@@ -146,10 +183,18 @@ class Project(DevOps):
@property @property
def repositories(self): def repositories(self):
return self._entities( if not hasattr(self, "_repositories"):
self._repositories = self._entities(
entity_class=Repository, entity_class=Repository,
key_name="id", key_name="id",
list_url=f"{self._id}/_apis/git/repositories") list_url=f"{self._id}/_apis/git/repositories")
return self._repositories
def __getitem__(self, key: str) -> Repository:
for repo in self.repositories:
if repo.id == key or repo.name == key:
return repo
raise KeyError(f"Repository with ID or name '{key}' not found.")
@auto_properties({ @auto_properties({
"name": "name", "name": "name",
@@ -162,8 +207,8 @@ class Project(DevOps):
"web_url": "webUrl" "web_url": "webUrl"
}) })
class Repository(DevOps): class Repository(DevOps):
def _get_attributes(self): def _get(self):
self._get_entity_attributes( self._get_entity(
key_name="id", key_name="id",
get_url=f"{self._project.id}/_apis/git/repositories/{self._id}" get_url=f"{self._project.id}/_apis/git/repositories/{self._id}"
) )
@@ -177,7 +222,7 @@ class Repository(DevOps):
UUID(id) # Check if it's a valid UUID UUID(id) # Check if it's a valid UUID
except ValueError: except ValueError:
# Called with a repository name, fetch by name # Called with a repository name, fetch by name
self._get_attributes() self._get()
if kwargs: if kwargs:
raise ValueError("Automatic properties cannot be set when retrieving by name.") raise ValueError("Automatic properties cannot be set when retrieving by name.")
return return
@@ -189,13 +234,17 @@ class Repository(DevOps):
def id(self): def id(self):
return self._id return self._id
@property
def project(self):
return self._project
def __str__(self): def __str__(self):
return f"Repository(name={self.name}, id={self._id})" return f"Repository(name={self.name}, id={self._id})"
@property @property
def items(self): def items(self):
# GET https://dev.azure.com/{organization}/{project}/_apis/git/repositories/{repositoryId}/items?api-version=7.1 if not hasattr(self, "_items"):
return self._entities( self._items = self._entities(
entity_class=Item, entity_class=Item,
key_name="path", key_name="path",
list_url=f"{self._project.id}/_apis/git/repositories/{self._id}/items", list_url=f"{self._project.id}/_apis/git/repositories/{self._id}/items",
@@ -204,6 +253,13 @@ class Repository(DevOps):
"recursionLevel": "oneLevel" "recursionLevel": "oneLevel"
} }
) )
return self._items
def __getitem__(self, key: str) -> Item:
for item in self.items:
if item.path == key:
return item
raise KeyError(f"Item with path '{key}' not found.")
@auto_properties({ @auto_properties({
"object_id": "objectId", "object_id": "objectId",
@@ -219,6 +275,33 @@ class Item(DevOps):
self._path = path self._path = path
self.set_auto_properties(**kwargs) # set properties defined in decorator self.set_auto_properties(**kwargs) # set properties defined in decorator
def _get(self):
self._get_entity(
key_name="path",
get_url=f"{self._repository._project.id}/_apis/git/repositories/{self._repository.id}/items",
params={
"path": self._path,
"$format": "json",
"recursionLevel": "none"
}
)
@property @property
def path(self): def path(self):
return self._path return self._path
@property
def children(self):
"""List items under this item if it is a folder."""
if self.git_object_type == "tree":
return self._entities(
entity_class=Item,
key_name="path",
list_url=f"{self._repository._project.id}/_apis/git/repositories/{self._repository.id}/items",
params={
"scopePath": self._path,
"recursionLevel": "oneLevel"
}
)
else:
raise ValueError("Items can only be listed for folder items.")

127
tests.py Executable file
View File

@@ -0,0 +1,127 @@
#!/usr/bin/env python3
import unittest
import requests
from sk.devops import Organization, Repository, Project, Item
from sk.azure import get_token
# Get the token outside the test class to speed up tests.
# Each Unit test instantinates the class, so doing it here avoids repeated authentication.
token = get_token()
class Test01(unittest.TestCase):
def setUp(self):
self.org = Organization("https://dev.azure.com/mcovsandbox", token=token)
def test_01(self):
"""Listing projects in the organization"""
self.assertGreater(len(list(self.org.projects)), 0)
def test_02(self):
"""Getting a specific project by ID (object instantiation)"""
project = Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
self.assertEqual(project.id, "bafe0cf1-6c97-4088-864a-ea6dc02b2727")
self.assertEqual(project.name, "ADO Sandbox")
def test_03(self):
"""Getting a specific project by name using org indexing (object retrieval)"""
project = self.org["ADO Sandbox"]
self.assertEqual(project.id, "bafe0cf1-6c97-4088-864a-ea6dc02b2727")
self.assertEqual(project.name, "ADO Sandbox")
def test_04(self):
"""Getting a specific project by ID using org indexing (object retrieval)"""
project = self.org["bafe0cf1-6c97-4088-864a-ea6dc02b2727"]
self.assertEqual(project.id, "bafe0cf1-6c97-4088-864a-ea6dc02b2727")
self.assertEqual(project.name, "ADO Sandbox")
class Test02(unittest.TestCase):
def setUp(self):
self.org = Organization("https://dev.azure.com/mcovsandbox", token=token)
def test_01(self):
"""Listing repositories in a project"""
project = Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
repos = list(project.repositories)
self.assertGreater(len(repos), 0)
def test_02(self):
"""Getting a specific repository by ID (object instantiation)"""
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
self.assertEqual(repo.id, "feac266f-84d2-41bc-839b-736925a85eaa")
self.assertEqual(repo.name, "ado-auth-lab")
def test_03(self):
"""Getting a specific repository by name using project indexing (object retrieval)"""
project = Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
repo = project["ado-auth-lab"]
self.assertEqual(repo.id, "feac266f-84d2-41bc-839b-736925a85eaa")
self.assertEqual(repo.name, "ado-auth-lab")
def test_04(self):
"""Getting a specific repository by ID using project indexing (object retrieval)"""
project = Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
repo = project["feac266f-84d2-41bc-839b-736925a85eaa"]
self.assertEqual(repo.id, "feac266f-84d2-41bc-839b-736925a85eaa")
self.assertEqual(repo.name, "ado-auth-lab")
class Test03(unittest.TestCase):
def setUp(self):
self.org = Organization("https://dev.azure.com/mcovsandbox", token=token)
def test_01(self):
"""Getting details of a specific item in a repository"""
item = Item(Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa"), path="/generate-pat.py")
self.assertEqual(item.path, "/generate-pat.py")
self.assertIsNotNone(item.commit_id)
def test_02(self):
"""Listing items in a folder within a repository"""
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
docs_item = Item(repo, path="/container")
children = list(docs_item.children)
self.assertGreater(len(children), 0)
def test_03(self):
"""Getting a specific item from a repository using indexing"""
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
item = repo["/container"]
self.assertEqual(item.path, "/container")
self.assertTrue(item.is_folder)
def test_04(self):
"""Getting a specific item from a repository using indexing"""
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
item = repo["/generate-pat.py"]
self.assertEqual(item.path, "/generate-pat.py")
self.assertFalse(item.is_folder)
def test_05(self):
"""Attempting to get a non-existent item from a repository using indexing"""
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
with self.assertRaises(KeyError):
repo["/non-existent-file.txt"] # This file does not exist
class Test04(unittest.TestCase):
def setUp(self):
self.org = Organization("https://dev.azure.com/mcovsandbox", token=token)
def test_01(self):
"""Getting details of a specific item in a repository"""
item = Item(Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa"), path="/generate-pat.py")
self.assertEqual(item.path, "/generate-pat.py")
self.assertIsNotNone(item.commit_id)
def test_02(self):
"""Trying to instantiate Item for a item that does not exist"""
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
with self.assertRaises(requests.exceptions.HTTPError):
item = Item(repo, path="/non-existent-file.txt")
self.assertEqual(item.path, "/non-existent-file.txt")
commit_id = item.commit_id # This will raise HTTPError when trying to fetch details of a non-existent item
def test_03(self):
"""Listing items in a folder within a repository"""
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
item = Item(repo, path="/container")
children = list(item.children)
self.assertGreater(len(children), 0)
if __name__ == "__main__":
unittest.main()