Skip to content

Commit 3b21b93

Browse files
author
Luke Sneeringer
committed
WIP
1 parent 503d11f commit 3b21b93

File tree

2 files changed

+12
-4
lines changed

2 files changed

+12
-4
lines changed

pubsub/google/cloud/pubsub.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616

1717
from google.cloud.pubsub_v1 import PublisherClient
1818
from google.cloud.pubsub_v1 import SubscriberClient
19+
from google.cloud.pubsub_v1 import types
1920

2021

2122
__all__ = (
2223
'PublisherClient',
2324
'SubscriberClient',
25+
'types',
2426
)

pubsub/google/cloud/pubsub_v1/publisher/batch.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,16 @@ class Batch(object):
4848
client (:class:`google.cloud.pubsub_v1.PublisherClient`): The
4949
publisher client used to create this batch. Batch settings are
5050
inferred from this.
51+
topic (str): The topic. The format for this is
52+
``projects/{project}/topics/{topic}``.
5153
settings (:class:`google.cloud.pubsub_v1.types.Batching`): The
5254
settings for batch publishing. These should be considered
5355
immutable once the batch has been opened.
56+
autocommit (bool): Whether to autocommit the batch when the time
57+
has elapsed. Defaults to True unless ``settings.max_latency`` is
58+
inf.
5459
"""
55-
def __init__(self, client, topic, settings):
60+
def __init__(self, client, topic, settings, autocommit=True):
5661
self._client = client
5762
self._topic = topic
5863
self._settings = settings
@@ -63,8 +68,9 @@ def __init__(self, client, topic, settings):
6368

6469
# Continually monitor the thread until it is time to commit the
6570
# batch, or the batch is explicitly committed.
66-
self._process = self._client.thread_class(target=self.monitor)
67-
self._process.start()
71+
if autocommit and self._settings.max_latency < float('inf'):
72+
self._process = self._client.thread_class(target=self.monitor)
73+
self._process.start()
6874

6975
@property
7076
def client(self):
@@ -100,7 +106,7 @@ def commit(self):
100106
self._status = 'in-flight'
101107

102108
# Begin the request to publish these messages.
103-
response = self._client.api.publish(self._topic, self.flush())
109+
response = self._client.api.publish(self._topic, list(self.flush()))
104110

105111
# FIXME (lukesneeringer): How do I check for errors on this?
106112
self._status = 'success'

0 commit comments

Comments
 (0)