Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
refactor: use 'Client._list_resource' in 'Bucket.list_notificatoins'
Also, provide explicit coverage for 'bucket._item_to_notification'.

Toward #38
  • Loading branch information
tseaver committed Jun 8, 2021
commit 68b762ebee38ffbe56918b903956e3fab4cde1fc
12 changes: 2 additions & 10 deletions google/cloud/storage/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
import base64
import copy
import datetime
import functools
import json
import warnings

import six
from six.moves.urllib.parse import urlsplit

from google.api_core import page_iterator
from google.api_core import datetime_helpers
from google.cloud._helpers import _datetime_to_rfc3339
from google.cloud._helpers import _NOW
Expand Down Expand Up @@ -1392,14 +1390,8 @@ def list_notifications(
"""
client = self._require_client(client)
path = self.path + "/notificationConfigs"
api_request = functools.partial(
client._connection.api_request, timeout=timeout, retry=retry
)
iterator = page_iterator.HTTPIterator(
client=client,
api_request=api_request,
path=path,
item_to_value=_item_to_notification,
iterator = client._list_resource(
path, _item_to_notification, timeout=timeout, retry=retry,
)
iterator.bucket = self
return iterator
Expand Down
141 changes: 75 additions & 66 deletions tests/unit/test_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -1000,63 +1000,50 @@ def test_list_blobs_w_explicit(self):
retry=retry,
)

def test_list_notifications(self):
from google.cloud.storage.notification import BucketNotification
from google.cloud.storage.notification import _TOPIC_REF_FMT
from google.cloud.storage.notification import (
JSON_API_V1_PAYLOAD_FORMAT,
NONE_PAYLOAD_FORMAT,
)
def test_list_notifications_w_defaults(self):
from google.cloud.storage.bucket import _item_to_notification

NAME = "name"
bucket_name = "name"
client = self._make_client()
client._list_resource = mock.Mock(spec=[])
bucket = self._make_one(client=client, name=bucket_name)

topic_refs = [("my-project-123", "topic-1"), ("other-project-456", "topic-2")]
iterator = bucket.list_notifications()

resources = [
{
"topic": _TOPIC_REF_FMT.format(*topic_refs[0]),
"id": "1",
"etag": "DEADBEEF",
"selfLink": "https://example.com/notification/1",
"payload_format": NONE_PAYLOAD_FORMAT,
},
{
"topic": _TOPIC_REF_FMT.format(*topic_refs[1]),
"id": "2",
"etag": "FACECABB",
"selfLink": "https://example.com/notification/2",
"payload_format": JSON_API_V1_PAYLOAD_FORMAT,
},
]
connection = _Connection({"items": resources})
client = _Client(connection)
bucket = self._make_one(client=client, name=NAME)
self.assertIs(iterator, client._list_resource.return_value)
self.assertIs(iterator.bucket, bucket)

notifications = list(bucket.list_notifications(timeout=42))
expected_path = "/b/{}/notificationConfigs".format(bucket_name)
expected_item_to_value = _item_to_notification
client._list_resource.assert_called_once_with(
expected_path,
expected_item_to_value,
timeout=self._get_default_timeout(),
retry=DEFAULT_RETRY,
)

req_args = client._connection._requested[0]
self.assertEqual(req_args.get("timeout"), 42)
def test_list_notifications_w_explicit(self):
from google.cloud.storage.bucket import _item_to_notification

self.assertEqual(len(notifications), len(resources))
for notification, resource, topic_ref in zip(
notifications, resources, topic_refs
):
self.assertIsInstance(notification, BucketNotification)
self.assertEqual(notification.topic_project, topic_ref[0])
self.assertEqual(notification.topic_name, topic_ref[1])
self.assertEqual(notification.notification_id, resource["id"])
self.assertEqual(notification.etag, resource["etag"])
self.assertEqual(notification.self_link, resource["selfLink"])
self.assertEqual(
notification.custom_attributes, resource.get("custom_attributes")
)
self.assertEqual(notification.event_types, resource.get("event_types"))
self.assertEqual(
notification.blob_name_prefix, resource.get("blob_name_prefix")
)
self.assertEqual(
notification.payload_format, resource.get("payload_format")
)
bucket_name = "name"
other_client = self._make_client()
other_client._list_resource = mock.Mock(spec=[])
bucket = self._make_one(client=None, name=bucket_name)
timeout = 42
retry = mock.Mock(spec=[])

iterator = bucket.list_notifications(
client=other_client, timeout=timeout, retry=retry,
)

self.assertIs(iterator, other_client._list_resource.return_value)
self.assertIs(iterator.bucket, bucket)

expected_path = "/b/{}/notificationConfigs".format(bucket_name)
expected_item_to_value = _item_to_notification
other_client._list_resource.assert_called_once_with(
expected_path, expected_item_to_value, timeout=timeout, retry=retry,
)

def test_get_notification_miss_w_defaults(self):
from google.cloud.exceptions import NotFound
Expand Down Expand Up @@ -1622,7 +1609,7 @@ def test_reload_w_metageneration_match(self):
)

def test_reload_w_generation_match(self):
connection = _Connection({})
connection = _Connection()
client = _Client(connection)
bucket = self._make_one(client=client, name="name")

Expand Down Expand Up @@ -3870,30 +3857,52 @@ def test_generate_signed_url_v4_w_bucket_bound_hostname_w_bare_hostname(self):
self._generate_signed_url_v4_helper(bucket_bound_hostname="cdn.example.com")


class Test__item_to_notification(unittest.TestCase):
def _call_fut(self, iterator, item):
from google.cloud.storage.bucket import _item_to_notification

return _item_to_notification(iterator, item)

def test_it(self):
from google.cloud.storage.notification import BucketNotification
from google.cloud.storage.notification import _TOPIC_REF_FMT
from google.cloud.storage.notification import NONE_PAYLOAD_FORMAT

iterator = mock.Mock(spec=["bucket"])
project = "my-project-123"
topic = "topic-1"
item = {
"topic": _TOPIC_REF_FMT.format(project, topic),
"id": "1",
"etag": "DEADBEEF",
"selfLink": "https://example.com/notification/1",
"payload_format": NONE_PAYLOAD_FORMAT,
}

notification = self._call_fut(iterator, item)

self.assertIsInstance(notification, BucketNotification)
self.assertIs(notification._bucket, iterator.bucket)
self.assertEqual(notification._topic_name, topic)
self.assertEqual(notification._topic_project, project)
self.assertEqual(notification._properties, item)


class _Connection(object):
_delete_bucket = False
credentials = None

def __init__(self, *responses):
self._responses = responses
self._requested = []
self._deleted_buckets = []
self.credentials = None
def __init__(self):
pass

def api_request(self, **kw):
self._requested.append(kw)
response, self._responses = self._responses[0], self._responses[1:]
return response
def api_request(self, **kw): # pragma: NO COVER
pass


class _Client(object):
def __init__(self, connection, project=None):
self._base_connection = connection
self.project = project

@property
def _connection(self):
return self._base_connection

@property
def _credentials(self):
return self._base_connection.credentials