Compare commits
37 Commits
44dcbe7fa9
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| b3ebe386dc | |||
| 15dcf09ec1 | |||
| ca16f1b098 | |||
| 4276939f30 | |||
| 15349c9390 | |||
| 064364caa6 | |||
| b83f53d140 | |||
| 32ee557f93 | |||
| 565464e266 | |||
| c3c5f9935c | |||
| c29e0b4e21 | |||
| c32fc25cee | |||
| e6bca1ce47 | |||
| 2addc85e40 | |||
| 5412c3ea09 | |||
| 6e16cebeea | |||
| d06bc05a2d | |||
| 495ba0b0b3 | |||
| 7d5d451d0c | |||
| 8b2a06551a | |||
| 89b9f8d6e6 | |||
| d6b58f0b51 | |||
| 89ba272003 | |||
| 5d7f2f51de | |||
| def594d5b7 | |||
| 73a63d0605 | |||
| cb0840f380 | |||
| f2e6f4907a | |||
| 80d4988bad | |||
| 9b2922c1ef | |||
| 27a5a13c47 | |||
| 6588313fa1 | |||
| 086618fe72 | |||
| 54361e6c18 | |||
| e4497791f3 | |||
| fd2fd61633 | |||
| 869d861165 |
9
.gitignore
vendored
9
.gitignore
vendored
@@ -1,6 +1,8 @@
|
|||||||
# Python
|
# Python
|
||||||
.venv
|
.venv
|
||||||
__pycache__/
|
__pycache__/
|
||||||
|
*.egg-info/
|
||||||
|
dist/
|
||||||
|
|
||||||
# Ignore sample JSON files
|
# Ignore sample JSON files
|
||||||
*.sample.json
|
*.sample.json
|
||||||
@@ -10,8 +12,15 @@ prototype_*.py
|
|||||||
|
|
||||||
# Shell secrets
|
# Shell secrets
|
||||||
*.secret
|
*.secret
|
||||||
|
*.client_secret
|
||||||
|
|
||||||
|
# Environment files
|
||||||
|
*.env
|
||||||
|
|
||||||
# Certificate files
|
# Certificate files
|
||||||
*.pem
|
*.pem
|
||||||
*.key
|
*.key
|
||||||
*.crt
|
*.crt
|
||||||
|
|
||||||
|
# Harvester output
|
||||||
|
reference
|
||||||
7
.vscode/settings.json
vendored
Normal file
7
.vscode/settings.json
vendored
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
{
|
||||||
|
"debug.autoExpandLazyVariables": "off",
|
||||||
|
"debug.inlineValues": "off",
|
||||||
|
"debugpy.debugJustMyCode": true,
|
||||||
|
"debugpy.showPythonInlineValues": false,
|
||||||
|
"python.terminal.useEnvFile": true
|
||||||
|
}
|
||||||
21
BUILD.md
Normal file
21
BUILD.md
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
# Build Instructions
|
||||||
|
|
||||||
|
Run the following command to build the project:
|
||||||
|
|
||||||
|
```shell
|
||||||
|
python -m build
|
||||||
|
```
|
||||||
|
|
||||||
|
This will create distribution files in the `dist/` directory.
|
||||||
|
|
||||||
|
Install the built package using pip:
|
||||||
|
|
||||||
|
```shell
|
||||||
|
pip install --no-index dist/docs_harvester-0.1.0-py3-none-any.whl
|
||||||
|
```
|
||||||
|
|
||||||
|
Install in editable mode for development:
|
||||||
|
|
||||||
|
```shell
|
||||||
|
pip install -e .
|
||||||
|
```
|
||||||
9
DEVOPS.md
Normal file
9
DEVOPS.md
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
# DevOps Notes
|
||||||
|
|
||||||
|
## DevOps OAuth2 Flow
|
||||||
|
|
||||||
|
Type: **oauth2**
|
||||||
|
Flow: **accessCode**
|
||||||
|
Authorization URL: `https://app.vssps.visualstudio.com/oauth2/authorize&response_type=Assertion`
|
||||||
|
Token URL: `https://app.vssps.visualstudio.com/oauth2/token?client_assertion_type=urn:ietf:params:oauth:client-assertion-type:jwt-bearer&grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer`
|
||||||
|
Scopes: `vso.code`
|
||||||
1
devops/__init__.py
Normal file
1
devops/__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
# devops package
|
||||||
@@ -59,9 +59,9 @@ def get_token(
|
|||||||
|
|
||||||
def secret_credentials_auth(
|
def secret_credentials_auth(
|
||||||
scope: str = DEVOPS_SCOPE,
|
scope: str = DEVOPS_SCOPE,
|
||||||
tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
|
tenant_id = os.environ.get("AZURE_TENANT_ID", ""),
|
||||||
client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
|
client_id = os.environ.get("AZURE_CLIENT_ID", ""),
|
||||||
client_secret: str = os.environ.get("AZURE_CLIENT_SECRET")
|
client_secret = os.environ.get("AZURE_CLIENT_SECRET")
|
||||||
) -> str:
|
) -> str:
|
||||||
"""
|
"""
|
||||||
Authenticate using client credentials. Pass credentials via environment variables,
|
Authenticate using client credentials. Pass credentials via environment variables,
|
||||||
@@ -92,8 +92,8 @@ def certificate_credentials_auth(
|
|||||||
# Wczytaj klucz prywatny (RSA)
|
# Wczytaj klucz prywatny (RSA)
|
||||||
with open(pem_path, "rb") as f:
|
with open(pem_path, "rb") as f:
|
||||||
pem = f.read()
|
pem = f.read()
|
||||||
key_pem = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0)
|
key_pem = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0) # type: ignore
|
||||||
cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0)
|
cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0) # type: ignore
|
||||||
|
|
||||||
private_key = serialization.load_pem_private_key(key_pem, password=None)
|
private_key = serialization.load_pem_private_key(key_pem, password=None)
|
||||||
cert = x509.load_pem_x509_certificate(cert_pem)
|
cert = x509.load_pem_x509_certificate(cert_pem)
|
||||||
@@ -115,7 +115,7 @@ def certificate_credentials_auth(
|
|||||||
|
|
||||||
headers = {"x5t": x5t, "kid": x5t}
|
headers = {"x5t": x5t, "kid": x5t}
|
||||||
|
|
||||||
assertion = jwt.encode(claims, private_key, algorithm="RS256", headers=headers)
|
assertion = jwt.encode(claims, private_key, algorithm="RS256", headers=headers) # type: ignore
|
||||||
|
|
||||||
data = {
|
data = {
|
||||||
"grant_type": "client_credentials",
|
"grant_type": "client_credentials",
|
||||||
327
devops/devops.py
Normal file
327
devops/devops.py
Normal file
@@ -0,0 +1,327 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
import pathlib
|
||||||
|
import requests
|
||||||
|
import urllib.parse
|
||||||
|
from uuid import UUID
|
||||||
|
import logging
|
||||||
|
|
||||||
|
DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
|
||||||
|
DEVOPS_API_VERSION = "7.1"
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Define a class decorator
|
||||||
|
def auto_properties(mapping: dict[str,str]):
|
||||||
|
|
||||||
|
def make_property(name: str):
|
||||||
|
private_var = f"_{name}"
|
||||||
|
|
||||||
|
def getter(self):
|
||||||
|
try:
|
||||||
|
i = getattr(self, private_var)
|
||||||
|
if i is not None:
|
||||||
|
return i
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Fetch repository details from the API if it is set to None or not existing
|
||||||
|
log.debug(f"Auto-fetching property '{name}' for {self.__class__.__name__}", extra={"property_name": name})
|
||||||
|
self.get_auto_properties()
|
||||||
|
return getattr(self, private_var)
|
||||||
|
|
||||||
|
def setter(self, value):
|
||||||
|
setattr(self, private_var, value)
|
||||||
|
|
||||||
|
return property(fget=getter, fset=setter)
|
||||||
|
|
||||||
|
def from_args(self, **kwargs):
|
||||||
|
for name in kwargs:
|
||||||
|
if name in self.__class__.__auto_properties__:
|
||||||
|
log.debug(f"Setting property '{name}' for {self.__class__.__name__} from args", extra={"property_name": name})
|
||||||
|
setattr(self, f"_{name}", kwargs.get(name, None))
|
||||||
|
|
||||||
|
def from_json(self, json_data: dict):
|
||||||
|
for json_name in self.__class__.__auto_properties_reversed__:
|
||||||
|
setattr(self, f"_{self.__class__.__auto_properties_reversed__[json_name]}", json_data.get(json_name, None))
|
||||||
|
|
||||||
|
def decorator(cls):
|
||||||
|
cls.__auto_properties__ = mapping # Make a copy of the mapping
|
||||||
|
cls.__auto_properties_reversed__ = {v: k for k, v in mapping.items()} # Store reversed mapping for JSON parsing
|
||||||
|
|
||||||
|
# Create properties dynamically
|
||||||
|
for name in mapping:
|
||||||
|
setattr(cls, name, make_property(name))
|
||||||
|
|
||||||
|
setattr(cls, "from_args", from_args)
|
||||||
|
setattr(cls, "from_json", from_json)
|
||||||
|
|
||||||
|
return cls
|
||||||
|
return decorator
|
||||||
|
|
||||||
|
def get_url(URL: str, token: str, api_version: str, params: dict = {}) -> requests.Response:
|
||||||
|
"""Helper method to make GET requests to DevOps REST API."""
|
||||||
|
if not URL or not token or not api_version:
|
||||||
|
raise ValueError("Organization URL, token, and API version must be set before making requests.")
|
||||||
|
|
||||||
|
request_parameters = {
|
||||||
|
"api-version": api_version,
|
||||||
|
**params
|
||||||
|
}
|
||||||
|
log.debug(f"Making GET request", extra={"url": URL, "params": request_parameters, "http_method": "get"})
|
||||||
|
r = requests.get(url=URL, params=request_parameters, headers={
|
||||||
|
"Authorization": f"Bearer {token}"
|
||||||
|
})
|
||||||
|
r.raise_for_status() # Ensure we raise an error for bad responses
|
||||||
|
return r # Return response
|
||||||
|
|
||||||
|
class Organization():
|
||||||
|
def __init__(self, org_url: str, token: str, api_version: str = DEVOPS_API_VERSION):
|
||||||
|
self._org_url = org_url.rstrip("/") + "/" # Ensure trailing slash
|
||||||
|
self._token = token
|
||||||
|
self._api_version = api_version
|
||||||
|
|
||||||
|
def get_path(self, path: str, params: dict = {}) -> requests.Response:
|
||||||
|
return get_url(
|
||||||
|
URL=urllib.parse.urljoin(self._org_url, path),
|
||||||
|
token=self._token,
|
||||||
|
api_version=self._api_version,
|
||||||
|
params=params
|
||||||
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def projects(self):
|
||||||
|
if not hasattr(self, "_projects"):
|
||||||
|
# Create Project objects
|
||||||
|
self._projects = []
|
||||||
|
for proj in self.get_path("_apis/projects").json().get("value", []):
|
||||||
|
p = Project(org=self, **proj)
|
||||||
|
self._projects.append(p)
|
||||||
|
return self._projects
|
||||||
|
|
||||||
|
def __getitem__(self, key: str) -> Project:
|
||||||
|
for project in self.projects:
|
||||||
|
if project.id == key or project.name == key: # type: ignore[attr-defined]
|
||||||
|
return project
|
||||||
|
raise KeyError(f"Project with ID or name '{key}' not found.")
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return f"Organization(url=\"{self._org_url}\")"
|
||||||
|
|
||||||
|
@auto_properties({
|
||||||
|
"id": "id",
|
||||||
|
"name": "name",
|
||||||
|
"url": "url",
|
||||||
|
"description": "description"
|
||||||
|
})
|
||||||
|
class Project():
|
||||||
|
def __init__(self, org: Organization, **kwargs):
|
||||||
|
self._org = org
|
||||||
|
self.from_args(**kwargs) # type: ignore[attr-defined]
|
||||||
|
|
||||||
|
if not hasattr(self, "_id") or self._id is None:
|
||||||
|
raise ValueError("Project ID must be provided.")
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._id = str(UUID(self._id))
|
||||||
|
except ValueError:
|
||||||
|
raise ValueError(f"Invalid project ID: {self._id}")
|
||||||
|
|
||||||
|
def get_auto_properties(self):
|
||||||
|
r = get_url(
|
||||||
|
URL=f"{self._org._org_url}_apis/projects/{self._id}",
|
||||||
|
token=self._org._token,
|
||||||
|
api_version=self._org._api_version
|
||||||
|
)
|
||||||
|
self.from_json(r.json()) # type: ignore[attr-defined]
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return f"Project(name=\"{self.name}\", id={self.id})" # type: ignore[attr-defined]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def id(self):
|
||||||
|
return self._id
|
||||||
|
|
||||||
|
@property
|
||||||
|
def organization(self):
|
||||||
|
return self._org
|
||||||
|
|
||||||
|
@property
|
||||||
|
def repositories(self):
|
||||||
|
if not hasattr(self, "_repositories"):
|
||||||
|
self._repositories = []
|
||||||
|
for repo in self._org.get_path(f"{self._id}/_apis/git/repositories").json().get("value", []):
|
||||||
|
# Remove unnecessary nested project info
|
||||||
|
if "project" in repo:
|
||||||
|
del repo["project"]
|
||||||
|
self._repositories.append(Repository(project=self, **repo))
|
||||||
|
return self._repositories
|
||||||
|
|
||||||
|
def __getitem__(self, key: str) -> Repository:
|
||||||
|
for repo in self.repositories:
|
||||||
|
if repo.id == key or repo.name == key: # type: ignore[attr-defined]
|
||||||
|
return repo
|
||||||
|
raise KeyError(f"Repository with ID or name '{key}' not found.")
|
||||||
|
|
||||||
|
@auto_properties({
|
||||||
|
"id": "id",
|
||||||
|
"name": "name",
|
||||||
|
"url": "url",
|
||||||
|
"default_branch": "defaultBranch",
|
||||||
|
"is_disabled": "isDisabled",
|
||||||
|
"is_in_maintenance": "isInMaintenance",
|
||||||
|
"remote_url": "remoteUrl",
|
||||||
|
"ssh_url": "sshUrl",
|
||||||
|
"web_url": "webUrl"
|
||||||
|
})
|
||||||
|
class Repository():
|
||||||
|
def __init__(self, project: Project, **kwargs):
|
||||||
|
self._project = project
|
||||||
|
|
||||||
|
if "id" not in kwargs and "name" not in kwargs:
|
||||||
|
raise ValueError("Either repository ID or name must be provided.")
|
||||||
|
|
||||||
|
if "id" in kwargs:
|
||||||
|
try:
|
||||||
|
UUID(kwargs.get("id")) # Check if it's a valid UUID
|
||||||
|
except ValueError:
|
||||||
|
raise ValueError("Invalid repository ID, must be a valid UUID.")
|
||||||
|
|
||||||
|
# set other properties if provided
|
||||||
|
self.from_args(**kwargs) # type: ignore[attr-defined]
|
||||||
|
|
||||||
|
def get_auto_properties(self):
|
||||||
|
id = self._id if hasattr(self, "_id") else self._name # type: ignore[attr-defined]
|
||||||
|
if id is None or id == "":
|
||||||
|
raise ValueError("Repository ID or name must be set to fetch properties.")
|
||||||
|
|
||||||
|
r = self._project.organization.get_path(path=f"{self._project.id}/_apis/git/repositories/{id}")
|
||||||
|
self.from_json(r.json()) # type: ignore[attr-defined]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def id(self):
|
||||||
|
return self._id # type: ignore[attr-defined]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def project(self):
|
||||||
|
return self._project
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return f"Repository(name={self.name}, id={self._id})" # type: ignore[attr-defined]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def items(self):
|
||||||
|
log.debug(f"Fetching items for repository '{self.name}'", extra={"repository_name": self.name}) # type: ignore[attr-defined]
|
||||||
|
if not hasattr(self, "_items"):
|
||||||
|
root = Item(repository=self, path="/", git_object_type="tree")
|
||||||
|
self._items = root.get_child_items()
|
||||||
|
|
||||||
|
return self._items
|
||||||
|
|
||||||
|
def __getitem__(self, key: str) -> Item:
|
||||||
|
for item in self.items:
|
||||||
|
if item.path == key:
|
||||||
|
return item
|
||||||
|
raise KeyError(f"Item with path '{key}' not found.")
|
||||||
|
|
||||||
|
@auto_properties({
|
||||||
|
"path": "path",
|
||||||
|
"object_id": "objectId",
|
||||||
|
"git_object_type": "gitObjectType",
|
||||||
|
"commit_id": "commitId",
|
||||||
|
"is_folder": "isFolder",
|
||||||
|
"url": "url"
|
||||||
|
})
|
||||||
|
class Item():
|
||||||
|
def __init__(self, repository: Repository, **kwargs):
|
||||||
|
self._repository = repository
|
||||||
|
self.from_args(**kwargs) # type: ignore[attr-defined]
|
||||||
|
if "branch" in kwargs:
|
||||||
|
self._branch = kwargs.get("branch")
|
||||||
|
|
||||||
|
@property
|
||||||
|
def branch(self):
|
||||||
|
if hasattr(self, "_branch"):
|
||||||
|
return getattr(self, "_branch")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_auto_properties(self):
|
||||||
|
r = self._repository._project.organization.get_path(
|
||||||
|
path=f"{self._repository._project.id}/_apis/git/repositories/{self._repository.id}/items",
|
||||||
|
params={
|
||||||
|
"path": self.path,
|
||||||
|
"$format": "json",
|
||||||
|
"recursionLevel": "none"
|
||||||
|
}
|
||||||
|
)
|
||||||
|
self.from_json(r.json()) # type: ignore[attr-defined]
|
||||||
|
|
||||||
|
def get_content(self, branch: str | None = None, commit: str | None = None, tag: str | None = None) -> bytes:
|
||||||
|
"""Get the content of the item with optional branch, commit, or tag."""
|
||||||
|
if self.git_object_type != "blob": # type: ignore[attr-defined]
|
||||||
|
raise ValueError("Content can only be fetched for blob items.")
|
||||||
|
|
||||||
|
params = { "path": self.path, "recursionLevel": "none" }
|
||||||
|
if self.branch and branch is None:
|
||||||
|
branch = self.branch
|
||||||
|
|
||||||
|
if branch:
|
||||||
|
params["version"] = branch
|
||||||
|
params["versionType"] = "branch"
|
||||||
|
elif tag:
|
||||||
|
params["version"] = tag
|
||||||
|
params["versionType"] = "tag"
|
||||||
|
elif commit:
|
||||||
|
params["version"] = commit
|
||||||
|
params["versionType"] = "commit"
|
||||||
|
|
||||||
|
r = self._repository._project.organization.get_path(
|
||||||
|
path=f"{self._repository._project.id}/_apis/git/repositories/{self._repository.id}/items",
|
||||||
|
params=params
|
||||||
|
)
|
||||||
|
return r.content
|
||||||
|
|
||||||
|
@property
|
||||||
|
def path(self):
|
||||||
|
return self._path # type: ignore[attr-defined]
|
||||||
|
|
||||||
|
def get_child_items(self, pattern: str | None = None, recurse: bool = False) -> list[Item]:
|
||||||
|
"""Get child items if this item is a folder."""
|
||||||
|
if self.git_object_type != "tree": # type: ignore[attr-defined]
|
||||||
|
raise ValueError("Child items can only be fetched for folder items.")
|
||||||
|
|
||||||
|
# Fetch child objects
|
||||||
|
objects = self._repository.project.organization.get_path(
|
||||||
|
path=f"{self._repository.project.id}/_apis/git/repositories/{self._repository.id}/items",
|
||||||
|
params={
|
||||||
|
"scopePath": self.path,
|
||||||
|
"recursionLevel": "oneLevel" if not recurse else "full"
|
||||||
|
}
|
||||||
|
).json().get("value", [])
|
||||||
|
child_items = []
|
||||||
|
for obj in objects:
|
||||||
|
obj_path = obj.get("path")
|
||||||
|
if pattern and not pathlib.PurePath(obj_path).match(pattern):
|
||||||
|
continue
|
||||||
|
i = Item(repository=self._repository, path=obj_path)
|
||||||
|
i.from_json(obj) # type: ignore[attr-defined]
|
||||||
|
child_items.append(i)
|
||||||
|
return child_items
|
||||||
|
|
||||||
|
@property
|
||||||
|
def children(self):
|
||||||
|
if not hasattr(self, "_children"):
|
||||||
|
self._children = self.get_child_items()
|
||||||
|
return self._children
|
||||||
|
|
||||||
|
def __getitem__(self, key: str) -> Item:
|
||||||
|
if self.git_object_type != "tree": # type: ignore[attr-defined]
|
||||||
|
raise ValueError("Child items can only be accessed for folder items.")
|
||||||
|
if not key.startswith("/"):
|
||||||
|
key = pathlib.Path(self.path).joinpath(key).absolute().as_posix()
|
||||||
|
for child in self.children:
|
||||||
|
if child.path == key:
|
||||||
|
return child
|
||||||
|
raise KeyError(f"Child item with path '{key}' not found.")
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return f"Item(path=\"{self._path}\" type={self.git_object_type})" # type: ignore[attr-defined]
|
||||||
57
get-token.py
57
get-token.py
@@ -1,11 +1,54 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Get an Azure DevOps token and print it in a format suitable for exporting as an environment variable.
|
||||||
|
|
||||||
from sk.azure import get_token
|
Usage:
|
||||||
|
eval $(python get-token.py)
|
||||||
|
|
||||||
token = get_token(
|
or
|
||||||
tenant_id="a7740229-47b6-45de-ad22-83721462b1bf",
|
|
||||||
client_id="840671c4-5dc4-40e5-aab9-7c3a07bbd652",
|
|
||||||
pem_path="./slawek-mba.pem"
|
|
||||||
)
|
|
||||||
|
|
||||||
print(f"Obtained token: {token}")
|
python get-token.py > set-ado-token.sh
|
||||||
|
source set-ado-token.sh
|
||||||
|
|
||||||
|
Now you can use the ADO_TOKEN environment variable, for example using curl:
|
||||||
|
|
||||||
|
curl -sH "Authorization: Bearer $ADO_TOKEN" "https://dev.azure.com/$ADO_ORGANIZATION_URL/_apis/projects?api-version=7.1"
|
||||||
|
"""
|
||||||
|
|
||||||
|
from devops.azure import get_token
|
||||||
|
from argparse import ArgumentParser
|
||||||
|
import os
|
||||||
|
|
||||||
|
args = ArgumentParser(description="Get Azure DevOps token and print it for exporting as environment variable.")
|
||||||
|
args.add_argument("--tenant-id", type=str, default=os.getenv("AZURE_TENANT_ID"), help="Azure AD Tenant ID")
|
||||||
|
args.add_argument("--client-id", type=str, default=os.getenv("AZURE_CLIENT_ID"), help="Azure AD Client ID")
|
||||||
|
args.add_argument("--pem-path", type=str, default=os.getenv("AZURE_CLIENT_CERTIFICATE_PATH"), help="Path to PEM file for authentication (optional)")
|
||||||
|
args.add_argument("--client-secret", type=str, default=os.getenv("AZURE_CLIENT_SECRET"), help="Client Secret for authentication (optional)")
|
||||||
|
args = args.parse_args()
|
||||||
|
|
||||||
|
if not args.tenant_id or not args.client_id:
|
||||||
|
print("Tenant ID and Client ID are required.")
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
if args.pem_path and os.path.isfile(args.pem_path):
|
||||||
|
token = get_token(
|
||||||
|
tenant_id=args.tenant_id,
|
||||||
|
client_id=args.client_id,
|
||||||
|
pem_path=args.pem_path
|
||||||
|
)
|
||||||
|
elif args.client_secret:
|
||||||
|
if not args.client_secret:
|
||||||
|
print("Client secret file is empty.")
|
||||||
|
exit(1)
|
||||||
|
token = get_token(
|
||||||
|
tenant_id=args.tenant_id,
|
||||||
|
client_id=args.client_id,
|
||||||
|
client_secret=args.client_secret
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
token = get_token(
|
||||||
|
tenant_id=args.tenant_id,
|
||||||
|
client_id=args.client_id
|
||||||
|
)
|
||||||
|
|
||||||
|
print(f"export ADO_TOKEN='{token}'")
|
||||||
|
|||||||
10
harvester.py
10
harvester.py
@@ -1,8 +1,8 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
from sk.devops import Organization
|
from harvester.harvester import harvest_readmes
|
||||||
org = Organization("https://dev.azure.com/mcovsandbox")
|
|
||||||
|
|
||||||
# print(org.projects["bafe0cf1-6c97-4088-864a-ea6dc02b2727"].repositories["feac266f-84d2-41bc-839b-736925a85eaa"].items["/generate-pat.py"])
|
if __name__ == "__main__":
|
||||||
print(org["ADO Sandbox"]["ado-auth-lab"]["/container"].url)
|
harvest_readmes(
|
||||||
print(org["ADO Sandbox"]["ado-auth-lab"]["/generate-pat.py"].url)
|
organization="mcovsandbox"
|
||||||
|
)
|
||||||
|
|||||||
1
harvester/__init__.py
Normal file
1
harvester/__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
# Harvester Package
|
||||||
75
harvester/harvester.py
Normal file
75
harvester/harvester.py
Normal file
@@ -0,0 +1,75 @@
|
|||||||
|
import os
|
||||||
|
import requests
|
||||||
|
from devops.azure import get_token
|
||||||
|
from devops.devops import Organization, Project, Repository, Item
|
||||||
|
import logging
|
||||||
|
|
||||||
|
fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
||||||
|
ch = logging.StreamHandler()
|
||||||
|
ch.setFormatter(fmt)
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
log.addHandler(ch)
|
||||||
|
log.setLevel(logging.INFO)
|
||||||
|
|
||||||
|
def sanitize_name(name: str) -> str:
|
||||||
|
"""Sanitize a name to be filesystem-friendly."""
|
||||||
|
return name.lower().replace(" ", "-").replace("_", "-")
|
||||||
|
|
||||||
|
def harvest_readmes(organization: str | Organization, branch: list[str | None] = ["main", "dev", None], projects: list[str] = [], output_path: str = "reference") -> None:
|
||||||
|
"""Harvest README files from repositories."""
|
||||||
|
if isinstance(organization, str):
|
||||||
|
org = Organization("https://dev.azure.com/" + organization, token=get_token())
|
||||||
|
else:
|
||||||
|
org = organization
|
||||||
|
|
||||||
|
if projects:
|
||||||
|
# Target specific projects
|
||||||
|
target_projects = [Project(org=org, name=project_name) for project_name in projects]
|
||||||
|
else:
|
||||||
|
# Target all projects
|
||||||
|
target_projects = org.projects
|
||||||
|
|
||||||
|
for project in target_projects:
|
||||||
|
repo_index = [] # Repository index for the project.
|
||||||
|
log.info(f"Processing project: {project.name} with {len(project.repositories)} repositories.") # type: ignore
|
||||||
|
for repo in project.repositories:
|
||||||
|
log.info(f"...processing repository: {repo.name}")
|
||||||
|
readme_found = False
|
||||||
|
# Try each specified branch to find the README.md file
|
||||||
|
for branch_name in branch:
|
||||||
|
try:
|
||||||
|
# Check if the README.md file exists
|
||||||
|
readme = Item(repository=repo, path="/README.md", branch=branch_name)
|
||||||
|
# Build output path and save the README content if found
|
||||||
|
if readme:
|
||||||
|
project_path = f"{output_path}/{sanitize_name(project.name)}" # type: ignore
|
||||||
|
# Create project directory if it doesn't exist
|
||||||
|
os.makedirs(project_path, exist_ok=True)
|
||||||
|
# Save README content to index.md
|
||||||
|
readme_content = readme.get_content(branch=branch_name)
|
||||||
|
if readme_content is None or len(readme_content.strip()) == 0:
|
||||||
|
continue
|
||||||
|
with open(f"{project_path}/{sanitize_name(repo.name)}.md", "w") as f:
|
||||||
|
f.write(readme_content.decode("utf-8"))
|
||||||
|
readme_found = True
|
||||||
|
break # Exit branch loop if README is found
|
||||||
|
except requests.exceptions.HTTPError:
|
||||||
|
# Repository does not have a README.md file in the specified branch
|
||||||
|
continue
|
||||||
|
# Register if README was not found in any branch
|
||||||
|
repo_index.append((repo.name, readme_found)) # README not found
|
||||||
|
# Log if the README was not found
|
||||||
|
if not readme_found:
|
||||||
|
log.warning(f"......README.md in repo {repo.name} is not found or empty.")
|
||||||
|
# Save the repository index for the project
|
||||||
|
with open(f"{output_path}/{sanitize_name(project.name)}/index.md", "w") as index_file: # type: ignore
|
||||||
|
index_file.write(f"# Repository Index for Project: {project.name}\n\n") # type: ignore
|
||||||
|
for repo_name, has_readme in repo_index:
|
||||||
|
status = "" if has_readme else " - README.md not found"
|
||||||
|
index_file.write(f"- [{repo_name}]({sanitize_name(repo_name)}.md){status}\n")
|
||||||
|
|
||||||
|
# Save the reference index for all projects
|
||||||
|
with open(f"{output_path}/index.md", "w") as ref_index_file:
|
||||||
|
ref_index_file.write("# Project Index\n\n")
|
||||||
|
for project in target_projects:
|
||||||
|
ref_index_file.write(f"- [{project.name}]({sanitize_name(project.name)}/index.md)\n") # type: ignore
|
||||||
@@ -1,6 +1,8 @@
|
|||||||
#! /usr/bin/env bash
|
#! /usr/bin/env bash
|
||||||
|
|
||||||
python3 -m venv .venv
|
VERSION="${1:-3}"
|
||||||
|
|
||||||
|
python${VERSION} -m venv .venv
|
||||||
./.venv/bin/pip install --upgrade pip
|
./.venv/bin/pip install --upgrade pip
|
||||||
./.venv/bin/pip install -r requirements.txt
|
./.venv/bin/pip install -r requirements.txt
|
||||||
|
|
||||||
|
|||||||
4
mkdocs.yml
Normal file
4
mkdocs.yml
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
site_name: Reference Documentation
|
||||||
|
docs_dir: reference
|
||||||
|
theme:
|
||||||
|
name: material
|
||||||
11
pyproject.toml
Normal file
11
pyproject.toml
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
[build-system]
|
||||||
|
requires = ["setuptools", "wheel", "build"]
|
||||||
|
build-backend = "setuptools.build_meta"
|
||||||
|
|
||||||
|
[project]
|
||||||
|
name = "devops"
|
||||||
|
version = "0.1.0"
|
||||||
|
requires-python = ">=3.12"
|
||||||
|
|
||||||
|
[tool.setuptools]
|
||||||
|
packages = ["devops"]
|
||||||
@@ -1,3 +1,5 @@
|
|||||||
debugpy==1.8.17
|
debugpy==1.8.17
|
||||||
azure-identity==1.25.1
|
azure-identity==1.25.1
|
||||||
requests==2.32.5
|
requests==2.32.5
|
||||||
|
loki-logger-handler==1.1.2
|
||||||
|
mkdocs-material>=1.5.2
|
||||||
|
|||||||
1
sk/__init__.py
Normal file
1
sk/__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
# My helper tools
|
||||||
307
sk/devops.py
307
sk/devops.py
@@ -1,307 +0,0 @@
|
|||||||
from __future__ import annotations
|
|
||||||
import requests
|
|
||||||
import urllib.parse
|
|
||||||
from uuid import UUID
|
|
||||||
from string import Template
|
|
||||||
|
|
||||||
DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
|
|
||||||
DEVOPS_API_VERSION = "7.1"
|
|
||||||
|
|
||||||
# Define a class decorator
|
|
||||||
def auto_properties(mapping: dict[str,str]):
|
|
||||||
|
|
||||||
def make_property(name: str):
|
|
||||||
private_var = f"_{name}"
|
|
||||||
|
|
||||||
def getter(self):
|
|
||||||
try:
|
|
||||||
i = getattr(self, private_var)
|
|
||||||
if i is not None:
|
|
||||||
return i
|
|
||||||
except AttributeError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Fetch repository details from the API if it is set to None or not existing
|
|
||||||
self._get()
|
|
||||||
return getattr(self, private_var)
|
|
||||||
|
|
||||||
return property(fget=getter)
|
|
||||||
|
|
||||||
def set_auto_properties(self, **kwargs):
|
|
||||||
allowed = set(self.__class__.__auto_properties__)
|
|
||||||
unknown = [k for k in kwargs if k not in allowed]
|
|
||||||
if unknown:
|
|
||||||
raise ValueError(f"Unknown properties for {self.__class__.__name__}: {', '.join(unknown)}")
|
|
||||||
for k, v in kwargs.items():
|
|
||||||
setattr(self, f"_{k}", v)
|
|
||||||
return self
|
|
||||||
|
|
||||||
def from_json(self, json_data: dict):
|
|
||||||
for name in self.__class__.__auto_properties__:
|
|
||||||
setattr(self, f"_{name}", json_data.get(self.__class__.__auto_properties__[name], None))
|
|
||||||
return self
|
|
||||||
|
|
||||||
def decorator(cls):
|
|
||||||
cls.__auto_properties__ = mapping # Make a copy of the mapping
|
|
||||||
|
|
||||||
# Create properties dynamically
|
|
||||||
for name in mapping:
|
|
||||||
setattr(cls, name, make_property(name))
|
|
||||||
|
|
||||||
setattr(cls, "set_auto_properties", set_auto_properties)
|
|
||||||
setattr(cls, "from_json", from_json)
|
|
||||||
|
|
||||||
return cls
|
|
||||||
return decorator
|
|
||||||
|
|
||||||
def devops(key: str, get_url: str, list_url: str = None, params: dict = {}):
|
|
||||||
def decorator(cls):
|
|
||||||
cls.__entity_key__ = key
|
|
||||||
cls.__entity_get_url__ = get_url # Use $key in the URL
|
|
||||||
cls.__entity_list_url__ = list_url # Use $key in the URL
|
|
||||||
cls.__entity_params__ = params
|
|
||||||
return cls
|
|
||||||
return decorator
|
|
||||||
|
|
||||||
class DevOps():
|
|
||||||
"""Base class for DevOps entities."""
|
|
||||||
|
|
||||||
def __init__(self, org_url: str, token: str, api_version: str = DEVOPS_API_VERSION):
|
|
||||||
self._org_url = org_url.rstrip("/") + "/" # Ensure trailing slash
|
|
||||||
self._token = token
|
|
||||||
self._api_version = api_version
|
|
||||||
|
|
||||||
def _get_url_path(self, path: str, params: dict = {}) -> requests.Response:
|
|
||||||
if not self._org_url or not self._token or not self._api_version:
|
|
||||||
raise ValueError("Organization URL, token, and API version must be set before making requests.")
|
|
||||||
|
|
||||||
request_parameters = {
|
|
||||||
"api-version": self._api_version,
|
|
||||||
**params
|
|
||||||
}
|
|
||||||
encoded_path = urllib.parse.quote(path.lstrip("/")) # Ensure single slash between base and path
|
|
||||||
url = self._org_url + encoded_path
|
|
||||||
r = requests.get(url=url, params=request_parameters, headers={
|
|
||||||
"Authorization": f"Bearer {self._token}"
|
|
||||||
})
|
|
||||||
r.raise_for_status() # Ensure we raise an error for bad responses
|
|
||||||
return r
|
|
||||||
|
|
||||||
def _get(self, key: str):
|
|
||||||
if not hasattr(self.__class__, "__entity_key__") or not hasattr(self.__class__, "__entity_get_url__"):
|
|
||||||
raise NotImplementedError("Called _get on a class that has not been decorated with @devops.")
|
|
||||||
setattr(self, f"_{self.__class__.__entity_key__}", key) # Set the entity key
|
|
||||||
# Build the URL
|
|
||||||
url = Template(self.__class__.__entity_get_url__).substitute(key=key)
|
|
||||||
# Build parameters with key substituted
|
|
||||||
params = {}
|
|
||||||
if hasattr(self.__class__, "__entity_params__"):
|
|
||||||
params = {k: Template(v).substitute(key=key) for k, v in self.__class__.__entity_params__.items()}
|
|
||||||
|
|
||||||
# Fetch the object data from the URL
|
|
||||||
r = self._get_url_path(url, params=params)
|
|
||||||
|
|
||||||
# Populate attributes
|
|
||||||
self.from_json(r.json())
|
|
||||||
|
|
||||||
def _get_entity(self, key_name: str, get_url: str, params: dict = {}):
|
|
||||||
"""
|
|
||||||
Each entity class can use this method to populate its attributes, by defining
|
|
||||||
its own _get method that calls this one with the key name,
|
|
||||||
and the URL.
|
|
||||||
"""
|
|
||||||
r = self._get_url_path(get_url, params=params) # Fetch the object data from the URL
|
|
||||||
setattr(self, f"_{key_name}", r.json().get(key_name, None)) # Set the key attribute
|
|
||||||
self.from_json(r.json()) # Populate other attributes from JSON
|
|
||||||
|
|
||||||
def _entity(self, entity_class: type, key_name: str, entity_data: dict) -> object:
|
|
||||||
"""A generic method to create an entity from JSON data."""
|
|
||||||
args = { key_name: entity_data.get(key_name) }
|
|
||||||
e = entity_class(self, **args)
|
|
||||||
e.from_json(entity_data)
|
|
||||||
return e
|
|
||||||
|
|
||||||
def _entities(self, entity_class: type, key_name: str, list_url: str, params: dict = {}) -> list[object]:
|
|
||||||
"""A generic method to retrieve a list of entities."""
|
|
||||||
r = self._get_url_path(list_url, params=params)
|
|
||||||
entities_data = r.json().get("value", [])
|
|
||||||
|
|
||||||
entities_list = []
|
|
||||||
for entity in entities_data:
|
|
||||||
entities_list.append(self._entity(entity_class, key_name, entity))
|
|
||||||
|
|
||||||
return entities_list
|
|
||||||
|
|
||||||
class Organization(DevOps):
|
|
||||||
def __init__(self, org_url: str, token: str | None = None, api_version: str = DEVOPS_API_VERSION):
|
|
||||||
super().__init__(org_url, token, api_version)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def projects(self):
|
|
||||||
if not hasattr(self, "_projects"):
|
|
||||||
self._projects = self._entities(
|
|
||||||
entity_class=Project,
|
|
||||||
key_name="id",
|
|
||||||
list_url="_apis/projects")
|
|
||||||
return self._projects
|
|
||||||
|
|
||||||
def __getitem__(self, key: str) -> Project:
|
|
||||||
for project in self.projects:
|
|
||||||
if project.id == key or project.name == key:
|
|
||||||
return project
|
|
||||||
raise KeyError(f"Project with ID or name '{key}' not found.")
|
|
||||||
|
|
||||||
@auto_properties({
|
|
||||||
"name": "name",
|
|
||||||
"url": "url",
|
|
||||||
"description": "description"
|
|
||||||
})
|
|
||||||
@devops("id", "_apis/projects/$key", "_apis/projects")
|
|
||||||
class Project(DevOps):
|
|
||||||
def _get(self):
|
|
||||||
self._get_entity(
|
|
||||||
key_name="id",
|
|
||||||
get_url=f"_apis/projects/{self._id}"
|
|
||||||
)
|
|
||||||
|
|
||||||
def __init__(self, org: Organization, id: str, **kwargs):
|
|
||||||
super().__init__(org._org_url, org._token, org._api_version)
|
|
||||||
|
|
||||||
try:
|
|
||||||
self._id = str(UUID(id))
|
|
||||||
except ValueError:
|
|
||||||
raise ValueError(f"Invalid project ID: {id}")
|
|
||||||
|
|
||||||
self.set_auto_properties(**kwargs)
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return f"Project(name={self._name}, id={self._id})"
|
|
||||||
|
|
||||||
@property
|
|
||||||
def id(self):
|
|
||||||
return self._id
|
|
||||||
|
|
||||||
@property
|
|
||||||
def repositories(self):
|
|
||||||
if not hasattr(self, "_repositories"):
|
|
||||||
self._repositories = self._entities(
|
|
||||||
entity_class=Repository,
|
|
||||||
key_name="id",
|
|
||||||
list_url=f"{self._id}/_apis/git/repositories")
|
|
||||||
return self._repositories
|
|
||||||
|
|
||||||
def __getitem__(self, key: str) -> Repository:
|
|
||||||
for repo in self.repositories:
|
|
||||||
if repo.id == key or repo.name == key:
|
|
||||||
return repo
|
|
||||||
raise KeyError(f"Repository with ID or name '{key}' not found.")
|
|
||||||
|
|
||||||
@auto_properties({
|
|
||||||
"name": "name",
|
|
||||||
"url": "url",
|
|
||||||
"default_branch": "defaultBranch",
|
|
||||||
"is_disabled": "isDisabled",
|
|
||||||
"is_in_maintenance": "isInMaintenance",
|
|
||||||
"remote_url": "remoteUrl",
|
|
||||||
"ssh_url": "sshUrl",
|
|
||||||
"web_url": "webUrl"
|
|
||||||
})
|
|
||||||
class Repository(DevOps):
|
|
||||||
def _get(self):
|
|
||||||
self._get_entity(
|
|
||||||
key_name="id",
|
|
||||||
get_url=f"{self._project.id}/_apis/git/repositories/{self._id}"
|
|
||||||
)
|
|
||||||
|
|
||||||
def __init__(self, project: Project, id: str, **kwargs):
|
|
||||||
super().__init__(project._org_url, project._token, project._api_version)
|
|
||||||
self._project = project
|
|
||||||
self._id = id
|
|
||||||
|
|
||||||
try:
|
|
||||||
UUID(id) # Check if it's a valid UUID
|
|
||||||
except ValueError:
|
|
||||||
# Called with a repository name, fetch by name
|
|
||||||
self._get()
|
|
||||||
if kwargs:
|
|
||||||
raise ValueError("Automatic properties cannot be set when retrieving by name.")
|
|
||||||
return
|
|
||||||
|
|
||||||
# set other properties if provided
|
|
||||||
self.set_auto_properties(**kwargs)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def id(self):
|
|
||||||
return self._id
|
|
||||||
|
|
||||||
@property
|
|
||||||
def project(self):
|
|
||||||
return self._project
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return f"Repository(name={self.name}, id={self._id})"
|
|
||||||
|
|
||||||
@property
|
|
||||||
def items(self):
|
|
||||||
if not hasattr(self, "_items"):
|
|
||||||
self._items = self._entities(
|
|
||||||
entity_class=Item,
|
|
||||||
key_name="path",
|
|
||||||
list_url=f"{self._project.id}/_apis/git/repositories/{self._id}/items",
|
|
||||||
params={
|
|
||||||
"scopePath": "/",
|
|
||||||
"recursionLevel": "oneLevel"
|
|
||||||
}
|
|
||||||
)
|
|
||||||
return self._items
|
|
||||||
|
|
||||||
def __getitem__(self, key: str) -> Item:
|
|
||||||
for item in self.items:
|
|
||||||
if item.path == key:
|
|
||||||
return item
|
|
||||||
raise KeyError(f"Item with path '{key}' not found.")
|
|
||||||
|
|
||||||
@auto_properties({
|
|
||||||
"object_id": "objectId",
|
|
||||||
"git_object_type": "gitObjectType",
|
|
||||||
"commit_id": "commitId",
|
|
||||||
"is_folder": "isFolder",
|
|
||||||
"url": "url"
|
|
||||||
})
|
|
||||||
class Item(DevOps):
|
|
||||||
def __init__(self, repository: Repository, path: str, **kwargs):
|
|
||||||
super().__init__(repository._org_url, repository._token, repository._api_version)
|
|
||||||
self._repository = repository
|
|
||||||
self._path = path
|
|
||||||
self.set_auto_properties(**kwargs) # set properties defined in decorator
|
|
||||||
|
|
||||||
def _get(self):
|
|
||||||
self._get_entity(
|
|
||||||
key_name="path",
|
|
||||||
get_url=f"{self._repository._project.id}/_apis/git/repositories/{self._repository.id}/items",
|
|
||||||
params={
|
|
||||||
"path": self._path,
|
|
||||||
"$format": "json",
|
|
||||||
"recursionLevel": "none"
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def path(self):
|
|
||||||
return self._path
|
|
||||||
|
|
||||||
@property
|
|
||||||
def children(self):
|
|
||||||
"""List items under this item if it is a folder."""
|
|
||||||
if self.git_object_type == "tree":
|
|
||||||
return self._entities(
|
|
||||||
entity_class=Item,
|
|
||||||
key_name="path",
|
|
||||||
list_url=f"{self._repository._project.id}/_apis/git/repositories/{self._repository.id}/items",
|
|
||||||
params={
|
|
||||||
"scopePath": self._path,
|
|
||||||
"recursionLevel": "oneLevel"
|
|
||||||
}
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
raise ValueError("Items can only be listed for folder items.")
|
|
||||||
44
sk/logger.py
Normal file
44
sk/logger.py
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
import logging
|
||||||
|
from loki_logger_handler.loki_logger_handler import LokiLoggerHandler
|
||||||
|
|
||||||
|
LOG_CONSOLE = 1
|
||||||
|
LOG_FILE = 2
|
||||||
|
LOG_LOKI = 4
|
||||||
|
|
||||||
|
# Quick non-intrusive debug check for sk.devops logger
|
||||||
|
def setup(name: str, handlers: int) -> logging.Logger:
|
||||||
|
logger = logging.getLogger(name)
|
||||||
|
logger.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s [%(url)]')
|
||||||
|
|
||||||
|
if handlers & LOG_CONSOLE:
|
||||||
|
console = logging.StreamHandler()
|
||||||
|
console.setLevel(logging.INFO)
|
||||||
|
console.setFormatter(fmt)
|
||||||
|
logger.addHandler(console)
|
||||||
|
|
||||||
|
if handlers & LOG_FILE:
|
||||||
|
file = logging.FileHandler('devops_debug.log')
|
||||||
|
file.setLevel(logging.DEBUG)
|
||||||
|
file.setFormatter(fmt)
|
||||||
|
logger.addHandler(file)
|
||||||
|
|
||||||
|
if handlers & LOG_LOKI:
|
||||||
|
# Create an instance of the custom handler
|
||||||
|
custom_handler = LokiLoggerHandler(
|
||||||
|
url="https://loki.koszewscy.waw.pl/loki/api/v1/push",
|
||||||
|
labels={"application": "docs-harverster"},
|
||||||
|
label_keys={"http_method": "http_method"},
|
||||||
|
timeout=10,
|
||||||
|
)
|
||||||
|
logger.addHandler(custom_handler)
|
||||||
|
|
||||||
|
return logger
|
||||||
|
|
||||||
|
def log_entity_creation(logger: logging.Logger, entity_class: type, entity_key: str):
|
||||||
|
logger.debug(f'Created new "{entity_class.__name__}" object with key: "{entity_key}"',
|
||||||
|
extra={
|
||||||
|
"entity_class": entity_class.__name__,
|
||||||
|
"entity_key": entity_key
|
||||||
|
})
|
||||||
27
tests.py
27
tests.py
@@ -1,8 +1,8 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import unittest
|
import unittest
|
||||||
import requests
|
import requests
|
||||||
from sk.devops import Organization, Repository, Project, Item
|
from devops.devops import Organization, Repository, Project, Item
|
||||||
from sk.azure import get_token
|
from devops.azure import get_token
|
||||||
|
|
||||||
# Get the token outside the test class to speed up tests.
|
# Get the token outside the test class to speed up tests.
|
||||||
# Each Unit test instantinates the class, so doing it here avoids repeated authentication.
|
# Each Unit test instantinates the class, so doing it here avoids repeated authentication.
|
||||||
@@ -19,17 +19,17 @@ class Test01(unittest.TestCase):
|
|||||||
"""Getting a specific project by ID (object instantiation)"""
|
"""Getting a specific project by ID (object instantiation)"""
|
||||||
project = Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
project = Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
||||||
self.assertEqual(project.id, "bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
self.assertEqual(project.id, "bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
||||||
self.assertEqual(project.name, "ADO Sandbox")
|
self.assertEqual(project.name, "ADO Sandbox") # type: ignore[attr-defined]
|
||||||
def test_03(self):
|
def test_03(self):
|
||||||
"""Getting a specific project by name using org indexing (object retrieval)"""
|
"""Getting a specific project by name using org indexing (object retrieval)"""
|
||||||
project = self.org["ADO Sandbox"]
|
project = self.org["ADO Sandbox"]
|
||||||
self.assertEqual(project.id, "bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
self.assertEqual(project.id, "bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
||||||
self.assertEqual(project.name, "ADO Sandbox")
|
self.assertEqual(project.name, "ADO Sandbox") # type: ignore[attr-defined]
|
||||||
def test_04(self):
|
def test_04(self):
|
||||||
"""Getting a specific project by ID using org indexing (object retrieval)"""
|
"""Getting a specific project by ID using org indexing (object retrieval)"""
|
||||||
project = self.org["bafe0cf1-6c97-4088-864a-ea6dc02b2727"]
|
project = self.org["bafe0cf1-6c97-4088-864a-ea6dc02b2727"]
|
||||||
self.assertEqual(project.id, "bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
self.assertEqual(project.id, "bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
||||||
self.assertEqual(project.name, "ADO Sandbox")
|
self.assertEqual(project.name, "ADO Sandbox") # type: ignore[attr-defined]
|
||||||
|
|
||||||
class Test02(unittest.TestCase):
|
class Test02(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
@@ -45,21 +45,21 @@ class Test02(unittest.TestCase):
|
|||||||
"""Getting a specific repository by ID (object instantiation)"""
|
"""Getting a specific repository by ID (object instantiation)"""
|
||||||
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
|
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
|
||||||
self.assertEqual(repo.id, "feac266f-84d2-41bc-839b-736925a85eaa")
|
self.assertEqual(repo.id, "feac266f-84d2-41bc-839b-736925a85eaa")
|
||||||
self.assertEqual(repo.name, "ado-auth-lab")
|
self.assertEqual(repo.name, "ado-auth-lab") # type: ignore[attr-defined]
|
||||||
|
|
||||||
def test_03(self):
|
def test_03(self):
|
||||||
"""Getting a specific repository by name using project indexing (object retrieval)"""
|
"""Getting a specific repository by name using project indexing (object retrieval)"""
|
||||||
project = Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
project = Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
||||||
repo = project["ado-auth-lab"]
|
repo = project["ado-auth-lab"]
|
||||||
self.assertEqual(repo.id, "feac266f-84d2-41bc-839b-736925a85eaa")
|
self.assertEqual(repo.id, "feac266f-84d2-41bc-839b-736925a85eaa")
|
||||||
self.assertEqual(repo.name, "ado-auth-lab")
|
self.assertEqual(repo.name, "ado-auth-lab") # type: ignore[attr-defined]
|
||||||
|
|
||||||
def test_04(self):
|
def test_04(self):
|
||||||
"""Getting a specific repository by ID using project indexing (object retrieval)"""
|
"""Getting a specific repository by ID using project indexing (object retrieval)"""
|
||||||
project = Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
project = Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
||||||
repo = project["feac266f-84d2-41bc-839b-736925a85eaa"]
|
repo = project["feac266f-84d2-41bc-839b-736925a85eaa"]
|
||||||
self.assertEqual(repo.id, "feac266f-84d2-41bc-839b-736925a85eaa")
|
self.assertEqual(repo.id, "feac266f-84d2-41bc-839b-736925a85eaa")
|
||||||
self.assertEqual(repo.name, "ado-auth-lab")
|
self.assertEqual(repo.name, "ado-auth-lab") # type: ignore[attr-defined]
|
||||||
|
|
||||||
class Test03(unittest.TestCase):
|
class Test03(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
@@ -69,7 +69,7 @@ class Test03(unittest.TestCase):
|
|||||||
"""Getting details of a specific item in a repository"""
|
"""Getting details of a specific item in a repository"""
|
||||||
item = Item(Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa"), path="/generate-pat.py")
|
item = Item(Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa"), path="/generate-pat.py")
|
||||||
self.assertEqual(item.path, "/generate-pat.py")
|
self.assertEqual(item.path, "/generate-pat.py")
|
||||||
self.assertIsNotNone(item.commit_id)
|
self.assertIsNotNone(item.commit_id) # type: ignore[attr-defined]
|
||||||
|
|
||||||
def test_02(self):
|
def test_02(self):
|
||||||
"""Listing items in a folder within a repository"""
|
"""Listing items in a folder within a repository"""
|
||||||
@@ -83,14 +83,14 @@ class Test03(unittest.TestCase):
|
|||||||
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
|
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
|
||||||
item = repo["/container"]
|
item = repo["/container"]
|
||||||
self.assertEqual(item.path, "/container")
|
self.assertEqual(item.path, "/container")
|
||||||
self.assertTrue(item.is_folder)
|
self.assertTrue(item.is_folder) # type: ignore[attr-defined]
|
||||||
|
|
||||||
def test_04(self):
|
def test_04(self):
|
||||||
"""Getting a specific item from a repository using indexing"""
|
"""Getting a specific item from a repository using indexing"""
|
||||||
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
|
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
|
||||||
item = repo["/generate-pat.py"]
|
item = repo["/generate-pat.py"]
|
||||||
self.assertEqual(item.path, "/generate-pat.py")
|
self.assertEqual(item.path, "/generate-pat.py")
|
||||||
self.assertFalse(item.is_folder)
|
self.assertFalse(item.is_folder) # type: ignore[attr-defined]
|
||||||
|
|
||||||
def test_05(self):
|
def test_05(self):
|
||||||
"""Attempting to get a non-existent item from a repository using indexing"""
|
"""Attempting to get a non-existent item from a repository using indexing"""
|
||||||
@@ -106,7 +106,7 @@ class Test04(unittest.TestCase):
|
|||||||
"""Getting details of a specific item in a repository"""
|
"""Getting details of a specific item in a repository"""
|
||||||
item = Item(Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa"), path="/generate-pat.py")
|
item = Item(Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa"), path="/generate-pat.py")
|
||||||
self.assertEqual(item.path, "/generate-pat.py")
|
self.assertEqual(item.path, "/generate-pat.py")
|
||||||
self.assertIsNotNone(item.commit_id)
|
self.assertIsNotNone(item.commit_id) # type: ignore[attr-defined]
|
||||||
|
|
||||||
def test_02(self):
|
def test_02(self):
|
||||||
"""Trying to instantiate Item for a item that does not exist"""
|
"""Trying to instantiate Item for a item that does not exist"""
|
||||||
@@ -114,7 +114,8 @@ class Test04(unittest.TestCase):
|
|||||||
with self.assertRaises(requests.exceptions.HTTPError):
|
with self.assertRaises(requests.exceptions.HTTPError):
|
||||||
item = Item(repo, path="/non-existent-file.txt")
|
item = Item(repo, path="/non-existent-file.txt")
|
||||||
self.assertEqual(item.path, "/non-existent-file.txt")
|
self.assertEqual(item.path, "/non-existent-file.txt")
|
||||||
commit_id = item.commit_id # This will raise HTTPError when trying to fetch details of a non-existent item
|
# This will raise HTTPError when trying to fetch details of a non-existent item
|
||||||
|
commit_id = item.commit_id # type: ignore[attr-defined]
|
||||||
|
|
||||||
def test_03(self):
|
def test_03(self):
|
||||||
"""Listing items in a folder within a repository"""
|
"""Listing items in a folder within a repository"""
|
||||||
|
|||||||
Reference in New Issue
Block a user