Compare commits
37 Commits
44dcbe7fa9
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| b3ebe386dc | |||
| 15dcf09ec1 | |||
| ca16f1b098 | |||
| 4276939f30 | |||
| 15349c9390 | |||
| 064364caa6 | |||
| b83f53d140 | |||
| 32ee557f93 | |||
| 565464e266 | |||
| c3c5f9935c | |||
| c29e0b4e21 | |||
| c32fc25cee | |||
| e6bca1ce47 | |||
| 2addc85e40 | |||
| 5412c3ea09 | |||
| 6e16cebeea | |||
| d06bc05a2d | |||
| 495ba0b0b3 | |||
| 7d5d451d0c | |||
| 8b2a06551a | |||
| 89b9f8d6e6 | |||
| d6b58f0b51 | |||
| 89ba272003 | |||
| 5d7f2f51de | |||
| def594d5b7 | |||
| 73a63d0605 | |||
| cb0840f380 | |||
| f2e6f4907a | |||
| 80d4988bad | |||
| 9b2922c1ef | |||
| 27a5a13c47 | |||
| 6588313fa1 | |||
| 086618fe72 | |||
| 54361e6c18 | |||
| e4497791f3 | |||
| fd2fd61633 | |||
| 869d861165 |
9
.gitignore
vendored
9
.gitignore
vendored
@@ -1,6 +1,8 @@
|
||||
# Python
|
||||
.venv
|
||||
__pycache__/
|
||||
*.egg-info/
|
||||
dist/
|
||||
|
||||
# Ignore sample JSON files
|
||||
*.sample.json
|
||||
@@ -10,8 +12,15 @@ prototype_*.py
|
||||
|
||||
# Shell secrets
|
||||
*.secret
|
||||
*.client_secret
|
||||
|
||||
# Environment files
|
||||
*.env
|
||||
|
||||
# Certificate files
|
||||
*.pem
|
||||
*.key
|
||||
*.crt
|
||||
|
||||
# Harvester output
|
||||
reference
|
||||
7
.vscode/settings.json
vendored
Normal file
7
.vscode/settings.json
vendored
Normal file
@@ -0,0 +1,7 @@
|
||||
{
|
||||
"debug.autoExpandLazyVariables": "off",
|
||||
"debug.inlineValues": "off",
|
||||
"debugpy.debugJustMyCode": true,
|
||||
"debugpy.showPythonInlineValues": false,
|
||||
"python.terminal.useEnvFile": true
|
||||
}
|
||||
21
BUILD.md
Normal file
21
BUILD.md
Normal file
@@ -0,0 +1,21 @@
|
||||
# Build Instructions
|
||||
|
||||
Run the following command to build the project:
|
||||
|
||||
```shell
|
||||
python -m build
|
||||
```
|
||||
|
||||
This will create distribution files in the `dist/` directory.
|
||||
|
||||
Install the built package using pip:
|
||||
|
||||
```shell
|
||||
pip install --no-index dist/docs_harvester-0.1.0-py3-none-any.whl
|
||||
```
|
||||
|
||||
Install in editable mode for development:
|
||||
|
||||
```shell
|
||||
pip install -e .
|
||||
```
|
||||
9
DEVOPS.md
Normal file
9
DEVOPS.md
Normal file
@@ -0,0 +1,9 @@
|
||||
# DevOps Notes
|
||||
|
||||
## DevOps OAuth2 Flow
|
||||
|
||||
Type: **oauth2**
|
||||
Flow: **accessCode**
|
||||
Authorization URL: `https://app.vssps.visualstudio.com/oauth2/authorize&response_type=Assertion`
|
||||
Token URL: `https://app.vssps.visualstudio.com/oauth2/token?client_assertion_type=urn:ietf:params:oauth:client-assertion-type:jwt-bearer&grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer`
|
||||
Scopes: `vso.code`
|
||||
1
devops/__init__.py
Normal file
1
devops/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# devops package
|
||||
@@ -59,9 +59,9 @@ def get_token(
|
||||
|
||||
def secret_credentials_auth(
|
||||
scope: str = DEVOPS_SCOPE,
|
||||
tenant_id: str = os.environ.get("AZURE_TENANT_ID", ""),
|
||||
client_id: str = os.environ.get("AZURE_CLIENT_ID", ""),
|
||||
client_secret: str = os.environ.get("AZURE_CLIENT_SECRET")
|
||||
tenant_id = os.environ.get("AZURE_TENANT_ID", ""),
|
||||
client_id = os.environ.get("AZURE_CLIENT_ID", ""),
|
||||
client_secret = os.environ.get("AZURE_CLIENT_SECRET")
|
||||
) -> str:
|
||||
"""
|
||||
Authenticate using client credentials. Pass credentials via environment variables,
|
||||
@@ -92,8 +92,8 @@ def certificate_credentials_auth(
|
||||
# Wczytaj klucz prywatny (RSA)
|
||||
with open(pem_path, "rb") as f:
|
||||
pem = f.read()
|
||||
key_pem = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0)
|
||||
cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0)
|
||||
key_pem = re.search(b"-----BEGIN (?:RSA )?PRIVATE KEY-----.*?END (?:RSA )?PRIVATE KEY-----", pem, re.S).group(0) # type: ignore
|
||||
cert_pem = re.search(b"-----BEGIN CERTIFICATE-----.*?END CERTIFICATE-----", pem, re.S).group(0) # type: ignore
|
||||
|
||||
private_key = serialization.load_pem_private_key(key_pem, password=None)
|
||||
cert = x509.load_pem_x509_certificate(cert_pem)
|
||||
@@ -115,7 +115,7 @@ def certificate_credentials_auth(
|
||||
|
||||
headers = {"x5t": x5t, "kid": x5t}
|
||||
|
||||
assertion = jwt.encode(claims, private_key, algorithm="RS256", headers=headers)
|
||||
assertion = jwt.encode(claims, private_key, algorithm="RS256", headers=headers) # type: ignore
|
||||
|
||||
data = {
|
||||
"grant_type": "client_credentials",
|
||||
327
devops/devops.py
Normal file
327
devops/devops.py
Normal file
@@ -0,0 +1,327 @@
|
||||
from __future__ import annotations
|
||||
import pathlib
|
||||
import requests
|
||||
import urllib.parse
|
||||
from uuid import UUID
|
||||
import logging
|
||||
|
||||
DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
|
||||
DEVOPS_API_VERSION = "7.1"
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# Define a class decorator
|
||||
def auto_properties(mapping: dict[str,str]):
|
||||
|
||||
def make_property(name: str):
|
||||
private_var = f"_{name}"
|
||||
|
||||
def getter(self):
|
||||
try:
|
||||
i = getattr(self, private_var)
|
||||
if i is not None:
|
||||
return i
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
# Fetch repository details from the API if it is set to None or not existing
|
||||
log.debug(f"Auto-fetching property '{name}' for {self.__class__.__name__}", extra={"property_name": name})
|
||||
self.get_auto_properties()
|
||||
return getattr(self, private_var)
|
||||
|
||||
def setter(self, value):
|
||||
setattr(self, private_var, value)
|
||||
|
||||
return property(fget=getter, fset=setter)
|
||||
|
||||
def from_args(self, **kwargs):
|
||||
for name in kwargs:
|
||||
if name in self.__class__.__auto_properties__:
|
||||
log.debug(f"Setting property '{name}' for {self.__class__.__name__} from args", extra={"property_name": name})
|
||||
setattr(self, f"_{name}", kwargs.get(name, None))
|
||||
|
||||
def from_json(self, json_data: dict):
|
||||
for json_name in self.__class__.__auto_properties_reversed__:
|
||||
setattr(self, f"_{self.__class__.__auto_properties_reversed__[json_name]}", json_data.get(json_name, None))
|
||||
|
||||
def decorator(cls):
|
||||
cls.__auto_properties__ = mapping # Make a copy of the mapping
|
||||
cls.__auto_properties_reversed__ = {v: k for k, v in mapping.items()} # Store reversed mapping for JSON parsing
|
||||
|
||||
# Create properties dynamically
|
||||
for name in mapping:
|
||||
setattr(cls, name, make_property(name))
|
||||
|
||||
setattr(cls, "from_args", from_args)
|
||||
setattr(cls, "from_json", from_json)
|
||||
|
||||
return cls
|
||||
return decorator
|
||||
|
||||
def get_url(URL: str, token: str, api_version: str, params: dict = {}) -> requests.Response:
|
||||
"""Helper method to make GET requests to DevOps REST API."""
|
||||
if not URL or not token or not api_version:
|
||||
raise ValueError("Organization URL, token, and API version must be set before making requests.")
|
||||
|
||||
request_parameters = {
|
||||
"api-version": api_version,
|
||||
**params
|
||||
}
|
||||
log.debug(f"Making GET request", extra={"url": URL, "params": request_parameters, "http_method": "get"})
|
||||
r = requests.get(url=URL, params=request_parameters, headers={
|
||||
"Authorization": f"Bearer {token}"
|
||||
})
|
||||
r.raise_for_status() # Ensure we raise an error for bad responses
|
||||
return r # Return response
|
||||
|
||||
class Organization():
|
||||
def __init__(self, org_url: str, token: str, api_version: str = DEVOPS_API_VERSION):
|
||||
self._org_url = org_url.rstrip("/") + "/" # Ensure trailing slash
|
||||
self._token = token
|
||||
self._api_version = api_version
|
||||
|
||||
def get_path(self, path: str, params: dict = {}) -> requests.Response:
|
||||
return get_url(
|
||||
URL=urllib.parse.urljoin(self._org_url, path),
|
||||
token=self._token,
|
||||
api_version=self._api_version,
|
||||
params=params
|
||||
)
|
||||
|
||||
@property
|
||||
def projects(self):
|
||||
if not hasattr(self, "_projects"):
|
||||
# Create Project objects
|
||||
self._projects = []
|
||||
for proj in self.get_path("_apis/projects").json().get("value", []):
|
||||
p = Project(org=self, **proj)
|
||||
self._projects.append(p)
|
||||
return self._projects
|
||||
|
||||
def __getitem__(self, key: str) -> Project:
|
||||
for project in self.projects:
|
||||
if project.id == key or project.name == key: # type: ignore[attr-defined]
|
||||
return project
|
||||
raise KeyError(f"Project with ID or name '{key}' not found.")
|
||||
|
||||
def __str__(self):
|
||||
return f"Organization(url=\"{self._org_url}\")"
|
||||
|
||||
@auto_properties({
|
||||
"id": "id",
|
||||
"name": "name",
|
||||
"url": "url",
|
||||
"description": "description"
|
||||
})
|
||||
class Project():
|
||||
def __init__(self, org: Organization, **kwargs):
|
||||
self._org = org
|
||||
self.from_args(**kwargs) # type: ignore[attr-defined]
|
||||
|
||||
if not hasattr(self, "_id") or self._id is None:
|
||||
raise ValueError("Project ID must be provided.")
|
||||
|
||||
try:
|
||||
self._id = str(UUID(self._id))
|
||||
except ValueError:
|
||||
raise ValueError(f"Invalid project ID: {self._id}")
|
||||
|
||||
def get_auto_properties(self):
|
||||
r = get_url(
|
||||
URL=f"{self._org._org_url}_apis/projects/{self._id}",
|
||||
token=self._org._token,
|
||||
api_version=self._org._api_version
|
||||
)
|
||||
self.from_json(r.json()) # type: ignore[attr-defined]
|
||||
|
||||
def __str__(self):
|
||||
return f"Project(name=\"{self.name}\", id={self.id})" # type: ignore[attr-defined]
|
||||
|
||||
@property
|
||||
def id(self):
|
||||
return self._id
|
||||
|
||||
@property
|
||||
def organization(self):
|
||||
return self._org
|
||||
|
||||
@property
|
||||
def repositories(self):
|
||||
if not hasattr(self, "_repositories"):
|
||||
self._repositories = []
|
||||
for repo in self._org.get_path(f"{self._id}/_apis/git/repositories").json().get("value", []):
|
||||
# Remove unnecessary nested project info
|
||||
if "project" in repo:
|
||||
del repo["project"]
|
||||
self._repositories.append(Repository(project=self, **repo))
|
||||
return self._repositories
|
||||
|
||||
def __getitem__(self, key: str) -> Repository:
|
||||
for repo in self.repositories:
|
||||
if repo.id == key or repo.name == key: # type: ignore[attr-defined]
|
||||
return repo
|
||||
raise KeyError(f"Repository with ID or name '{key}' not found.")
|
||||
|
||||
@auto_properties({
|
||||
"id": "id",
|
||||
"name": "name",
|
||||
"url": "url",
|
||||
"default_branch": "defaultBranch",
|
||||
"is_disabled": "isDisabled",
|
||||
"is_in_maintenance": "isInMaintenance",
|
||||
"remote_url": "remoteUrl",
|
||||
"ssh_url": "sshUrl",
|
||||
"web_url": "webUrl"
|
||||
})
|
||||
class Repository():
|
||||
def __init__(self, project: Project, **kwargs):
|
||||
self._project = project
|
||||
|
||||
if "id" not in kwargs and "name" not in kwargs:
|
||||
raise ValueError("Either repository ID or name must be provided.")
|
||||
|
||||
if "id" in kwargs:
|
||||
try:
|
||||
UUID(kwargs.get("id")) # Check if it's a valid UUID
|
||||
except ValueError:
|
||||
raise ValueError("Invalid repository ID, must be a valid UUID.")
|
||||
|
||||
# set other properties if provided
|
||||
self.from_args(**kwargs) # type: ignore[attr-defined]
|
||||
|
||||
def get_auto_properties(self):
|
||||
id = self._id if hasattr(self, "_id") else self._name # type: ignore[attr-defined]
|
||||
if id is None or id == "":
|
||||
raise ValueError("Repository ID or name must be set to fetch properties.")
|
||||
|
||||
r = self._project.organization.get_path(path=f"{self._project.id}/_apis/git/repositories/{id}")
|
||||
self.from_json(r.json()) # type: ignore[attr-defined]
|
||||
|
||||
@property
|
||||
def id(self):
|
||||
return self._id # type: ignore[attr-defined]
|
||||
|
||||
@property
|
||||
def project(self):
|
||||
return self._project
|
||||
|
||||
def __str__(self):
|
||||
return f"Repository(name={self.name}, id={self._id})" # type: ignore[attr-defined]
|
||||
|
||||
@property
|
||||
def items(self):
|
||||
log.debug(f"Fetching items for repository '{self.name}'", extra={"repository_name": self.name}) # type: ignore[attr-defined]
|
||||
if not hasattr(self, "_items"):
|
||||
root = Item(repository=self, path="/", git_object_type="tree")
|
||||
self._items = root.get_child_items()
|
||||
|
||||
return self._items
|
||||
|
||||
def __getitem__(self, key: str) -> Item:
|
||||
for item in self.items:
|
||||
if item.path == key:
|
||||
return item
|
||||
raise KeyError(f"Item with path '{key}' not found.")
|
||||
|
||||
@auto_properties({
|
||||
"path": "path",
|
||||
"object_id": "objectId",
|
||||
"git_object_type": "gitObjectType",
|
||||
"commit_id": "commitId",
|
||||
"is_folder": "isFolder",
|
||||
"url": "url"
|
||||
})
|
||||
class Item():
|
||||
def __init__(self, repository: Repository, **kwargs):
|
||||
self._repository = repository
|
||||
self.from_args(**kwargs) # type: ignore[attr-defined]
|
||||
if "branch" in kwargs:
|
||||
self._branch = kwargs.get("branch")
|
||||
|
||||
@property
|
||||
def branch(self):
|
||||
if hasattr(self, "_branch"):
|
||||
return getattr(self, "_branch")
|
||||
return None
|
||||
|
||||
def get_auto_properties(self):
|
||||
r = self._repository._project.organization.get_path(
|
||||
path=f"{self._repository._project.id}/_apis/git/repositories/{self._repository.id}/items",
|
||||
params={
|
||||
"path": self.path,
|
||||
"$format": "json",
|
||||
"recursionLevel": "none"
|
||||
}
|
||||
)
|
||||
self.from_json(r.json()) # type: ignore[attr-defined]
|
||||
|
||||
def get_content(self, branch: str | None = None, commit: str | None = None, tag: str | None = None) -> bytes:
|
||||
"""Get the content of the item with optional branch, commit, or tag."""
|
||||
if self.git_object_type != "blob": # type: ignore[attr-defined]
|
||||
raise ValueError("Content can only be fetched for blob items.")
|
||||
|
||||
params = { "path": self.path, "recursionLevel": "none" }
|
||||
if self.branch and branch is None:
|
||||
branch = self.branch
|
||||
|
||||
if branch:
|
||||
params["version"] = branch
|
||||
params["versionType"] = "branch"
|
||||
elif tag:
|
||||
params["version"] = tag
|
||||
params["versionType"] = "tag"
|
||||
elif commit:
|
||||
params["version"] = commit
|
||||
params["versionType"] = "commit"
|
||||
|
||||
r = self._repository._project.organization.get_path(
|
||||
path=f"{self._repository._project.id}/_apis/git/repositories/{self._repository.id}/items",
|
||||
params=params
|
||||
)
|
||||
return r.content
|
||||
|
||||
@property
|
||||
def path(self):
|
||||
return self._path # type: ignore[attr-defined]
|
||||
|
||||
def get_child_items(self, pattern: str | None = None, recurse: bool = False) -> list[Item]:
|
||||
"""Get child items if this item is a folder."""
|
||||
if self.git_object_type != "tree": # type: ignore[attr-defined]
|
||||
raise ValueError("Child items can only be fetched for folder items.")
|
||||
|
||||
# Fetch child objects
|
||||
objects = self._repository.project.organization.get_path(
|
||||
path=f"{self._repository.project.id}/_apis/git/repositories/{self._repository.id}/items",
|
||||
params={
|
||||
"scopePath": self.path,
|
||||
"recursionLevel": "oneLevel" if not recurse else "full"
|
||||
}
|
||||
).json().get("value", [])
|
||||
child_items = []
|
||||
for obj in objects:
|
||||
obj_path = obj.get("path")
|
||||
if pattern and not pathlib.PurePath(obj_path).match(pattern):
|
||||
continue
|
||||
i = Item(repository=self._repository, path=obj_path)
|
||||
i.from_json(obj) # type: ignore[attr-defined]
|
||||
child_items.append(i)
|
||||
return child_items
|
||||
|
||||
@property
|
||||
def children(self):
|
||||
if not hasattr(self, "_children"):
|
||||
self._children = self.get_child_items()
|
||||
return self._children
|
||||
|
||||
def __getitem__(self, key: str) -> Item:
|
||||
if self.git_object_type != "tree": # type: ignore[attr-defined]
|
||||
raise ValueError("Child items can only be accessed for folder items.")
|
||||
if not key.startswith("/"):
|
||||
key = pathlib.Path(self.path).joinpath(key).absolute().as_posix()
|
||||
for child in self.children:
|
||||
if child.path == key:
|
||||
return child
|
||||
raise KeyError(f"Child item with path '{key}' not found.")
|
||||
|
||||
def __str__(self):
|
||||
return f"Item(path=\"{self._path}\" type={self.git_object_type})" # type: ignore[attr-defined]
|
||||
53
get-token.py
53
get-token.py
@@ -1,11 +1,54 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Get an Azure DevOps token and print it in a format suitable for exporting as an environment variable.
|
||||
|
||||
from sk.azure import get_token
|
||||
Usage:
|
||||
eval $(python get-token.py)
|
||||
|
||||
or
|
||||
|
||||
python get-token.py > set-ado-token.sh
|
||||
source set-ado-token.sh
|
||||
|
||||
Now you can use the ADO_TOKEN environment variable, for example using curl:
|
||||
|
||||
curl -sH "Authorization: Bearer $ADO_TOKEN" "https://dev.azure.com/$ADO_ORGANIZATION_URL/_apis/projects?api-version=7.1"
|
||||
"""
|
||||
|
||||
from devops.azure import get_token
|
||||
from argparse import ArgumentParser
|
||||
import os
|
||||
|
||||
args = ArgumentParser(description="Get Azure DevOps token and print it for exporting as environment variable.")
|
||||
args.add_argument("--tenant-id", type=str, default=os.getenv("AZURE_TENANT_ID"), help="Azure AD Tenant ID")
|
||||
args.add_argument("--client-id", type=str, default=os.getenv("AZURE_CLIENT_ID"), help="Azure AD Client ID")
|
||||
args.add_argument("--pem-path", type=str, default=os.getenv("AZURE_CLIENT_CERTIFICATE_PATH"), help="Path to PEM file for authentication (optional)")
|
||||
args.add_argument("--client-secret", type=str, default=os.getenv("AZURE_CLIENT_SECRET"), help="Client Secret for authentication (optional)")
|
||||
args = args.parse_args()
|
||||
|
||||
if not args.tenant_id or not args.client_id:
|
||||
print("Tenant ID and Client ID are required.")
|
||||
exit(1)
|
||||
|
||||
if args.pem_path and os.path.isfile(args.pem_path):
|
||||
token = get_token(
|
||||
tenant_id="a7740229-47b6-45de-ad22-83721462b1bf",
|
||||
client_id="840671c4-5dc4-40e5-aab9-7c3a07bbd652",
|
||||
pem_path="./slawek-mba.pem"
|
||||
tenant_id=args.tenant_id,
|
||||
client_id=args.client_id,
|
||||
pem_path=args.pem_path
|
||||
)
|
||||
elif args.client_secret:
|
||||
if not args.client_secret:
|
||||
print("Client secret file is empty.")
|
||||
exit(1)
|
||||
token = get_token(
|
||||
tenant_id=args.tenant_id,
|
||||
client_id=args.client_id,
|
||||
client_secret=args.client_secret
|
||||
)
|
||||
else:
|
||||
token = get_token(
|
||||
tenant_id=args.tenant_id,
|
||||
client_id=args.client_id
|
||||
)
|
||||
|
||||
print(f"Obtained token: {token}")
|
||||
print(f"export ADO_TOKEN='{token}'")
|
||||
|
||||
10
harvester.py
10
harvester.py
@@ -1,8 +1,8 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from sk.devops import Organization
|
||||
org = Organization("https://dev.azure.com/mcovsandbox")
|
||||
from harvester.harvester import harvest_readmes
|
||||
|
||||
# print(org.projects["bafe0cf1-6c97-4088-864a-ea6dc02b2727"].repositories["feac266f-84d2-41bc-839b-736925a85eaa"].items["/generate-pat.py"])
|
||||
print(org["ADO Sandbox"]["ado-auth-lab"]["/container"].url)
|
||||
print(org["ADO Sandbox"]["ado-auth-lab"]["/generate-pat.py"].url)
|
||||
if __name__ == "__main__":
|
||||
harvest_readmes(
|
||||
organization="mcovsandbox"
|
||||
)
|
||||
|
||||
1
harvester/__init__.py
Normal file
1
harvester/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# Harvester Package
|
||||
75
harvester/harvester.py
Normal file
75
harvester/harvester.py
Normal file
@@ -0,0 +1,75 @@
|
||||
import os
|
||||
import requests
|
||||
from devops.azure import get_token
|
||||
from devops.devops import Organization, Project, Repository, Item
|
||||
import logging
|
||||
|
||||
fmt = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
||||
ch = logging.StreamHandler()
|
||||
ch.setFormatter(fmt)
|
||||
log = logging.getLogger(__name__)
|
||||
log.addHandler(ch)
|
||||
log.setLevel(logging.INFO)
|
||||
|
||||
def sanitize_name(name: str) -> str:
|
||||
"""Sanitize a name to be filesystem-friendly."""
|
||||
return name.lower().replace(" ", "-").replace("_", "-")
|
||||
|
||||
def harvest_readmes(organization: str | Organization, branch: list[str | None] = ["main", "dev", None], projects: list[str] = [], output_path: str = "reference") -> None:
|
||||
"""Harvest README files from repositories."""
|
||||
if isinstance(organization, str):
|
||||
org = Organization("https://dev.azure.com/" + organization, token=get_token())
|
||||
else:
|
||||
org = organization
|
||||
|
||||
if projects:
|
||||
# Target specific projects
|
||||
target_projects = [Project(org=org, name=project_name) for project_name in projects]
|
||||
else:
|
||||
# Target all projects
|
||||
target_projects = org.projects
|
||||
|
||||
for project in target_projects:
|
||||
repo_index = [] # Repository index for the project.
|
||||
log.info(f"Processing project: {project.name} with {len(project.repositories)} repositories.") # type: ignore
|
||||
for repo in project.repositories:
|
||||
log.info(f"...processing repository: {repo.name}")
|
||||
readme_found = False
|
||||
# Try each specified branch to find the README.md file
|
||||
for branch_name in branch:
|
||||
try:
|
||||
# Check if the README.md file exists
|
||||
readme = Item(repository=repo, path="/README.md", branch=branch_name)
|
||||
# Build output path and save the README content if found
|
||||
if readme:
|
||||
project_path = f"{output_path}/{sanitize_name(project.name)}" # type: ignore
|
||||
# Create project directory if it doesn't exist
|
||||
os.makedirs(project_path, exist_ok=True)
|
||||
# Save README content to index.md
|
||||
readme_content = readme.get_content(branch=branch_name)
|
||||
if readme_content is None or len(readme_content.strip()) == 0:
|
||||
continue
|
||||
with open(f"{project_path}/{sanitize_name(repo.name)}.md", "w") as f:
|
||||
f.write(readme_content.decode("utf-8"))
|
||||
readme_found = True
|
||||
break # Exit branch loop if README is found
|
||||
except requests.exceptions.HTTPError:
|
||||
# Repository does not have a README.md file in the specified branch
|
||||
continue
|
||||
# Register if README was not found in any branch
|
||||
repo_index.append((repo.name, readme_found)) # README not found
|
||||
# Log if the README was not found
|
||||
if not readme_found:
|
||||
log.warning(f"......README.md in repo {repo.name} is not found or empty.")
|
||||
# Save the repository index for the project
|
||||
with open(f"{output_path}/{sanitize_name(project.name)}/index.md", "w") as index_file: # type: ignore
|
||||
index_file.write(f"# Repository Index for Project: {project.name}\n\n") # type: ignore
|
||||
for repo_name, has_readme in repo_index:
|
||||
status = "" if has_readme else " - README.md not found"
|
||||
index_file.write(f"- [{repo_name}]({sanitize_name(repo_name)}.md){status}\n")
|
||||
|
||||
# Save the reference index for all projects
|
||||
with open(f"{output_path}/index.md", "w") as ref_index_file:
|
||||
ref_index_file.write("# Project Index\n\n")
|
||||
for project in target_projects:
|
||||
ref_index_file.write(f"- [{project.name}]({sanitize_name(project.name)}/index.md)\n") # type: ignore
|
||||
@@ -1,6 +1,8 @@
|
||||
#! /usr/bin/env bash
|
||||
|
||||
python3 -m venv .venv
|
||||
VERSION="${1:-3}"
|
||||
|
||||
python${VERSION} -m venv .venv
|
||||
./.venv/bin/pip install --upgrade pip
|
||||
./.venv/bin/pip install -r requirements.txt
|
||||
|
||||
|
||||
4
mkdocs.yml
Normal file
4
mkdocs.yml
Normal file
@@ -0,0 +1,4 @@
|
||||
site_name: Reference Documentation
|
||||
docs_dir: reference
|
||||
theme:
|
||||
name: material
|
||||
11
pyproject.toml
Normal file
11
pyproject.toml
Normal file
@@ -0,0 +1,11 @@
|
||||
[build-system]
|
||||
requires = ["setuptools", "wheel", "build"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "devops"
|
||||
version = "0.1.0"
|
||||
requires-python = ">=3.12"
|
||||
|
||||
[tool.setuptools]
|
||||
packages = ["devops"]
|
||||
@@ -1,3 +1,5 @@
|
||||
debugpy==1.8.17
|
||||
azure-identity==1.25.1
|
||||
requests==2.32.5
|
||||
loki-logger-handler==1.1.2
|
||||
mkdocs-material>=1.5.2
|
||||
|
||||
1
sk/__init__.py
Normal file
1
sk/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# My helper tools
|
||||
307
sk/devops.py
307
sk/devops.py
@@ -1,307 +0,0 @@
|
||||
from __future__ import annotations
|
||||
import requests
|
||||
import urllib.parse
|
||||
from uuid import UUID
|
||||
from string import Template
|
||||
|
||||
DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
|
||||
DEVOPS_API_VERSION = "7.1"
|
||||
|
||||
# Define a class decorator
|
||||
def auto_properties(mapping: dict[str,str]):
|
||||
|
||||
def make_property(name: str):
|
||||
private_var = f"_{name}"
|
||||
|
||||
def getter(self):
|
||||
try:
|
||||
i = getattr(self, private_var)
|
||||
if i is not None:
|
||||
return i
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
# Fetch repository details from the API if it is set to None or not existing
|
||||
self._get()
|
||||
return getattr(self, private_var)
|
||||
|
||||
return property(fget=getter)
|
||||
|
||||
def set_auto_properties(self, **kwargs):
|
||||
allowed = set(self.__class__.__auto_properties__)
|
||||
unknown = [k for k in kwargs if k not in allowed]
|
||||
if unknown:
|
||||
raise ValueError(f"Unknown properties for {self.__class__.__name__}: {', '.join(unknown)}")
|
||||
for k, v in kwargs.items():
|
||||
setattr(self, f"_{k}", v)
|
||||
return self
|
||||
|
||||
def from_json(self, json_data: dict):
|
||||
for name in self.__class__.__auto_properties__:
|
||||
setattr(self, f"_{name}", json_data.get(self.__class__.__auto_properties__[name], None))
|
||||
return self
|
||||
|
||||
def decorator(cls):
|
||||
cls.__auto_properties__ = mapping # Make a copy of the mapping
|
||||
|
||||
# Create properties dynamically
|
||||
for name in mapping:
|
||||
setattr(cls, name, make_property(name))
|
||||
|
||||
setattr(cls, "set_auto_properties", set_auto_properties)
|
||||
setattr(cls, "from_json", from_json)
|
||||
|
||||
return cls
|
||||
return decorator
|
||||
|
||||
def devops(key: str, get_url: str, list_url: str = None, params: dict = {}):
|
||||
def decorator(cls):
|
||||
cls.__entity_key__ = key
|
||||
cls.__entity_get_url__ = get_url # Use $key in the URL
|
||||
cls.__entity_list_url__ = list_url # Use $key in the URL
|
||||
cls.__entity_params__ = params
|
||||
return cls
|
||||
return decorator
|
||||
|
||||
class DevOps():
|
||||
"""Base class for DevOps entities."""
|
||||
|
||||
def __init__(self, org_url: str, token: str, api_version: str = DEVOPS_API_VERSION):
|
||||
self._org_url = org_url.rstrip("/") + "/" # Ensure trailing slash
|
||||
self._token = token
|
||||
self._api_version = api_version
|
||||
|
||||
def _get_url_path(self, path: str, params: dict = {}) -> requests.Response:
|
||||
if not self._org_url or not self._token or not self._api_version:
|
||||
raise ValueError("Organization URL, token, and API version must be set before making requests.")
|
||||
|
||||
request_parameters = {
|
||||
"api-version": self._api_version,
|
||||
**params
|
||||
}
|
||||
encoded_path = urllib.parse.quote(path.lstrip("/")) # Ensure single slash between base and path
|
||||
url = self._org_url + encoded_path
|
||||
r = requests.get(url=url, params=request_parameters, headers={
|
||||
"Authorization": f"Bearer {self._token}"
|
||||
})
|
||||
r.raise_for_status() # Ensure we raise an error for bad responses
|
||||
return r
|
||||
|
||||
def _get(self, key: str):
|
||||
if not hasattr(self.__class__, "__entity_key__") or not hasattr(self.__class__, "__entity_get_url__"):
|
||||
raise NotImplementedError("Called _get on a class that has not been decorated with @devops.")
|
||||
setattr(self, f"_{self.__class__.__entity_key__}", key) # Set the entity key
|
||||
# Build the URL
|
||||
url = Template(self.__class__.__entity_get_url__).substitute(key=key)
|
||||
# Build parameters with key substituted
|
||||
params = {}
|
||||
if hasattr(self.__class__, "__entity_params__"):
|
||||
params = {k: Template(v).substitute(key=key) for k, v in self.__class__.__entity_params__.items()}
|
||||
|
||||
# Fetch the object data from the URL
|
||||
r = self._get_url_path(url, params=params)
|
||||
|
||||
# Populate attributes
|
||||
self.from_json(r.json())
|
||||
|
||||
def _get_entity(self, key_name: str, get_url: str, params: dict = {}):
|
||||
"""
|
||||
Each entity class can use this method to populate its attributes, by defining
|
||||
its own _get method that calls this one with the key name,
|
||||
and the URL.
|
||||
"""
|
||||
r = self._get_url_path(get_url, params=params) # Fetch the object data from the URL
|
||||
setattr(self, f"_{key_name}", r.json().get(key_name, None)) # Set the key attribute
|
||||
self.from_json(r.json()) # Populate other attributes from JSON
|
||||
|
||||
def _entity(self, entity_class: type, key_name: str, entity_data: dict) -> object:
|
||||
"""A generic method to create an entity from JSON data."""
|
||||
args = { key_name: entity_data.get(key_name) }
|
||||
e = entity_class(self, **args)
|
||||
e.from_json(entity_data)
|
||||
return e
|
||||
|
||||
def _entities(self, entity_class: type, key_name: str, list_url: str, params: dict = {}) -> list[object]:
|
||||
"""A generic method to retrieve a list of entities."""
|
||||
r = self._get_url_path(list_url, params=params)
|
||||
entities_data = r.json().get("value", [])
|
||||
|
||||
entities_list = []
|
||||
for entity in entities_data:
|
||||
entities_list.append(self._entity(entity_class, key_name, entity))
|
||||
|
||||
return entities_list
|
||||
|
||||
class Organization(DevOps):
|
||||
def __init__(self, org_url: str, token: str | None = None, api_version: str = DEVOPS_API_VERSION):
|
||||
super().__init__(org_url, token, api_version)
|
||||
|
||||
@property
|
||||
def projects(self):
|
||||
if not hasattr(self, "_projects"):
|
||||
self._projects = self._entities(
|
||||
entity_class=Project,
|
||||
key_name="id",
|
||||
list_url="_apis/projects")
|
||||
return self._projects
|
||||
|
||||
def __getitem__(self, key: str) -> Project:
|
||||
for project in self.projects:
|
||||
if project.id == key or project.name == key:
|
||||
return project
|
||||
raise KeyError(f"Project with ID or name '{key}' not found.")
|
||||
|
||||
@auto_properties({
|
||||
"name": "name",
|
||||
"url": "url",
|
||||
"description": "description"
|
||||
})
|
||||
@devops("id", "_apis/projects/$key", "_apis/projects")
|
||||
class Project(DevOps):
|
||||
def _get(self):
|
||||
self._get_entity(
|
||||
key_name="id",
|
||||
get_url=f"_apis/projects/{self._id}"
|
||||
)
|
||||
|
||||
def __init__(self, org: Organization, id: str, **kwargs):
|
||||
super().__init__(org._org_url, org._token, org._api_version)
|
||||
|
||||
try:
|
||||
self._id = str(UUID(id))
|
||||
except ValueError:
|
||||
raise ValueError(f"Invalid project ID: {id}")
|
||||
|
||||
self.set_auto_properties(**kwargs)
|
||||
|
||||
def __str__(self):
|
||||
return f"Project(name={self._name}, id={self._id})"
|
||||
|
||||
@property
|
||||
def id(self):
|
||||
return self._id
|
||||
|
||||
@property
|
||||
def repositories(self):
|
||||
if not hasattr(self, "_repositories"):
|
||||
self._repositories = self._entities(
|
||||
entity_class=Repository,
|
||||
key_name="id",
|
||||
list_url=f"{self._id}/_apis/git/repositories")
|
||||
return self._repositories
|
||||
|
||||
def __getitem__(self, key: str) -> Repository:
|
||||
for repo in self.repositories:
|
||||
if repo.id == key or repo.name == key:
|
||||
return repo
|
||||
raise KeyError(f"Repository with ID or name '{key}' not found.")
|
||||
|
||||
@auto_properties({
|
||||
"name": "name",
|
||||
"url": "url",
|
||||
"default_branch": "defaultBranch",
|
||||
"is_disabled": "isDisabled",
|
||||
"is_in_maintenance": "isInMaintenance",
|
||||
"remote_url": "remoteUrl",
|
||||
"ssh_url": "sshUrl",
|
||||
"web_url": "webUrl"
|
||||
})
|
||||
class Repository(DevOps):
|
||||
def _get(self):
|
||||
self._get_entity(
|
||||
key_name="id",
|
||||
get_url=f"{self._project.id}/_apis/git/repositories/{self._id}"
|
||||
)
|
||||
|
||||
def __init__(self, project: Project, id: str, **kwargs):
|
||||
super().__init__(project._org_url, project._token, project._api_version)
|
||||
self._project = project
|
||||
self._id = id
|
||||
|
||||
try:
|
||||
UUID(id) # Check if it's a valid UUID
|
||||
except ValueError:
|
||||
# Called with a repository name, fetch by name
|
||||
self._get()
|
||||
if kwargs:
|
||||
raise ValueError("Automatic properties cannot be set when retrieving by name.")
|
||||
return
|
||||
|
||||
# set other properties if provided
|
||||
self.set_auto_properties(**kwargs)
|
||||
|
||||
@property
|
||||
def id(self):
|
||||
return self._id
|
||||
|
||||
@property
|
||||
def project(self):
|
||||
return self._project
|
||||
|
||||
def __str__(self):
|
||||
return f"Repository(name={self.name}, id={self._id})"
|
||||
|
||||
@property
|
||||
def items(self):
|
||||
if not hasattr(self, "_items"):
|
||||
self._items = self._entities(
|
||||
entity_class=Item,
|
||||
key_name="path",
|
||||
list_url=f"{self._project.id}/_apis/git/repositories/{self._id}/items",
|
||||
params={
|
||||
"scopePath": "/",
|
||||
"recursionLevel": "oneLevel"
|
||||
}
|
||||
)
|
||||
return self._items
|
||||
|
||||
def __getitem__(self, key: str) -> Item:
|
||||
for item in self.items:
|
||||
if item.path == key:
|
||||
return item
|
||||
raise KeyError(f"Item with path '{key}' not found.")
|
||||
|
||||
@auto_properties({
|
||||
"object_id": "objectId",
|
||||
"git_object_type": "gitObjectType",
|
||||
"commit_id": "commitId",
|
||||
"is_folder": "isFolder",
|
||||
"url": "url"
|
||||
})
|
||||
class Item(DevOps):
|
||||
def __init__(self, repository: Repository, path: str, **kwargs):
|
||||
super().__init__(repository._org_url, repository._token, repository._api_version)
|
||||
self._repository = repository
|
||||
self._path = path
|
||||
self.set_auto_properties(**kwargs) # set properties defined in decorator
|
||||
|
||||
def _get(self):
|
||||
self._get_entity(
|
||||
key_name="path",
|
||||
get_url=f"{self._repository._project.id}/_apis/git/repositories/{self._repository.id}/items",
|
||||
params={
|
||||
"path": self._path,
|
||||
"$format": "json",
|
||||
"recursionLevel": "none"
|
||||
}
|
||||
)
|
||||
|
||||
@property
|
||||
def path(self):
|
||||
return self._path
|
||||
|
||||
@property
|
||||
def children(self):
|
||||
"""List items under this item if it is a folder."""
|
||||
if self.git_object_type == "tree":
|
||||
return self._entities(
|
||||
entity_class=Item,
|
||||
key_name="path",
|
||||
list_url=f"{self._repository._project.id}/_apis/git/repositories/{self._repository.id}/items",
|
||||
params={
|
||||
"scopePath": self._path,
|
||||
"recursionLevel": "oneLevel"
|
||||
}
|
||||
)
|
||||
else:
|
||||
raise ValueError("Items can only be listed for folder items.")
|
||||
44
sk/logger.py
Normal file
44
sk/logger.py
Normal file
@@ -0,0 +1,44 @@
|
||||
import logging
|
||||
from loki_logger_handler.loki_logger_handler import LokiLoggerHandler
|
||||
|
||||
LOG_CONSOLE = 1
|
||||
LOG_FILE = 2
|
||||
LOG_LOKI = 4
|
||||
|
||||
# Quick non-intrusive debug check for sk.devops logger
|
||||
def setup(name: str, handlers: int) -> logging.Logger:
|
||||
logger = logging.getLogger(name)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s [%(url)]')
|
||||
|
||||
if handlers & LOG_CONSOLE:
|
||||
console = logging.StreamHandler()
|
||||
console.setLevel(logging.INFO)
|
||||
console.setFormatter(fmt)
|
||||
logger.addHandler(console)
|
||||
|
||||
if handlers & LOG_FILE:
|
||||
file = logging.FileHandler('devops_debug.log')
|
||||
file.setLevel(logging.DEBUG)
|
||||
file.setFormatter(fmt)
|
||||
logger.addHandler(file)
|
||||
|
||||
if handlers & LOG_LOKI:
|
||||
# Create an instance of the custom handler
|
||||
custom_handler = LokiLoggerHandler(
|
||||
url="https://loki.koszewscy.waw.pl/loki/api/v1/push",
|
||||
labels={"application": "docs-harverster"},
|
||||
label_keys={"http_method": "http_method"},
|
||||
timeout=10,
|
||||
)
|
||||
logger.addHandler(custom_handler)
|
||||
|
||||
return logger
|
||||
|
||||
def log_entity_creation(logger: logging.Logger, entity_class: type, entity_key: str):
|
||||
logger.debug(f'Created new "{entity_class.__name__}" object with key: "{entity_key}"',
|
||||
extra={
|
||||
"entity_class": entity_class.__name__,
|
||||
"entity_key": entity_key
|
||||
})
|
||||
27
tests.py
27
tests.py
@@ -1,8 +1,8 @@
|
||||
#!/usr/bin/env python3
|
||||
import unittest
|
||||
import requests
|
||||
from sk.devops import Organization, Repository, Project, Item
|
||||
from sk.azure import get_token
|
||||
from devops.devops import Organization, Repository, Project, Item
|
||||
from devops.azure import get_token
|
||||
|
||||
# Get the token outside the test class to speed up tests.
|
||||
# Each Unit test instantinates the class, so doing it here avoids repeated authentication.
|
||||
@@ -19,17 +19,17 @@ class Test01(unittest.TestCase):
|
||||
"""Getting a specific project by ID (object instantiation)"""
|
||||
project = Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
||||
self.assertEqual(project.id, "bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
||||
self.assertEqual(project.name, "ADO Sandbox")
|
||||
self.assertEqual(project.name, "ADO Sandbox") # type: ignore[attr-defined]
|
||||
def test_03(self):
|
||||
"""Getting a specific project by name using org indexing (object retrieval)"""
|
||||
project = self.org["ADO Sandbox"]
|
||||
self.assertEqual(project.id, "bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
||||
self.assertEqual(project.name, "ADO Sandbox")
|
||||
self.assertEqual(project.name, "ADO Sandbox") # type: ignore[attr-defined]
|
||||
def test_04(self):
|
||||
"""Getting a specific project by ID using org indexing (object retrieval)"""
|
||||
project = self.org["bafe0cf1-6c97-4088-864a-ea6dc02b2727"]
|
||||
self.assertEqual(project.id, "bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
||||
self.assertEqual(project.name, "ADO Sandbox")
|
||||
self.assertEqual(project.name, "ADO Sandbox") # type: ignore[attr-defined]
|
||||
|
||||
class Test02(unittest.TestCase):
|
||||
def setUp(self):
|
||||
@@ -45,21 +45,21 @@ class Test02(unittest.TestCase):
|
||||
"""Getting a specific repository by ID (object instantiation)"""
|
||||
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
|
||||
self.assertEqual(repo.id, "feac266f-84d2-41bc-839b-736925a85eaa")
|
||||
self.assertEqual(repo.name, "ado-auth-lab")
|
||||
self.assertEqual(repo.name, "ado-auth-lab") # type: ignore[attr-defined]
|
||||
|
||||
def test_03(self):
|
||||
"""Getting a specific repository by name using project indexing (object retrieval)"""
|
||||
project = Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
||||
repo = project["ado-auth-lab"]
|
||||
self.assertEqual(repo.id, "feac266f-84d2-41bc-839b-736925a85eaa")
|
||||
self.assertEqual(repo.name, "ado-auth-lab")
|
||||
self.assertEqual(repo.name, "ado-auth-lab") # type: ignore[attr-defined]
|
||||
|
||||
def test_04(self):
|
||||
"""Getting a specific repository by ID using project indexing (object retrieval)"""
|
||||
project = Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727")
|
||||
repo = project["feac266f-84d2-41bc-839b-736925a85eaa"]
|
||||
self.assertEqual(repo.id, "feac266f-84d2-41bc-839b-736925a85eaa")
|
||||
self.assertEqual(repo.name, "ado-auth-lab")
|
||||
self.assertEqual(repo.name, "ado-auth-lab") # type: ignore[attr-defined]
|
||||
|
||||
class Test03(unittest.TestCase):
|
||||
def setUp(self):
|
||||
@@ -69,7 +69,7 @@ class Test03(unittest.TestCase):
|
||||
"""Getting details of a specific item in a repository"""
|
||||
item = Item(Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa"), path="/generate-pat.py")
|
||||
self.assertEqual(item.path, "/generate-pat.py")
|
||||
self.assertIsNotNone(item.commit_id)
|
||||
self.assertIsNotNone(item.commit_id) # type: ignore[attr-defined]
|
||||
|
||||
def test_02(self):
|
||||
"""Listing items in a folder within a repository"""
|
||||
@@ -83,14 +83,14 @@ class Test03(unittest.TestCase):
|
||||
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
|
||||
item = repo["/container"]
|
||||
self.assertEqual(item.path, "/container")
|
||||
self.assertTrue(item.is_folder)
|
||||
self.assertTrue(item.is_folder) # type: ignore[attr-defined]
|
||||
|
||||
def test_04(self):
|
||||
"""Getting a specific item from a repository using indexing"""
|
||||
repo = Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa")
|
||||
item = repo["/generate-pat.py"]
|
||||
self.assertEqual(item.path, "/generate-pat.py")
|
||||
self.assertFalse(item.is_folder)
|
||||
self.assertFalse(item.is_folder) # type: ignore[attr-defined]
|
||||
|
||||
def test_05(self):
|
||||
"""Attempting to get a non-existent item from a repository using indexing"""
|
||||
@@ -106,7 +106,7 @@ class Test04(unittest.TestCase):
|
||||
"""Getting details of a specific item in a repository"""
|
||||
item = Item(Repository(Project(self.org, id="bafe0cf1-6c97-4088-864a-ea6dc02b2727"), id="feac266f-84d2-41bc-839b-736925a85eaa"), path="/generate-pat.py")
|
||||
self.assertEqual(item.path, "/generate-pat.py")
|
||||
self.assertIsNotNone(item.commit_id)
|
||||
self.assertIsNotNone(item.commit_id) # type: ignore[attr-defined]
|
||||
|
||||
def test_02(self):
|
||||
"""Trying to instantiate Item for a item that does not exist"""
|
||||
@@ -114,7 +114,8 @@ class Test04(unittest.TestCase):
|
||||
with self.assertRaises(requests.exceptions.HTTPError):
|
||||
item = Item(repo, path="/non-existent-file.txt")
|
||||
self.assertEqual(item.path, "/non-existent-file.txt")
|
||||
commit_id = item.commit_id # This will raise HTTPError when trying to fetch details of a non-existent item
|
||||
# This will raise HTTPError when trying to fetch details of a non-existent item
|
||||
commit_id = item.commit_id # type: ignore[attr-defined]
|
||||
|
||||
def test_03(self):
|
||||
"""Listing items in a folder within a repository"""
|
||||
|
||||
Reference in New Issue
Block a user