Skip to content

Commit 0da12f6

Browse files
author
Jon Wayne Parrott
authored
Promote subscribe_experimental() to subscribe(), remove old subscriber implementation. (googleapis#5274)
Promote subscribe_experimental() to subscribe(), remove old subscriber implementation. This removes the following public interfaces: * pubsub_v1.subscriber.policy.base * pubsub_v1.subscriber.policy.thread * pubsub_v1.subscriber.futures.Future * pubsub_v1.subscriber.client.Client.subscribe_experimental
1 parent 1feb52e commit 0da12f6

File tree

17 files changed

+69
-2532
lines changed

17 files changed

+69
-2532
lines changed

docs/pubsub/index.rst

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,25 +90,33 @@ the topic, and subscribe to that.
9090
... project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
9191
... sub='MY_SUBSCRIPTION_NAME', # Set this to something appropriate.
9292
... )
93-
>>> subscription = subscriber.create_subscription(subscription_name, topic)
93+
>>> subscriber.create_subscription(subscription_name, topic)
9494
95-
The subscription is opened asynchronously, and messages are processed by
96-
use of a callback.
95+
To receive messages on the subscription, you *subscribe* to the subscription.
96+
The client opens a stream in a background process and calls a callback for each
97+
message received.
9798

9899
.. code-block:: python
99100
100101
>>> def callback(message):
101102
... print(message.data)
102103
... message.ack()
103-
>>> future = subscription.open(callback)
104+
>>> future = subscriber.subscribe(subscription_name, callback)
104105
105106
You can use the future to block the main thread, and raise any exceptions
106-
that originate asychronously.
107+
that originate asynchronously.
107108

108109
.. code-block:: python
109110
110111
>>> future.result()
111112
113+
You can also cancel the future to stop receiving messages.
114+
115+
116+
.. code-block:: python
117+
118+
>>> future.cancel()
119+
112120
To learn more, consult the :doc:`subscriber documentation <subscriber/index>`.
113121

114122

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
Futures
2+
=======
3+
4+
.. automodule:: google.cloud.pubsub_v1.subscriber.futures
5+
:members:
6+
:inherited-members:

docs/pubsub/subscriber/api/policy.rst

Lines changed: 0 additions & 5 deletions
This file was deleted.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
Scheduler
2+
=========
3+
4+
.. automodule:: google.cloud.pubsub_v1.subscriber.scheduler
5+
:members:
6+
:inherited-members:

docs/pubsub/subscriber/index.rst

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -47,37 +47,38 @@ Pulling a Subscription
4747
----------------------
4848

4949
Once you have created a subscription (or if you already had one), the next
50-
step is to pull data from it. This entails two steps: first you must call
51-
:meth:`~.pubsub_v1.subscriber.client.Client.subscribe`, passing in the
52-
subscription string.
50+
step is to pull data from it. The subscriber client uses the
51+
:meth:`~.pubsub_v1.subscriber.client.Client.subscribe` method to start a
52+
background thread to receive messages from Pub/Sub and calls a callback with
53+
each message received.
5354

5455
.. code-block:: python
5556
5657
# As before, substitute {project} and {subscription} with appropriate
5758
# values for your application.
58-
subscription = subscriber.subscribe(
59+
future = subscriber.subscribe(
5960
'projects/{project}/subscriptions/{subscription}',
61+
callback
6062
)
6163
62-
This will return an object with an
63-
:meth:`~.pubsub_v1.subscriber.policy.thread.Policy.open` method; calling
64-
this method will actually begin consumption of the subscription.
64+
This will return a
65+
:class:`~.pubsub_v1.subscriber.futures.StreamingPullFuture`. This future allows
66+
you to control the background thread that is managing the subscription.
6567

6668

6769
Subscription Callbacks
6870
----------------------
6971

70-
Because subscriptions in this Pub/Sub client are opened asynchronously,
71-
processing the messages that are yielded by the subscription is handled
72-
through **callbacks**.
72+
Messages received from a subscription are processed asynchronously through
73+
**callbacks**.
7374

7475
The basic idea: Define a function that takes one argument; this argument
7576
will be a :class:`~.pubsub_v1.subscriber.message.Message` instance. This
7677
function should do whatever processing is necessary. At the end, the
77-
function should :meth:`~.pubsub_v1.subscriber.message.Message.ack` the
78-
message.
78+
function should either :meth:`~.pubsub_v1.subscriber.message.Message.ack`
79+
or :meth:`~.pubsub_v1.subscriber.message.Message.nack` the message.
7980

80-
When you call :meth:`~.pubsub_v1.subscriber.policy.thread.Policy.open`, you
81+
When you call :meth:`~.pubsub_v1.subscriber.client.Client.subscribe`, you
8182
must pass the callback that will be used.
8283

8384
Here is an example:
@@ -91,11 +92,15 @@ Here is an example:
9192
message.ack()
9293
9394
# Open the subscription, passing the callback.
94-
future = subscription.open(callback)
95+
future = subscriber.subscribe(
96+
'projects/{project}/subscriptions/{subscription}',
97+
callback
98+
)
9599
96-
The :meth:`~.pubsub_v1.subscriber.policy.thread.Policy.open` method returns
97-
a :class:`~.pubsub_v1.subscriber.futures.Future`, which is both the interface
98-
to wait on messages (e.g. block the primary thread) and to address exceptions.
100+
The :meth:`~.pubsub_v1.subscriber.client.Client.subscribe` method returns
101+
a :class:`~.pubsub_v1.subscriber.futures.StreamingPullFuture`, which is both
102+
the interface to wait on messages (e.g. block the primary thread) and to
103+
address exceptions.
99104

100105
To block the thread you are in while messages are coming in the stream,
101106
use the :meth:`~.pubsub_v1.subscriber.futures.Future.result` method:
@@ -104,6 +109,9 @@ use the :meth:`~.pubsub_v1.subscriber.futures.Future.result` method:
104109
105110
future.result()
106111
112+
.. note: This will block forever assuming no errors or that ``cancel`` is never
113+
called.
114+
107115
You can also use this for error handling; any exceptions that crop up on a
108116
thread will be set on the future.
109117

@@ -115,6 +123,15 @@ thread will be set on the future.
115123
subscription.close()
116124
raise
117125
126+
Finally, you can use
127+
:meth:`~.pubsub_v1.subscriber.futures.StreamingPullFuture.cancel` to stop
128+
receiving messages.
129+
130+
131+
.. code-block:: python
132+
133+
future.cancel()
134+
118135
119136
Explaining Ack
120137
--------------
@@ -142,5 +159,6 @@ API Reference
142159
:maxdepth: 2
143160

144161
api/client
145-
api/policy
146162
api/message
163+
api/futures
164+
api/scheduler

0 commit comments

Comments
 (0)