Compare commits

...

8 Commits

5 changed files with 218 additions and 167 deletions

1
.gitignore vendored
View File

@@ -10,6 +10,7 @@ prototype_*.py
# Shell secrets # Shell secrets
*.secret *.secret
*.client_secret
# Certificate files # Certificate files
*.pem *.pem

View File

@@ -1,4 +1,19 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""
Get an Azure DevOps token and print it in a format suitable for exporting as an environment variable.
Usage:
eval $(python get-token.py)
or
python get-token.py > set-ado-token.sh
source set-ado-token.sh
Now you can use the ADO_TOKEN environment variable, for example using curl:
curl -H "Authorization: Bearer $ADO_TOKEN" https://dev.azure.com/your_organization/_apis/projects?api-version=7.1
"""
from sk.azure import get_token from sk.azure import get_token
@@ -8,4 +23,6 @@ token = get_token(
pem_path="./slawek-mba.pem" pem_path="./slawek-mba.pem"
) )
print(f"Obtained token: {token}") # print(f"Obtained token: {token}")
print(f"export ADO_TOKEN='{token}'")

View File

@@ -1,3 +1,4 @@
debugpy==1.8.17 debugpy==1.8.17
azure-identity==1.25.1 azure-identity==1.25.1
requests==2.32.5 requests==2.32.5
loki-logger-handler==1.1.2

View File

@@ -2,11 +2,17 @@ from __future__ import annotations
import requests import requests
import urllib.parse import urllib.parse
from uuid import UUID from uuid import UUID
from string import Template import logging
from sk.logger import log_entity_creation
DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default" DEVOPS_SCOPE = "https://app.vssps.visualstudio.com/.default"
DEVOPS_API_VERSION = "7.1" DEVOPS_API_VERSION = "7.1"
# Get logger. It should be configured by the main application.
log = logging.getLogger(__name__)
# log.setLevel(logging.DEBUG)
# log.propagate = False
# Define a class decorator # Define a class decorator
def auto_properties(mapping: dict[str,str]): def auto_properties(mapping: dict[str,str]):
@@ -22,181 +28,140 @@ def auto_properties(mapping: dict[str,str]):
pass pass
# Fetch repository details from the API if it is set to None or not existing # Fetch repository details from the API if it is set to None or not existing
self._get() log.debug(f"Auto-fetching property '{name}' for {self.__class__.__name__}", extra={"property_name": name})
self.get_auto_properties()
return getattr(self, private_var) return getattr(self, private_var)
return property(fget=getter) def setter(self, value):
setattr(self, private_var, value)
def set_auto_properties(self, **kwargs): return property(fget=getter, fset=setter)
allowed = set(self.__class__.__auto_properties__)
unknown = [k for k in kwargs if k not in allowed] def from_args(self, **kwargs):
if unknown: for name in kwargs:
raise ValueError(f"Unknown properties for {self.__class__.__name__}: {', '.join(unknown)}") if name in self.__class__.__auto_properties__:
for k, v in kwargs.items(): log.debug(f"Setting property '{name}' for {self.__class__.__name__} from args", extra={"property_name": name})
setattr(self, f"_{k}", v) setattr(self, f"_{name}", kwargs.get(name, None))
return self
def from_json(self, json_data: dict): def from_json(self, json_data: dict):
for name in self.__class__.__auto_properties__: for json_name in self.__class__.__auto_properties_reversed__:
setattr(self, f"_{name}", json_data.get(self.__class__.__auto_properties__[name], None)) setattr(self, f"_{self.__class__.__auto_properties_reversed__[json_name]}", json_data.get(json_name, None))
return self
def decorator(cls): def decorator(cls):
cls.__auto_properties__ = mapping # Make a copy of the mapping cls.__auto_properties__ = mapping # Make a copy of the mapping
cls.__auto_properties_reversed__ = {v: k for k, v in mapping.items()} # Store reversed mapping for JSON parsing
# Create properties dynamically # Create properties dynamically
for name in mapping: for name in mapping:
setattr(cls, name, make_property(name)) setattr(cls, name, make_property(name))
setattr(cls, "set_auto_properties", set_auto_properties) setattr(cls, "from_args", from_args)
setattr(cls, "from_json", from_json) setattr(cls, "from_json", from_json)
return cls return cls
return decorator return decorator
def devops(key: str, get_url: str, list_url: str = None, params: dict = {}): def get_url(URL: str, token: str, api_version: str, params: dict = {}) -> requests.Response:
def decorator(cls): """Helper method to make GET requests to DevOps REST API."""
cls.__entity_key__ = key if not URL or not token or not api_version:
cls.__entity_get_url__ = get_url # Use $key in the URL raise ValueError("Organization URL, token, and API version must be set before making requests.")
cls.__entity_list_url__ = list_url # Use $key in the URL
cls.__entity_params__ = params
return cls
return decorator
class DevOps(): request_parameters = {
"""Base class for DevOps entities.""" "api-version": api_version,
**params
}
log.debug(f"Making GET request", extra={"url": URL, "params": request_parameters, "http_method": "get"})
r = requests.get(url=URL, params=request_parameters, headers={
"Authorization": f"Bearer {token}"
})
r.raise_for_status() # Ensure we raise an error for bad responses
return r # Return response
class Organization():
def __init__(self, org_url: str, token: str, api_version: str = DEVOPS_API_VERSION): def __init__(self, org_url: str, token: str, api_version: str = DEVOPS_API_VERSION):
self._org_url = org_url.rstrip("/") + "/" # Ensure trailing slash self._org_url = org_url.rstrip("/") + "/" # Ensure trailing slash
self._token = token self._token = token
self._api_version = api_version self._api_version = api_version
log_entity_creation(log, Organization, self._org_url)
def _get_url_path(self, path: str, params: dict = {}) -> requests.Response: def get_path(self, path: str, params: dict = {}) -> requests.Response:
if not self._org_url or not self._token or not self._api_version: return get_url(
raise ValueError("Organization URL, token, and API version must be set before making requests.") URL=urllib.parse.urljoin(self._org_url, path),
token=self._token,
request_parameters = { api_version=self._api_version,
"api-version": self._api_version, params=params
**params )
}
encoded_path = urllib.parse.quote(path.lstrip("/")) # Ensure single slash between base and path
url = self._org_url + encoded_path
r = requests.get(url=url, params=request_parameters, headers={
"Authorization": f"Bearer {self._token}"
})
r.raise_for_status() # Ensure we raise an error for bad responses
return r
def _get(self, key: str):
if not hasattr(self.__class__, "__entity_key__") or not hasattr(self.__class__, "__entity_get_url__"):
raise NotImplementedError("Called _get on a class that has not been decorated with @devops.")
setattr(self, f"_{self.__class__.__entity_key__}", key) # Set the entity key
# Build the URL
url = Template(self.__class__.__entity_get_url__).substitute(key=key)
# Build parameters with key substituted
params = {}
if hasattr(self.__class__, "__entity_params__"):
params = {k: Template(v).substitute(key=key) for k, v in self.__class__.__entity_params__.items()}
# Fetch the object data from the URL
r = self._get_url_path(url, params=params)
# Populate attributes
self.from_json(r.json())
def _get_entity(self, key_name: str, get_url: str, params: dict = {}):
"""
Each entity class can use this method to populate its attributes, by defining
its own _get method that calls this one with the key name,
and the URL.
"""
r = self._get_url_path(get_url, params=params) # Fetch the object data from the URL
setattr(self, f"_{key_name}", r.json().get(key_name, None)) # Set the key attribute
self.from_json(r.json()) # Populate other attributes from JSON
def _entity(self, entity_class: type, key_name: str, entity_data: dict) -> object:
"""A generic method to create an entity from JSON data."""
args = { key_name: entity_data.get(key_name) }
e = entity_class(self, **args)
e.from_json(entity_data)
return e
def _entities(self, entity_class: type, key_name: str, list_url: str, params: dict = {}) -> list[object]:
"""A generic method to retrieve a list of entities."""
r = self._get_url_path(list_url, params=params)
entities_data = r.json().get("value", [])
entities_list = []
for entity in entities_data:
entities_list.append(self._entity(entity_class, key_name, entity))
return entities_list
class Organization(DevOps):
def __init__(self, org_url: str, token: str | None = None, api_version: str = DEVOPS_API_VERSION):
super().__init__(org_url, token, api_version)
@property @property
def projects(self): def projects(self):
if not hasattr(self, "_projects"): if not hasattr(self, "_projects"):
self._projects = self._entities( # Create Project objects
entity_class=Project, self._projects = [Project(org=self, **proj) for proj in self.get_path("_apis/projects").json().get("value", [])]
key_name="id",
list_url="_apis/projects")
return self._projects return self._projects
def __getitem__(self, key: str) -> Project: def __getitem__(self, key: str) -> Project:
for project in self.projects: for project in self.projects:
if project.id == key or project.name == key: if project.id == key or project.name == key: # type: ignore[attr-defined]
return project return project
raise KeyError(f"Project with ID or name '{key}' not found.") raise KeyError(f"Project with ID or name '{key}' not found.")
def __str__(self):
return f"Organization(url=\"{self._org_url}\")"
@auto_properties({ @auto_properties({
"id": "id",
"name": "name", "name": "name",
"url": "url", "url": "url",
"description": "description" "description": "description"
}) })
@devops("id", "_apis/projects/$key", "_apis/projects") class Project():
class Project(DevOps): def __init__(self, org: Organization, **kwargs):
def _get(self): self._org = org
self._get_entity( self.from_args(**kwargs) # type: ignore[attr-defined]
key_name="id",
get_url=f"_apis/projects/{self._id}"
)
def __init__(self, org: Organization, id: str, **kwargs): if not hasattr(self, "_id") or self._id is None:
super().__init__(org._org_url, org._token, org._api_version) raise ValueError("Project ID must be provided.")
try: try:
self._id = str(UUID(id)) self._id = str(UUID(self._id))
except ValueError: except ValueError:
raise ValueError(f"Invalid project ID: {id}") raise ValueError(f"Invalid project ID: {self._id}")
self.set_auto_properties(**kwargs) log_entity_creation(log, Project, self.id)
def get_auto_properties(self):
r = get_url(
URL=f"{self._org._org_url}_apis/projects/{self._id}",
token=self._org._token,
api_version=self._org._api_version
)
self.from_json(r.json()) # type: ignore[attr-defined]
def __str__(self): def __str__(self):
return f"Project(name={self._name}, id={self._id})" return f"Project(name=\"{self.name}\", id={self.id})" # type: ignore[attr-defined]
@property @property
def id(self): def id(self):
return self._id return self._id
@property
def organization(self):
return self._org
@property @property
def repositories(self): def repositories(self):
if not hasattr(self, "_repositories"): if not hasattr(self, "_repositories"):
self._repositories = self._entities( self._repositories = [Repository(project=self, **repo) for repo in self._org.get_path(f"{self._id}/_apis/git/repositories").json().get("value", [])]
entity_class=Repository,
key_name="id",
list_url=f"{self._id}/_apis/git/repositories")
return self._repositories return self._repositories
def __getitem__(self, key: str) -> Repository: def __getitem__(self, key: str) -> Repository:
for repo in self.repositories: for repo in self.repositories:
if repo.id == key or repo.name == key: if repo.id == key or repo.name == key: # type: ignore[attr-defined]
return repo return repo
raise KeyError(f"Repository with ID or name '{key}' not found.") raise KeyError(f"Repository with ID or name '{key}' not found.")
@auto_properties({ @auto_properties({
"id": "id",
"name": "name", "name": "name",
"url": "url", "url": "url",
"default_branch": "defaultBranch", "default_branch": "defaultBranch",
@@ -206,53 +171,49 @@ class Project(DevOps):
"ssh_url": "sshUrl", "ssh_url": "sshUrl",
"web_url": "webUrl" "web_url": "webUrl"
}) })
class Repository(DevOps): class Repository():
def _get(self): def __init__(self, project: Project, **kwargs):
self._get_entity(
key_name="id",
get_url=f"{self._project.id}/_apis/git/repositories/{self._id}"
)
def __init__(self, project: Project, id: str, **kwargs):
super().__init__(project._org_url, project._token, project._api_version)
self._project = project self._project = project
self._id = id
try: if "id" not in kwargs and "name" not in kwargs:
UUID(id) # Check if it's a valid UUID raise ValueError("Either repository ID or name must be provided.")
except ValueError:
# Called with a repository name, fetch by name if "id" in kwargs:
self._get() try:
if kwargs: UUID(kwargs.get("id")) # Check if it's a valid UUID
raise ValueError("Automatic properties cannot be set when retrieving by name.") except ValueError:
return raise ValueError("Invalid repository ID, must be a valid UUID.")
# set other properties if provided # set other properties if provided
self.set_auto_properties(**kwargs) self.from_args(**kwargs) # type: ignore[attr-defined]
log_entity_creation(log, Repository, self.id)
def get_auto_properties(self):
id = self._id if hasattr(self, "_id") else self._name # type: ignore[attr-defined]
if id is None or id == "":
raise ValueError("Repository ID or name must be set to fetch properties.")
r = self._project.organization.get_path(path=f"{self._project.id}/_apis/git/repositories/{id}")
self.from_json(r.json()) # type: ignore[attr-defined]
@property @property
def id(self): def id(self):
return self._id return self._id # type: ignore[attr-defined]
@property @property
def project(self): def project(self):
return self._project return self._project
def __str__(self): def __str__(self):
return f"Repository(name={self.name}, id={self._id})" return f"Repository(name={self.name}, id={self._id})" # type: ignore[attr-defined]
@property @property
def items(self): def items(self):
log.debug(f"Fetching items for repository '{self.name}'", extra={"repository_name": self.name}) # type: ignore[attr-defined]
if not hasattr(self, "_items"): if not hasattr(self, "_items"):
self._items = self._entities( root = Item(repository=self, path="/", git_object_type="tree")
entity_class=Item, self._items = root.get_child_items()
key_name="path",
list_url=f"{self._project.id}/_apis/git/repositories/{self._id}/items",
params={
"scopePath": "/",
"recursionLevel": "oneLevel"
}
)
return self._items return self._items
def __getitem__(self, key: str) -> Item: def __getitem__(self, key: str) -> Item:
@@ -262,46 +223,73 @@ class Repository(DevOps):
raise KeyError(f"Item with path '{key}' not found.") raise KeyError(f"Item with path '{key}' not found.")
@auto_properties({ @auto_properties({
"path": "path",
"object_id": "objectId", "object_id": "objectId",
"git_object_type": "gitObjectType", "git_object_type": "gitObjectType",
"commit_id": "commitId", "commit_id": "commitId",
"is_folder": "isFolder", "is_folder": "isFolder",
"url": "url" "url": "url"
}) })
class Item(DevOps): class Item():
def __init__(self, repository: Repository, path: str, **kwargs): def __init__(self, repository: Repository, **kwargs):
super().__init__(repository._org_url, repository._token, repository._api_version)
self._repository = repository self._repository = repository
self._path = path self.from_args(**kwargs) # type: ignore[attr-defined]
self.set_auto_properties(**kwargs) # set properties defined in decorator log_entity_creation(log, Item, self.path)
def _get(self): def get_auto_properties(self):
self._get_entity( r = self._repository._project.organization.get_path(
key_name="path", path=f"{self._repository._project.id}/_apis/git/repositories/{self._repository.id}/items",
get_url=f"{self._repository._project.id}/_apis/git/repositories/{self._repository.id}/items",
params={ params={
"path": self._path, "path": self.path,
"$format": "json", "$format": "json",
"recursionLevel": "none" "recursionLevel": "none"
} }
) )
self.from_json(r.json()) # type: ignore[attr-defined]
def get_content(self) -> bytes:
"""Get the content of the item if it is a file."""
if self.git_object_type != "blob": # type: ignore[attr-defined]
raise ValueError("Content can only be fetched for blob items.")
r = self._repository._project.organization.get_path(
path=f"{self._repository._project.id}/_apis/git/repositories/{self._repository.id}/items",
params={
"path": self.path,
"recursionLevel": "none"
}
)
return r.content
@property @property
def path(self): def path(self):
return self._path return self._path # type: ignore[attr-defined]
def get_child_items(self) -> list[Item]:
"""Get child items if this item is a folder."""
if self.git_object_type != "tree": # type: ignore[attr-defined]
raise ValueError("Child items can only be fetched for folder items.")
# Fetch child objects
objects = self._repository._project.organization.get_path(
path=f"{self._repository._project.id}/_apis/git/repositories/{self._repository.id}/items",
params={
"scopePath": self.path,
"recursionLevel": "oneLevel"
}
).json().get("value", [])
child_items = []
for obj in objects:
i = Item(repository=self._repository, path=obj.get("path"))
i.from_json(obj) # type: ignore[attr-defined]
child_items.append(i)
return child_items
@property @property
def children(self): def children(self):
"""List items under this item if it is a folder.""" if not hasattr(self, "_children"):
if self.git_object_type == "tree": self._children = self.get_child_items()
return self._entities( return self._children
entity_class=Item,
key_name="path", def __str__(self):
list_url=f"{self._repository._project.id}/_apis/git/repositories/{self._repository.id}/items", return f"Item(path=\"{self._path}\" type={self.git_object_type})" # type: ignore[attr-defined]
params={
"scopePath": self._path,
"recursionLevel": "oneLevel"
}
)
else:
raise ValueError("Items can only be listed for folder items.")

44
sk/logger.py Normal file
View File

@@ -0,0 +1,44 @@
import logging
from loki_logger_handler.loki_logger_handler import LokiLoggerHandler
LOG_CONSOLE = 1
LOG_FILE = 2
LOG_LOKI = 4
# Quick non-intrusive debug check for sk.devops logger
def setup(name: str, handlers: int) -> logging.Logger:
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s [%(url)]')
if handlers & LOG_CONSOLE:
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(fmt)
logger.addHandler(console)
if handlers & LOG_FILE:
file = logging.FileHandler('devops_debug.log')
file.setLevel(logging.DEBUG)
file.setFormatter(fmt)
logger.addHandler(file)
if handlers & LOG_LOKI:
# Create an instance of the custom handler
custom_handler = LokiLoggerHandler(
url="https://loki.koszewscy.waw.pl/loki/api/v1/push",
labels={"application": "docs-harverster"},
label_keys={"http_method": "http_method"},
timeout=10,
)
logger.addHandler(custom_handler)
return logger
def log_entity_creation(logger: logging.Logger, entity_class: type, entity_key: str):
logger.debug(f'Created new "{entity_class.__name__}" object with key: "{entity_key}"',
extra={
"entity_class": entity_class.__name__,
"entity_key": entity_key
})