Change the get_url method to return the Response instead of parsed JSON to allow downloading content of files.

This commit is contained in:
2025-11-08 14:55:45 +01:00
parent 6588313fa1
commit 27a5a13c47

View File

@@ -75,16 +75,16 @@ def get_url(URL: str, token: str, api_version: str, params: dict = {}) -> reques
"Authorization": f"Bearer {token}"
})
r.raise_for_status() # Ensure we raise an error for bad responses
return r.json() # Return parsed JSON response
return r # Return response
class Organization():
def __init__(self, org_url: str, token: str | None = None, api_version: str = DEVOPS_API_VERSION):
def __init__(self, org_url: str, token: str, api_version: str = DEVOPS_API_VERSION):
self._org_url = org_url.rstrip("/") + "/" # Ensure trailing slash
self._token = token
self._api_version = api_version
log_entity_creation(log, Organization, self._org_url)
def get_path(self, path: str, params: dict = {}) -> dict:
def get_path(self, path: str, params: dict = {}) -> requests.Response:
return get_url(
URL=urllib.parse.urljoin(self._org_url, path),
token=self._token,
@@ -96,12 +96,12 @@ class Organization():
def projects(self):
if not hasattr(self, "_projects"):
# Create Project objects
self._projects = [Project(org=self, **proj) for proj in self.get_path("_apis/projects").get("value", [])]
self._projects = [Project(org=self, **proj) for proj in self.get_path("_apis/projects").json().get("value", [])]
return self._projects
def __getitem__(self, key: str) -> Project:
for project in self.projects:
if project.id == key or project.name == key:
if project.id == key or project.name == key: # type: ignore[attr-defined]
return project
raise KeyError(f"Project with ID or name '{key}' not found.")
@@ -117,7 +117,7 @@ class Organization():
class Project():
def __init__(self, org: Organization, **kwargs):
self._org = org
self.from_args(**kwargs)
self.from_args(**kwargs) # type: ignore[attr-defined]
if not hasattr(self, "_id") or self._id is None:
raise ValueError("Project ID must be provided.")
@@ -135,10 +135,10 @@ class Project():
token=self._org._token,
api_version=self._org._api_version
)
self.from_json(r)
self.from_json(r.json()) # type: ignore[attr-defined]
def __str__(self):
return f"Project(name=\"{self.name}\", id={self.id})"
return f"Project(name=\"{self.name}\", id={self.id})" # type: ignore[attr-defined]
@property
def id(self):
@@ -151,12 +151,12 @@ class Project():
@property
def repositories(self):
if not hasattr(self, "_repositories"):
self._repositories = [Repository(project=self, **repo) for repo in self._org.get_path(f"{self._id}/_apis/git/repositories").get("value", [])]
self._repositories = [Repository(project=self, **repo) for repo in self._org.get_path(f"{self._id}/_apis/git/repositories").json().get("value", [])]
return self._repositories
def __getitem__(self, key: str) -> Repository:
for repo in self.repositories:
if repo.id == key or repo.name == key:
if repo.id == key or repo.name == key: # type: ignore[attr-defined]
return repo
raise KeyError(f"Repository with ID or name '{key}' not found.")
@@ -185,31 +185,31 @@ class Repository():
raise ValueError("Invalid repository ID, must be a valid UUID.")
# set other properties if provided
self.from_args(**kwargs)
self.from_args(**kwargs) # type: ignore[attr-defined]
log_entity_creation(log, Repository, self.id)
def get_auto_properties(self):
id = self._id if hasattr(self, "_id") else self._name
id = self._id if hasattr(self, "_id") else self._name # type: ignore[attr-defined]
if id is None or id == "":
raise ValueError("Repository ID or name must be set to fetch properties.")
r = self._project.organization.get_path(path=f"{self._project.id}/_apis/git/repositories/{id}")
self.from_json(r)
self.from_json(r.json()) # type: ignore[attr-defined]
@property
def id(self):
return self._id
return self._id # type: ignore[attr-defined]
@property
def project(self):
return self._project
def __str__(self):
return f"Repository(name={self.name}, id={self._id})"
return f"Repository(name={self.name}, id={self._id})" # type: ignore[attr-defined]
@property
def items(self):
log.debug(f"Fetching items for repository '{self.name}'", extra={"repository_name": self.name})
log.debug(f"Fetching items for repository '{self.name}'", extra={"repository_name": self.name}) # type: ignore[attr-defined]
if not hasattr(self, "_items"):
root = Item(repository=self, path="/", git_object_type="tree")
self._items = root.get_child_items()
@@ -233,7 +233,7 @@ class Repository():
class Item():
def __init__(self, repository: Repository, **kwargs):
self._repository = repository
self.from_args(**kwargs)
self.from_args(**kwargs) # type: ignore[attr-defined]
log_entity_creation(log, Item, self.path)
def get_auto_properties(self):
@@ -245,15 +245,15 @@ class Item():
"recursionLevel": "none"
}
)
self.from_json(r)
self.from_json(r.json()) # type: ignore[attr-defined]
@property
def path(self):
return self._path
return self._path # type: ignore[attr-defined]
def get_child_items(self) -> list[Item]:
"""Get child items if this item is a folder."""
if self.git_object_type != "tree":
if self.git_object_type != "tree": # type: ignore[attr-defined]
raise ValueError("Child items can only be fetched for folder items.")
# Fetch child objects
@@ -263,11 +263,11 @@ class Item():
"scopePath": self.path,
"recursionLevel": "oneLevel"
}
).get("value", [])
).json().get("value", [])
child_items = []
for obj in objects:
i = Item(repository=self._repository, path=obj.get("path"))
i.from_json(obj)
i.from_json(obj) # type: ignore[attr-defined]
child_items.append(i)
return child_items
@@ -278,4 +278,4 @@ class Item():
return self._children
def __str__(self):
return f"Item(path=\"{self._path}\" type={self.git_object_type})"
return f"Item(path=\"{self._path}\" type={self.git_object_type})" # type: ignore[attr-defined]