Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion google/cloud/spanner_dbapi/parse_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@

# DDL statements follow
# https://cloud.google.com/spanner/docs/data-definition-language
RE_DDL = re.compile(r"^\s*(CREATE|ALTER|DROP)", re.IGNORECASE | re.DOTALL)
RE_DDL = re.compile(r"^\s*(CREATE|ALTER|DROP|GRANT|REVOKE)", re.IGNORECASE | re.DOTALL)

RE_IS_INSERT = re.compile(r"^\s*(INSERT)", re.IGNORECASE | re.DOTALL)

Expand Down
99 changes: 95 additions & 4 deletions google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@
from google.cloud.exceptions import NotFound
from google.api_core.exceptions import Aborted
from google.api_core import gapic_v1
from google.iam.v1 import iam_policy_pb2
from google.iam.v1 import options_pb2

from google.cloud.spanner_admin_database_v1 import CreateDatabaseRequest
from google.cloud.spanner_admin_database_v1 import Database as DatabasePB
from google.cloud.spanner_admin_database_v1 import ListDatabaseRolesRequest
from google.cloud.spanner_admin_database_v1 import EncryptionConfig
from google.cloud.spanner_admin_database_v1 import RestoreDatabaseEncryptionConfig
from google.cloud.spanner_admin_database_v1 import RestoreDatabaseRequest
Expand Down Expand Up @@ -119,7 +122,8 @@ class Database(object):
:class:`~google.cloud.spanner_admin_database_v1.types.DatabaseDialect`
:param database_dialect:
(Optional) database dialect for the database

:type database_role: str or None
:param database_role: (Optional) user-assigned database_role for the session.
"""

_spanner_api = None
Expand All @@ -133,6 +137,7 @@ def __init__(
logger=None,
encryption_config=None,
database_dialect=DatabaseDialect.DATABASE_DIALECT_UNSPECIFIED,
database_role=None,
):
self.database_id = database_id
self._instance = instance
Expand All @@ -149,9 +154,10 @@ def __init__(
self._logger = logger
self._encryption_config = encryption_config
self._database_dialect = database_dialect
self._database_role = database_role

if pool is None:
pool = BurstyPool()
pool = BurstyPool(database_role=database_role)

self._pool = pool
pool.bind(self)
Expand Down Expand Up @@ -314,6 +320,14 @@ def database_dialect(self):
"""
return self._database_dialect

@property
def database_role(self):
"""User-assigned database_role for sessions created by the pool.
:rtype: str
:returns: a str with the name of the database role.
"""
return self._database_role

@property
def logger(self):
"""Logger used by the database.
Expand Down Expand Up @@ -584,16 +598,22 @@ def execute_pdml():

return _retry_on_aborted(execute_pdml, DEFAULT_RETRY_BACKOFF)()

def session(self, labels=None):
def session(self, labels=None, database_role=None):
"""Factory to create a session for this database.

:type labels: dict (str -> str) or None
:param labels: (Optional) user-assigned labels for the session.

:type database_role: str
:param database_role: (Optional) user-assigned database_role for the session.

:rtype: :class:`~google.cloud.spanner_v1.session.Session`
:returns: a session bound to this database.
"""
return Session(self, labels=labels)
# If role is specified in param, then that role is used
# instead.
role = database_role or self._database_role
return Session(self, labels=labels, database_role=role)

def snapshot(self, **kw):
"""Return an object which wraps a snapshot.
Expand Down Expand Up @@ -772,6 +792,29 @@ def list_database_operations(self, filter_="", page_size=None):
filter_=database_filter, page_size=page_size
)

def list_database_roles(self, page_size=None):
"""Lists Cloud Spanner database roles.

:type page_size: int
:param page_size:
Optional. The maximum number of database roles in each page of results
from this request. Non-positive values are ignored. Defaults to a
sensible value set by the API.

:type: Iterable
:returns:
Iterable of :class:`~google.cloud.spanner_admin_database_v1.types.spanner_database_admin.DatabaseRole`
resources within the current database.
"""
api = self._instance._client.database_admin_api
metadata = _metadata_with_prefix(self.name)

request = ListDatabaseRolesRequest(
parent=self.name,
page_size=page_size,
)
return api.list_database_roles(request=request, metadata=metadata)

def table(self, table_id):
"""Factory to create a table object within this database.

Expand Down Expand Up @@ -811,6 +854,54 @@ def list_tables(self):
for row in results:
yield self.table(row[0])

def get_iam_policy(self, policy_version=None):
"""Gets the access control policy for a database resource.

:type policy_version: int
:param policy_version:
(Optional) the maximum policy version that will be
used to format the policy. Valid values are 0, 1 ,3.

:rtype: :class:`~google.iam.v1.policy_pb2.Policy`
:returns:
returns an Identity and Access Management (IAM) policy. It is used to
specify access control policies for Cloud Platform
resources.
"""
api = self._instance._client.database_admin_api
metadata = _metadata_with_prefix(self.name)

request = iam_policy_pb2.GetIamPolicyRequest(
resource=self.name,
options=options_pb2.GetPolicyOptions(
requested_policy_version=policy_version
),
)
response = api.get_iam_policy(request=request, metadata=metadata)
return response

def set_iam_policy(self, policy):
"""Sets the access control policy on a database resource.
Replaces any existing policy.

:type policy: :class:`~google.iam.v1.policy_pb2.Policy`
:param policy_version:
the complete policy to be applied to the resource.

:rtype: :class:`~google.iam.v1.policy_pb2.Policy`
:returns:
returns the new Identity and Access Management (IAM) policy.
"""
api = self._instance._client.database_admin_api
metadata = _metadata_with_prefix(self.name)

request = iam_policy_pb2.SetIamPolicyRequest(
resource=self.name,
policy=policy,
)
response = api.set_iam_policy(request=request, metadata=metadata)
return response


class BatchCheckout(object):
"""Context manager for using a batch from a database.
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/spanner_v1/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ def database(
logger=None,
encryption_config=None,
database_dialect=DatabaseDialect.DATABASE_DIALECT_UNSPECIFIED,
database_role=None,
):
"""Factory to create a database within this instance.

Expand Down Expand Up @@ -477,6 +478,7 @@ def database(
logger=logger,
encryption_config=encryption_config,
database_dialect=database_dialect,
database_role=database_role,
)

def list_databases(self, page_size=None):
Expand Down
88 changes: 76 additions & 12 deletions google/cloud/spanner_v1/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import queue

from google.cloud.exceptions import NotFound
from google.cloud.spanner_v1 import BatchCreateSessionsRequest
from google.cloud.spanner_v1 import Session
from google.cloud.spanner_v1._helpers import _metadata_with_prefix


Expand All @@ -30,14 +32,18 @@ class AbstractSessionPool(object):
:type labels: dict (str -> str) or None
:param labels: (Optional) user-assigned labels for sessions created
by the pool.

:type database_role: str
:param database_role: (Optional) user-assigned database_role for the session.
"""

_database = None

def __init__(self, labels=None):
def __init__(self, labels=None, database_role=None):
if labels is None:
labels = {}
self._labels = labels
self._database_role = database_role

@property
def labels(self):
Expand All @@ -48,6 +54,15 @@ def labels(self):
"""
return self._labels

@property
def database_role(self):
"""User-assigned database_role for sessions created by the pool.

:rtype: str
:returns: database_role assigned by the user
"""
return self._database_role

def bind(self, database):
"""Associate the pool with a database.

Expand Down Expand Up @@ -104,9 +119,9 @@ def _new_session(self):
:rtype: :class:`~google.cloud.spanner_v1.session.Session`
:returns: new session instance.
"""
if self.labels:
return self._database.session(labels=self.labels)
return self._database.session()
return self._database.session(
labels=self.labels, database_role=self.database_role
)

def session(self, **kwargs):
"""Check out a session from the pool.
Expand Down Expand Up @@ -146,13 +161,22 @@ class FixedSizePool(AbstractSessionPool):
:type labels: dict (str -> str) or None
:param labels: (Optional) user-assigned labels for sessions created
by the pool.

:type database_role: str
:param database_role: (Optional) user-assigned database_role for the session.
"""

DEFAULT_SIZE = 10
DEFAULT_TIMEOUT = 10

def __init__(self, size=DEFAULT_SIZE, default_timeout=DEFAULT_TIMEOUT, labels=None):
super(FixedSizePool, self).__init__(labels=labels)
def __init__(
self,
size=DEFAULT_SIZE,
default_timeout=DEFAULT_TIMEOUT,
labels=None,
database_role=None,
):
super(FixedSizePool, self).__init__(labels=labels, database_role=database_role)
self.size = size
self.default_timeout = default_timeout
self._sessions = queue.LifoQueue(size)
Expand All @@ -167,9 +191,14 @@ def bind(self, database):
self._database = database
api = database.spanner_api
metadata = _metadata_with_prefix(database.name)
self._database_role = self._database_role or self._database.database_role
request = BatchCreateSessionsRequest(
session_template=Session(creator_role=self.database_role),
)

while not self._sessions.full():
resp = api.batch_create_sessions(
request=request,
database=database.name,
session_count=self.size - self._sessions.qsize(),
metadata=metadata,
Expand Down Expand Up @@ -243,10 +272,13 @@ class BurstyPool(AbstractSessionPool):
:type labels: dict (str -> str) or None
:param labels: (Optional) user-assigned labels for sessions created
by the pool.

:type database_role: str
:param database_role: (Optional) user-assigned database_role for the session.
"""

def __init__(self, target_size=10, labels=None):
super(BurstyPool, self).__init__(labels=labels)
def __init__(self, target_size=10, labels=None, database_role=None):
super(BurstyPool, self).__init__(labels=labels, database_role=database_role)
self.target_size = target_size
self._database = None
self._sessions = queue.LifoQueue(target_size)
Expand All @@ -259,6 +291,7 @@ def bind(self, database):
when needed.
"""
self._database = database
self._database_role = self._database_role or self._database.database_role

def get(self):
"""Check a session out from the pool.
Expand Down Expand Up @@ -340,10 +373,20 @@ class PingingPool(AbstractSessionPool):
:type labels: dict (str -> str) or None
:param labels: (Optional) user-assigned labels for sessions created
by the pool.

:type database_role: str
:param database_role: (Optional) user-assigned database_role for the session.
"""

def __init__(self, size=10, default_timeout=10, ping_interval=3000, labels=None):
super(PingingPool, self).__init__(labels=labels)
def __init__(
self,
size=10,
default_timeout=10,
ping_interval=3000,
labels=None,
database_role=None,
):
super(PingingPool, self).__init__(labels=labels, database_role=database_role)
self.size = size
self.default_timeout = default_timeout
self._delta = datetime.timedelta(seconds=ping_interval)
Expand All @@ -360,9 +403,15 @@ def bind(self, database):
api = database.spanner_api
metadata = _metadata_with_prefix(database.name)
created_session_count = 0
self._database_role = self._database_role or self._database.database_role

request = BatchCreateSessionsRequest(
session_template=Session(creator_role=self.database_role),
)

while created_session_count < self.size:
resp = api.batch_create_sessions(
request=request,
database=database.name,
session_count=self.size - created_session_count,
metadata=metadata,
Expand Down Expand Up @@ -470,13 +519,27 @@ class TransactionPingingPool(PingingPool):
:type labels: dict (str -> str) or None
:param labels: (Optional) user-assigned labels for sessions created
by the pool.

:type database_role: str
:param database_role: (Optional) user-assigned database_role for the session.
"""

def __init__(self, size=10, default_timeout=10, ping_interval=3000, labels=None):
def __init__(
self,
size=10,
default_timeout=10,
ping_interval=3000,
labels=None,
database_role=None,
):
self._pending_sessions = queue.Queue()

super(TransactionPingingPool, self).__init__(
size, default_timeout, ping_interval, labels=labels
size,
default_timeout,
ping_interval,
labels=labels,
database_role=database_role,
)

self.begin_pending_transactions()
Expand All @@ -489,6 +552,7 @@ def bind(self, database):
when needed.
"""
super(TransactionPingingPool, self).bind(database)
self._database_role = self._database_role or self._database.database_role
self.begin_pending_transactions()

def put(self, session):
Expand Down
Loading