From b59cc8d4fae593eb7592455a1696d7ab996a53dd Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Thu, 10 Oct 2024 17:42:32 -0400 Subject: [PATCH 1/2] docs: Add ingestion from GCS sample (#1273) --- samples/snippets/publisher.py | 105 +++++++++++++++++++++++++++++ samples/snippets/publisher_test.py | 34 ++++++++++ samples/snippets/requirements.txt | 2 +- 3 files changed, 140 insertions(+), 1 deletion(-) diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index d2be927b8..7cb7ca223 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -103,6 +103,88 @@ def create_topic_with_kinesis_ingestion( # [END pubsub_create_topic_with_kinesis_ingestion] +def create_topic_with_cloud_storage_ingestion( + project_id: str, + topic_id: str, + bucket: str, + input_format: str, + text_delimiter: str, + match_glob: str, + minimum_object_create_time: str, +) -> None: + """Create a new Pub/Sub topic with Cloud Storage Ingestion Settings.""" + # [START pubsub_create_topic_with_cloud_storage_ingestion] + from google.cloud import pubsub_v1 + from google.protobuf import timestamp_pb2 + from google.pubsub_v1.types import Topic + from google.pubsub_v1.types import IngestionDataSourceSettings + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + # bucket = "your-bucket" + # input_format = "text" (can be one of "text", "avro", "pubsub_avro") + # text_delimiter = "\n" + # match_glob = "**.txt" + # minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ" + + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_id) + + cloud_storage_settings = IngestionDataSourceSettings.CloudStorage( + bucket=bucket, + ) + if input_format == "text": + cloud_storage_settings.text_format = ( + IngestionDataSourceSettings.CloudStorage.TextFormat( + delimiter=text_delimiter + ) + ) + elif input_format == "avro": + cloud_storage_settings.avro_format = ( + IngestionDataSourceSettings.CloudStorage.AvroFormat() + ) + elif input_format == "pubsub_avro": + cloud_storage_settings.pubsub_avro_format = ( + IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat() + ) + else: + print( + "Invalid input_format: " + + input_format + + "; must be in ('text', 'avro', 'pubsub_avro')" + ) + return + + if match_glob: + cloud_storage_settings.match_glob = match_glob + + if minimum_object_create_time: + try: + minimum_object_create_time_timestamp = timestamp_pb2.Timestamp() + minimum_object_create_time_timestamp.FromJsonString( + minimum_object_create_time + ) + cloud_storage_settings.minimum_object_create_time = ( + minimum_object_create_time_timestamp + ) + except ValueError: + print("Invalid minimum_object_create_time: " + minimum_object_create_time) + return + + request = Topic( + name=topic_path, + ingestion_data_source_settings=IngestionDataSourceSettings( + cloud_storage=cloud_storage_settings, + ), + ) + + topic = publisher.create_topic(request=request) + + print(f"Created topic: {topic.name} with Cloud Storage Ingestion Settings") + # [END pubsub_create_topic_with_cloud_storage_ingestion] + + def update_topic_type( project_id: str, topic_id: str, @@ -615,6 +697,19 @@ def detach_subscription(project_id: str, subscription_id: str) -> None: create_topic_with_kinesis_ingestion_parser.add_argument("aws_role_arn") create_topic_with_kinesis_ingestion_parser.add_argument("gcp_service_account") + create_topic_with_cloud_storage_ingestion_parser = subparsers.add_parser( + "create_cloud_storage_ingestion", + help=create_topic_with_cloud_storage_ingestion.__doc__, + ) + create_topic_with_cloud_storage_ingestion_parser.add_argument("topic_id") + create_topic_with_cloud_storage_ingestion_parser.add_argument("bucket") + create_topic_with_cloud_storage_ingestion_parser.add_argument("input_format") + create_topic_with_cloud_storage_ingestion_parser.add_argument("text_delimiter") + create_topic_with_cloud_storage_ingestion_parser.add_argument("match_glob") + create_topic_with_cloud_storage_ingestion_parser.add_argument( + "minimum_object_create_time" + ) + update_topic_type_parser = subparsers.add_parser( "update_kinesis_ingestion", help=update_topic_type.__doc__ ) @@ -693,6 +788,16 @@ def detach_subscription(project_id: str, subscription_id: str) -> None: args.aws_role_arn, args.gcp_service_account, ) + elif args.command == "create_cloud_storage_ingestion": + create_topic_with_cloud_storage_ingestion( + args.project_id, + args.topic_id, + args.bucket, + args.input_format, + args.text_delimiter, + args.match_glob, + args.minimum_object_create_time, + ) elif args.command == "update_kinesis_ingestion": update_topic_type( args.project_id, diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py index adb015e8a..6f17305cb 100644 --- a/samples/snippets/publisher_test.py +++ b/samples/snippets/publisher_test.py @@ -162,6 +162,40 @@ def test_create_topic_with_kinesis_ingestion( publisher_client.delete_topic(request={"topic": topic_path}) +def test_create_topic_with_cloud_storage_ingestion( + publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str] +) -> None: + # The scope of `topic_path` is limited to this function. + topic_path = publisher_client.topic_path(PROJECT_ID, TOPIC_ID) + + bucket = "pubsub-cloud-storage-bucket" + input_format = "text" + text_delimiter = "," + match_glob = "**.txt" + minimum_object_create_time = "1970-01-01T00:00:01Z" + + try: + publisher_client.delete_topic(request={"topic": topic_path}) + except NotFound: + pass + + publisher.create_topic_with_cloud_storage_ingestion( + PROJECT_ID, + TOPIC_ID, + bucket, + input_format, + text_delimiter, + match_glob, + minimum_object_create_time, + ) + + out, _ = capsys.readouterr() + assert f"Created topic: {topic_path} with Cloud Storage Ingestion Settings" in out + + # Clean up resource created for the test. + publisher_client.delete_topic(request={"topic": topic_path}) + + def test_update_topic_type( publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str] ) -> None: diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt index 4e86dfa5b..3a16ebc94 100644 --- a/samples/snippets/requirements.txt +++ b/samples/snippets/requirements.txt @@ -1,4 +1,4 @@ -google-cloud-pubsub==2.25.0 +google-cloud-pubsub==2.26.0 avro==1.12.0 protobuf===4.24.4; python_version == '3.7' protobuf==5.28.0; python_version >= '3.8' From 8febfc7c47fe7d9990944b0d7962803f00ec012a Mon Sep 17 00:00:00 2001 From: "release-please[bot]" <55107282+release-please[bot]@users.noreply.github.com> Date: Tue, 15 Oct 2024 18:51:27 -0400 Subject: [PATCH 2/2] chore(main): release 2.26.1 (#1276) Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> --- .release-please-manifest.json | 2 +- CHANGELOG.md | 7 +++++++ google/pubsub/gapic_version.py | 2 +- google/pubsub_v1/gapic_version.py | 2 +- .../snippet_metadata_google.pubsub.v1.json | 2 +- 5 files changed, 11 insertions(+), 4 deletions(-) diff --git a/.release-please-manifest.json b/.release-please-manifest.json index d53227a83..9c3477bd7 100644 --- a/.release-please-manifest.json +++ b/.release-please-manifest.json @@ -1,4 +1,4 @@ { - ".": "2.26.0" + ".": "2.26.1" } \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index e1a67b711..64bb863eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,13 @@ [1]: https://pypi.org/project/google-cloud-pubsub/#history +## [2.26.1](https://github.com/googleapis/python-pubsub/compare/v2.26.0...v2.26.1) (2024-10-10) + + +### Documentation + +* Add ingestion from GCS sample ([#1273](https://github.com/googleapis/python-pubsub/issues/1273)) ([b59cc8d](https://github.com/googleapis/python-pubsub/commit/b59cc8d4fae593eb7592455a1696d7ab996a53dd)) + ## [2.26.0](https://github.com/googleapis/python-pubsub/compare/v2.25.2...v2.26.0) (2024-10-09) diff --git a/google/pubsub/gapic_version.py b/google/pubsub/gapic_version.py index d56eed5c5..040d4e7f1 100644 --- a/google/pubsub/gapic_version.py +++ b/google/pubsub/gapic_version.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__version__ = "2.26.0" # {x-release-please-version} +__version__ = "2.26.1" # {x-release-please-version} diff --git a/google/pubsub_v1/gapic_version.py b/google/pubsub_v1/gapic_version.py index d56eed5c5..040d4e7f1 100644 --- a/google/pubsub_v1/gapic_version.py +++ b/google/pubsub_v1/gapic_version.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__version__ = "2.26.0" # {x-release-please-version} +__version__ = "2.26.1" # {x-release-please-version} diff --git a/samples/generated_samples/snippet_metadata_google.pubsub.v1.json b/samples/generated_samples/snippet_metadata_google.pubsub.v1.json index a20353b05..ead6d83f3 100644 --- a/samples/generated_samples/snippet_metadata_google.pubsub.v1.json +++ b/samples/generated_samples/snippet_metadata_google.pubsub.v1.json @@ -8,7 +8,7 @@ ], "language": "PYTHON", "name": "google-cloud-pubsub", - "version": "2.26.0" + "version": "2.26.1" }, "snippets": [ {