Skip to content

Commit 1e879a1

Browse files
author
Luke Sneeringer
committed
wip
1 parent 3b21b93 commit 1e879a1

File tree

6 files changed

+150
-81
lines changed

6 files changed

+150
-81
lines changed

pubsub/google/cloud/pubsub_v1/publisher/batch.py

Lines changed: 70 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,9 @@
2222
import six
2323

2424
from google.cloud.pubsub_v1 import types
25+
from google.cloud.pubsub_v1.publisher import exceptions
2526
from google.cloud.pubsub_v1.publisher import future
2627

27-
QueueItem = collections.namedtuple('QueueItem', ['message', 'future'])
28-
2928

3029
class Batch(object):
3130
"""A batch of messages.
@@ -59,16 +58,24 @@ class Batch(object):
5958
"""
6059
def __init__(self, client, topic, settings, autocommit=True):
6160
self._client = client
62-
self._topic = topic
63-
self._settings = settings
64-
self._messages = queue.Queue()
65-
self._futures = queue.Queue()
66-
self._status = 'accepting messages'
67-
self._message_ids = {}
61+
62+
# Create a namespace that is owned by the client manager; this
63+
# is necessary to be able to have these values be communicable between
64+
# processes.
65+
self._ = self.manager.Namespace()
66+
self._.futures = self.manager.list()
67+
self._.messages = self.manager.list()
68+
self._.message_ids = self.manager.dict()
69+
self._.settings = settings
70+
self._.status = 'accepting messages'
71+
self._.topic = topic
72+
73+
# This is purely internal tracking.
74+
self._process = None
6875

6976
# Continually monitor the thread until it is time to commit the
7077
# batch, or the batch is explicitly committed.
71-
if autocommit and self._settings.max_latency < float('inf'):
78+
if autocommit and self._.settings.max_latency < float('inf'):
7279
self._process = self._client.thread_class(target=self.monitor)
7380
self._process.start()
7481

@@ -82,6 +89,16 @@ def client(self):
8289
"""
8390
return self._client
8491

92+
@property
93+
def manager(self):
94+
"""Return the client's manager.
95+
96+
Returns:
97+
:class:`multiprocessing.Manager`: The manager responsible for
98+
handling shared memory objects.
99+
"""
100+
return self._client.manager
101+
85102
@property
86103
def status(self):
87104
"""Return the status of this batch.
@@ -90,7 +107,7 @@ def status(self):
90107
str: The status of this batch. All statuses are human-readable,
91108
all-lowercase strings.
92109
"""
93-
return self._status
110+
return self._.status
94111

95112
def commit(self):
96113
"""Actually publish all of the messages on the active batch.
@@ -99,52 +116,46 @@ def commit(self):
99116
batch on the publisher, and then the batch is discarded upon
100117
completion.
101118
"""
102-
# If this is the active batch on the cleint right now, remove it.
103-
self._client.batch(self._topic, pop=self)
104-
105119
# Update the status.
106-
self._status = 'in-flight'
120+
self._.status = 'in-flight'
107121

108122
# Begin the request to publish these messages.
109-
response = self._client.api.publish(self._topic, list(self.flush()))
123+
if len(self._.messages) == 0:
124+
raise Exception('Empty queue')
125+
response = self._client.api.publish(self._.topic, self._.messages)
126+
127+
# Sanity check: If the number of message IDs is not equal to the
128+
# number of futures I have, then something went wrong.
129+
if len(response.message_ids) != len(self._.futures):
130+
raise exceptions.PublishError(
131+
'Some messages were not successfully published.',
132+
)
110133

111134
# FIXME (lukesneeringer): How do I check for errors on this?
112-
self._status = 'success'
135+
self._.status = 'success'
113136

114137
# Iterate over the futures on the queue and return the response IDs.
115138
# We are trusting that there is a 1:1 mapping, and raise an exception
116139
# if not.
117-
try:
118-
for message_id in response.message_ids:
119-
future_ = self._futures.get(block=False)
120-
self._message_ids[future_] = message_id
121-
future_._trigger()
122-
except queue.Empty:
123-
raise ValueError('More message IDs came back than messages '
124-
'were published.')
125-
126-
# If the queue of futures is not empty, we did not get enough IDs
127-
# back.
128-
if self._futures.empty():
129-
raise ValueError('Fewer message IDs came back than messages '
130-
'were published.')
131-
132-
133-
def flush(self):
134-
"""Flush the messages off of this queue, one at a time.
135-
136-
This method is called when the batch is committed. Calling it outside
137-
of the context of committing will effectively remove messages
138-
from the batch.
139-
140-
Yields:
141-
:class:~`pubsub_v1.types.PubsubMessage`: A Pub/Sub Message.
140+
for mid, fut in zip(response.message_ids, self._.futures):
141+
self._message_ids[fut] = mid
142+
fut._trigger()
143+
144+
def get_message_id(self, publish_future):
145+
"""Return the message ID corresponding to the given future.
146+
147+
Args:
148+
publish_future (:class:~`future.Future`): The future returned
149+
from a ``publish`` call.
150+
151+
Returns:
152+
str: The message ID.
153+
154+
Raises:
155+
KeyError: If the future is not yet done or there is no message
156+
ID corresponding to it.
142157
"""
143-
try:
144-
while True:
145-
yield self._messages.get(block=False)
146-
except queue.Empty:
147-
raise StopIteration
158+
return self._message_ids[publish_future]
148159

149160
def monitor(self):
150161
"""Commit this batch after sufficient time has elapsed.
@@ -156,11 +167,11 @@ def monitor(self):
156167
# in a separate thread.
157168
#
158169
# Sleep for however long we should be waiting.
159-
time.sleep(self._settings.max_latency)
170+
time.sleep(self._.settings.max_latency)
160171

161172
# If, in the intervening period, the batch started to be committed,
162173
# then no-op at this point.
163-
if self._status != 'accepting messages':
174+
if self._.status != 'accepting messages':
164175
return
165176

166177
# Commit.
@@ -216,10 +227,18 @@ def publish(self, data, **attrs):
216227
'be sent as text strings.')
217228

218229
# Store the actual message in the batch's message queue.
219-
self._messages.put(types.PubsubMessage(data=data, attributes=attrs))
230+
self._.messages.append(
231+
types.PubsubMessage(data=data, attributes=attrs),
232+
)
220233

221234
# Return a Future. That future needs to be aware of the status
222235
# of this batch.
223-
f = future.Future(self)
224-
self._futures.put(f)
236+
f = future.Future(self._)
237+
self._.futures.append(f)
225238
return f
239+
240+
241+
# Make a fake batch. This is used by the client to do single-op checks
242+
# for batch existence.
243+
FakeBatch = collections.namedtuple('FakeBatch', ['status'])
244+
FAKE = FakeBatch(status='fake')

pubsub/google/cloud/pubsub_v1/publisher/client.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from google.cloud.pubsub_v1 import _gapic
2626
from google.cloud.pubsub_v1 import types
2727
from google.cloud.pubsub_v1.publisher.batch import Batch
28+
from google.cloud.pubsub_v1.publisher.batch import FAKE
2829

2930

3031
__VERSION__ = pkg_resources.get_distribution('google-cloud-pubsub').version
@@ -58,7 +59,11 @@ def __init__(self, batching=(), thread_class=multiprocessing.Process,
5859
kwargs['lib_name'] = 'gccl'
5960
kwargs['lib_version'] = __VERSION__
6061
self.api = self._gapic_class(**kwargs)
61-
self.batching = types.Batching(batching)
62+
self.batching = types.Batching(*batching)
63+
64+
# Set the manager, which is responsible for granting shared memory
65+
# objects.
66+
self._manager = multiprocessing.Manager()
6267

6368
# Set the thread class.
6469
self._thread_class = thread_class
@@ -67,6 +72,16 @@ def __init__(self, batching=(), thread_class=multiprocessing.Process,
6772
# messages. One batch exists for each topic.
6873
self._batches = {}
6974

75+
@property
76+
def manager(self):
77+
"""Return the manager.
78+
79+
Returns:
80+
:class:`multiprocessing.Manager`: The manager responsible for
81+
handling shared memory objects.
82+
"""
83+
return self._manager
84+
7085
@property
7186
def thread_class(self):
7287
"""Return the thread class provided at instantiation.
@@ -76,7 +91,7 @@ def thread_class(self):
7691
"""
7792
return self._thread_class
7893

79-
def batch(self, topic, create=True, pop=None):
94+
def batch(self, topic, create=True):
8095
"""Return the current batch.
8196
8297
This will create a new batch only if no batch currently exists.
@@ -85,16 +100,13 @@ def batch(self, topic, create=True, pop=None):
85100
topic (str): A string representing the topic.
86101
create (bool): Whether to create a new batch if no batch is
87102
found. Defaults to True.
88-
pop (:class:~`pubsub_v1.batch.Batch`): Pop the batch off
89-
if it is found *and* is the batch that was sent. Defaults
90-
to None (never pop).
91103
92104
Returns:
93105
:class:~`pubsub_v1.batch.Batch` The batch object.
94106
"""
95107
# If there is no matching batch yet, then potentially create one
96108
# and place it on the batches dictionary.
97-
if topic not in self._batches:
109+
if self._batches.get(topic, FAKE).status != 'accepting messages':
98110
if not create:
99111
return None
100112
self._batches[topic] = Batch(
@@ -103,10 +115,6 @@ def batch(self, topic, create=True, pop=None):
103115
topic=topic,
104116
)
105117

106-
# If we are supposed to remove the batch, pop it off and return it.
107-
if pop and self._batches[topic] == pop:
108-
return self._batches.pop(topic)
109-
110118
# Simply return the appropriate batch.
111119
return self._batches[topic]
112120

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Copyright 2017, Google Inc. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
class PublishError(RuntimeError):
16+
pass

pubsub/google/cloud/pubsub_v1/publisher/future.py

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ class Future(object):
2828
methods in this library.
2929
3030
Args:
31-
batch (:class:~`pubsub_v1.batch.Batch`): The batch object that
32-
is committing this message.
31+
batch (:class:`multiprocessing.Namespace`): Information about the
32+
batch object that is committing this message.
3333
"""
34-
def __init__(self, batch):
35-
self._batch = batch
34+
def __init__(self, batch_info):
35+
self._batch_info = batch_info
3636
self._hash = hash(uuid.uuid4())
3737
self._callbacks = queue.Queue()
3838

@@ -66,7 +66,7 @@ def done(self):
6666
This still returns True in failure cases; checking `result` or
6767
`exception` is the canonical way to assess success or failure.
6868
"""
69-
return self.batch.status in ('success', 'error')
69+
return self._batch_info.status in ('success', 'error')
7070

7171
def result(self, timeout=None):
7272
"""Return the message ID, or raise an exception.
@@ -88,7 +88,7 @@ def result(self, timeout=None):
8888
# return an appropriate value.
8989
err = self.exception(timeout=timeout)
9090
if err is None:
91-
return self.batch.get_message_id(self._client_id)
91+
return self._batch_info.message_ids[self]
9292
raise err
9393

9494
def exception(self, timeout=None, _wait=1):
@@ -102,18 +102,18 @@ def exception(self, timeout=None, _wait=1):
102102
times out and raises TimeoutError.
103103
104104
Raises:
105-
:class:~`pubsub_v1.TimeoutError`: If the request times out.
105+
:exc:`TimeoutError`: If the request times out.
106106
107107
Returns:
108108
:class:`Exception`: The exception raised by the call, if any.
109109
"""
110110
# If the batch completed successfully, this should return None.
111-
if self.batch.status == 'success':
111+
if self.batch_info.status == 'success':
112112
return None
113113

114114
# If this batch had an error, this should return it.
115-
if self.batch.status == 'error':
116-
return self.batch._error
115+
if self.batch_info.status == 'error':
116+
return self.batch_info.error
117117

118118
# If the timeout has been exceeded, raise TimeoutError.
119119
if timeout < 0:
@@ -151,12 +151,3 @@ def _trigger(self):
151151
callback(self)
152152
except queue.Empty:
153153
return None
154-
155-
156-
class TimeoutError(object):
157-
"""Exception subclass for timeout-related errors.
158-
159-
This exception is only returned by the :class:~`pubsub_v1.future.Future`
160-
class.
161-
"""
162-
pass
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Copyright 2017, Google Inc. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
def retry(func, delay=0, count=0, err=None, **kwargs):
16+
"""Attempt to retry a function after the provided delay.
17+
18+
If there have been too many retries, raise an exception.
19+
20+
Args:
21+
func (callable): The function to retry.
22+
delay (int): The period to delay before retrying; specified in seconds.
23+
count (int): The number of previous retries that have occurred.
24+
If this is >= 5, an exception will be raised.
25+
**kwargs (dict): Other keyword arguments to pass to the function.
26+
"""
27+
# If there have been too many retries, simply raise the exception.
28+
if count >= 5:
29+
raise err
30+
31+
# Sleep the given delay.
32+
time.sleep(delay)
33+
34+
# Try calling the method again.
35+
return func(delay=delay, count=count, **kwargs)

pubsub/google/cloud/pubsub_v1/types.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
)
3434
Batching.__new__.__defaults__ = (
3535
1024 * 1024 * 5, # max_bytes: 5 MB
36-
0.001, # max_latency: 1 millisecond
36+
0.25, # max_latency: 0.25 seconds
3737
1000, # max_messages: 1,000
3838
)
3939

0 commit comments

Comments
 (0)