Compare commits

..

22 Commits

Author SHA1 Message Date
d3344b8799 Add script to create Python virtual environment 2025-11-04 09:14:10 +01:00
05c9e5184c Implemented devops decorator and a get method that retrieves object properties by a key.
All checks were successful
/ unit-tests (push) Successful in 10s
2025-11-04 08:37:35 +01:00
9748230745 Added a new debug configuration for debugging
All checks were successful
/ unit-tests (push) Successful in 1m2s
the current file.
2025-11-04 07:55:34 +01:00
8c0a92a8b7 Remove unused import of DefaultAzureCredential from devops.py 2025-11-04 07:55:27 +01:00
31e1b88cd1 Updated authentication scenarios.
All checks were successful
/ unit-tests (push) Successful in 9s
2025-11-03 21:24:25 +01:00
678b6161cc Added PEM read/write functions.
All checks were successful
/ unit-tests (push) Successful in 11s
2025-11-03 20:42:42 +01:00
b7608dcdf8 Moved devops package to sk module.
All checks were successful
/ unit-tests (push) Successful in 9s
2025-11-03 14:52:20 +01:00
f797cd098d Added a prototype authentication function using client certificate.
All checks were successful
/ unit-tests (push) Successful in 11s
2025-11-03 14:49:52 +01:00
2f2cb1c337 Add minimal authentication package for Azure using client credentials
All checks were successful
/ unit-tests (push) Successful in 11s
2025-11-03 13:41:21 +01:00
ddab4df55f Add function to create self-signed certificates with PEM format 2025-11-03 13:37:34 +01:00
6e757dd0b8 Update .gitignore to include additional shell secrets and certificate files 2025-11-03 13:37:25 +01:00
07c662b1e8 Fix badge link in README.md to point to the correct unit tests workflow 2025-11-03 10:36:34 +01:00
6c8bcc775f Renamed unit-tests workflow.
All checks were successful
/ unit-tests (push) Successful in 11s
2025-11-03 10:35:28 +01:00
32ebcf74b0 Wrapped the badge around a link to the workflow. 2025-11-03 10:33:54 +01:00
1caf0f3069 Fix formatting in README.md for project description 2025-11-03 09:31:11 +01:00
9acb223ef1 Added a badge. 2025-11-03 09:29:05 +01:00
9ef54860a5 Updated test workflow. Narrowed trigger conditions.
All checks were successful
/ unit-tests (push) Successful in 12s
2025-11-03 08:50:26 +01:00
77ec8354c8 Added MIT license.
All checks were successful
/ unit-tests (push) Successful in 10s
2025-11-03 08:48:29 +01:00
da11714629 Added Azure authentication environment variables.
All checks were successful
/ unit-tests (push) Successful in 10s
2025-11-03 08:15:03 +01:00
f640db21e3 Fix: wrong workflow directory name.
Some checks failed
/ unit-tests (push) Failing after 55s
2025-11-03 07:42:45 +01:00
f7bd2136ee Added automatic unit testing. 2025-11-03 07:40:10 +01:00
9f418332ba Added missing item and children listing tests. 2025-11-03 07:39:56 +01:00
11 changed files with 423 additions and 11 deletions

View File

@@ -0,0 +1,41 @@
on:
push:
branches:
- main
paths:
# Run tests only if Python files change or this workflow file changes
- '.gitea/workflows/run-unit-tests.yml'
- '**/*.py'
jobs:
unit-tests:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v5
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: '3.12'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run tests
run: |
./tests.py
env:
AZURE_CLIENT_ID: ${{ vars.AZURE_CLIENT_ID }}
AZURE_TENANT_ID: ${{ vars.AZURE_TENANT_ID }}
AZURE_CLIENT_SECRET: ${{ secrets.AZURE_CLIENT_SECRET }}
# AI: generated... check validity
# - name: Upload test results
# if: always()
# uses: actions/upload-artifact@v2
# with:
# name: test-results
# path: tests/test_results.xml

8
.gitignore vendored
View File

@@ -7,3 +7,11 @@ __pycache__/
# Ignore prototype scripts # Ignore prototype scripts
prototype_*.py prototype_*.py
# Shell secrets
*.secret
# Certificate files
*.pem
*.key
*.crt

7
.vscode/launch.json vendored
View File

@@ -10,6 +10,13 @@
"request": "launch", "request": "launch",
"program": "${workspaceFolder}/harvester.py", "program": "${workspaceFolder}/harvester.py",
"console": "integratedTerminal" "console": "integratedTerminal"
},
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
} }
] ]
} }

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Slawomir Koszewski (slawek@koszewscy.waw.pl)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -1,3 +1,5 @@
# Markdown Docs Harvester # Markdown Docs Harvester
[![Unit Tests](https://gitea.koszewscy.waw.pl/slawek/docs-harvester/actions/workflows/unit-tests.yml/badge.svg)](https://gitea.koszewscy.waw.pl/slawek/docs-harvester/actions?workflow=unit-tests.yml)
This project is designed to harvest and process Markdown documentation files from Git repositories. This project is designed to harvest and process Markdown documentation files from Git repositories.

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from devops import Organization from sk.devops import Organization
org = Organization("https://dev.azure.com/mcovsandbox") org = Organization("https://dev.azure.com/mcovsandbox")
# print(org.projects["bafe0cf1-6c97-4088-864a-ea6dc02b2727"].repositories["feac266f-84d2-41bc-839b-736925a85eaa"].items["/generate-pat.py"]) # print(org.projects["bafe0cf1-6c97-4088-864a-ea6dc02b2727"].repositories["feac266f-84d2-41bc-839b-736925a85eaa"].items["/generate-pat.py"])

8
make-venv.sh Executable file
View File

@@ -0,0 +1,8 @@
#! /usr/bin/env bash
python3 -m venv .venv
./.venv/bin/pip install --upgrade pip
./.venv/bin/pip install -r requirements.txt
echo "Add the following alias to your shell configuration file (e.g., .bashrc or .zshrc):"
echo "alias v_env='test -d $(pwd)/.venv && . $(pwd)/.venv/bin/activate'"

130
sk/azure.py Normal file
View File

@@ -0,0 +1,130 @@
"""
Minimal Authentication package for Azure.
Uses client credentials - a secret or a certificate.
"""
import os, requests
import re
import jwt, uuid, time
from cryptography.hazmat.primitives import serialization
from cryptography import x509
import hashlib
import base64
DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
def get_token(
tenant_id: str | None = None,
client_id: str | None = None,
client_secret: str | None = None,
pem_path: str | None = None
) -> str:
"""
Obtain a token for DevOps using DefaultAzureCredential.
"""
try:
if tenant_id and client_id and client_secret:
from azure.identity import ClientSecretCredential
return ClientSecretCredential(
tenant_id=tenant_id,
client_id=client_id,
client_secret=client_secret
).get_token(DEVOPS_SCOPE).token
elif tenant_id and client_id and pem_path:
from azure.identity import CertificateCredential
return CertificateCredential(
tenant_id=tenant_id,
client_id=client_id,
certificate_path=pem_path
).get_token(DEVOPS_SCOPE).token
else:
from azure.identity import DefaultAzureCredential
return DefaultAzureCredential().get_token(DEVOPS_SCOPE).token
except ImportError:
if tenant_id and client_id and client_secret:
return secret_credentials_auth(
tenant_id=tenant_id,
client_id=client_id,
client_secret=client_secret
)
elif tenant_id and client_id and pem_path:
return certificate_credentials_auth(
tenant_id=tenant_id,
client_id=client_id,
pem_path=pem_path
)
else:
raise ValueError("Either client_secret or pem_path must be provided, if no azure-identity package is installed.")
def secret_credentials_auth(
scope: str = DEVOPS_SCOPE,
tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
client_secret: str = os.environ.get("AZURE_CLIENT_SECRET")
) -> str:
"""
Authenticate using client credentials. Pass credentials via environment variables,
or directly as function parameters.
"""
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
r = requests.get(token_url, data={
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"scope": scope
})
r.raise_for_status()
return r.json().get("access_token", "")
def certificate_credentials_auth(
scope: str = DEVOPS_SCOPE,
tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
pem_path: str = os.environ.get("AZURE_CLIENT_CERTIFICATE_PATH", "")
) -> str:
"""
Authenticate using client credentials with a certificate.
Pass credentials via environment variables, or directly as function parameters.
"""
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
# Wczytaj klucz prywatny (RSA)
with open(pem_path, "rb") as f:
pem = f.read()
key_pem = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0)
cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0)
private_key = serialization.load_pem_private_key(key_pem, password=None)
cert = x509.load_pem_x509_certificate(cert_pem)
der = cert.public_bytes(serialization.Encoding.DER)
sha1 = hashlib.sha1(der).digest()
x5t = base64.urlsafe_b64encode(sha1).rstrip(b"=").decode("ascii")
# Stwórz client_assertion JWT
now = int(time.time())
claims = {
"iss": client_id,
"sub": client_id,
"aud": token_url,
"jti": str(uuid.uuid4()),
"iat": now,
"exp": now + 600,
}
headers = {"x5t": x5t, "kid": x5t}
assertion = jwt.encode(claims, private_key, algorithm="RS256", headers=headers)
data = {
"grant_type": "client_credentials",
"client_id": client_id,
"scope": scope,
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": assertion,
}
r = requests.post(token_url, data=data)
r.raise_for_status()
return r.json().get("access_token")

161
sk/certificates.py Normal file
View File

@@ -0,0 +1,161 @@
import datetime
from io import BufferedWriter
import re
import cryptography.x509 as x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes
import pathlib
def read_private_key(pem_path: str) -> PrivateKeyTypes:
"""
Read a PEM file and extract the private key in bytes.
"""
with open(pem_path, "rb") as f:
pem = f.read()
key_pem = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0)
key = serialization.load_pem_private_key(key_pem, password=None)
return key
def read_public_certificate(pem_path: str) -> any:
"""
Read a PEM file and extract the public certificate.
"""
with open(pem_path, "rb") as f:
pem = f.read()
cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0)
cert = x509.load_pem_x509_certificate(cert_pem)
return cert
def _write_pem_key(
writer: BufferedWriter,
key: PrivateKeyTypes,
encoding: serialization.Encoding = serialization.Encoding.PEM,
format: serialization.PrivateFormat = serialization.PrivateFormat.PKCS8,
password: str | None = None
):
"""
Write a PEM encoded private key to the given writer.
Allows optional password protection.
"""
if password:
encryption_algorithm = serialization.BestAvailableEncryption(password.encode())
else:
encryption_algorithm = serialization.NoEncryption()
key_pem = key.private_bytes(
encoding=encoding,
format=format,
encryption_algorithm=encryption_algorithm,
)
writer.write(key_pem)
def _write_pem_cert(
writer: BufferedWriter,
cert: x509.Certificate,
encoding: serialization.Encoding = serialization.Encoding.PEM
):
cert_pem = cert.public_bytes(encoding=encoding)
writer.write(cert_pem)
def write_pem_file(
pem_file: pathlib.Path,
cert: x509.Certificate | None = None,
key: PrivateKeyTypes | None = None,
encoding: serialization.Encoding = serialization.Encoding.PEM,
key_serialization_format: serialization.PrivateFormat = serialization.PrivateFormat.PKCS8,
password: str | None = None
):
"""
Write the certificate and/or private key to a PEM file.
"""
with open(pem_file, "wb") as f:
if cert:
_write_pem_cert(
f,
cert,
encoding=encoding
)
if key:
_write_pem_key(
f,
key,
encoding=encoding,
password=password,
format=key_serialization_format
)
def create_self_signed_certificate(
file_path: str,
subject_name: str,
organization_name: str,
country_name: str,
valid_days: int = 365,
key_size: int = 2048
):
"""
Create a self-signed certificate. It saves the certificate and private key
in PEM format to the specified file path. Three files are created:
- <file_path>.crt : The public certificate
- <file_path>.key : The private key
- <file_path>.pem : The certificate and private key combined in one file
:param file_path: Base file path to save the certificate and key.
:param subject_name: Common Name (CN) for the certificate.
:param organization_name: Organization Name (O) for the certificate.
:param country_name: Country Name (C) for the certificate.
:param valid_days: Number of days the certificate is valid for.
:param key_size: Size of the RSA key.
Use the following command to replace any credentials already defined for the
App Registration with that certificate:
az ad app credential reset --id <CLIENT_ID> --cert @<file_path>.crt
Use --append to add the certificate without removing existing credentials.
> Note: Do not upload the private key file (.key) nor the combined PEM file (.pem).
"""
key = rsa.generate_private_key(public_exponent=65537, key_size=key_size)
subject = issuer = x509.Name([
x509.oid.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, country_name),
x509.oid.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME, organization_name),
x509.oid.NameAttribute(x509.oid.NameOID.COMMON_NAME, subject_name),
])
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=1))
.not_valid_after(datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=valid_days))
.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True)
.sign(key, hashes.SHA256())
)
crt_path = pathlib.Path(file_path).with_suffix('.crt')
key_path = pathlib.Path(file_path).with_suffix('.key')
pem_path = pathlib.Path(file_path).with_suffix('.pem')
public_key = cert.public_bytes(serialization.Encoding.PEM)
private_key = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
# Write the certificate
write_pem_file(crt_path, cert=cert)
# Write the private key
write_pem_file(key_path, key=key)
# Write both to a combined PEM file
write_pem_file(pem_path, cert=cert, key=key)
print(f"Certificate created and saved.")
print(f" - Certificate: {crt_path}")
print(f" - Private Key: {key_path}")
print(f" - PEM File: {pem_path}")

View File

@@ -2,13 +2,13 @@ from __future__ import annotations
import requests import requests
import urllib.parse import urllib.parse
from uuid import UUID from uuid import UUID
from azure.identity import DefaultAzureCredential from string import Template
DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default" DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
DEVOPS_API_VERSION = "7.1" DEVOPS_API_VERSION = "7.1"
# Define a class decorator # Define a class decorator
def auto_properties(mapping: dict[str,str] | None = None): def auto_properties(mapping: dict[str,str]):
def make_property(name: str): def make_property(name: str):
private_var = f"_{name}" private_var = f"_{name}"
@@ -54,6 +54,15 @@ def auto_properties(mapping: dict[str,str] | None = None):
return cls return cls
return decorator return decorator
def devops(key: str, get_url: str, list_url: str = None, params: dict = {}):
def decorator(cls):
cls.__entity_key__ = key
cls.__entity_get_url__ = get_url # Use $key in the URL
cls.__entity_list_url__ = list_url # Use $key in the URL
cls.__entity_params__ = params
return cls
return decorator
class DevOps(): class DevOps():
"""Base class for DevOps entities.""" """Base class for DevOps entities."""
@@ -78,7 +87,24 @@ class DevOps():
r.raise_for_status() # Ensure we raise an error for bad responses r.raise_for_status() # Ensure we raise an error for bad responses
return r return r
def _get_entity(self, key_name: str, get_url: str, params: dict = {}) -> object: def _get(self, key: str):
if not hasattr(self.__class__, "__entity_key__") or not hasattr(self.__class__, "__entity_get_url__"):
raise NotImplementedError("Called _get on a class that has not been decorated with @devops.")
setattr(self, f"_{self.__class__.__entity_key__}", key) # Set the entity key
# Build the URL
url = Template(self.__class__.__entity_get_url__).substitute(key=key)
# Build parameters with key substituted
params = {}
if hasattr(self.__class__, "__entity_params__"):
params = {k: Template(v).substitute(key=key) for k, v in self.__class__.__entity_params__.items()}
# Fetch the object data from the URL
r = self._get_url_path(url, params=params)
# Populate attributes
self.from_json(r.json())
def _get_entity(self, key_name: str, get_url: str, params: dict = {}):
""" """
Each entity class can use this method to populate its attributes, by defining Each entity class can use this method to populate its attributes, by defining
its own _get method that calls this one with the key name, its own _get method that calls this one with the key name,
@@ -108,8 +134,6 @@ class DevOps():
class Organization(DevOps): class Organization(DevOps):
def __init__(self, org_url: str, token: str | None = None, api_version: str = DEVOPS_API_VERSION): def __init__(self, org_url: str, token: str | None = None, api_version: str = DEVOPS_API_VERSION):
if token is None:
token = DefaultAzureCredential().get_token(DEVOPS_SCOPE).token
super().__init__(org_url, token, api_version) super().__init__(org_url, token, api_version)
@property @property
@@ -132,6 +156,7 @@ class Organization(DevOps):
"url": "url", "url": "url",
"description": "description" "description": "description"
}) })
@devops("id", "_apis/projects/$key", "_apis/projects")
class Project(DevOps): class Project(DevOps):
def _get(self): def _get(self):
self._get_entity( self._get_entity(

View File

@@ -1,11 +1,12 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import unittest import unittest
from azure.identity import DefaultAzureCredential import requests
from devops import DEVOPS_SCOPE, Organization, Repository, Project, Item from sk.devops import Organization, Repository, Project, Item
from sk.azure import get_token
# Get the token outside the test class to speed up tests. # Get the token outside the test class to speed up tests.
# Each Unit test instantinates the class, so doing it here avoids repeated authentication. # Each Unit test instantinates the class, so doing it here avoids repeated authentication.
token = DefaultAzureCredential().get_token(DEVOPS_SCOPE).token token = get_token()
class Test01(unittest.TestCase): class Test01(unittest.TestCase):
def setUp(self): def setUp(self):
@@ -108,10 +109,18 @@ class Test04(unittest.TestCase):
self.assertIsNotNone(item.commit_id) self.assertIsNotNone(item.commit_id)
def test_02(self): def test_02(self):
"""Trying to instantiate Item for a item that does not exist"""
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
with self.assertRaises(requests.exceptions.HTTPError):
item = Item(repo, path="/non-existent-file.txt")
self.assertEqual(item.path, "/non-existent-file.txt")
commit_id = item.commit_id # This will raise HTTPError when trying to fetch details of a non-existent item
def test_03(self):
"""Listing items in a folder within a repository""" """Listing items in a folder within a repository"""
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa") repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
docs_item = Item(repo, path="/container") item = Item(repo, path="/container")
children = list(docs_item.children) children = list(item.children)
self.assertGreater(len(children), 0) self.assertGreater(len(children), 0)
if __name__ == "__main__": if __name__ == "__main__":