Skip to content

Commit ee804a1

Browse files
authored
BigQuery: ensure that KeyboardInterrupt during to_dataframeno longer hangs. (googleapis#7698)
* fix: `KeyboardInterrupt` during `to_dataframe` (with BQ Storage API) no longer hangs I noticed in manually testing `to_dataframe` that it would stop the current cell when I hit Ctrl-C, but data kept on downloading in the background. Trying to exit the Python shell, I'd notice that it would hang until I pressed Ctrl-C a few more times. Rather than get the DataFrame for each stream in one big chunk, loop through each block and exit if the function needs to quit early. This follows the pattern at https://stackoverflow.com/a/29237343/101923 Update tests to ensure multiple progress interval loops. * Refactor _to_dataframe_bqstorage_stream
1 parent f540d42 commit ee804a1

File tree

3 files changed

+228
-18
lines changed

3 files changed

+228
-18
lines changed

bigquery/google/cloud/bigquery/table.py

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
)
6767
_TABLE_HAS_NO_SCHEMA = 'Table has no schema: call "client.get_table()"'
6868
_MARKER = object()
69+
_PROGRESS_INTERVAL = 1.0 # Time between download status updates, in seconds.
6970

7071

7172
def _reference_getter(table):
@@ -1386,6 +1387,27 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None):
13861387

13871388
return pandas.concat(frames)
13881389

1390+
def _to_dataframe_bqstorage_stream(
1391+
self, bqstorage_client, dtypes, columns, session, stream
1392+
):
1393+
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
1394+
rowstream = bqstorage_client.read_rows(position).rows(session)
1395+
1396+
frames = []
1397+
for page in rowstream.pages:
1398+
if self._to_dataframe_finished:
1399+
return
1400+
frames.append(page.to_dataframe(dtypes=dtypes))
1401+
1402+
# Avoid errors on unlucky streams with no blocks. pandas.concat
1403+
# will fail on an empty list.
1404+
if not frames:
1405+
return pandas.DataFrame(columns=columns)
1406+
1407+
# page.to_dataframe() does not preserve column order. Rearrange at
1408+
# the end using manually-parsed schema.
1409+
return pandas.concat(frames)[columns]
1410+
13891411
def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
13901412
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""
13911413
if bigquery_storage_v1beta1 is None:
@@ -1421,17 +1443,46 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
14211443
if not session.streams:
14221444
return pandas.DataFrame(columns=columns)
14231445

1424-
def get_dataframe(stream):
1425-
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
1426-
rowstream = bqstorage_client.read_rows(position)
1427-
return rowstream.to_dataframe(session, dtypes=dtypes)
1446+
# Use _to_dataframe_finished to notify worker threads when to quit.
1447+
# See: https://stackoverflow.com/a/29237343/101923
1448+
self._to_dataframe_finished = False
1449+
1450+
def get_frames(pool):
1451+
frames = []
1452+
1453+
# Manually submit jobs and wait for download to complete rather
1454+
# than using pool.map because pool.map continues running in the
1455+
# background even if there is an exception on the main thread.
1456+
# See: https://github.com/googleapis/google-cloud-python/pull/7698
1457+
not_done = [
1458+
pool.submit(
1459+
self._to_dataframe_bqstorage_stream,
1460+
bqstorage_client,
1461+
dtypes,
1462+
columns,
1463+
session,
1464+
stream,
1465+
)
1466+
for stream in session.streams
1467+
]
1468+
1469+
while not_done:
1470+
done, not_done = concurrent.futures.wait(
1471+
not_done, timeout=_PROGRESS_INTERVAL
1472+
)
1473+
frames.extend([future.result() for future in done])
1474+
return frames
14281475

14291476
with concurrent.futures.ThreadPoolExecutor() as pool:
1430-
frames = pool.map(get_dataframe, session.streams)
1477+
try:
1478+
frames = get_frames(pool)
1479+
finally:
1480+
# No need for a lock because reading/replacing a variable is
1481+
# defined to be an atomic operation in the Python language
1482+
# definition (enforced by the global interpreter lock).
1483+
self._to_dataframe_finished = True
14311484

1432-
# rowstream.to_dataframe() does not preserve column order. Rearrange at
1433-
# the end using manually-parsed schema.
1434-
return pandas.concat(frames)[columns]
1485+
return pandas.concat(frames)
14351486

14361487
def _get_progress_bar(self, progress_bar_type):
14371488
"""Construct a tqdm progress bar object, if tqdm is installed."""

bigquery/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
]
3838
extras = {
3939
"bqstorage": [
40-
"google-cloud-bigquery-storage >= 0.2.0dev1, <2.0.0dev",
40+
"google-cloud-bigquery-storage >= 0.4.0, <2.0.0dev",
4141
"fastavro>=0.21.2",
4242
],
4343
"pandas": ["pandas>=0.17.1"],

bigquery/tests/unit/test_table.py

Lines changed: 168 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import concurrent.futures
1516
import itertools
1617
import json
18+
import time
1719
import unittest
1820
import warnings
1921

@@ -1705,7 +1707,7 @@ def test_to_dataframe_error_if_pandas_is_none(self):
17051707
@unittest.skipIf(
17061708
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
17071709
)
1708-
def test_to_dataframe_w_bqstorage_empty(self):
1710+
def test_to_dataframe_w_bqstorage_no_streams(self):
17091711
from google.cloud.bigquery import schema
17101712
from google.cloud.bigquery import table as mut
17111713

@@ -1746,18 +1748,70 @@ def test_to_dataframe_w_bqstorage_empty(self):
17461748
@unittest.skipIf(
17471749
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
17481750
)
1749-
def test_to_dataframe_w_bqstorage_nonempty(self):
1751+
def test_to_dataframe_w_bqstorage_empty_streams(self):
17501752
from google.cloud.bigquery import schema
17511753
from google.cloud.bigquery import table as mut
17521754
from google.cloud.bigquery_storage_v1beta1 import reader
17531755

1756+
bqstorage_client = mock.create_autospec(
1757+
bigquery_storage_v1beta1.BigQueryStorageClient
1758+
)
1759+
session = bigquery_storage_v1beta1.types.ReadSession(
1760+
streams=[{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}]
1761+
)
1762+
session.avro_schema.schema = json.dumps(
1763+
{
1764+
"fields": [
1765+
{"name": "colA"},
1766+
# Not alphabetical to test column order.
1767+
{"name": "colC"},
1768+
{"name": "colB"},
1769+
]
1770+
}
1771+
)
1772+
bqstorage_client.create_read_session.return_value = session
1773+
17541774
mock_rowstream = mock.create_autospec(reader.ReadRowsStream)
1755-
mock_rowstream.to_dataframe.return_value = pandas.DataFrame(
1756-
[
1757-
{"colA": 1, "colB": "abc", "colC": 2.0},
1758-
{"colA": -1, "colB": "def", "colC": 4.0},
1759-
]
1775+
bqstorage_client.read_rows.return_value = mock_rowstream
1776+
1777+
mock_rows = mock.create_autospec(reader.ReadRowsIterable)
1778+
mock_rowstream.rows.return_value = mock_rows
1779+
mock_pages = mock.PropertyMock(return_value=())
1780+
type(mock_rows).pages = mock_pages
1781+
1782+
schema = [
1783+
schema.SchemaField("colA", "IGNORED"),
1784+
schema.SchemaField("colC", "IGNORED"),
1785+
schema.SchemaField("colB", "IGNORED"),
1786+
]
1787+
1788+
row_iterator = mut.RowIterator(
1789+
_mock_client(),
1790+
None, # api_request: ignored
1791+
None, # path: ignored
1792+
schema,
1793+
table=mut.TableReference.from_string("proj.dset.tbl"),
1794+
selected_fields=schema,
17601795
)
1796+
1797+
got = row_iterator.to_dataframe(bqstorage_client)
1798+
1799+
column_names = ["colA", "colC", "colB"]
1800+
self.assertEqual(list(got), column_names)
1801+
self.assertTrue(got.empty)
1802+
1803+
@unittest.skipIf(pandas is None, "Requires `pandas`")
1804+
@unittest.skipIf(
1805+
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
1806+
)
1807+
def test_to_dataframe_w_bqstorage_nonempty(self):
1808+
from google.cloud.bigquery import schema
1809+
from google.cloud.bigquery import table as mut
1810+
from google.cloud.bigquery_storage_v1beta1 import reader
1811+
1812+
# Speed up testing.
1813+
mut._PROGRESS_INTERVAL = 0.01
1814+
17611815
bqstorage_client = mock.create_autospec(
17621816
bigquery_storage_v1beta1.BigQueryStorageClient
17631817
)
@@ -1775,7 +1829,27 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
17751829
}
17761830
)
17771831
bqstorage_client.create_read_session.return_value = session
1832+
1833+
mock_rowstream = mock.create_autospec(reader.ReadRowsStream)
17781834
bqstorage_client.read_rows.return_value = mock_rowstream
1835+
1836+
mock_rows = mock.create_autospec(reader.ReadRowsIterable)
1837+
mock_rowstream.rows.return_value = mock_rows
1838+
1839+
def blocking_to_dataframe(*args, **kwargs):
1840+
# Sleep for longer than the waiting interval so that we know we're
1841+
# only reading one page per loop at most.
1842+
time.sleep(2 * mut._PROGRESS_INTERVAL)
1843+
return pandas.DataFrame(
1844+
{"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]},
1845+
columns=["colA", "colB", "colC"],
1846+
)
1847+
1848+
mock_page = mock.create_autospec(reader.ReadRowsPage)
1849+
mock_page.to_dataframe.side_effect = blocking_to_dataframe
1850+
mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page))
1851+
type(mock_rows).pages = mock_pages
1852+
17791853
schema = [
17801854
schema.SchemaField("colA", "IGNORED"),
17811855
schema.SchemaField("colC", "IGNORED"),
@@ -1791,10 +1865,95 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
17911865
selected_fields=schema,
17921866
)
17931867

1794-
got = row_iterator.to_dataframe(bqstorage_client)
1868+
with mock.patch(
1869+
"concurrent.futures.wait", wraps=concurrent.futures.wait
1870+
) as mock_wait:
1871+
got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client)
1872+
17951873
column_names = ["colA", "colC", "colB"]
17961874
self.assertEqual(list(got), column_names)
1797-
self.assertEqual(len(got.index), 2)
1875+
self.assertEqual(len(got.index), 6)
1876+
# Make sure that this test looped through multiple progress intervals.
1877+
self.assertGreaterEqual(mock_wait.call_count, 2)
1878+
1879+
@unittest.skipIf(pandas is None, "Requires `pandas`")
1880+
@unittest.skipIf(
1881+
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
1882+
)
1883+
def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self):
1884+
from google.cloud.bigquery import schema
1885+
from google.cloud.bigquery import table as mut
1886+
from google.cloud.bigquery_storage_v1beta1 import reader
1887+
1888+
# Speed up testing.
1889+
mut._PROGRESS_INTERVAL = 0.01
1890+
1891+
bqstorage_client = mock.create_autospec(
1892+
bigquery_storage_v1beta1.BigQueryStorageClient
1893+
)
1894+
session = bigquery_storage_v1beta1.types.ReadSession(
1895+
streams=[
1896+
# Use two streams because one will fail with a
1897+
# KeyboardInterrupt, and we want to check that the other stream
1898+
# ends early.
1899+
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"},
1900+
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"},
1901+
]
1902+
)
1903+
session.avro_schema.schema = json.dumps(
1904+
{"fields": [{"name": "colA"}, {"name": "colB"}, {"name": "colC"}]}
1905+
)
1906+
bqstorage_client.create_read_session.return_value = session
1907+
1908+
def blocking_to_dataframe(*args, **kwargs):
1909+
# Sleep for longer than the waiting interval so that we know we're
1910+
# only reading one page per loop at most.
1911+
time.sleep(2 * mut._PROGRESS_INTERVAL)
1912+
return pandas.DataFrame(
1913+
{"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]},
1914+
columns=["colA", "colB", "colC"],
1915+
)
1916+
1917+
mock_page = mock.create_autospec(reader.ReadRowsPage)
1918+
mock_page.to_dataframe.side_effect = blocking_to_dataframe
1919+
mock_rows = mock.create_autospec(reader.ReadRowsIterable)
1920+
mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page))
1921+
type(mock_rows).pages = mock_pages
1922+
mock_rowstream = mock.create_autospec(reader.ReadRowsStream)
1923+
mock_rowstream.rows.return_value = mock_rows
1924+
1925+
mock_cancelled_rows = mock.create_autospec(reader.ReadRowsIterable)
1926+
mock_cancelled_pages = mock.PropertyMock(side_effect=KeyboardInterrupt)
1927+
type(mock_cancelled_rows).pages = mock_cancelled_pages
1928+
mock_cancelled_rowstream = mock.create_autospec(reader.ReadRowsStream)
1929+
mock_cancelled_rowstream.rows.return_value = mock_cancelled_rows
1930+
1931+
bqstorage_client.read_rows.side_effect = (
1932+
mock_cancelled_rowstream,
1933+
mock_rowstream,
1934+
)
1935+
1936+
schema = [
1937+
schema.SchemaField("colA", "IGNORED"),
1938+
schema.SchemaField("colB", "IGNORED"),
1939+
schema.SchemaField("colC", "IGNORED"),
1940+
]
1941+
1942+
row_iterator = mut.RowIterator(
1943+
_mock_client(),
1944+
None, # api_request: ignored
1945+
None, # path: ignored
1946+
schema,
1947+
table=mut.TableReference.from_string("proj.dset.tbl"),
1948+
selected_fields=schema,
1949+
)
1950+
1951+
with pytest.raises(KeyboardInterrupt):
1952+
row_iterator.to_dataframe(bqstorage_client=bqstorage_client)
1953+
1954+
# Should not have fetched the third page of results because exit_early
1955+
# should have been set.
1956+
self.assertLessEqual(mock_page.to_dataframe.call_count, 2)
17981957

17991958
@unittest.skipIf(pandas is None, "Requires `pandas`")
18001959
@unittest.skipIf(

0 commit comments

Comments
 (0)