Compare commits
	
		
			42 Commits
		
	
	
		
			850bc99aa2
			...
			main
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| d3344b8799 | |||
| 05c9e5184c | |||
| 9748230745 | |||
| 8c0a92a8b7 | |||
| 31e1b88cd1 | |||
| 678b6161cc | |||
| b7608dcdf8 | |||
| f797cd098d | |||
| 2f2cb1c337 | |||
| ddab4df55f | |||
| 6e757dd0b8 | |||
| 07c662b1e8 | |||
| 6c8bcc775f | |||
| 32ebcf74b0 | |||
| 1caf0f3069 | |||
| 9acb223ef1 | |||
| 9ef54860a5 | |||
| 77ec8354c8 | |||
| da11714629 | |||
| f640db21e3 | |||
| f7bd2136ee | |||
| 9f418332ba | |||
| 8535648c3d | |||
| f43564d019 | |||
| 6bc913d43e | |||
| b430721c05 | |||
| be2a6870c6 | |||
| 54709dd281 | |||
| 2f1e1a583f | |||
| 53e1ae186e | |||
| d8674462df | |||
| 17bd314a20 | |||
| b7bb29224f | |||
| 26b192c840 | |||
| 71d83b8d76 | |||
| 837543d44a | |||
| 91b806637c | |||
| 96ccd4d7c7 | |||
| deed075727 | |||
| ce38e275a9 | |||
| 4685103e60 | |||
| 3ce14912e4 | 
							
								
								
									
										41
									
								
								.gitea/workflows/unit-tests.yml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										41
									
								
								.gitea/workflows/unit-tests.yml
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,41 @@
 | 
				
			|||||||
 | 
					on:
 | 
				
			||||||
 | 
					  push:
 | 
				
			||||||
 | 
					    branches:
 | 
				
			||||||
 | 
					      - main
 | 
				
			||||||
 | 
					    paths:
 | 
				
			||||||
 | 
					      # Run tests only if Python files change or this workflow file changes
 | 
				
			||||||
 | 
					      - '.gitea/workflows/run-unit-tests.yml'
 | 
				
			||||||
 | 
					      - '**/*.py'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					jobs:
 | 
				
			||||||
 | 
					  unit-tests:
 | 
				
			||||||
 | 
					    runs-on: ubuntu-latest
 | 
				
			||||||
 | 
					    steps:
 | 
				
			||||||
 | 
					      - name: Checkout code
 | 
				
			||||||
 | 
					        uses: actions/checkout@v5
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      - name: Set up Python
 | 
				
			||||||
 | 
					        uses: actions/setup-python@v6
 | 
				
			||||||
 | 
					        with:
 | 
				
			||||||
 | 
					          python-version: '3.12'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      - name: Install dependencies
 | 
				
			||||||
 | 
					        run: |
 | 
				
			||||||
 | 
					          python -m pip install --upgrade pip
 | 
				
			||||||
 | 
					          pip install -r requirements.txt
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      - name: Run tests
 | 
				
			||||||
 | 
					        run: |
 | 
				
			||||||
 | 
					          ./tests.py
 | 
				
			||||||
 | 
					        env:
 | 
				
			||||||
 | 
					          AZURE_CLIENT_ID: ${{ vars.AZURE_CLIENT_ID }}
 | 
				
			||||||
 | 
					          AZURE_TENANT_ID: ${{ vars.AZURE_TENANT_ID }}
 | 
				
			||||||
 | 
					          AZURE_CLIENT_SECRET: ${{ secrets.AZURE_CLIENT_SECRET }}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      # AI: generated... check validity
 | 
				
			||||||
 | 
					      # - name: Upload test results
 | 
				
			||||||
 | 
					      #   if: always()
 | 
				
			||||||
 | 
					      #   uses: actions/upload-artifact@v2
 | 
				
			||||||
 | 
					      #   with:
 | 
				
			||||||
 | 
					      #     name: test-results
 | 
				
			||||||
 | 
					      #     path: tests/test_results.xml
 | 
				
			||||||
							
								
								
									
										14
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										14
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							@@ -1,3 +1,17 @@
 | 
				
			|||||||
# Python
 | 
					# Python
 | 
				
			||||||
.venv
 | 
					.venv
 | 
				
			||||||
__pycache__/
 | 
					__pycache__/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# Ignore sample JSON files
 | 
				
			||||||
 | 
					*.sample.json
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# Ignore prototype scripts
 | 
				
			||||||
 | 
					prototype_*.py
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# Shell secrets
 | 
				
			||||||
 | 
					*.secret
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# Certificate files
 | 
				
			||||||
 | 
					*.pem
 | 
				
			||||||
 | 
					*.key
 | 
				
			||||||
 | 
					*.crt
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										7
									
								
								.vscode/launch.json
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										7
									
								
								.vscode/launch.json
									
									
									
									
										vendored
									
									
								
							@@ -10,6 +10,13 @@
 | 
				
			|||||||
            "request": "launch",
 | 
					            "request": "launch",
 | 
				
			||||||
            "program": "${workspaceFolder}/harvester.py",
 | 
					            "program": "${workspaceFolder}/harvester.py",
 | 
				
			||||||
            "console": "integratedTerminal"
 | 
					            "console": "integratedTerminal"
 | 
				
			||||||
 | 
					        },
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            "name": "Python Debugger: Current File",
 | 
				
			||||||
 | 
					            "type": "debugpy",
 | 
				
			||||||
 | 
					            "request": "launch",
 | 
				
			||||||
 | 
					            "program": "${file}",
 | 
				
			||||||
 | 
					            "console": "integratedTerminal"
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    ]
 | 
					    ]
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										21
									
								
								LICENSE
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										21
									
								
								LICENSE
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,21 @@
 | 
				
			|||||||
 | 
					MIT License
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Copyright (c) 2025 Slawomir Koszewski (slawek@koszewscy.waw.pl)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Permission is hereby granted, free of charge, to any person obtaining a copy
 | 
				
			||||||
 | 
					of this software and associated documentation files (the "Software"), to deal
 | 
				
			||||||
 | 
					in the Software without restriction, including without limitation the rights
 | 
				
			||||||
 | 
					to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 | 
				
			||||||
 | 
					copies of the Software, and to permit persons to whom the Software is
 | 
				
			||||||
 | 
					furnished to do so, subject to the following conditions:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					The above copyright notice and this permission notice shall be included in all
 | 
				
			||||||
 | 
					copies or substantial portions of the Software.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 | 
				
			||||||
 | 
					IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 | 
				
			||||||
 | 
					FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 | 
				
			||||||
 | 
					AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 | 
				
			||||||
 | 
					LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 | 
				
			||||||
 | 
					OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 | 
				
			||||||
 | 
					SOFTWARE.
 | 
				
			||||||
@@ -1,3 +1,5 @@
 | 
				
			|||||||
# Markdown Docs Harvester
 | 
					# Markdown Docs Harvester
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					[](https://gitea.koszewscy.waw.pl/slawek/docs-harvester/actions?workflow=unit-tests.yml)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
This project is designed to harvest and process Markdown documentation files from Git repositories.
 | 
					This project is designed to harvest and process Markdown documentation files from Git repositories.
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										74
									
								
								devops.py
									
									
									
									
									
								
							
							
						
						
									
										74
									
								
								devops.py
									
									
									
									
									
								
							@@ -1,74 +0,0 @@
 | 
				
			|||||||
from unicodedata import name
 | 
					 | 
				
			||||||
import requests
 | 
					 | 
				
			||||||
import urllib.request
 | 
					 | 
				
			||||||
import urllib.parse
 | 
					 | 
				
			||||||
import json
 | 
					 | 
				
			||||||
from uuid import UUID
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
 | 
					 | 
				
			||||||
DEVOPS_API_VERSION = "7.1"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
class Organization:
 | 
					 | 
				
			||||||
    def __init__(self, org_url: str, token: str, api_version: str = DEVOPS_API_VERSION):
 | 
					 | 
				
			||||||
        self._org_url = org_url.rstrip("/") + "/" # Ensure trailing slash
 | 
					 | 
				
			||||||
        self._token = token
 | 
					 | 
				
			||||||
        self._api_version = api_version
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def get(self, path: str, params: dict = {}):
 | 
					 | 
				
			||||||
        request_parameters = {
 | 
					 | 
				
			||||||
            "api-version": self._api_version,
 | 
					 | 
				
			||||||
            **params
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        encoded_path = urllib.parse.quote(path.lstrip("/")) # Ensure single slash between base and path
 | 
					 | 
				
			||||||
        url = self._org_url + encoded_path
 | 
					 | 
				
			||||||
        r = requests.get(url=url, params=request_parameters, headers={
 | 
					 | 
				
			||||||
            "Authorization": f"Bearer {self._token}"
 | 
					 | 
				
			||||||
        })
 | 
					 | 
				
			||||||
        return r
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @property
 | 
					 | 
				
			||||||
    def projects(self):
 | 
					 | 
				
			||||||
        r = self.get("_apis/projects")
 | 
					 | 
				
			||||||
        r.raise_for_status()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # Return a list of Project instances
 | 
					 | 
				
			||||||
        projects_data = r.json().get("value", [])
 | 
					 | 
				
			||||||
        return [
 | 
					 | 
				
			||||||
            Project(self,
 | 
					 | 
				
			||||||
                    id=proj.get("id"),
 | 
					 | 
				
			||||||
                    name=proj.get("name"),
 | 
					 | 
				
			||||||
                    url=proj.get("url"),
 | 
					 | 
				
			||||||
                    description=proj.get("description"),
 | 
					 | 
				
			||||||
                    ) for proj in projects_data
 | 
					 | 
				
			||||||
        ]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
class Project:
 | 
					 | 
				
			||||||
    def __init__(self, org: Organization, id: str, name: str | None = None, url: str | None = None, description: str | None = None):
 | 
					 | 
				
			||||||
        self._org = org
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # Check, if the id is a valid UUID
 | 
					 | 
				
			||||||
        try:
 | 
					 | 
				
			||||||
            self._id = str(UUID(id))
 | 
					 | 
				
			||||||
        except ValueError:
 | 
					 | 
				
			||||||
            raise ValueError(f"Invalid project ID: {self._id}")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if name is not None and url is not None and name != "" and url != "":
 | 
					 | 
				
			||||||
            self._name = name
 | 
					 | 
				
			||||||
            self._url = url
 | 
					 | 
				
			||||||
            self._description = description if description is not None else "" # Ensure description is a string
 | 
					 | 
				
			||||||
        else:
 | 
					 | 
				
			||||||
            # Fetch project details from the API
 | 
					 | 
				
			||||||
            r = org.get(f"_apis/projects/{urllib.parse.quote(self._id)}")
 | 
					 | 
				
			||||||
            r.raise_for_status()
 | 
					 | 
				
			||||||
            proj_data = r.json()
 | 
					 | 
				
			||||||
            self._id = proj_data.get("id", "")
 | 
					 | 
				
			||||||
            self._name = proj_data.get("name", "")
 | 
					 | 
				
			||||||
            self._url = proj_data.get("url", "")
 | 
					 | 
				
			||||||
            self._description = proj_data.get("description", "")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @property
 | 
					 | 
				
			||||||
    def name(self):
 | 
					 | 
				
			||||||
        return self._name
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def __str__(self):
 | 
					 | 
				
			||||||
        return f"Project(name={self._name}, id={self._id})"
 | 
					 | 
				
			||||||
							
								
								
									
										15
									
								
								harvester.py
									
									
									
									
									
								
							
							
						
						
									
										15
									
								
								harvester.py
									
									
									
									
									
								
							@@ -1,13 +1,8 @@
 | 
				
			|||||||
#!/usr/bin/env python3
 | 
					#!/usr/bin/env python3
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from devops import Organization, Project, DEVOPS_SCOPE
 | 
					from sk.devops import Organization
 | 
				
			||||||
from azure.identity import DefaultAzureCredential
 | 
					org = Organization("https://dev.azure.com/mcovsandbox")
 | 
				
			||||||
from json import dumps
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
org = Organization("https://dev.azure.com/mcovsandbox", DefaultAzureCredential().get_token(DEVOPS_SCOPE).token)
 | 
					# print(org.projects["bafe0cf1-6c97-4088-864a-ea6dc02b2727"].repositories["feac266f-84d2-41bc-839b-736925a85eaa"].items["/generate-pat.py"])
 | 
				
			||||||
projects = org.projects
 | 
					print(org["ADO Sandbox"]["ado-auth-lab"]["/container"].url)
 | 
				
			||||||
print([str(p) for p in projects])
 | 
					print(org["ADO Sandbox"]["ado-auth-lab"]["/generate-pat.py"].url)
 | 
				
			||||||
 | 
					 | 
				
			||||||
ado_sandbox = Project(org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
print(ado_sandbox)
 | 
					 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										8
									
								
								make-venv.sh
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										8
									
								
								make-venv.sh
									
									
									
									
									
										Executable file
									
								
							@@ -0,0 +1,8 @@
 | 
				
			|||||||
 | 
					#! /usr/bin/env bash
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					python3 -m venv .venv
 | 
				
			||||||
 | 
					./.venv/bin/pip install --upgrade pip
 | 
				
			||||||
 | 
					./.venv/bin/pip install -r requirements.txt
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					echo "Add the following alias to your shell configuration file (e.g., .bashrc or .zshrc):"
 | 
				
			||||||
 | 
					echo "alias v_env='test -d $(pwd)/.venv && . $(pwd)/.venv/bin/activate'"
 | 
				
			||||||
							
								
								
									
										130
									
								
								sk/azure.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										130
									
								
								sk/azure.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,130 @@
 | 
				
			|||||||
 | 
					"""
 | 
				
			||||||
 | 
					Minimal Authentication package for Azure.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Uses client credentials - a secret or a certificate.
 | 
				
			||||||
 | 
					"""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import os, requests
 | 
				
			||||||
 | 
					import re
 | 
				
			||||||
 | 
					import jwt, uuid, time
 | 
				
			||||||
 | 
					from cryptography.hazmat.primitives import serialization
 | 
				
			||||||
 | 
					from cryptography import x509
 | 
				
			||||||
 | 
					import hashlib
 | 
				
			||||||
 | 
					import base64
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def get_token(
 | 
				
			||||||
 | 
					        tenant_id: str | None = None,
 | 
				
			||||||
 | 
					        client_id: str | None = None,
 | 
				
			||||||
 | 
					        client_secret: str | None = None,
 | 
				
			||||||
 | 
					        pem_path: str | None = None
 | 
				
			||||||
 | 
					    ) -> str:
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    Obtain a token for DevOps using DefaultAzureCredential.
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    try:
 | 
				
			||||||
 | 
					        if tenant_id and client_id and client_secret:
 | 
				
			||||||
 | 
					            from azure.identity import ClientSecretCredential
 | 
				
			||||||
 | 
					            return ClientSecretCredential(
 | 
				
			||||||
 | 
					                tenant_id=tenant_id,
 | 
				
			||||||
 | 
					                client_id=client_id,
 | 
				
			||||||
 | 
					                client_secret=client_secret
 | 
				
			||||||
 | 
					            ).get_token(DEVOPS_SCOPE).token
 | 
				
			||||||
 | 
					        elif tenant_id and client_id and pem_path:
 | 
				
			||||||
 | 
					            from azure.identity import CertificateCredential
 | 
				
			||||||
 | 
					            return CertificateCredential(
 | 
				
			||||||
 | 
					                tenant_id=tenant_id,
 | 
				
			||||||
 | 
					                client_id=client_id,
 | 
				
			||||||
 | 
					                certificate_path=pem_path
 | 
				
			||||||
 | 
					            ).get_token(DEVOPS_SCOPE).token
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            from azure.identity import DefaultAzureCredential
 | 
				
			||||||
 | 
					            return DefaultAzureCredential().get_token(DEVOPS_SCOPE).token
 | 
				
			||||||
 | 
					    except ImportError:
 | 
				
			||||||
 | 
					        if tenant_id and client_id and client_secret:
 | 
				
			||||||
 | 
					            return secret_credentials_auth(
 | 
				
			||||||
 | 
					                tenant_id=tenant_id,
 | 
				
			||||||
 | 
					                client_id=client_id,
 | 
				
			||||||
 | 
					                client_secret=client_secret
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					        elif tenant_id and client_id and pem_path:
 | 
				
			||||||
 | 
					            return certificate_credentials_auth(
 | 
				
			||||||
 | 
					                tenant_id=tenant_id,
 | 
				
			||||||
 | 
					                client_id=client_id,
 | 
				
			||||||
 | 
					                pem_path=pem_path
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            raise ValueError("Either client_secret or pem_path must be provided, if no azure-identity package is installed.")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def secret_credentials_auth(
 | 
				
			||||||
 | 
					        scope: str = DEVOPS_SCOPE,
 | 
				
			||||||
 | 
					        tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
 | 
				
			||||||
 | 
					        client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
 | 
				
			||||||
 | 
					        client_secret: str = os.environ.get("AZURE_CLIENT_SECRET")
 | 
				
			||||||
 | 
					        ) -> str:
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    Authenticate using client credentials. Pass credentials via environment variables,
 | 
				
			||||||
 | 
					    or directly as function parameters.
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
 | 
				
			||||||
 | 
					    r = requests.get(token_url, data={
 | 
				
			||||||
 | 
					        "grant_type": "client_credentials",
 | 
				
			||||||
 | 
					        "client_id": client_id,
 | 
				
			||||||
 | 
					        "client_secret": client_secret,
 | 
				
			||||||
 | 
					        "scope": scope
 | 
				
			||||||
 | 
					    })
 | 
				
			||||||
 | 
					    r.raise_for_status()
 | 
				
			||||||
 | 
					    return r.json().get("access_token", "")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def certificate_credentials_auth(
 | 
				
			||||||
 | 
					        scope: str = DEVOPS_SCOPE,
 | 
				
			||||||
 | 
					        tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
 | 
				
			||||||
 | 
					        client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
 | 
				
			||||||
 | 
					        pem_path: str = os.environ.get("AZURE_CLIENT_CERTIFICATE_PATH", "")
 | 
				
			||||||
 | 
					        ) -> str:
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    Authenticate using client credentials with a certificate.
 | 
				
			||||||
 | 
					    Pass credentials via environment variables, or directly as function parameters.
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Wczytaj klucz prywatny (RSA)
 | 
				
			||||||
 | 
					    with open(pem_path, "rb") as f:
 | 
				
			||||||
 | 
					        pem = f.read()
 | 
				
			||||||
 | 
					        key_pem  = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0)
 | 
				
			||||||
 | 
					        cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        private_key = serialization.load_pem_private_key(key_pem, password=None)
 | 
				
			||||||
 | 
					        cert = x509.load_pem_x509_certificate(cert_pem)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        der = cert.public_bytes(serialization.Encoding.DER)
 | 
				
			||||||
 | 
					        sha1 = hashlib.sha1(der).digest()
 | 
				
			||||||
 | 
					        x5t  = base64.urlsafe_b64encode(sha1).rstrip(b"=").decode("ascii")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Stwórz client_assertion JWT
 | 
				
			||||||
 | 
					    now = int(time.time())
 | 
				
			||||||
 | 
					    claims = {
 | 
				
			||||||
 | 
					        "iss": client_id,
 | 
				
			||||||
 | 
					        "sub": client_id,
 | 
				
			||||||
 | 
					        "aud": token_url,
 | 
				
			||||||
 | 
					        "jti": str(uuid.uuid4()),
 | 
				
			||||||
 | 
					        "iat": now,
 | 
				
			||||||
 | 
					        "exp": now + 600,
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    headers = {"x5t": x5t, "kid": x5t}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    assertion = jwt.encode(claims, private_key, algorithm="RS256", headers=headers)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    data = {
 | 
				
			||||||
 | 
					        "grant_type": "client_credentials",
 | 
				
			||||||
 | 
					        "client_id": client_id,
 | 
				
			||||||
 | 
					        "scope": scope,
 | 
				
			||||||
 | 
					        "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
 | 
				
			||||||
 | 
					        "client_assertion": assertion,
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    r = requests.post(token_url, data=data)
 | 
				
			||||||
 | 
					    r.raise_for_status()
 | 
				
			||||||
 | 
					    return r.json().get("access_token")
 | 
				
			||||||
							
								
								
									
										161
									
								
								sk/certificates.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										161
									
								
								sk/certificates.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,161 @@
 | 
				
			|||||||
 | 
					import datetime
 | 
				
			||||||
 | 
					from io import BufferedWriter
 | 
				
			||||||
 | 
					import re
 | 
				
			||||||
 | 
					import cryptography.x509 as x509
 | 
				
			||||||
 | 
					from cryptography.hazmat.primitives import hashes, serialization
 | 
				
			||||||
 | 
					from cryptography.hazmat.primitives.asymmetric import rsa
 | 
				
			||||||
 | 
					from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes
 | 
				
			||||||
 | 
					import pathlib
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def read_private_key(pem_path: str) -> PrivateKeyTypes:
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    Read a PEM file and extract the private key in bytes.
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    with open(pem_path, "rb") as f:
 | 
				
			||||||
 | 
					        pem = f.read()
 | 
				
			||||||
 | 
					        key_pem = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0)
 | 
				
			||||||
 | 
					        key = serialization.load_pem_private_key(key_pem, password=None)
 | 
				
			||||||
 | 
					        return key
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def read_public_certificate(pem_path: str) -> any:
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    Read a PEM file and extract the public certificate.
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    with open(pem_path, "rb") as f:
 | 
				
			||||||
 | 
					        pem = f.read()
 | 
				
			||||||
 | 
					        cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0)
 | 
				
			||||||
 | 
					        cert = x509.load_pem_x509_certificate(cert_pem)
 | 
				
			||||||
 | 
					        return cert
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def _write_pem_key(
 | 
				
			||||||
 | 
					        writer: BufferedWriter,
 | 
				
			||||||
 | 
					        key: PrivateKeyTypes,
 | 
				
			||||||
 | 
					        encoding: serialization.Encoding = serialization.Encoding.PEM,
 | 
				
			||||||
 | 
					        format: serialization.PrivateFormat = serialization.PrivateFormat.PKCS8,
 | 
				
			||||||
 | 
					        password: str | None = None
 | 
				
			||||||
 | 
					):
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    Write a PEM encoded private key to the given writer.
 | 
				
			||||||
 | 
					    Allows optional password protection.
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    if password:
 | 
				
			||||||
 | 
					        encryption_algorithm = serialization.BestAvailableEncryption(password.encode())
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        encryption_algorithm = serialization.NoEncryption()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    key_pem = key.private_bytes(
 | 
				
			||||||
 | 
					        encoding=encoding,
 | 
				
			||||||
 | 
					        format=format,
 | 
				
			||||||
 | 
					        encryption_algorithm=encryption_algorithm,
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					    writer.write(key_pem)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def _write_pem_cert(
 | 
				
			||||||
 | 
					        writer: BufferedWriter,
 | 
				
			||||||
 | 
					        cert: x509.Certificate,
 | 
				
			||||||
 | 
					        encoding: serialization.Encoding = serialization.Encoding.PEM
 | 
				
			||||||
 | 
					        ):
 | 
				
			||||||
 | 
					    cert_pem = cert.public_bytes(encoding=encoding)
 | 
				
			||||||
 | 
					    writer.write(cert_pem)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def write_pem_file(
 | 
				
			||||||
 | 
					        pem_file: pathlib.Path,
 | 
				
			||||||
 | 
					        cert: x509.Certificate | None = None,
 | 
				
			||||||
 | 
					        key: PrivateKeyTypes | None = None,
 | 
				
			||||||
 | 
					        encoding: serialization.Encoding = serialization.Encoding.PEM,
 | 
				
			||||||
 | 
					        key_serialization_format: serialization.PrivateFormat = serialization.PrivateFormat.PKCS8,
 | 
				
			||||||
 | 
					        password: str | None = None
 | 
				
			||||||
 | 
					        ):
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    Write the certificate and/or private key to a PEM file.
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    with open(pem_file, "wb") as f:
 | 
				
			||||||
 | 
					        if cert:
 | 
				
			||||||
 | 
					            _write_pem_cert(
 | 
				
			||||||
 | 
					                f,
 | 
				
			||||||
 | 
					                cert,
 | 
				
			||||||
 | 
					                encoding=encoding
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					        if key:
 | 
				
			||||||
 | 
					            _write_pem_key(
 | 
				
			||||||
 | 
					                f,
 | 
				
			||||||
 | 
					                key,
 | 
				
			||||||
 | 
					                encoding=encoding,
 | 
				
			||||||
 | 
					                password=password,
 | 
				
			||||||
 | 
					                format=key_serialization_format
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def create_self_signed_certificate(
 | 
				
			||||||
 | 
					        file_path: str,
 | 
				
			||||||
 | 
					        subject_name: str,
 | 
				
			||||||
 | 
					        organization_name: str,
 | 
				
			||||||
 | 
					        country_name: str,
 | 
				
			||||||
 | 
					        valid_days: int = 365,
 | 
				
			||||||
 | 
					        key_size: int = 2048
 | 
				
			||||||
 | 
					):
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    Create a self-signed certificate. It saves the certificate and private key
 | 
				
			||||||
 | 
					    in PEM format to the specified file path. Three files are created:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    - <file_path>.crt : The public certificate
 | 
				
			||||||
 | 
					    - <file_path>.key : The private key
 | 
				
			||||||
 | 
					    - <file_path>.pem : The certificate and private key combined in one file
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    :param file_path: Base file path to save the certificate and key.
 | 
				
			||||||
 | 
					    :param subject_name: Common Name (CN) for the certificate.
 | 
				
			||||||
 | 
					    :param organization_name: Organization Name (O) for the certificate.
 | 
				
			||||||
 | 
					    :param country_name: Country Name (C) for the certificate.
 | 
				
			||||||
 | 
					    :param valid_days: Number of days the certificate is valid for.
 | 
				
			||||||
 | 
					    :param key_size: Size of the RSA key.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Use the following command to replace any credentials already defined for the 
 | 
				
			||||||
 | 
					    App Registration with that certificate:
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    az ad app credential reset --id <CLIENT_ID> --cert @<file_path>.crt
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Use --append to add the certificate without removing existing credentials.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    > Note: Do not upload the private key file (.key) nor the combined PEM file (.pem).
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    key = rsa.generate_private_key(public_exponent=65537, key_size=key_size)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    subject = issuer = x509.Name([
 | 
				
			||||||
 | 
					        x509.oid.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, country_name),
 | 
				
			||||||
 | 
					        x509.oid.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME, organization_name),
 | 
				
			||||||
 | 
					        x509.oid.NameAttribute(x509.oid.NameOID.COMMON_NAME, subject_name),
 | 
				
			||||||
 | 
					    ])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    cert = (
 | 
				
			||||||
 | 
					        x509.CertificateBuilder()
 | 
				
			||||||
 | 
					        .subject_name(subject)
 | 
				
			||||||
 | 
					        .issuer_name(issuer)
 | 
				
			||||||
 | 
					        .public_key(key.public_key())
 | 
				
			||||||
 | 
					        .serial_number(x509.random_serial_number())
 | 
				
			||||||
 | 
					        .not_valid_before(datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=1))
 | 
				
			||||||
 | 
					        .not_valid_after(datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=valid_days))
 | 
				
			||||||
 | 
					        .add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True)
 | 
				
			||||||
 | 
					        .sign(key, hashes.SHA256())
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    crt_path = pathlib.Path(file_path).with_suffix('.crt')
 | 
				
			||||||
 | 
					    key_path = pathlib.Path(file_path).with_suffix('.key')
 | 
				
			||||||
 | 
					    pem_path = pathlib.Path(file_path).with_suffix('.pem')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    public_key = cert.public_bytes(serialization.Encoding.PEM)
 | 
				
			||||||
 | 
					    private_key = key.private_bytes(
 | 
				
			||||||
 | 
					        encoding=serialization.Encoding.PEM,
 | 
				
			||||||
 | 
					        format=serialization.PrivateFormat.TraditionalOpenSSL,
 | 
				
			||||||
 | 
					        encryption_algorithm=serialization.NoEncryption(),
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Write the certificate
 | 
				
			||||||
 | 
					    write_pem_file(crt_path, cert=cert)
 | 
				
			||||||
 | 
					    # Write the private key
 | 
				
			||||||
 | 
					    write_pem_file(key_path, key=key)
 | 
				
			||||||
 | 
					    # Write both to a combined PEM file
 | 
				
			||||||
 | 
					    write_pem_file(pem_path, cert=cert, key=key)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    print(f"Certificate created and saved.")
 | 
				
			||||||
 | 
					    print(f" - Certificate: {crt_path}")
 | 
				
			||||||
 | 
					    print(f" - Private Key: {key_path}")
 | 
				
			||||||
 | 
					    print(f" - PEM File: {pem_path}")
 | 
				
			||||||
							
								
								
									
										307
									
								
								sk/devops.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										307
									
								
								sk/devops.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,307 @@
 | 
				
			|||||||
 | 
					from __future__ import annotations
 | 
				
			||||||
 | 
					import requests
 | 
				
			||||||
 | 
					import urllib.parse
 | 
				
			||||||
 | 
					from uuid import UUID
 | 
				
			||||||
 | 
					from string import Template
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
 | 
				
			||||||
 | 
					DEVOPS_API_VERSION = "7.1"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# Define a class decorator
 | 
				
			||||||
 | 
					def auto_properties(mapping: dict[str,str]):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def make_property(name: str):
 | 
				
			||||||
 | 
					        private_var = f"_{name}"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        def getter(self):
 | 
				
			||||||
 | 
					            try:
 | 
				
			||||||
 | 
					                i = getattr(self, private_var)
 | 
				
			||||||
 | 
					                if i is not None:
 | 
				
			||||||
 | 
					                    return i
 | 
				
			||||||
 | 
					            except AttributeError:
 | 
				
			||||||
 | 
					                pass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # Fetch repository details from the API if it is set to None or not existing
 | 
				
			||||||
 | 
					            self._get()
 | 
				
			||||||
 | 
					            return getattr(self, private_var)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return property(fget=getter)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def set_auto_properties(self, **kwargs):
 | 
				
			||||||
 | 
					        allowed = set(self.__class__.__auto_properties__)
 | 
				
			||||||
 | 
					        unknown = [k for k in kwargs if k not in allowed]
 | 
				
			||||||
 | 
					        if unknown:
 | 
				
			||||||
 | 
					            raise ValueError(f"Unknown properties for {self.__class__.__name__}: {', '.join(unknown)}")
 | 
				
			||||||
 | 
					        for k, v in kwargs.items():
 | 
				
			||||||
 | 
					            setattr(self, f"_{k}", v)
 | 
				
			||||||
 | 
					        return self
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def from_json(self, json_data: dict):
 | 
				
			||||||
 | 
					        for name in self.__class__.__auto_properties__:
 | 
				
			||||||
 | 
					            setattr(self, f"_{name}", json_data.get(self.__class__.__auto_properties__[name], None))
 | 
				
			||||||
 | 
					        return self
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def decorator(cls):
 | 
				
			||||||
 | 
					        cls.__auto_properties__ = mapping # Make a copy of the mapping
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Create properties dynamically
 | 
				
			||||||
 | 
					        for name in mapping:
 | 
				
			||||||
 | 
					            setattr(cls, name, make_property(name))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        setattr(cls, "set_auto_properties", set_auto_properties)
 | 
				
			||||||
 | 
					        setattr(cls, "from_json", from_json)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return cls
 | 
				
			||||||
 | 
					    return decorator
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def devops(key: str, get_url: str, list_url: str = None, params: dict = {}):
 | 
				
			||||||
 | 
					    def decorator(cls):
 | 
				
			||||||
 | 
					        cls.__entity_key__ = key
 | 
				
			||||||
 | 
					        cls.__entity_get_url__ = get_url # Use $key in the URL
 | 
				
			||||||
 | 
					        cls.__entity_list_url__ = list_url # Use $key in the URL
 | 
				
			||||||
 | 
					        cls.__entity_params__ = params
 | 
				
			||||||
 | 
					        return cls
 | 
				
			||||||
 | 
					    return decorator
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class DevOps():
 | 
				
			||||||
 | 
					    """Base class for DevOps entities."""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __init__(self, org_url: str, token: str, api_version: str = DEVOPS_API_VERSION):
 | 
				
			||||||
 | 
					        self._org_url = org_url.rstrip("/") + "/" # Ensure trailing slash
 | 
				
			||||||
 | 
					        self._token = token
 | 
				
			||||||
 | 
					        self._api_version = api_version
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _get_url_path(self, path: str, params: dict = {}) -> requests.Response:
 | 
				
			||||||
 | 
					        if not self._org_url or not self._token or not self._api_version:
 | 
				
			||||||
 | 
					            raise ValueError("Organization URL, token, and API version must be set before making requests.")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        request_parameters = {
 | 
				
			||||||
 | 
					            "api-version": self._api_version,
 | 
				
			||||||
 | 
					            **params
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        encoded_path = urllib.parse.quote(path.lstrip("/")) # Ensure single slash between base and path
 | 
				
			||||||
 | 
					        url = self._org_url + encoded_path
 | 
				
			||||||
 | 
					        r = requests.get(url=url, params=request_parameters, headers={
 | 
				
			||||||
 | 
					            "Authorization": f"Bearer {self._token}"
 | 
				
			||||||
 | 
					        })
 | 
				
			||||||
 | 
					        r.raise_for_status() # Ensure we raise an error for bad responses
 | 
				
			||||||
 | 
					        return r
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _get(self, key: str):
 | 
				
			||||||
 | 
					        if not hasattr(self.__class__, "__entity_key__") or not hasattr(self.__class__, "__entity_get_url__"):
 | 
				
			||||||
 | 
					            raise NotImplementedError("Called _get on a class that has not been decorated with @devops.")
 | 
				
			||||||
 | 
					        setattr(self, f"_{self.__class__.__entity_key__}", key) # Set the entity key
 | 
				
			||||||
 | 
					        # Build the URL
 | 
				
			||||||
 | 
					        url = Template(self.__class__.__entity_get_url__).substitute(key=key)
 | 
				
			||||||
 | 
					        # Build parameters with key substituted
 | 
				
			||||||
 | 
					        params = {}
 | 
				
			||||||
 | 
					        if hasattr(self.__class__, "__entity_params__"):
 | 
				
			||||||
 | 
					            params = {k: Template(v).substitute(key=key) for k, v in self.__class__.__entity_params__.items()}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Fetch the object data from the URL
 | 
				
			||||||
 | 
					        r = self._get_url_path(url, params=params)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Populate attributes
 | 
				
			||||||
 | 
					        self.from_json(r.json())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _get_entity(self, key_name: str, get_url: str, params: dict = {}):
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        Each entity class can use this method to populate its attributes, by defining
 | 
				
			||||||
 | 
					        its own _get method that calls this one with the key name,
 | 
				
			||||||
 | 
					        and the URL.
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        r = self._get_url_path(get_url, params=params) # Fetch the object data from the URL
 | 
				
			||||||
 | 
					        setattr(self, f"_{key_name}", r.json().get(key_name, None)) # Set the key attribute
 | 
				
			||||||
 | 
					        self.from_json(r.json()) # Populate other attributes from JSON
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _entity(self, entity_class: type, key_name: str, entity_data: dict) -> object:
 | 
				
			||||||
 | 
					        """A generic method to create an entity from JSON data."""
 | 
				
			||||||
 | 
					        args = { key_name: entity_data.get(key_name) }
 | 
				
			||||||
 | 
					        e = entity_class(self, **args)
 | 
				
			||||||
 | 
					        e.from_json(entity_data)
 | 
				
			||||||
 | 
					        return e
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _entities(self, entity_class: type, key_name: str, list_url: str, params: dict = {}) -> list[object]:
 | 
				
			||||||
 | 
					        """A generic method to retrieve a list of entities."""
 | 
				
			||||||
 | 
					        r = self._get_url_path(list_url, params=params)
 | 
				
			||||||
 | 
					        entities_data = r.json().get("value", [])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        entities_list = []
 | 
				
			||||||
 | 
					        for entity in entities_data:
 | 
				
			||||||
 | 
					            entities_list.append(self._entity(entity_class, key_name, entity))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return entities_list
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class Organization(DevOps):
 | 
				
			||||||
 | 
					    def __init__(self, org_url: str, token: str | None = None, api_version: str = DEVOPS_API_VERSION):
 | 
				
			||||||
 | 
					        super().__init__(org_url, token, api_version)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @property
 | 
				
			||||||
 | 
					    def projects(self):
 | 
				
			||||||
 | 
					        if not hasattr(self, "_projects"):
 | 
				
			||||||
 | 
					            self._projects = self._entities(
 | 
				
			||||||
 | 
					                entity_class=Project,
 | 
				
			||||||
 | 
					                key_name="id",
 | 
				
			||||||
 | 
					                list_url="_apis/projects")
 | 
				
			||||||
 | 
					        return self._projects
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __getitem__(self, key: str) -> Project:
 | 
				
			||||||
 | 
					        for project in self.projects:
 | 
				
			||||||
 | 
					            if project.id == key or project.name == key:
 | 
				
			||||||
 | 
					                return project
 | 
				
			||||||
 | 
					        raise KeyError(f"Project with ID or name '{key}' not found.")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@auto_properties({
 | 
				
			||||||
 | 
					    "name":        "name",
 | 
				
			||||||
 | 
					    "url":         "url",
 | 
				
			||||||
 | 
					    "description": "description"
 | 
				
			||||||
 | 
					    })
 | 
				
			||||||
 | 
					@devops("id", "_apis/projects/$key", "_apis/projects")
 | 
				
			||||||
 | 
					class Project(DevOps):
 | 
				
			||||||
 | 
					    def _get(self):
 | 
				
			||||||
 | 
					        self._get_entity(
 | 
				
			||||||
 | 
					            key_name="id",
 | 
				
			||||||
 | 
					            get_url=f"_apis/projects/{self._id}"
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __init__(self, org: Organization, id: str, **kwargs):
 | 
				
			||||||
 | 
					        super().__init__(org._org_url, org._token, org._api_version)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            self._id = str(UUID(id))
 | 
				
			||||||
 | 
					        except ValueError:
 | 
				
			||||||
 | 
					            raise ValueError(f"Invalid project ID: {id}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.set_auto_properties(**kwargs)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __str__(self):
 | 
				
			||||||
 | 
					        return f"Project(name={self._name}, id={self._id})"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @property
 | 
				
			||||||
 | 
					    def id(self):
 | 
				
			||||||
 | 
					        return self._id
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @property
 | 
				
			||||||
 | 
					    def repositories(self):
 | 
				
			||||||
 | 
					        if not hasattr(self, "_repositories"):
 | 
				
			||||||
 | 
					            self._repositories = self._entities(
 | 
				
			||||||
 | 
					                entity_class=Repository,
 | 
				
			||||||
 | 
					                key_name="id",
 | 
				
			||||||
 | 
					                list_url=f"{self._id}/_apis/git/repositories")
 | 
				
			||||||
 | 
					        return self._repositories
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __getitem__(self, key: str) -> Repository:
 | 
				
			||||||
 | 
					        for repo in self.repositories:
 | 
				
			||||||
 | 
					            if repo.id == key or repo.name == key:
 | 
				
			||||||
 | 
					                return repo
 | 
				
			||||||
 | 
					        raise KeyError(f"Repository with ID or name '{key}' not found.")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@auto_properties({
 | 
				
			||||||
 | 
					    "name": "name",
 | 
				
			||||||
 | 
					    "url": "url",
 | 
				
			||||||
 | 
					    "default_branch": "defaultBranch",
 | 
				
			||||||
 | 
					    "is_disabled": "isDisabled",
 | 
				
			||||||
 | 
					    "is_in_maintenance": "isInMaintenance",
 | 
				
			||||||
 | 
					    "remote_url": "remoteUrl",
 | 
				
			||||||
 | 
					    "ssh_url": "sshUrl",
 | 
				
			||||||
 | 
					    "web_url": "webUrl"
 | 
				
			||||||
 | 
					    })
 | 
				
			||||||
 | 
					class Repository(DevOps):
 | 
				
			||||||
 | 
					    def _get(self):
 | 
				
			||||||
 | 
					        self._get_entity(
 | 
				
			||||||
 | 
					            key_name="id",
 | 
				
			||||||
 | 
					            get_url=f"{self._project.id}/_apis/git/repositories/{self._id}"
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __init__(self, project: Project, id: str, **kwargs):
 | 
				
			||||||
 | 
					        super().__init__(project._org_url, project._token, project._api_version)
 | 
				
			||||||
 | 
					        self._project = project
 | 
				
			||||||
 | 
					        self._id = id
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            UUID(id) # Check if it's a valid UUID
 | 
				
			||||||
 | 
					        except ValueError:
 | 
				
			||||||
 | 
					            # Called with a repository name, fetch by name
 | 
				
			||||||
 | 
					            self._get()
 | 
				
			||||||
 | 
					            if kwargs:
 | 
				
			||||||
 | 
					                raise ValueError("Automatic properties cannot be set when retrieving by name.")
 | 
				
			||||||
 | 
					            return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # set other properties if provided
 | 
				
			||||||
 | 
					        self.set_auto_properties(**kwargs)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @property
 | 
				
			||||||
 | 
					    def id(self):
 | 
				
			||||||
 | 
					        return self._id
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @property
 | 
				
			||||||
 | 
					    def project(self):
 | 
				
			||||||
 | 
					        return self._project
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __str__(self):
 | 
				
			||||||
 | 
					        return f"Repository(name={self.name}, id={self._id})"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @property
 | 
				
			||||||
 | 
					    def items(self):
 | 
				
			||||||
 | 
					        if not hasattr(self, "_items"):
 | 
				
			||||||
 | 
					            self._items = self._entities(
 | 
				
			||||||
 | 
					                entity_class=Item,
 | 
				
			||||||
 | 
					                key_name="path",
 | 
				
			||||||
 | 
					                list_url=f"{self._project.id}/_apis/git/repositories/{self._id}/items",
 | 
				
			||||||
 | 
					                params={
 | 
				
			||||||
 | 
					                    "scopePath": "/",
 | 
				
			||||||
 | 
					                    "recursionLevel": "oneLevel"
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					        return self._items
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __getitem__(self, key: str) -> Item:
 | 
				
			||||||
 | 
					        for item in self.items:
 | 
				
			||||||
 | 
					            if item.path == key:
 | 
				
			||||||
 | 
					                return item
 | 
				
			||||||
 | 
					        raise KeyError(f"Item with path '{key}' not found.")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@auto_properties({
 | 
				
			||||||
 | 
					    "object_id":       "objectId",
 | 
				
			||||||
 | 
					    "git_object_type": "gitObjectType",
 | 
				
			||||||
 | 
					    "commit_id":       "commitId",
 | 
				
			||||||
 | 
					    "is_folder":       "isFolder",
 | 
				
			||||||
 | 
					    "url":             "url"
 | 
				
			||||||
 | 
					    })
 | 
				
			||||||
 | 
					class Item(DevOps):
 | 
				
			||||||
 | 
					    def __init__(self, repository: Repository, path: str, **kwargs):
 | 
				
			||||||
 | 
					        super().__init__(repository._org_url, repository._token, repository._api_version)
 | 
				
			||||||
 | 
					        self._repository = repository
 | 
				
			||||||
 | 
					        self._path = path
 | 
				
			||||||
 | 
					        self.set_auto_properties(**kwargs) # set properties defined in decorator
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _get(self):
 | 
				
			||||||
 | 
					        self._get_entity(
 | 
				
			||||||
 | 
					            key_name="path",
 | 
				
			||||||
 | 
					            get_url=f"{self._repository._project.id}/_apis/git/repositories/{self._repository.id}/items",
 | 
				
			||||||
 | 
					            params={
 | 
				
			||||||
 | 
					                "path": self._path,
 | 
				
			||||||
 | 
					                "$format": "json",
 | 
				
			||||||
 | 
					                "recursionLevel": "none"
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @property
 | 
				
			||||||
 | 
					    def path(self):
 | 
				
			||||||
 | 
					        return self._path
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @property
 | 
				
			||||||
 | 
					    def children(self):
 | 
				
			||||||
 | 
					        """List items under this item if it is a folder."""
 | 
				
			||||||
 | 
					        if self.git_object_type == "tree":
 | 
				
			||||||
 | 
					            return self._entities(
 | 
				
			||||||
 | 
					                entity_class=Item,
 | 
				
			||||||
 | 
					                key_name="path",
 | 
				
			||||||
 | 
					                list_url=f"{self._repository._project.id}/_apis/git/repositories/{self._repository.id}/items",
 | 
				
			||||||
 | 
					                params={
 | 
				
			||||||
 | 
					                    "scopePath": self._path,
 | 
				
			||||||
 | 
					                    "recursionLevel": "oneLevel"
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            raise ValueError("Items can only be listed for folder items.")
 | 
				
			||||||
							
								
								
									
										127
									
								
								tests.py
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										127
									
								
								tests.py
									
									
									
									
									
										Executable file
									
								
							@@ -0,0 +1,127 @@
 | 
				
			|||||||
 | 
					#!/usr/bin/env python3
 | 
				
			||||||
 | 
					import unittest
 | 
				
			||||||
 | 
					import requests
 | 
				
			||||||
 | 
					from sk.devops import Organization, Repository, Project, Item
 | 
				
			||||||
 | 
					from sk.azure import get_token
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# Get the token outside the test class to speed up tests.
 | 
				
			||||||
 | 
					# Each Unit test instantinates the class, so doing it here avoids repeated authentication.
 | 
				
			||||||
 | 
					token = get_token()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class Test01(unittest.TestCase):
 | 
				
			||||||
 | 
					    def setUp(self):
 | 
				
			||||||
 | 
					        self.org = Organization("https://dev.azure.com/mcovsandbox", token=token)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_01(self):
 | 
				
			||||||
 | 
					        """Listing projects in the organization"""
 | 
				
			||||||
 | 
					        self.assertGreater(len(list(self.org.projects)), 0)
 | 
				
			||||||
 | 
					    def test_02(self):
 | 
				
			||||||
 | 
					        """Getting a specific project by ID (object instantiation)"""
 | 
				
			||||||
 | 
					        project = Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
 | 
				
			||||||
 | 
					        self.assertEqual(project.id, "bafe0cf1-6c97-4088-864a-ea6dc02b2727")
 | 
				
			||||||
 | 
					        self.assertEqual(project.name, "ADO Sandbox")
 | 
				
			||||||
 | 
					    def test_03(self):
 | 
				
			||||||
 | 
					        """Getting a specific project by name using org indexing (object retrieval)"""
 | 
				
			||||||
 | 
					        project = self.org["ADO Sandbox"]
 | 
				
			||||||
 | 
					        self.assertEqual(project.id, "bafe0cf1-6c97-4088-864a-ea6dc02b2727")
 | 
				
			||||||
 | 
					        self.assertEqual(project.name, "ADO Sandbox")
 | 
				
			||||||
 | 
					    def test_04(self):
 | 
				
			||||||
 | 
					        """Getting a specific project by ID using org indexing (object retrieval)"""
 | 
				
			||||||
 | 
					        project = self.org["bafe0cf1-6c97-4088-864a-ea6dc02b2727"]
 | 
				
			||||||
 | 
					        self.assertEqual(project.id, "bafe0cf1-6c97-4088-864a-ea6dc02b2727")
 | 
				
			||||||
 | 
					        self.assertEqual(project.name, "ADO Sandbox")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class Test02(unittest.TestCase):
 | 
				
			||||||
 | 
					    def setUp(self):
 | 
				
			||||||
 | 
					        self.org = Organization("https://dev.azure.com/mcovsandbox", token=token)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_01(self):
 | 
				
			||||||
 | 
					        """Listing repositories in a project"""
 | 
				
			||||||
 | 
					        project = Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
 | 
				
			||||||
 | 
					        repos = list(project.repositories)
 | 
				
			||||||
 | 
					        self.assertGreater(len(repos), 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_02(self):
 | 
				
			||||||
 | 
					        """Getting a specific repository by ID (object instantiation)"""
 | 
				
			||||||
 | 
					        repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
 | 
				
			||||||
 | 
					        self.assertEqual(repo.id, "feac266f-84d2-41bc-839b-736925a85eaa")
 | 
				
			||||||
 | 
					        self.assertEqual(repo.name, "ado-auth-lab")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_03(self):
 | 
				
			||||||
 | 
					        """Getting a specific repository by name using project indexing (object retrieval)"""
 | 
				
			||||||
 | 
					        project = Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
 | 
				
			||||||
 | 
					        repo = project["ado-auth-lab"]
 | 
				
			||||||
 | 
					        self.assertEqual(repo.id, "feac266f-84d2-41bc-839b-736925a85eaa")
 | 
				
			||||||
 | 
					        self.assertEqual(repo.name, "ado-auth-lab")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_04(self):
 | 
				
			||||||
 | 
					        """Getting a specific repository by ID using project indexing (object retrieval)"""
 | 
				
			||||||
 | 
					        project = Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
 | 
				
			||||||
 | 
					        repo = project["feac266f-84d2-41bc-839b-736925a85eaa"]
 | 
				
			||||||
 | 
					        self.assertEqual(repo.id, "feac266f-84d2-41bc-839b-736925a85eaa")
 | 
				
			||||||
 | 
					        self.assertEqual(repo.name, "ado-auth-lab")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class Test03(unittest.TestCase):
 | 
				
			||||||
 | 
					    def setUp(self):
 | 
				
			||||||
 | 
					        self.org = Organization("https://dev.azure.com/mcovsandbox", token=token)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_01(self):
 | 
				
			||||||
 | 
					        """Getting details of a specific item in a repository"""
 | 
				
			||||||
 | 
					        item = Item(Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa"), path="/generate-pat.py")
 | 
				
			||||||
 | 
					        self.assertEqual(item.path, "/generate-pat.py")
 | 
				
			||||||
 | 
					        self.assertIsNotNone(item.commit_id)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_02(self):
 | 
				
			||||||
 | 
					        """Listing items in a folder within a repository"""
 | 
				
			||||||
 | 
					        repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
 | 
				
			||||||
 | 
					        docs_item = Item(repo, path="/container")
 | 
				
			||||||
 | 
					        children = list(docs_item.children)
 | 
				
			||||||
 | 
					        self.assertGreater(len(children), 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_03(self):
 | 
				
			||||||
 | 
					        """Getting a specific item from a repository using indexing"""
 | 
				
			||||||
 | 
					        repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
 | 
				
			||||||
 | 
					        item = repo["/container"]
 | 
				
			||||||
 | 
					        self.assertEqual(item.path, "/container")
 | 
				
			||||||
 | 
					        self.assertTrue(item.is_folder)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_04(self):
 | 
				
			||||||
 | 
					        """Getting a specific item from a repository using indexing"""
 | 
				
			||||||
 | 
					        repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
 | 
				
			||||||
 | 
					        item = repo["/generate-pat.py"]
 | 
				
			||||||
 | 
					        self.assertEqual(item.path, "/generate-pat.py")
 | 
				
			||||||
 | 
					        self.assertFalse(item.is_folder)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_05(self):
 | 
				
			||||||
 | 
					        """Attempting to get a non-existent item from a repository using indexing"""
 | 
				
			||||||
 | 
					        repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
 | 
				
			||||||
 | 
					        with self.assertRaises(KeyError):
 | 
				
			||||||
 | 
					            repo["/non-existent-file.txt"] # This file does not exist
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class Test04(unittest.TestCase):
 | 
				
			||||||
 | 
					    def setUp(self):
 | 
				
			||||||
 | 
					        self.org = Organization("https://dev.azure.com/mcovsandbox", token=token)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_01(self):
 | 
				
			||||||
 | 
					        """Getting details of a specific item in a repository"""
 | 
				
			||||||
 | 
					        item = Item(Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa"), path="/generate-pat.py")
 | 
				
			||||||
 | 
					        self.assertEqual(item.path, "/generate-pat.py")
 | 
				
			||||||
 | 
					        self.assertIsNotNone(item.commit_id)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_02(self):
 | 
				
			||||||
 | 
					        """Trying to instantiate Item for a item that does not exist"""
 | 
				
			||||||
 | 
					        repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
 | 
				
			||||||
 | 
					        with self.assertRaises(requests.exceptions.HTTPError):
 | 
				
			||||||
 | 
					            item = Item(repo, path="/non-existent-file.txt")
 | 
				
			||||||
 | 
					            self.assertEqual(item.path, "/non-existent-file.txt")
 | 
				
			||||||
 | 
					            commit_id = item.commit_id # This will raise HTTPError when trying to fetch details of a non-existent item
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_03(self):
 | 
				
			||||||
 | 
					        """Listing items in a folder within a repository"""
 | 
				
			||||||
 | 
					        repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
 | 
				
			||||||
 | 
					        item = Item(repo, path="/container")
 | 
				
			||||||
 | 
					        children = list(item.children)
 | 
				
			||||||
 | 
					        self.assertGreater(len(children), 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					if __name__ == "__main__":
 | 
				
			||||||
 | 
					    unittest.main()
 | 
				
			||||||
		Reference in New Issue
	
	Block a user