Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cloudfoundry_client/operations/push/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import shutil
import tempfile
import time
from typing import Tuple, Dict
from typing import Tuple

from cloudfoundry_client.client import CloudFoundryClient
from cloudfoundry_client.operations.push.cf_ignore import CfIgnore
Expand Down Expand Up @@ -211,7 +211,7 @@ def _resolve_new_tcp_route(self, space: Entity, domain: Entity, port: int) -> En
return existing_route

@staticmethod
def _split_route(requested_route: Dict[str, str]) -> Tuple[str, int, str]:
def _split_route(requested_route: dict[str, str]) -> Tuple[str, int, str]:
route_splitted = PushOperation.SPLIT_ROUTE_PATTERN.match(requested_route["route"])
if route_splitted is None:
raise AssertionError("Invalid route: %s" % requested_route["route"])
Expand All @@ -222,7 +222,7 @@ def _split_route(requested_route: Dict[str, str]) -> Tuple[str, int, str]:

@staticmethod
def _resolve_domain(
route: str, private_domains: Dict[str, Entity], shared_domains: Dict[str, Entity]
route: str, private_domains: dict[str, Entity], shared_domains: dict[str, Entity]
) -> Tuple[str, str, Entity]:
for domains in [private_domains, shared_domains]:
if route in domains:
Expand Down
16 changes: 8 additions & 8 deletions cloudfoundry_client/v2/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
from http import HTTPStatus
from time import sleep
from typing import Dict, TYPE_CHECKING
from typing import TYPE_CHECKING

from cloudfoundry_client.doppler.client import EnvelopeStream
from cloudfoundry_client.errors import InvalidStatusCode
Expand All @@ -17,7 +17,7 @@


class Application(Entity):
def instances(self) -> Dict[str, JsonObject]:
def instances(self) -> dict[str, JsonObject]:
return self.client.v2.apps.get_instances(self["metadata"]["guid"])

def start(self) -> "Application":
Expand All @@ -29,10 +29,10 @@ def stop(self) -> "Application":
def restart_instance(self, instance_id: int):
return self.client.v2.apps.restart_instance(self["metadata"]["guid"], instance_id)

def stats(self) -> Dict[str, JsonObject]:
def stats(self) -> dict[str, JsonObject]:
return self.client.v2.apps.get_stats(self["metadata"]["guid"])

def env(self) -> Dict[str, JsonObject]:
def env(self) -> dict[str, JsonObject]:
return self.client.v2.apps.get_env(self["metadata"]["guid"])

def summary(self) -> JsonObject:
Expand Down Expand Up @@ -80,13 +80,13 @@ def __init__(self, target_endpoint: str, client: "CloudFoundryClient"):
target_endpoint, client, "/v2/apps", lambda pairs: Application(target_endpoint, client, pairs)
)

def get_stats(self, application_guid: str) -> Dict[str, JsonObject]:
def get_stats(self, application_guid: str) -> dict[str, JsonObject]:
return self._get("%s/%s/stats" % (self.entity_uri, application_guid), JsonObject)

def get_instances(self, application_guid: str) -> Dict[str, JsonObject]:
def get_instances(self, application_guid: str) -> dict[str, JsonObject]:
return self._get("%s/%s/instances" % (self.entity_uri, application_guid), JsonObject)

def get_env(self, application_guid: str) -> Dict[str, JsonObject]:
def get_env(self, application_guid: str) -> dict[str, JsonObject]:
return self._get("%s/%s/env" % (self.entity_uri, application_guid), JsonObject)

def get_summary(self, application_guid: str) -> JsonObject:
Expand Down Expand Up @@ -195,7 +195,7 @@ def _wait_for_instances_in_state(
sleep(check_time)
sum_waiting += check_time

def _safe_get_instances(self, application_guid: str) -> Dict[str, JsonObject]:
def _safe_get_instances(self, application_guid: str) -> dict[str, JsonObject]:
try:
return self.get_instances(application_guid)
except InvalidStatusCode as ex:
Expand Down
4 changes: 2 additions & 2 deletions cloudfoundry_client/v2/service_instances.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, TYPE_CHECKING
from typing import TYPE_CHECKING

from cloudfoundry_client.v2.entities import EntityManager, Entity

Expand Down Expand Up @@ -44,7 +44,7 @@ def update(
params = None if not accepts_incomplete else dict(accepts_incomplete="true")
return super(ServiceInstanceManager, self)._update(instance_guid, request, params=params)

def list_permissions(self, instance_guid: str) -> Dict[str, bool]:
def list_permissions(self, instance_guid: str) -> dict[str, bool]:
return super(ServiceInstanceManager, self)._get("%s/%s/permissions" % (self.entity_uri, instance_guid), dict)

def remove(self, instance_guid: str, accepts_incomplete: bool | None = False, purge: bool | None = False):
Expand Down
8 changes: 4 additions & 4 deletions cloudfoundry_client/v3/service_plans.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, TYPE_CHECKING
from typing import TYPE_CHECKING

from cloudfoundry_client.v3.entities import EntityManager, Entity

Expand Down Expand Up @@ -27,12 +27,12 @@ def update(
def remove(self, guid: str):
super(ServicePlanManager, self)._remove(guid)

def get_visibility(self, service_plan_guid: str) -> Dict:
def get_visibility(self, service_plan_guid: str) -> dict:
return super(ServicePlanManager, self)._get(f"{self.target_endpoint}{self.entity_uri}/{service_plan_guid}/visibility")

# Updates a service plan visibility. It behaves similar to the POST service plan visibility endpoint but
# this endpoint will REPLACE the existing list of organizations when the service plan is organization visible.
def update_visibility(self, service_plan_guid: str, type: str, organizations: list[dict] | None = None) -> Dict:
def update_visibility(self, service_plan_guid: str, type: str, organizations: list[dict] | None = None) -> dict:
payload = {"type": type}
if organizations:
payload["organizations"] = organizations
Expand All @@ -42,7 +42,7 @@ def update_visibility(self, service_plan_guid: str, type: str, organizations: li

# Applies a service plan visibility. It behaves similar to the PATCH service plan visibility endpoint but
# this endpoint will APPEND to the existing list of organizations when the service plan is organization visible.
def apply_visibility_to_extra_orgs(self, service_plan_guid: str, organizations: list[dict]) -> Dict:
def apply_visibility_to_extra_orgs(self, service_plan_guid: str, organizations: list[dict]) -> dict:
payload = {"type": "organization", "organizations": organizations}
return super(ServicePlanManager, self)._post(
url=f"{self.target_endpoint}{self.entity_uri}/{service_plan_guid}/visibility", data=payload, files=None
Expand Down
Loading